Skip to content

Commit

Permalink
improve error handling on non-existent columns, add flush on shutdown (
Browse files Browse the repository at this point in the history
  • Loading branch information
fschoell authored Oct 31, 2024
1 parent f96966f commit 8ebe195
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 9 deletions.
12 changes: 8 additions & 4 deletions db/dialect_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,15 @@ func convertOpToClickhouseValues(o *Operation) ([]any, error) {
sort.Strings(columns)
values := make([]any, len(o.data))
for i, v := range columns {
convertedType, err := convertToType(o.data[v], o.table.columnsByName[v].scanType)
if err != nil {
return nil, fmt.Errorf("converting value %q to type %q in column %q: %w", o.data[v], o.table.columnsByName[v].scanType, v, err)
if col, exists := o.table.columnsByName[v]; exists {
convertedType, err := convertToType(o.data[v], col.scanType)
if err != nil {
return nil, fmt.Errorf("converting value %q to type %q in column %q: %w", o.data[v], o.table.columnsByName[v].scanType, v, err)
}
values[i] = convertedType
} else {
return nil, fmt.Errorf("received data for table column %q which does not exist on the database", v)
}
values[i] = convertedType
}
return values, nil
}
Expand Down
14 changes: 9 additions & 5 deletions sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,18 @@ func (s *SQLSinker) Run(ctx context.Context) {
}
}

s.Sinker.OnTerminating(s.Shutdown)
s.OnTerminating(func(err error) {
s.Sinker.OnTerminating(func(err error) {
s.stats.LogNow()
s.logger.Info("sql sinker terminating", zap.Stringer("last_block_written", s.stats.lastBlock))
s.Sinker.Shutdown(err)
if err == nil {
s.logger.Info("sql sinker terminating, flushing remaining rows")
_, err = s.loader.Flush(ctx, s.OutputModuleHash(), cursor, 0)
}
s.logger.Info("sql sinker terminated", zap.Stringer("last_block_written", s.stats.lastBlock), zap.Error(err))
s.Shutdown(err)
})
s.OnTerminating(s.Sinker.Shutdown)

s.OnTerminating(func(_ error) { s.stats.Close() })
s.stats.OnTerminating(func(_ error) { s.stats.Close() })
s.stats.OnTerminated(func(err error) { s.Shutdown(err) })

logEach := 15 * time.Second
Expand Down

0 comments on commit 8ebe195

Please sign in to comment.