Skip to content

Commit

Permalink
refactor(go/adbc/driver): connection and statement bases
Browse files Browse the repository at this point in the history
  • Loading branch information
joellubi committed Mar 4, 2024
1 parent bf42dc6 commit 40be93d
Show file tree
Hide file tree
Showing 19 changed files with 1,131 additions and 617 deletions.
11 changes: 11 additions & 0 deletions go/adbc/adbc.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,17 @@ const (
InfoDriverADBCVersion InfoCode = 103 // DriverADBCVersion
)

type InfoValueTypeCode = arrow.UnionTypeCode

const (
InfoValueStringType InfoValueTypeCode = 0
InfoValueBooleanType InfoValueTypeCode = 1
InfoValueInt64Type InfoValueTypeCode = 2
InfoValueInt32BitmaskType InfoValueTypeCode = 3
InfoValueStringListType InfoValueTypeCode = 4
InfoValueInt32ToInt32ListMapType InfoValueTypeCode = 5
)

type ObjectDepth int

const (
Expand Down
183 changes: 183 additions & 0 deletions go/adbc/driver/driverbase/connection.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package driverbase

import (
"context"
"fmt"

"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow/go/v16/arrow"
"github.com/apache/arrow/go/v16/arrow/array"
"github.com/apache/arrow/go/v16/arrow/memory"
"golang.org/x/exp/slog"
)

const (
ConnectionMessageOptionUnknown = "Unknown connection option"
ConnectionMessageCannotCommit = "Cannot commit when autocommit is enabled"
ConnectionMessageCannotRollback = "Cannot rollback when autocommit is enabled"
)

// ConnectionImpl is an interface that drivers implement to provide
// vendor-specific functionality.
type ConnectionImpl interface {
adbc.Connection
adbc.GetSetOptions
adbc.DatabaseLogging
Base() *ConnectionImplBase
}

// ConnectionImplBase is a struct that provides default implementations of some of the
// methods defined in the ConnectionImpl interface. It is meant to be used as a composite
// struct for a driver's ConnectionImpl implementation.
//
// It is up to the driver implementor to understand the semantics of the default
// behavior provided. For example, in some cases the default implementation may provide
// a fallback value while in other cases it may provide a partial-result which must be
// merged with the driver-specific-result, if any.
type ConnectionImplBase struct {
Alloc memory.Allocator
ErrorHelper ErrorHelper
DriverInfo *DriverInfo
Logger *slog.Logger
}

// NewConnectionImplBase instantiates ConnectionImplBase.
//
// - database is a DatabaseImplBase containing the common resources from the parent
// database, allowing the Arrow allocator, error handler, and logger to be reused.
func NewConnectionImplBase(database *DatabaseImplBase) ConnectionImplBase {
return ConnectionImplBase{Alloc: database.Alloc, ErrorHelper: database.ErrorHelper, DriverInfo: database.DriverInfo, Logger: database.Logger}
}

func (base *ConnectionImplBase) Base() *ConnectionImplBase {
return base
}

func (base *ConnectionImplBase) SetLogger(logger *slog.Logger) {
if logger != nil {
base.Logger = logger
} else {
base.Logger = nilLogger()
}
}

func (base *ConnectionImplBase) Commit(ctx context.Context) error {
return base.ErrorHelper.Errorf(adbc.StatusInvalidState, ConnectionMessageCannotCommit)
}

func (base *ConnectionImplBase) Rollback(context.Context) error {
return base.ErrorHelper.Errorf(adbc.StatusInvalidState, ConnectionMessageCannotRollback)
}

func (base *ConnectionImplBase) GetInfo(ctx context.Context, infoCodes []adbc.InfoCode) (array.RecordReader, error) {

if len(infoCodes) == 0 {
infoCodes = base.DriverInfo.InfoSupportedCodes()
}

bldr := array.NewRecordBuilder(base.Alloc, adbc.GetInfoSchema)
defer bldr.Release()
bldr.Reserve(len(infoCodes))

infoNameBldr := bldr.Field(0).(*array.Uint32Builder)
infoValueBldr := bldr.Field(1).(*array.DenseUnionBuilder)
strInfoBldr := infoValueBldr.Child(int(adbc.InfoValueStringType)).(*array.StringBuilder)
intInfoBldr := infoValueBldr.Child(int(adbc.InfoValueInt64Type)).(*array.Int64Builder)

for _, code := range infoCodes {
switch code {
case adbc.InfoDriverName:
name, ok := base.DriverInfo.GetInfoDriverName()
if !ok {
continue
}

infoNameBldr.Append(uint32(code))
infoValueBldr.Append(adbc.InfoValueStringType)
strInfoBldr.Append(name)
case adbc.InfoDriverVersion:
version, ok := base.DriverInfo.GetInfoDriverVersion()
if !ok {
continue
}

infoNameBldr.Append(uint32(code))
infoValueBldr.Append(adbc.InfoValueStringType)
strInfoBldr.Append(version)
case adbc.InfoDriverArrowVersion:
arrowVersion, ok := base.DriverInfo.GetInfoDriverArrowVersion()
if !ok {
continue
}

infoNameBldr.Append(uint32(code))
infoValueBldr.Append(adbc.InfoValueStringType)
strInfoBldr.Append(arrowVersion)
case adbc.InfoDriverADBCVersion:
adbcVersion, ok := base.DriverInfo.GetInfoDriverADBCVersion()
if !ok {
continue
}

infoNameBldr.Append(uint32(code))
infoValueBldr.Append(adbc.InfoValueInt64Type)
intInfoBldr.Append(adbcVersion)
case adbc.InfoVendorName:
name, ok := base.DriverInfo.GetInfoVendorName()
if !ok {
continue
}

infoNameBldr.Append(uint32(code))
infoValueBldr.Append(adbc.InfoValueStringType)
strInfoBldr.Append(name)
default:
infoNameBldr.Append(uint32(code))
value, ok := base.DriverInfo.GetInfoForInfoCode(code)
if !ok {
infoValueBldr.AppendNull()
continue
}

// TODO: Handle other custom info types
infoValueBldr.Append(adbc.InfoValueStringType)
strInfoBldr.Append(fmt.Sprint(value))
}
}

final := bldr.NewRecord()
defer final.Release()
return array.NewRecordReader(adbc.GetInfoSchema, []arrow.Record{final})
}

func (base *ConnectionImplBase) GetOption(key string) (string, error) {
return "", base.ErrorHelper.Errorf(adbc.StatusNotFound, "%s '%s'", ConnectionMessageOptionUnknown, key)
}

func (base *ConnectionImplBase) GetOptionBytes(key string) ([]byte, error) {
return nil, base.ErrorHelper.Errorf(adbc.StatusNotFound, "%s '%s'", ConnectionMessageOptionUnknown, key)
}

func (base *ConnectionImplBase) GetOptionDouble(key string) (float64, error) {
return 0, base.ErrorHelper.Errorf(adbc.StatusNotFound, "%s '%s'", ConnectionMessageOptionUnknown, key)
}

func (base *ConnectionImplBase) GetOptionInt(key string) (int64, error) {
return 0, base.ErrorHelper.Errorf(adbc.StatusNotFound, "%s '%s'", ConnectionMessageOptionUnknown, key)
}

func (base *ConnectionImplBase) SetOption(key string, val string) error {
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "%s '%s'", ConnectionMessageOptionUnknown, key)
}

func (base *ConnectionImplBase) SetOptionBytes(key string, val []byte) error {
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "%s '%s'", ConnectionMessageOptionUnknown, key)
}

func (base *ConnectionImplBase) SetOptionDouble(key string, val float64) error {
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "%s '%s'", ConnectionMessageOptionUnknown, key)
}

func (base *ConnectionImplBase) SetOptionInt(key string, val int64) error {
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "%s '%s'", ConnectionMessageOptionUnknown, key)
}
120 changes: 36 additions & 84 deletions go/adbc/driver/driverbase/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,135 +18,87 @@
package driverbase

import (
"context"

"github.com/apache/arrow-adbc/go/adbc"
"github.com/apache/arrow/go/v16/arrow/memory"
"golang.org/x/exp/slog"
)

const (
DatabaseMessageOptionUnknown = "Unknown database option"
)

// DatabaseImpl is an interface that drivers implement to provide
// vendor-specific functionality.
type DatabaseImpl interface {
adbc.Database
adbc.GetSetOptions
adbc.DatabaseLogging
Base() *DatabaseImplBase
Open(context.Context) (adbc.Connection, error)
Close() error
SetOptions(map[string]string) error
}

// DatabaseImplBase is a struct that provides default implementations of the
// DatabaseImpl interface. It is meant to be used as a composite struct for a
// driver's DatabaseImpl implementation.
// DatabaseImplBase is a struct that provides default implementations of some of the
// methods defined in the DatabaseImpl interface. It is meant to be used as a composite
// struct for a driver's DatabaseImpl implementation.
//
// It is up to the driver implementor to understand the semantics of the default
// behavior provided. For example, in some cases the default implementation may provide
// a fallback value while in other cases it may provide a partial-result which must be
// merged with the driver-specific-result, if any.
type DatabaseImplBase struct {
Alloc memory.Allocator
ErrorHelper ErrorHelper
DriverInfo *DriverInfo
Logger *slog.Logger
}

// NewDatabaseImplBase instantiates DatabaseImplBase. name is the driver's
// name and is used to construct error messages. alloc is an Arrow allocator
// to use.
// NewDatabaseImplBase instantiates DatabaseImplBase.
//
// - driver is a DriverImplBase containing the common resources from the parent
// driver, allowing the Arrow allocator and error handler to be reused.
func NewDatabaseImplBase(driver *DriverImplBase) DatabaseImplBase {
return DatabaseImplBase{Alloc: driver.Alloc, ErrorHelper: driver.ErrorHelper, Logger: nilLogger()}
return DatabaseImplBase{Alloc: driver.Alloc, ErrorHelper: driver.ErrorHelper, DriverInfo: driver.DriverInfo, Logger: nilLogger()}
}

func (base *DatabaseImplBase) Base() *DatabaseImplBase {
return base
}

func (base *DatabaseImplBase) SetLogger(logger *slog.Logger) {
if logger != nil {
base.Logger = logger
} else {
base.Logger = nilLogger()
}
}

func (base *DatabaseImplBase) GetOption(key string) (string, error) {
return "", base.ErrorHelper.Errorf(adbc.StatusNotFound, "Unknown database option '%s'", key)
return "", base.ErrorHelper.Errorf(adbc.StatusNotFound, "%s '%s'", DatabaseMessageOptionUnknown, key)
}

func (base *DatabaseImplBase) GetOptionBytes(key string) ([]byte, error) {
return nil, base.ErrorHelper.Errorf(adbc.StatusNotFound, "Unknown database option '%s'", key)
return nil, base.ErrorHelper.Errorf(adbc.StatusNotFound, "%s '%s'", DatabaseMessageOptionUnknown, key)
}

func (base *DatabaseImplBase) GetOptionDouble(key string) (float64, error) {
return 0, base.ErrorHelper.Errorf(adbc.StatusNotFound, "Unknown database option '%s'", key)
return 0, base.ErrorHelper.Errorf(adbc.StatusNotFound, "%s '%s'", DatabaseMessageOptionUnknown, key)
}

func (base *DatabaseImplBase) GetOptionInt(key string) (int64, error) {
return 0, base.ErrorHelper.Errorf(adbc.StatusNotFound, "Unknown database option '%s'", key)
return 0, base.ErrorHelper.Errorf(adbc.StatusNotFound, "%s '%s'", DatabaseMessageOptionUnknown, key)
}

func (base *DatabaseImplBase) SetOption(key string, val string) error {
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "Unknown database option '%s'", key)
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "%s '%s'", DatabaseMessageOptionUnknown, key)
}

func (base *DatabaseImplBase) SetOptionBytes(key string, val []byte) error {
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "Unknown database option '%s'", key)
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "%s '%s'", DatabaseMessageOptionUnknown, key)
}

func (base *DatabaseImplBase) SetOptionDouble(key string, val float64) error {
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "Unknown database option '%s'", key)
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "%s '%s'", DatabaseMessageOptionUnknown, key)
}

func (base *DatabaseImplBase) SetOptionInt(key string, val int64) error {
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "Unknown database option '%s'", key)
}

// database is the implementation of adbc.Database.
type database struct {
impl DatabaseImpl
}

// NewDatabase wraps a DatabaseImpl to create an adbc.Database.
func NewDatabase(impl DatabaseImpl) adbc.Database {
return &database{
impl: impl,
}
}

func (db *database) GetOption(key string) (string, error) {
return db.impl.GetOption(key)
}

func (db *database) GetOptionBytes(key string) ([]byte, error) {
return db.impl.GetOptionBytes(key)
}

func (db *database) GetOptionDouble(key string) (float64, error) {
return db.impl.GetOptionDouble(key)
}

func (db *database) GetOptionInt(key string) (int64, error) {
return db.impl.GetOptionInt(key)
}

func (db *database) SetOption(key string, val string) error {
return db.impl.SetOption(key, val)
}

func (db *database) SetOptionBytes(key string, val []byte) error {
return db.impl.SetOptionBytes(key, val)
}

func (db *database) SetOptionDouble(key string, val float64) error {
return db.impl.SetOptionDouble(key, val)
}

func (db *database) SetOptionInt(key string, val int64) error {
return db.impl.SetOptionInt(key, val)
}

func (db *database) Open(ctx context.Context) (adbc.Connection, error) {
return db.impl.Open(ctx)
}

func (db *database) Close() error {
return db.impl.Close()
}

func (db *database) SetLogger(logger *slog.Logger) {
if logger != nil {
db.impl.Base().Logger = logger
} else {
db.impl.Base().Logger = nilLogger()
}
}

func (db *database) SetOptions(opts map[string]string) error {
return db.impl.SetOptions(opts)
return base.ErrorHelper.Errorf(adbc.StatusNotImplemented, "%s '%s'", DatabaseMessageOptionUnknown, key)
}
Loading

0 comments on commit 40be93d

Please sign in to comment.