Skip to content

Commit

Permalink
Merge pull request #1652 from kaleido-io/certmisatch-gauages
Browse files Browse the repository at this point in the history
[metrics] [dataexchange] [networkmap] DXConnect Callbacks for Node Identity Check Metrics
  • Loading branch information
peterbroadhurst authored Mar 4, 2025
2 parents 7f9364d + a5cb23a commit e486b2e
Show file tree
Hide file tree
Showing 49 changed files with 1,465 additions and 148 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ __debug*
!deploy/charts/firefly
containerlogs
.vscode/*.log
.idea
doc-site/site
.idea/
doc-site/site
*.iml
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ GOGC=30

all: build test go-mod-tidy
test: deps lint
$(VGO) test ./internal/... ./pkg/... ./cmd/... ./doc-site ./ffconfig/... -cover -coverprofile=coverage.txt -covermode=atomic -timeout=30s ${TEST_ARGS}
$(VGO) test ./internal/... ./pkg/... ./cmd/... ./doc-site ./ffconfig/... -cover -coverprofile=coverage.txt -covermode=atomic -timeout=45s ${TEST_ARGS}
coverage.html:
$(VGO) tool cover -html=coverage.txt
coverage: test coverage.html
Expand Down
208 changes: 202 additions & 6 deletions go.work.sum

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,4 @@ func TestGetSubscriptionEventsFilteredNoSequenceIDsProvided(t *testing.T) {

r.ServeHTTP(res, req)
assert.Equal(t, 200, res.Result().StatusCode)
}
}
3 changes: 3 additions & 0 deletions internal/coremsgs/en_error_messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,4 +318,7 @@ var (
MsgDuplicateContractListenerFilterLocation = ffe("FF10477", "Duplicate filter provided for contract listener for location", 400)
MsgInvalidNamespaceForOperationUpdate = ffe("FF10478", "Received different namespace for operation update '%s' than expected for manager '%s'")
MsgEmptyPluginForOperationUpdate = ffe("FF10479", "Received empty plugin for operation update '%s'")
MsgInvalidIdentityPatch = ffe("FF10480", "A profile must be provided when updating an identity", 400)
MsgNodeNotProvidedForCheck = ffe("FF10481", "Node not provided for check", 500)
MsgNodeMissingProfile = ffe("FF10482", "Node provided for check does not have a profile", 500)
)
5 changes: 2 additions & 3 deletions internal/database/sqlcommon/event_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func TestGetEventsInSequenceRangeE2EWithDB(t *testing.T) {
Type: core.EventTypeMessageConfirmed,
Reference: fftypes.NewUUID(),
Correlator: fftypes.NewUUID(),
Topic: fmt.Sprintf("topic%d", i % 2),
Topic: fmt.Sprintf("topic%d", i%2),
Created: fftypes.Now(),
}
err := s.InsertEvent(ctx, event)
Expand Down Expand Up @@ -322,10 +322,9 @@ func TestGetEventsInSequenceRangeBuildQueryFail(t *testing.T) {

func TestGetEventsInSequenceRangeShouldCallGetEventsWhenNoSequencedProvidedAndThrowAnError(t *testing.T) {
s, mock := newMockProvider().init()
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id", }).AddRow("only one"))
mock.ExpectQuery("SELECT .*").WillReturnRows(sqlmock.NewRows([]string{"id"}).AddRow("only one"))
f := database.EventQueryFactory.NewFilter(context.Background()).And()
_, _, err := s.GetEventsInSequenceRange(context.Background(), "ns1", f, -1, -1)
assert.NotNil(t, err)
assert.NoError(t, mock.ExpectationsWereMet())
}

105 changes: 104 additions & 1 deletion internal/dataexchange/ffdx/ffdx.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,17 @@ package ffdx

import (
"context"
"crypto/x509"
"encoding/json"
"encoding/pem"
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

"github.com/hyperledger/firefly/internal/metrics"

"github.com/go-resty/resty/v2"
"github.com/hyperledger/firefly-common/pkg/config"
Expand Down Expand Up @@ -54,6 +60,8 @@ type FFDX struct {
retry *retry.Retry
backgroundStart bool
backgroundRetry *retry.Retry

metrics metrics.Manager // optional
}

type dxNode struct {
Expand Down Expand Up @@ -168,7 +176,7 @@ func (h *FFDX) Name() string {
return "ffdx"
}

func (h *FFDX) Init(ctx context.Context, cancelCtx context.CancelFunc, config config.Section) (err error) {
func (h *FFDX) Init(ctx context.Context, cancelCtx context.CancelFunc, config config.Section, metrics metrics.Manager) (err error) {
h.ctx = log.WithLogField(ctx, "dx", "https")
h.cancelCtx = cancelCtx
h.ackChannel = make(chan *ack)
Expand All @@ -179,6 +187,7 @@ func (h *FFDX) Init(ctx context.Context, cancelCtx context.CancelFunc, config co
}
h.needsInit = config.GetBool(DataExchangeInitEnabled)
h.nodes = make(map[string]*dxNode)
h.metrics = metrics

if config.GetString(ffresty.HTTPConfigURL) == "" {
return i18n.NewError(ctx, coremsgs.MsgMissingPluginConfig, "url", "dataexchange.ffdx")
Expand Down Expand Up @@ -295,6 +304,11 @@ func (h *FFDX) beforeConnect(ctx context.Context, w wsclient.WSClient) error {
return fmt.Errorf("DX returned non-ready status: %s", status.Status)
}
}

for _, cb := range h.callbacks.handlers {
cb.DXConnect(h)
}

h.initialized = true
return nil
}
Expand Down Expand Up @@ -448,6 +462,95 @@ func (h *FFDX) TransferBlob(ctx context.Context, nsOpID string, peer, sender fft
return nil
}

func (h *FFDX) CheckNodeIdentityStatus(ctx context.Context, node *core.Identity) error {
if err := h.checkInitialized(ctx); err != nil {
return err
}

if node == nil {
return i18n.NewError(ctx, coremsgs.MsgNodeNotProvidedForCheck)
}

var mismatchState = metrics.NodeIdentityDXCertMismatchStatusUnknown
defer func() {
if h.metrics != nil && h.metrics.IsMetricsEnabled() {
h.metrics.NodeIdentityDXCertMismatch(node.Namespace, mismatchState)
}
log.L(ctx).Debugf("Identity status checked against DX node='%s' mismatch_state='%s'", node.Name, mismatchState)
}()

dxPeer, err := h.GetEndpointInfo(ctx, node.Name) // should be the same as the local node
if err != nil {
return err
}

dxPeerCert := dxPeer.GetString("cert")
// if this occurs, it is either a misconfigured / broken DX or likely a DX that is compatible from an API perspective
// but does not have the same peer info as the HTTPS mTLS DX
if dxPeerCert == "" {
log.L(ctx).Debugf("DX peer does not have a 'cert', DX plugin may be unsupported")
return nil
}

expiry, err := extractSoonestExpiryFromCertBundle(strings.ReplaceAll(dxPeerCert, `\n`, "\n"))
if err == nil {
if expiry.Before(time.Now()) {
log.L(ctx).Warnf("DX certificate for node '%s' has expired", node.Name)
}

if h.metrics != nil && h.metrics.IsMetricsEnabled() {
h.metrics.NodeIdentityDXCertExpiry(node.Namespace, expiry)
}
} else {
log.L(ctx).Errorf("Failed to find x509 cert within DX cert bundle node='%s' namespace='%s'", node.Name, node.Namespace)
}

if node.Profile == nil {
return i18n.NewError(ctx, coremsgs.MsgNodeNotProvidedForCheck)
}

nodeCert := node.Profile.GetString("cert")
if nodeCert != "" {
mismatchState = metrics.NodeIdentityDXCertMismatchStatusHealthy
if dxPeerCert != nodeCert {
log.L(ctx).Warnf("DX certificate for node '%s' is out-of-sync with on-chain identity", node.Name)
mismatchState = metrics.NodeIdentityDXCertMismatchStatusMismatched
}
}

return nil
}

// We assume the cert with the soonest expiry is the leaf cert, but even if its the CA,
// that's what will invalidate the leaf anyways, so really we only care about the soonest expiry.
// So we loop through the bundle finding the soonest expiry, not necessarily the leaf.
func extractSoonestExpiryFromCertBundle(certBundle string) (time.Time, error) {
var expiringCert *x509.Certificate
var block *pem.Block
var rest = []byte(certBundle)

for {
block, rest = pem.Decode(rest)
if block == nil {
break
}

cert, err := x509.ParseCertificate(block.Bytes)
if err != nil {
return time.Time{}, fmt.Errorf("failed to parse non-certificate within bundle: %v", err)
}
if expiringCert == nil || cert.NotAfter.Before(expiringCert.NotAfter) {
expiringCert = cert
}
}

if expiringCert == nil {
return time.Time{}, errors.New("no valid certificate found")
}

return expiringCert.NotAfter.UTC(), nil
}

func (h *FFDX) ackLoop() {
for {
select {
Expand Down
Loading

0 comments on commit e486b2e

Please sign in to comment.