From 8ebe19509855f97d94cd1a3fc2a816ffe5536fce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Frederik=20Sch=C3=B6ll?= Date: Thu, 31 Oct 2024 13:32:19 +0100 Subject: [PATCH] improve error handling on non-existent columns, add flush on shutdown (#25) --- db/dialect_clickhouse.go | 12 ++++++++---- sinker/sinker.go | 14 +++++++++----- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/db/dialect_clickhouse.go b/db/dialect_clickhouse.go index 10a57f3..9025cf6 100644 --- a/db/dialect_clickhouse.go +++ b/db/dialect_clickhouse.go @@ -171,11 +171,15 @@ func convertOpToClickhouseValues(o *Operation) ([]any, error) { sort.Strings(columns) values := make([]any, len(o.data)) for i, v := range columns { - convertedType, err := convertToType(o.data[v], o.table.columnsByName[v].scanType) - if err != nil { - return nil, fmt.Errorf("converting value %q to type %q in column %q: %w", o.data[v], o.table.columnsByName[v].scanType, v, err) + if col, exists := o.table.columnsByName[v]; exists { + convertedType, err := convertToType(o.data[v], col.scanType) + if err != nil { + return nil, fmt.Errorf("converting value %q to type %q in column %q: %w", o.data[v], o.table.columnsByName[v].scanType, v, err) + } + values[i] = convertedType + } else { + return nil, fmt.Errorf("received data for table column %q which does not exist on the database", v) } - values[i] = convertedType } return values, nil } diff --git a/sinker/sinker.go b/sinker/sinker.go index dd0ff2c..d910561 100644 --- a/sinker/sinker.go +++ b/sinker/sinker.go @@ -63,14 +63,18 @@ func (s *SQLSinker) Run(ctx context.Context) { } } - s.Sinker.OnTerminating(s.Shutdown) - s.OnTerminating(func(err error) { + s.Sinker.OnTerminating(func(err error) { s.stats.LogNow() - s.logger.Info("sql sinker terminating", zap.Stringer("last_block_written", s.stats.lastBlock)) - s.Sinker.Shutdown(err) + if err == nil { + s.logger.Info("sql sinker terminating, flushing remaining rows") + _, err = s.loader.Flush(ctx, s.OutputModuleHash(), cursor, 0) + } + s.logger.Info("sql sinker terminated", zap.Stringer("last_block_written", s.stats.lastBlock), zap.Error(err)) + s.Shutdown(err) }) + s.OnTerminating(s.Sinker.Shutdown) - s.OnTerminating(func(_ error) { s.stats.Close() }) + s.stats.OnTerminating(func(_ error) { s.stats.Close() }) s.stats.OnTerminated(func(err error) { s.Shutdown(err) }) logEach := 15 * time.Second