Skip to content

Commit

Permalink
Merge pull request #138 from hyperledger/db-conflict
Browse files Browse the repository at this point in the history
DB-side UPSERT optimiziation support
  • Loading branch information
peterbroadhurst authored May 24, 2024
2 parents dab0027 + 38236e1 commit d4e905e
Show file tree
Hide file tree
Showing 21 changed files with 314 additions and 44 deletions.
2 changes: 1 addition & 1 deletion mocks/authmocks/plugin.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/crudmocks/crud.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/dbmigratemocks/driver.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/httpservermocks/go_http_server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/wsservermocks/protocol.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mocks/wsservermocks/web_socket_server.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/config/cobracmd.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -27,7 +27,7 @@ func ShowConfigCommand(initConf func() error) *cobra.Command {
Use: "showconfig",
Aliases: []string{"showconf"},
Short: "List out the configuration options",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
if err := initConf(); err != nil {
return err
}
Expand Down
82 changes: 68 additions & 14 deletions pkg/dbsql/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package dbsql
import (
"context"
"database/sql"
"database/sql/driver"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -50,6 +51,7 @@ const (
UpsertOptimizationSkip UpsertOptimization = iota
UpsertOptimizationNew
UpsertOptimizationExisting
UpsertOptimizationDB // only supported if the DB layer support ON CONFLICT semantics
)

type GetOption int
Expand Down Expand Up @@ -276,18 +278,23 @@ func (c *CrudBase[T]) idFilter(id string) sq.Eq {
return filter
}

func (c *CrudBase[T]) isImmutable(col string) bool {
for _, immutable := range append(c.ImmutableColumns, c.GetIDField(), ColumnCreated, ColumnUpdated, c.DB.sequenceColumn) {
if col == immutable {
return true
}
}
return false
}

func (c *CrudBase[T]) buildUpdateList(_ context.Context, update sq.UpdateBuilder, inst T, includeNil bool) sq.UpdateBuilder {
colLoop:
for _, col := range c.Columns {
for _, immutable := range append(c.ImmutableColumns, c.GetIDField(), ColumnCreated, ColumnUpdated, c.DB.sequenceColumn) {
if col == immutable {
continue colLoop
if !c.isImmutable(col) {
value := c.getFieldValue(inst, col)
if includeNil || !isNil(value) {
update = update.Set(col, value)
}
}
value := c.getFieldValue(inst, col)
if includeNil || !isNil(value) {
update = update.Set(col, value)
}
}
if !c.TimesDisabled {
update = update.Set(ColumnUpdated, fftypes.Now())
Expand Down Expand Up @@ -332,9 +339,8 @@ func (c *CrudBase[T]) getFieldValue(inst T, col string) interface{} {
return val
}

func (c *CrudBase[T]) setInsertTimestamps(inst T) {
func (c *CrudBase[T]) setInsertTimestamps(inst T, now *fftypes.FFTime) {
if !c.TimesDisabled {
now := fftypes.Now()
inst.SetCreated(now)
inst.SetUpdated(now)
}
Expand All @@ -353,7 +359,7 @@ func (c *CrudBase[T]) attemptInsert(ctx context.Context, tx *TXWrapper, inst T,
}
}

c.setInsertTimestamps(inst)
c.setInsertTimestamps(inst, fftypes.Now())
insert := sq.Insert(c.Table).Columns(c.Columns...)
values := make([]interface{}, len(c.Columns))
for i, col := range c.Columns {
Expand Down Expand Up @@ -383,11 +389,18 @@ func (c *CrudBase[T]) Upsert(ctx context.Context, inst T, optimization UpsertOpt
// The expectation is that the optimization will hit almost all of the time,
// as only recovery paths require us to go down the un-optimized route.
optimized := false
if optimization == UpsertOptimizationNew {
switch {
case optimization == UpsertOptimizationDB && c.DB.features.DBOptimizedUpsertBuilder != nil:
optimized = true // the DB does the work here, so any failure is a straight failure
created, err = c.dbOptimizedUpsert(ctx, tx, inst)
if err != nil {
return false, err
}
case optimization == UpsertOptimizationNew:
opErr := c.attemptInsert(ctx, tx, inst, true /* we want a failure here we can progress past */)
optimized = opErr == nil
created = optimized
} else if optimization == UpsertOptimizationExisting {
default: // UpsertOptimizationExisting, or fallback if DB optimization requested
rowsAffected, opErr := c.updateFromInstance(ctx, tx, inst, true /* full replace */)
optimized = opErr == nil && rowsAffected == 1
}
Expand Down Expand Up @@ -426,6 +439,47 @@ func (c *CrudBase[T]) Upsert(ctx context.Context, inst T, optimization UpsertOpt
return created, c.DB.CommitTx(ctx, tx, autoCommit)
}

func (c *CrudBase[T]) dbOptimizedUpsert(ctx context.Context, tx *TXWrapper, inst T) (created bool, err error) {

// Caller responsible for checking this is available before driving this path
optimizedInsertBuilder := c.DB.provider.Features().DBOptimizedUpsertBuilder

if c.IDValidator != nil {
if err := c.IDValidator(ctx, inst.GetID()); err != nil {
return false, err
}
}
now := fftypes.Now()
c.setInsertTimestamps(inst, now)

values := make(map[string]driver.Value)
updateCols := make([]string, 0, len(c.Columns))
for _, col := range c.Columns {
values[col] = c.getFieldValue(inst, col)
if !c.isImmutable(col) {
updateCols = append(updateCols, col)
}
}
var rows *sql.Rows
query, err := optimizedInsertBuilder(ctx, c.Table, c.GetIDField(), c.Columns, updateCols, ColumnCreated, values)
if err == nil {
rows, _, err = c.DB.RunAsQueryTx(ctx, c.Table, tx, query.PlaceholderFormat(c.DB.features.PlaceholderFormat))
}
if err != nil {
return false, err
}
defer rows.Close()
if rows.Next() {
var createTime fftypes.FFTime
if err = rows.Scan(&createTime); err != nil {
return false, i18n.NewError(ctx, i18n.MsgDBReadInsertTSFailed, err)
}
created = !createTime.Time().Before(*now.Time())
}
return created, nil

}

func (c *CrudBase[T]) InsertMany(ctx context.Context, instances []T, allowPartialSuccess bool, hooks ...PostCompletionHook) (err error) {

ctx, tx, autoCommit, err := c.DB.BeginOrUseTx(ctx)
Expand All @@ -436,7 +490,7 @@ func (c *CrudBase[T]) InsertMany(ctx context.Context, instances []T, allowPartia
if c.DB.Features().MultiRowInsert {
insert := sq.Insert(c.Table).Columns(c.Columns...)
for _, inst := range instances {
c.setInsertTimestamps(inst)
c.setInsertTimestamps(inst, fftypes.Now())
values := make([]interface{}, len(c.Columns))
for i, col := range c.Columns {
values[i] = c.getFieldValue(inst, col)
Expand Down
91 changes: 91 additions & 0 deletions pkg/dbsql/crud_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"os"
"testing"
"time"

"github.com/DATA-DOG/go-sqlmock"
"github.com/Masterminds/squirrel"
Expand Down Expand Up @@ -776,6 +777,96 @@ func TestUpsertFailUpdate(t *testing.T) {
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestUpsertPSQLOptimizedCreated(t *testing.T) {
after := (fftypes.FFTime)(fftypes.Now().Time().Add(1 * time.Hour))
mp := NewMockProvider()
mp.FakePSQLUpsertOptimization = true
db, mock := mp.UTInit()
tc := newCRUDCollection(&db.Database, "ns1")
mock.ExpectBegin()
mock.ExpectQuery("INSERT INTO crudables.*ON CONFLICT .* DO UPDATE SET.*RETURNING created").WillReturnRows(
sqlmock.NewRows([]string{"created"}).AddRow(after.String()),
)
mock.ExpectCommit()
created, err := tc.Upsert(context.Background(), &TestCRUDable{
ResourceBase: ResourceBase{
ID: fftypes.NewUUID(),
},
}, UpsertOptimizationDB)
assert.NoError(t, err)
assert.True(t, created)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestUpsertPSQLOptimizedUpdated(t *testing.T) {
before := fftypes.Now()
mp := NewMockProvider()
mp.FakePSQLUpsertOptimization = true
db, mock := mp.UTInit()
tc := newCRUDCollection(&db.Database, "ns1")
mock.ExpectBegin()
mock.ExpectQuery("INSERT INTO crudables.*ON CONFLICT .* DO UPDATE SET.*RETURNING created").WillReturnRows(
sqlmock.NewRows([]string{"created"}).AddRow(before.String()),
)
mock.ExpectCommit()
created, err := tc.Upsert(context.Background(), &TestCRUDable{
ResourceBase: ResourceBase{
ID: fftypes.NewUUID(),
},
}, UpsertOptimizationDB)
assert.NoError(t, err)
assert.False(t, created)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestUpsertPSQLOptimizedBadID(t *testing.T) {
mp := NewMockProvider()
mp.FakePSQLUpsertOptimization = true
db, mock := mp.UTInit()
tc := newCRUDCollection(&db.Database, "ns1")
mock.ExpectBegin()
mock.ExpectRollback()
_, err := tc.Upsert(context.Background(), &TestCRUDable{}, UpsertOptimizationDB)
assert.Regexp(t, "FF00138", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestUpsertPSQLOptimizedQueryFail(t *testing.T) {
mp := NewMockProvider()
mp.FakePSQLUpsertOptimization = true
db, mock := mp.UTInit()
tc := newCRUDCollection(&db.Database, "ns1")
mock.ExpectBegin()
mock.ExpectQuery("INSERT INTO crudables.*ON CONFLICT .* DO UPDATE SET.*RETURNING created").WillReturnError(fmt.Errorf("pop"))
mock.ExpectRollback()
_, err := tc.Upsert(context.Background(), &TestCRUDable{
ResourceBase: ResourceBase{
ID: fftypes.NewUUID(),
},
}, UpsertOptimizationDB)
assert.Regexp(t, "FF00176.*pop", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestUpsertPSQLOptimizedBadTimeReturn(t *testing.T) {
mp := NewMockProvider()
mp.FakePSQLUpsertOptimization = true
db, mock := mp.UTInit()
tc := newCRUDCollection(&db.Database, "ns1")
mock.ExpectBegin()
mock.ExpectQuery("INSERT INTO crudables.*ON CONFLICT .* DO UPDATE SET.*RETURNING created").WillReturnRows(
sqlmock.NewRows([]string{"created"}).AddRow("!!!this is not a time!!!"),
)
mock.ExpectRollback()
_, err := tc.Upsert(context.Background(), &TestCRUDable{
ResourceBase: ResourceBase{
ID: fftypes.NewUUID(),
},
}, UpsertOptimizationDB)
assert.Regexp(t, "FF00248.*FF00136", err)
assert.NoError(t, mock.ExpectationsWereMet())
}

func TestInsertManyBeginFail(t *testing.T) {
db, mock := NewMockProvider().UTInit()
tc := newCRUDCollection(&db.Database, "ns1")
Expand Down
11 changes: 8 additions & 3 deletions pkg/dbsql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
_ "github.com/golang-migrate/migrate/v4/source/file"
)

type QueryModifier = func(sq.SelectBuilder) (sq.SelectBuilder, error)

type Database struct {
db *sql.DB
provider Provider
Expand All @@ -42,8 +44,6 @@ type Database struct {
sequenceColumn string
}

type QueryModifier = func(sq.SelectBuilder) (sq.SelectBuilder, error)

// PreCommitAccumulator is a structure that can accumulate state during
// the transaction, then has a function that is called just before commit.
type PreCommitAccumulator interface {
Expand Down Expand Up @@ -202,12 +202,17 @@ func (s *Database) QueryTx(ctx context.Context, table string, tx *TXWrapper, q s
// in the read operations (read after insert for example).
tx = GetTXFromContext(ctx)
}
return s.RunAsQueryTx(ctx, table, tx, q.PlaceholderFormat(s.features.PlaceholderFormat))
}

func (s *Database) RunAsQueryTx(ctx context.Context, table string, tx *TXWrapper, q sq.Sqlizer) (*sql.Rows, *TXWrapper, error) {

l := log.L(ctx)
sqlQuery, args, err := q.PlaceholderFormat(s.features.PlaceholderFormat).ToSql()
sqlQuery, args, err := q.ToSql()
if err != nil {
return nil, tx, i18n.WrapError(ctx, err, i18n.MsgDBQueryBuildFailed)
}

before := time.Now()
l.Tracef(`SQL-> query: %s (args: %+v)`, sqlQuery, args)
var rows *sql.Rows
Expand Down
16 changes: 10 additions & 6 deletions pkg/dbsql/mock_provider.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -41,11 +41,12 @@ type MockProvider struct {
}

type MockProviderConfig struct {
FakePSQLInsert bool
OpenError error
GetMigrationDriverError error
IndividualSort bool
MultiRowInsert bool
FakePSQLInsert bool
OpenError error
GetMigrationDriverError error
IndividualSort bool
MultiRowInsert bool
FakePSQLUpsertOptimization bool
}

func NewMockProvider() *MockProvider {
Expand Down Expand Up @@ -87,6 +88,9 @@ func (mp *MockProvider) Features() SQLFeatures {
return fmt.Sprintf(`<acquire lock %s>`, lockName)
}
features.MultiRowInsert = mp.MultiRowInsert
if mp.FakePSQLUpsertOptimization {
features.DBOptimizedUpsertBuilder = BuildPostgreSQLOptimizedUpsert
}
return features
}

Expand Down
Loading

0 comments on commit d4e905e

Please sign in to comment.