Skip to content

Commit

Permalink
Merge pull request #134 from kaleido-io/add-monitoring-routes
Browse files Browse the repository at this point in the history
add monitoring routes
  • Loading branch information
Chengxuan authored Feb 24, 2025
2 parents b66a96a + a99198d commit 3e2fb0e
Show file tree
Hide file tree
Showing 26 changed files with 272 additions and 135 deletions.
4 changes: 2 additions & 2 deletions cmd/client_eventstreams_delete.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -30,7 +30,7 @@ func clientEventStreamsDeleteCommand(clientFactory func() (apiclient.FFTMClient,
Use: "delete",
Short: "Delete event streams",
Long: "",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
client, err := clientFactory()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/client_eventstreams_list.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -30,7 +30,7 @@ func clientEventStreamsListCommand(clientFactory func() (apiclient.FFTMClient, e
Use: "list",
Short: "List event streams",
Long: "",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
client, err := clientFactory()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/client_listeners_delete.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -30,7 +30,7 @@ func clientListenersDeleteCommand(clientFactory func() (apiclient.FFTMClient, er
Use: "delete",
Short: "Delete event streams",
Long: "",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
client, err := clientFactory()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/client_listeners_list.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -30,7 +30,7 @@ func clientListenersListCommand(clientFactory func() (apiclient.FFTMClient, erro
Use: "list",
Short: "List listeners",
Long: "",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
client, err := clientFactory()
if err != nil {
return err
Expand Down
4 changes: 2 additions & 2 deletions cmd/migrate.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -41,7 +41,7 @@ func buildLeveldb2postgresCommand(initConfig func() error) *cobra.Command {
leveldb2postgresEventStreamsCmd := &cobra.Command{
Use: "leveldb2postgres",
Short: "Migrate from LevelDB to PostgreSQL persistence",
RunE: func(cmd *cobra.Command, args []string) error {
RunE: func(_ *cobra.Command, _ []string) error {
if err := initConfig(); err != nil {
return err
}
Expand Down
44 changes: 42 additions & 2 deletions config.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,8 @@
|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|address|The IP address on which the metrics HTTP API should listen|`int`|`127.0.0.1`
|enabled|Enables the metrics API|`boolean`|`false`
|path|The path from which to serve the Prometheus metrics|`string`|`/metrics`
|enabled|Deprecated: Please use 'monitoring.enabled' instead|`boolean`|`false`
|path|Deprecated: Please use 'monitoring.metricsPath' instead|`string`|`/metrics`
|port|The port on which the metrics HTTP API should listen|`int`|`6000`
|publicURL|The fully qualified public URL for the metrics API. This is used for building URLs in HTTP responses and in OpenAPI Spec generation|URL `string`|`<nil>`
|readTimeout|The maximum time to wait when reading from an HTTP connection|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s`
Expand Down Expand Up @@ -210,6 +210,46 @@
|keyFile|The path to the private key file for TLS on this API|`string`|`<nil>`
|requiredDNAttributes|A set of required subject DN attributes. Each entry is a regular expression, and the subject certificate must have a matching attribute of the specified type (CN, C, O, OU, ST, L, STREET, POSTALCODE, SERIALNUMBER are valid attributes)|`map[string]string`|`<nil>`

## monitoring

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|address|Listener address|`int`|`127.0.0.1`
|enabled|Enables the monitoring APIs|`boolean`|`false`
|metricsPath|The path from which to serve the Prometheus metrics|`string`|`/metrics`
|port|Listener port|`int`|`6000`
|publicURL|Externally available URL for the HTTP endpoint|`string`|`<nil>`
|readTimeout|HTTP server read timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s`
|shutdownTimeout|HTTP server shutdown timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`10s`
|writeTimeout|HTTP server write timeout|[`time.Duration`](https://pkg.go.dev/time#Duration)|`15s`

## monitoring.auth

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|type|The auth plugin to use for server side authentication of requests|`string`|`<nil>`

## monitoring.auth.basic

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|passwordfile|The path to a .htpasswd file to use for authenticating requests. Passwords should be hashed with bcrypt.|`string`|`<nil>`

## monitoring.tls

|Key|Description|Type|Default Value|
|---|-----------|----|-------------|
|ca|The TLS certificate authority in PEM format (this option is ignored if caFile is also set)|`string`|`<nil>`
|caFile|The path to the CA file for TLS on this API|`string`|`<nil>`
|cert|The TLS certificate in PEM format (this option is ignored if certFile is also set)|`string`|`<nil>`
|certFile|The path to the certificate file for TLS on this API|`string`|`<nil>`
|clientAuth|Enables or disables client auth for TLS on this API|`string`|`<nil>`
|enabled|Enables or disables TLS on this API|`boolean`|`false`
|insecureSkipHostVerify|When to true in unit test development environments to disable TLS verification. Use with extreme caution|`boolean`|`<nil>`
|key|The TLS certificate key in PEM format (this option is ignored if keyFile is also set)|`string`|`<nil>`
|keyFile|The path to the private key file for TLS on this API|`string`|`<nil>`
|requiredDNAttributes|A set of required subject DN attributes. Each entry is a regular expression, and the subject certificate must have a matching attribute of the specified type (CN, C, O, OU, ST, L, STREET, POSTALCODE, SERIALNUMBER are valid attributes)|`map[string]string`|`<nil>`

## persistence

|Key|Description|Type|Default Value|
Expand Down
11 changes: 6 additions & 5 deletions internal/apiclient/eventstreams.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand All @@ -18,6 +18,7 @@ package apiclient

import (
"context"
"errors"
"fmt"
"regexp"

Expand All @@ -31,7 +32,7 @@ func (c *fftmClient) GetEventStreams(ctx context.Context) ([]apitypes.EventStrea
SetResult(&eventStreams).
Get("eventstreams")
if !resp.IsSuccess() {
return nil, fmt.Errorf(string(resp.Body()))
return nil, errors.New(string(resp.Body()))
}
return eventStreams, err
}
Expand All @@ -43,7 +44,7 @@ func (c *fftmClient) GetListeners(ctx context.Context, eventStreamID string) ([]
SetResult(&listeners).
Get(fmt.Sprintf("eventstreams/%s/listeners", eventStreamID))
if !resp.IsSuccess() {
return nil, fmt.Errorf(string(resp.Body()))
return nil, errors.New(string(resp.Body()))
}
return listeners, err
}
Expand All @@ -56,7 +57,7 @@ func (c *fftmClient) DeleteEventStream(ctx context.Context, eventStreamID string
return err
}
if !resp.IsSuccess() {
return fmt.Errorf(string(resp.Body()))
return errors.New(string(resp.Body()))
}
return nil
}
Expand Down Expand Up @@ -92,7 +93,7 @@ func (c *fftmClient) DeleteListener(ctx context.Context, eventStreamID, listener
return err
}
if !resp.IsSuccess() {
return fmt.Errorf(string(resp.Body()))
return errors.New(string(resp.Body()))
}
return nil
}
Expand Down
3 changes: 2 additions & 1 deletion internal/confirmations/confirmations.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -311,6 +311,7 @@ func (bcm *blockConfirmationManager) getBlockByHash(blockHash string) (*apitypes

func (bcm *blockConfirmationManager) getBlockByNumber(blockNumber uint64, allowCache bool, expectedParentHash string) (*apitypes.BlockInfo, error) {
res, reason, err := bcm.connector.BlockInfoByNumber(bcm.ctx, &ffcapi.BlockInfoByNumberRequest{
//nolint:gosec
BlockNumber: fftypes.NewFFBigInt(int64(blockNumber)),
AllowCache: allowCache,
ExpectedParentHash: expectedParentHash,
Expand Down
3 changes: 2 additions & 1 deletion internal/confirmations/confirmed_block_listener.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -357,6 +357,7 @@ func (cbl *confirmedBlockListener) dispatchAllConfirmed() {
BlockEvent: &ffcapi.BlockEvent{
ListenerID: cbl.id,
BlockInfo: ffcapi.BlockInfo{
//nolint:gosec
BlockNumber: fftypes.NewFFBigInt(int64(block.BlockNumber)),
BlockHash: block.BlockHash,
ParentHash: block.ParentHash,
Expand Down
13 changes: 10 additions & 3 deletions internal/events/eventstream.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -65,6 +65,7 @@ var esDefaults struct {
}

func InitDefaults() {
//nolint:gosec
esDefaults.batchSize = config.GetInt64(tmconfig.EventStreamsDefaultsBatchSize)
esDefaults.batchTimeout = fftypes.FFDuration(config.GetDuration(tmconfig.EventStreamsDefaultsBatchTimeout))
esDefaults.errorHandling = fftypes.FFEnum(config.GetString(tmconfig.EventStreamsDefaultsErrorHandling))
Expand Down Expand Up @@ -218,6 +219,7 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda

// Batch timeout
if updates.EthCompatBatchTimeoutMS != nil {
//nolint:gosec
dv := fftypes.FFDuration(*updates.EthCompatBatchTimeoutMS) * fftypes.FFDuration(time.Millisecond)
changed = apitypes.CheckUpdateDuration(changed, &merged.BatchTimeout, base.BatchTimeout, &dv, esDefaults.batchTimeout)
} else {
Expand All @@ -226,6 +228,7 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda

// Retry timeout
if updates.EthCompatRetryTimeoutSec != nil {
//nolint:gosec
dv := fftypes.FFDuration(*updates.EthCompatRetryTimeoutSec) * fftypes.FFDuration(time.Second)
changed = apitypes.CheckUpdateDuration(changed, &merged.RetryTimeout, base.RetryTimeout, &dv, esDefaults.retryTimeout)
} else {
Expand All @@ -234,6 +237,7 @@ func mergeValidateEsConfig(ctx context.Context, base *apitypes.EventStream, upda

// Blocked retry delay
if updates.EthCompatBlockedRetryDelaySec != nil {
//nolint:gosec
dv := fftypes.FFDuration(*updates.EthCompatBlockedRetryDelaySec) * fftypes.FFDuration(time.Second)
changed = apitypes.CheckUpdateDuration(changed, &merged.BlockedRetryDelay, base.BlockedRetryDelay, &dv, esDefaults.blockedRetryDelay)
} else {
Expand Down Expand Up @@ -509,7 +513,8 @@ func (es *eventStream) Start(ctx context.Context) error {
startTime: fftypes.Now(),
eventLoopDone: make(chan struct{}),
batchLoopDone: make(chan struct{}),
updates: make(chan *ffcapi.ListenerEvent, int(*es.spec.BatchSize)),
//nolint:gosec
updates: make(chan *ffcapi.ListenerEvent, int(*es.spec.BatchSize)),
}
startedState.ctx, startedState.cancelCtx = context.WithCancel(es.bgCtx)
es.currentState = startedState
Expand Down Expand Up @@ -781,6 +786,8 @@ func (es *eventStream) checkConfirmedEventForBatch(e *ffcapi.ListenerEvent) (l *
func (es *eventStream) batchLoop(startedState *startedStreamState) {
defer close(startedState.batchLoopDone)
ctx := startedState.ctx

//nolint:gosec
maxSize := int(*es.spec.BatchSize)
batchNumber := int64(0)

Expand Down Expand Up @@ -955,7 +962,7 @@ func (es *eventStream) writeCheckpoint(startedState *startedStreamState, batch *
}

// We only return if the context is cancelled, or the checkpoint succeeds
return es.retry.Do(startedState.ctx, "checkpoint", func(attempt int) (retry bool, err error) {
return es.retry.Do(startedState.ctx, "checkpoint", func(_ int) (retry bool, err error) {
return true, es.persistence.WriteCheckpoint(startedState.ctx, cp)
})
}
Expand Down
9 changes: 6 additions & 3 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -58,7 +58,7 @@ func NewMetricsManager(ctx context.Context) Metrics {
)
mm := &metricsManager{
ctx: ctx,
metricsEnabled: config.GetBool(tmconfig.MetricsEnabled),
metricsEnabled: config.GetBool(tmconfig.DeprecatedMetricsEnabled) || config.GetBool(tmconfig.MonitoringEnabled),
timeMap: make(map[string]time.Time),
metricsRegistry: metricsRegistry,
eventsMetricsManager: eventsMetricsManager,
Expand All @@ -74,7 +74,10 @@ func (mm *metricsManager) IsMetricsEnabled() bool {
}

func (mm *metricsManager) HTTPHandler() http.Handler {
httpHandler, _ := mm.metricsRegistry.HTTPHandler(mm.ctx, promhttp.HandlerOpts{})
httpHandler, err := mm.metricsRegistry.HTTPHandler(mm.ctx, promhttp.HandlerOpts{})
if err != nil {
panic(err)
}
return httpHandler
}

Expand Down
7 changes: 4 additions & 3 deletions internal/persistence/leveldb/leveldb_persistence.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -478,6 +478,7 @@ func (p *leveldbPersistence) InsertTransactionWithNextNonce(ctx context.Context,
// From this point on, we will guide this transaction through to submission.
// We return an "ack" at this point, and dispatch the work of getting the transaction submitted
// to the background worker.
//nolint:gosec // Safe conversion as nonce is always positive
tx.Nonce = fftypes.NewFFBigInt(int64(lockedNonce.nonce))

if err = p.writeTransaction(ctx, &apitypes.TXWithStatus{
Expand Down Expand Up @@ -586,7 +587,7 @@ func (p *leveldbPersistence) UpdateTransaction(ctx context.Context, txID string,
return p.writeTransaction(ctx, tx, false)
}

func (p *leveldbPersistence) writeTransaction(ctx context.Context, tx *apitypes.TXWithStatus, new bool) (err error) {
func (p *leveldbPersistence) writeTransaction(ctx context.Context, tx *apitypes.TXWithStatus, newTx bool) (err error) {
// We take a write-lock here, because we are writing multiple values (the indexes), and anybody
// attempting to read the critical nonce allocation index must know the difference between a partial write
// (we crashed before we completed all the writes) and an incomplete write that's in process.
Expand All @@ -608,7 +609,7 @@ func (p *leveldbPersistence) writeTransaction(ctx context.Context, tx *apitypes.
return i18n.NewError(ctx, tmmsgs.MsgPersistenceTXIncomplete)
}
idKey := txDataKey(tx.ID)
if new {
if newTx {
if tx.SequenceID != "" {
// for new transactions sequence ID should always be generated by persistence layer
// as the format of its value is persistence service specific
Expand Down
3 changes: 2 additions & 1 deletion internal/persistence/postgres/sqlpersistence.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -103,6 +103,7 @@ func (p *sqlPersistence) RichQuery() persistence.RichQuery {
}

func (p *sqlPersistence) seqAfterFilter(ctx context.Context, qf *ffapi.QueryFields, after *int64, limit int, dir txhandler.SortDirection, conditions ...ffapi.Filter) (filter ffapi.Filter) {
//nolint:gosec // Safe conversion as limit is always positive
fb := qf.NewFilterLimit(ctx, uint64(limit))
if after != nil {
if dir == txhandler.SortDirectionDescending {
Expand Down
6 changes: 4 additions & 2 deletions internal/persistence/postgres/transaction_writer.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2024 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -98,7 +98,8 @@ func newTransactionWriter(bgCtx context.Context, p *sqlPersistence, conf config.
batchMaxSize := conf.GetInt(ConfigTXWriterBatchSize)
cacheSlots := conf.GetInt(ConfigTXWriterCacheSlots)
tw = &transactionWriter{
p: p,
p: p,
//nolint:gosec // Safe conversion as workerCount is always positive
workerCount: uint32(workerCount),
batchTimeout: conf.GetDuration(ConfigTXWriterBatchTimeout),
batchMaxSize: batchMaxSize,
Expand Down Expand Up @@ -354,6 +355,7 @@ func (tw *transactionWriter) assignNonces(ctx context.Context, txInsertsByFrom m
}
}
log.L(ctx).Infof("Assigned nonce %s / %d to %s", signer, cacheEntry.nextNonce, op.txInsert.ID)
//nolint:gosec
op.txInsert.Nonce = fftypes.NewFFBigInt(int64(cacheEntry.nextNonce))
cacheEntry.nextNonce++
tw.nextNonceCache.Add(signer, cacheEntry)
Expand Down
3 changes: 2 additions & 1 deletion internal/persistence/postgres/transactions.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2025 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -141,6 +141,7 @@ func (p *sqlPersistence) ListTransactionsByCreateTime(ctx context.Context, after
}

func (p *sqlPersistence) ListTransactionsByNonce(ctx context.Context, signer string, after *fftypes.FFBigInt, limit int, dir txhandler.SortDirection) ([]*apitypes.ManagedTX, error) {
//nolint:gosec // Safe conversion as limit is always positive
fb := persistence.TransactionFilters.NewFilterLimit(ctx, uint64(limit))
conditions := []ffapi.Filter{
fb.Eq("from", signer),
Expand Down
Loading

0 comments on commit 3e2fb0e

Please sign in to comment.