Skip to content

Commit

Permalink
lint and license
Browse files Browse the repository at this point in the history
  • Loading branch information
timwu20 committed Feb 24, 2025
1 parent 76fb97e commit ba5deda
Show file tree
Hide file tree
Showing 17 changed files with 108 additions and 25 deletions.
19 changes: 14 additions & 5 deletions internal/client/network-gossip/bridge.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package gossip

import (
Expand All @@ -19,8 +22,8 @@ var logger = log.NewFromGlobal(log.AddContext("pkg", "client/network-gossip"))
// In the scenario where messages have been received from the network but a subscribed message sink is not ready to
// receiver, we delay 10 ms and will remove the channel from the sinks if the message is not consumed by the end of
// the delay. To model this process a gossip engine can be in two forwarding states: idle, and busy.

// GossipEngine utilizes and implementation of [Network] and provides gossiping capabilities on
//
// GossipEngine utilises and implementation of [Network] and provides gossiping capabilities on
// top of it.
type GossipEngine[H runtime.Hash, N runtime.Number, Hasher runtime.Hasher[H]] struct {
stateMachine consensusGossip[H, Hasher]
Expand Down Expand Up @@ -65,7 +68,12 @@ func NewGossipEngine[H runtime.Hash, N runtime.Number, Hasher runtime.Hasher[H]]
validator Validator[H],
) GossipEngine[H, N, Hasher] {
ge := newGossipEngine[H, N, Hasher](network, sync, notificationService, protocol, validator)
go ge.poll()
go func() {
err := ge.poll()
if err != nil {
panic(err)
}
}()
return ge
}

Expand Down Expand Up @@ -163,7 +171,7 @@ func (ge *GossipEngine[H, N, Hasher]) Announce(block H, associatedData []byte) {
ge.sync.AnnounceBlock(block, associatedData)
}

func (ge *GossipEngine[H, N, Hasher]) poll() error {
func (ge *GossipEngine[H, N, Hasher]) poll() error { //nolint: gocyclo
var nextNotificationEvent <-chan service.NotificationEvent
// outer:
for {
Expand Down Expand Up @@ -197,7 +205,8 @@ func (ge *GossipEngine[H, N, Hasher]) poll() error {
case service.NotificationEventNotificationStreamClosed:
ge.stateMachine.PeerDisconnected(ge.notificationService, event.Peer)
case service.NotificationEventNotificationReceived:
toForward := ge.stateMachine.OnIncoming(ge.network, ge.notificationService, event.Peer, [][]byte{event.Notification})
toForward := ge.stateMachine.OnIncoming(
ge.network, ge.notificationService, event.Peer, [][]byte{event.Notification})
ge.forwardingState = forwardingStateBusy[H](toForward)
default:
panic("unreachable")
Expand Down
14 changes: 11 additions & 3 deletions internal/client/network-gossip/bridge_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package gossip

import (
Expand Down Expand Up @@ -144,7 +147,9 @@ func (ao TestValidator) NewPeer(context ValidatorContext[hash.H256], who peerid.
}
func (ao TestValidator) PeerDisconnected(context ValidatorContext[hash.H256], who peerid.PeerID) {
}
func (ao TestValidator) Validate(context ValidatorContext[hash.H256], sender peerid.PeerID, data []byte) ValidationResult {
func (ao TestValidator) Validate(
context ValidatorContext[hash.H256], sender peerid.PeerID, data []byte,
) ValidationResult {
return ValidationResultProcessAndKeep[hash.H256]{
Hash: hash.H256(data[0:32]),
}
Expand All @@ -154,7 +159,9 @@ func (ao TestValidator) MessageExpired() func(topic hash.H256, message []byte) b
return false
}
}
func (ao TestValidator) MessageAllowed() func(who peerid.PeerID, intent MessageIntent, topic hash.H256, data []byte) bool {
func (ao TestValidator) MessageAllowed() func(
who peerid.PeerID, intent MessageIntent, topic hash.H256, data []byte,
) bool {
return func(who peerid.PeerID, intent MessageIntent, topic hash.H256, data []byte) bool {
return true
}
Expand Down Expand Up @@ -296,7 +303,8 @@ func TestGossipEngine(t *testing.T) {
if !ok {
gossipEngine.messageSinks[topicChan.Topic] = make([]chan TopicNotification, 0)
}
gossipEngine.messageSinks[topicChan.Topic] = append(gossipEngine.messageSinks[topicChan.Topic], topicChan.Chan)
gossipEngine.messageSinks[topicChan.Topic] = append(
gossipEngine.messageSinks[topicChan.Topic], topicChan.Chan)
}

// Register the remote peer.
Expand Down
3 changes: 3 additions & 0 deletions internal/client/network-gossip/network_gossip.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package gossip

import (
Expand Down
40 changes: 31 additions & 9 deletions internal/client/network-gossip/state_machine.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package gossip

import (
Expand Down Expand Up @@ -137,7 +140,10 @@ func (h hasher[K]) Hash(key K) uint32 {
}

// Create a new instance using the given validator.
func newConsensusGossip[H runtime.Hash, Hasher runtime.Hasher[H]](validator Validator[H], protocol network.ProtocolName) consensusGossip[H, Hasher] {
func newConsensusGossip[H runtime.Hash, Hasher runtime.Hasher[H]](
validator Validator[H],
protocol network.ProtocolName,
) consensusGossip[H, Hasher] {
h := hasher[H]{maphash.NewHasher[H]()}
knownMessages, err := freelru.New[H, any](knownMessageCacheSize, h.Hash)
if err != nil {
Expand All @@ -154,15 +160,21 @@ func newConsensusGossip[H runtime.Hash, Hasher runtime.Hasher[H]](validator Vali
}

// Handle new connected peer.
func (cg *consensusGossip[H, Hasher]) NewPeer(notificationService service.NotificationService, who peerid.PeerID, role role.ObservedRole) {
func (cg *consensusGossip[H, Hasher]) NewPeer(
notificationService service.NotificationService,
who peerid.PeerID,
role role.ObservedRole,
) {
cg.peers[who] = peerConsensus[H]{knownMessages: make(map[H]any)}

validator := cg.validator
context := newtorkContext[H, Hasher]{gossip: cg, notificationService: notificationService}
validator.NewPeer(context, who, role)
}

func (cg *consensusGossip[H, Hasher]) registerMessageHashed(messageHash H, topic H, message []byte, sender *peerid.PeerID) {
func (cg *consensusGossip[H, Hasher]) registerMessageHashed(
messageHash H, topic H, message []byte, sender *peerid.PeerID,
) {
cg.knownMessages.Add(messageHash, nil)
cg.messages = append(cg.messages, messageEntry[H]{
messageHash: messageHash,
Expand All @@ -182,7 +194,9 @@ func (cg *consensusGossip[H, Hasher]) RegisterMessage(topic H, message []byte) {
}

// Call when a peer has been disconnected to stop tracking gossip status.
func (cg *consensusGossip[H, Hasher]) PeerDisconnected(notificationService service.NotificationService, who peerid.PeerID) {
func (cg *consensusGossip[H, Hasher]) PeerDisconnected(
notificationService service.NotificationService, who peerid.PeerID,
) {
validator := cg.validator
context := newtorkContext[H, Hasher]{gossip: cg, notificationService: notificationService}
validator.PeerDisconnected(context, who)
Expand All @@ -205,7 +219,9 @@ func (cg *consensusGossip[H, Hasher]) rebroadcast(notificationService service.No
}

// Broadcast all messages with given topic.
func (cg *consensusGossip[H, Hasher]) BroadcastTopic(notificationService service.NotificationService, topic H, force bool) {
func (cg *consensusGossip[H, Hasher]) BroadcastTopic(
notificationService service.NotificationService, topic H, force bool,
) {
var messages []messageEntry[H]
for _, entry := range cg.messages {
if entry.topic == topic {
Expand Down Expand Up @@ -240,7 +256,7 @@ func (cg *consensusGossip[H, Hasher]) CollectGarbage() {
// TODO: expired messages metric

for id, peer := range cg.peers {
for h, _ := range peer.knownMessages {
for h := range peer.knownMessages {
if _, ok := knownMessages.Get(h); !ok {
delete(peer.knownMessages, h)
}
Expand Down Expand Up @@ -335,7 +351,9 @@ func (cg *consensusGossip[H, Hasher]) OnIncoming(
}

// Send all messages with given topic to a peer.
func (cg *consensusGossip[H, Hasher]) SendTopic(notificationService service.NotificationService, who peerid.PeerID, topic H, force bool) {
func (cg *consensusGossip[H, Hasher]) SendTopic(
notificationService service.NotificationService, who peerid.PeerID, topic H, force bool,
) {
messageAllowed := cg.validator.MessageAllowed()

if peer, ok := cg.peers[who]; ok {
Expand Down Expand Up @@ -363,7 +381,9 @@ func (cg *consensusGossip[H, Hasher]) SendTopic(notificationService service.Noti
}

// Multicast a message to all peers.
func (cg *consensusGossip[H, Hasher]) Multicast(notificationService service.NotificationService, topic H, message []byte, force bool) {
func (cg *consensusGossip[H, Hasher]) Multicast(
notificationService service.NotificationService, topic H, message []byte, force bool,
) {
messageHash := (*new(Hasher)).Hash(message)
cg.registerMessageHashed(messageHash, topic, message, nil)
var intent MessageIntent = MessageIntentBroadcast
Expand All @@ -382,7 +402,9 @@ func (cg *consensusGossip[H, Hasher]) Multicast(notificationService service.Noti
}

// Send addressed message to a peer. The message is not kept or multicast later on.
func (cg *consensusGossip[H, Hasher]) SendMessage(notificationService service.NotificationService, who peerid.PeerID, message []byte) {
func (cg *consensusGossip[H, Hasher]) SendMessage(
notificationService service.NotificationService, who peerid.PeerID, message []byte,
) {
peer, ok := cg.peers[who]
if !ok {
return
Expand Down
7 changes: 6 additions & 1 deletion internal/client/network-gossip/state_machine_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package gossip

import (
Expand Down Expand Up @@ -191,7 +194,9 @@ func (NoOpNotificationService) MessageSink(peer peerid.PeerID) service.MessageSi

var _ service.NotificationService = NoOpNotificationService{}

func pushMessage(consensus *consensusGossip[hash.H256, runtime.BlakeTwo256], topic hash.H256, h hash.H256, message []byte) {
func pushMessage(
consensus *consensusGossip[hash.H256, runtime.BlakeTwo256], topic hash.H256, h hash.H256, message []byte,
) {
consensus.knownMessages.Add(h, nil)
consensus.messages = append(consensus.messages, messageEntry[hash.H256]{
messageHash: h,
Expand Down
3 changes: 3 additions & 0 deletions internal/client/network-gossip/validator.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package gossip

import (
Expand Down
3 changes: 3 additions & 0 deletions internal/client/network/config/config.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package config

import (
Expand Down
5 changes: 4 additions & 1 deletion internal/client/network/event/event.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package event

import (
Expand Down Expand Up @@ -26,7 +29,7 @@ type DHTEventValuePut kad.Key
// DHTEventValuePutFailed means an error has occurred while putting a record into the DHT.
type DHTEventValuePutFailed kad.Key

// DHTEventStartProvidingFailed means an error occured while registering as a content provider on the DHT.
// DHTEventStartProvidingFailed means an error occurred while registering as a content provider on the DHT.
type DHTEventStartProvidingFailed kad.Key

// DHTEventPutRecordRequest means the DHT received a put record request.
Expand Down
3 changes: 3 additions & 0 deletions internal/client/network/network.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package network

// The protocol name transmitted on the wire.
Expand Down
5 changes: 4 additions & 1 deletion internal/client/network/role/role.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package role

// Role that the peer sent to us during the handshake, with the addition of what our local node knows about that peer.
Expand Down Expand Up @@ -25,7 +28,7 @@ const (
RoleAuthority
)

// Roles are a bitmask of the roles that a node fulfills.
// Roles are a bitmask of the roles that a node fulfils.
type Roles uint8

const (
Expand Down
11 changes: 7 additions & 4 deletions internal/client/network/service/service.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package service

import (
Expand All @@ -15,7 +18,7 @@ type NetworkSyncForkRequest[BlockHash, BlockNumber any] interface {
//
// If the given slice of peers is empty then the underlying implementation
// should make a best effort to fetch the block from any peers it is
// connected to (NOTE: this assumption will change in the future #3629).
// connected to.
SetSyncForkRequest(peers []peerid.PeerID, hash BlockHash, number BlockNumber)
}

Expand Down Expand Up @@ -201,7 +204,7 @@ type NotificationEventNotificationReceived struct {

func (NotificationEventNotificationReceived) isNotificationEvent() {}

// NotificationService is the notification service. It defines behaviors that both the protocol implementations and
// NotificationService is the notification service. It defines behaviours that both the protocol implementations and
// NotificationService can expect from each other.
//
// NotificationService can send two different kinds of information to protocol:
Expand All @@ -214,7 +217,7 @@ func (NotificationEventNotificationReceived) isNotificationEvent() {}
// [ValidationResultAccept] or [ValidationResultReject].
//
// After the validation result has been received by NotificationService, it prepares the substream for communication by
// initializing the necessary sinks and emits [NotificationEventNotificationStreamOpened] which informs the protocol
// initialising the necessary sinks and emits [NotificationEventNotificationStreamOpened] which informs the protocol
// that the remote peer is ready to receive notifications.
//
// Both local and remote peer can close the substream at any time. Local peer can do so by calling CloseSubstream which
Expand Down Expand Up @@ -269,7 +272,7 @@ type NotificationService interface {
// notifications to the remote peer.
//
// Use of this API is discouraged as it's not as performant as sending notifications through [NotificationService] due
// to synchronization required to keep the underlying notification sink up to date with possible sink replacement
// to synchronisation required to keep the underlying notification sink up to date with possible sink replacement
// events.
type MessageSink interface {
// Send synchronous notification to the peer associated with this MessageSink.
Expand Down
3 changes: 3 additions & 0 deletions internal/client/network/sync/sync.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package sync

import (
Expand Down
3 changes: 3 additions & 0 deletions internal/client/network/types/kad/kad.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package kad

import (
Expand Down
3 changes: 3 additions & 0 deletions internal/client/network/types/multiaddr/multiaddr.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package multiaddr

import (
Expand Down
3 changes: 3 additions & 0 deletions internal/client/network/types/multihash/multihash.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package multihash

import "github.com/multiformats/go-multihash"
Expand Down
5 changes: 4 additions & 1 deletion internal/client/network/types/peer-id/peer_id.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package peerid

import (
Expand Down Expand Up @@ -80,7 +83,7 @@ func NewRandomPeerID() PeerID {
}
}

// Create a [PeerID] parsed from bytes.
// NewPeerID creates a [PeerID] parsed from bytes.
func NewPeerID(data []byte) (PeerID, error) {
peerID, err := peer.IDFromBytes(data)
if err != nil {
Expand Down
3 changes: 3 additions & 0 deletions internal/client/network/types/peer-id/peer_id_test.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
// Copyright 2025 ChainSafe Systems (ON)
// SPDX-License-Identifier: LGPL-3.0-only

package peerid

import (
Expand Down

0 comments on commit ba5deda

Please sign in to comment.