Skip to content

Commit 10d6a0d

Browse files
authored
feat(zeromq): add block info publisher (#1666)
1 parent d0e66e5 commit 10d6a0d

16 files changed

+225
-158
lines changed

node/node.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,6 @@ func (n *Node) Stop() {
167167
// Wait for network to stop
168168
time.Sleep(1 * time.Second)
169169

170-
close(n.eventCh)
171170
n.consMgr.Stop()
172171
n.sync.Stop()
173172
n.state.Close()
@@ -176,6 +175,8 @@ func (n *Node) Stop() {
176175
n.http.StopServer()
177176
n.jsonrpc.StopServer()
178177
n.zeromq.Close()
178+
179+
close(n.eventCh)
179180
}
180181

181182
// these methods are using by GUI.

state/state.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -752,5 +752,8 @@ func (st *state) publishEvent(msg any) {
752752
return
753753
}
754754

755-
st.eventCh <- msg
755+
select {
756+
case st.eventCh <- msg:
757+
default:
758+
}
756759
}

util/testsuite/testsuite.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -868,7 +868,7 @@ func (*TestSuite) HelperSignTransaction(prv crypto.PrivateKey, trx *tx.Tx) {
868868
trx.SetPublicKey(prv.PublicKey())
869869
}
870870

871-
func (*TestSuite) FindFreePort() int {
871+
func FindFreePort() int {
872872
listener, _ := net.Listen("tcp", "localhost:0")
873873
defer func() {
874874
_ = listener.Close()

www/zmq/block_info_publisher.go

-26
This file was deleted.

www/zmq/config.go

+6-11
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ import (
55
"fmt"
66
"net/url"
77
"strings"
8-
"time"
98
)
109

1110
type Config struct {
@@ -14,19 +13,15 @@ type Config struct {
1413
ZmqPubRawBlock string `toml:"zmqpubrawblock"`
1514
ZmqPubRawTx string `toml:"zmqpubrawtx"`
1615
ZmqPubHWM int `toml:"zmqpubhwm"`
17-
18-
// Private config
19-
ZmqAutomaticReconnect bool `toml:"-"`
20-
ZmqDialerRetryTime time.Duration `toml:"-"`
21-
ZmqDialerMaxRetries int `toml:"-"`
2216
}
2317

2418
func DefaultConfig() *Config {
2519
return &Config{
26-
ZmqAutomaticReconnect: true,
27-
ZmqDialerMaxRetries: 10,
28-
ZmqDialerRetryTime: 250 * time.Millisecond,
29-
ZmqPubHWM: 1000,
20+
ZmqPubBlockInfo: "",
21+
ZmqPubTxInfo: "",
22+
ZmqPubRawBlock: "",
23+
ZmqPubRawTx: "",
24+
ZmqPubHWM: 1000,
3025
}
3126
}
3227

@@ -65,7 +60,7 @@ func (c *Config) BasicCheck() error {
6560
func validateTopicSocket(socket string) error {
6661
addr, err := url.Parse(socket)
6762
if err != nil {
68-
return errors.New("failed to parse ZmqPub value: " + err.Error())
63+
return errors.New("failed to parse URL: " + err.Error())
6964
}
7065

7166
if addr.Scheme != "tcp" {

www/zmq/config_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ func TestDefaultConfig(t *testing.T) {
1818
}
1919

2020
func TestBasicCheck(t *testing.T) {
21-
testCases := []struct {
21+
tests := []struct {
2222
name string
2323
config *Config
2424
expectErr bool
@@ -51,7 +51,7 @@ func TestBasicCheck(t *testing.T) {
5151
{
5252
name: "Empty host",
5353
config: &Config{
54-
ZmqPubBlockInfo: "tcp://:28332",
54+
ZmqPubTxInfo: "tcp://:28332",
5555
},
5656
expectErr: true,
5757
},
@@ -69,10 +69,10 @@ func TestBasicCheck(t *testing.T) {
6969
},
7070
}
7171

72-
for _, tc := range testCases {
73-
t.Run(tc.name, func(t *testing.T) {
74-
err := tc.config.BasicCheck()
75-
if tc.expectErr {
72+
for _, tt := range tests {
73+
t.Run(tt.name, func(t *testing.T) {
74+
err := tt.config.BasicCheck()
75+
if tt.expectErr {
7676
assert.Error(t, err, "BasicCheck should return an error")
7777
} else {
7878
assert.NoError(t, err, "BasicCheck should not return an error")

www/zmq/publisher.go

+44
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,25 @@
11
package zmq
22

33
import (
4+
"encoding/binary"
5+
46
"github.com/go-zeromq/zmq4"
7+
"github.com/pactus-project/pactus/crypto"
58
"github.com/pactus-project/pactus/types/block"
69
"github.com/pactus-project/pactus/util/logger"
710
)
811

912
type Publisher interface {
1013
Address() string
1114
TopicName() string
15+
HWM() int
1216

1317
onNewBlock(blk *block.Block)
1418
}
1519

1620
type basePub struct {
1721
topic Topic
22+
seqNo uint32
1823
zmqSocket zmq4.Socket
1924
logger *logger.SubLogger
2025
}
@@ -26,3 +31,42 @@ func (b *basePub) Address() string {
2631
func (b *basePub) TopicName() string {
2732
return b.topic.String()
2833
}
34+
35+
func (b *basePub) HWM() int {
36+
hwmOpt, _ := b.zmqSocket.GetOption(zmq4.OptionHWM)
37+
38+
return hwmOpt.(int)
39+
}
40+
41+
// makeTopicMsg constructs a ZMQ message with a topic ID, message body, and sequence number.
42+
// The message is constructed as a byte slice with the following structure:
43+
// - Topic ID (2 Bytes)
44+
// - Message body (varies based on provided parts)
45+
// - Sequence number (4 Bytes).
46+
func (b *basePub) makeTopicMsg(parts ...any) []byte {
47+
result := make([]byte, 0, 64)
48+
49+
// Append Topic ID to the message (2 Bytes)
50+
result = append(result, b.topic.Bytes()...)
51+
52+
// Append message body based on the provided parts
53+
for _, part := range parts {
54+
switch castedVal := part.(type) {
55+
case crypto.Address:
56+
result = append(result, castedVal.Bytes()...)
57+
case []byte:
58+
result = append(result, castedVal...)
59+
case uint32:
60+
result = binary.BigEndian.AppendUint32(result, castedVal)
61+
case uint16:
62+
result = binary.BigEndian.AppendUint16(result, castedVal)
63+
default:
64+
panic("implement me!!")
65+
}
66+
}
67+
68+
// Append sequence number to the message (4 Bytes, Big Endian encoding)
69+
result = binary.BigEndian.AppendUint32(result, b.seqNo)
70+
71+
return result
72+
}

www/zmq/publisher_block_info.go

+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package zmq
2+
3+
import (
4+
"github.com/go-zeromq/zmq4"
5+
"github.com/pactus-project/pactus/types/block"
6+
"github.com/pactus-project/pactus/util/logger"
7+
)
8+
9+
type blockInfoPub struct {
10+
basePub
11+
}
12+
13+
func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
14+
return &blockInfoPub{
15+
basePub: basePub{
16+
topic: TopicBlockInfo,
17+
seqNo: 0,
18+
zmqSocket: socket,
19+
logger: logger,
20+
},
21+
}
22+
}
23+
24+
func (b *blockInfoPub) onNewBlock(blk *block.Block) {
25+
rawMsg := b.makeTopicMsg(
26+
blk.Header().ProposerAddress(),
27+
blk.Header().UnixTime(),
28+
uint16(len(blk.Transactions())),
29+
blk.Height(),
30+
)
31+
32+
message := zmq4.NewMsg(rawMsg)
33+
34+
if err := b.zmqSocket.Send(message); err != nil {
35+
b.logger.Error("zmq publish message error", "err", err, "publisher", b.TopicName())
36+
}
37+
38+
b.logger.Debug("zmq published message success",
39+
"publisher", b.TopicName(),
40+
"block_height", blk.Height())
41+
42+
b.seqNo++
43+
}

www/zmq/publisher_block_info_test.go

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package zmq
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"fmt"
7+
"testing"
8+
9+
"github.com/go-zeromq/zmq4"
10+
"github.com/pactus-project/pactus/util/testsuite"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestBlockInfoPublisher(t *testing.T) {
15+
port := testsuite.FindFreePort()
16+
addr := fmt.Sprintf("tcp://localhost:%d", port)
17+
conf := DefaultConfig()
18+
conf.ZmqPubBlockInfo = addr
19+
20+
td := setup(t, conf)
21+
defer td.closeServer()
22+
23+
td.server.Publishers()
24+
25+
sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false))
26+
27+
err := sub.Dial(addr)
28+
require.NoError(t, err)
29+
30+
err = sub.SetOption(zmq4.OptionSubscribe, string(TopicBlockInfo.Bytes()))
31+
require.NoError(t, err)
32+
33+
blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight())
34+
35+
td.eventCh <- blk
36+
37+
received, err := sub.Recv()
38+
require.NoError(t, err)
39+
40+
require.NotNil(t, received.Frames)
41+
require.GreaterOrEqual(t, len(received.Frames), 1)
42+
43+
msg := received.Frames[0]
44+
require.Len(t, msg, 37)
45+
46+
topic := msg[:2]
47+
proposerBytes := msg[2:23]
48+
timestamp := binary.BigEndian.Uint32(msg[23:27])
49+
txCount := binary.BigEndian.Uint16(msg[27:29])
50+
height := binary.BigEndian.Uint32(msg[29:33])
51+
seqNo := binary.BigEndian.Uint32(msg[33:])
52+
53+
require.Equal(t, TopicBlockInfo.Bytes(), topic)
54+
require.Equal(t, blk.Header().ProposerAddress().Bytes(), proposerBytes)
55+
require.Equal(t, blk.Header().UnixTime(), timestamp)
56+
require.Equal(t, uint16(len(blk.Transactions())), txCount)
57+
require.Equal(t, blk.Height(), height)
58+
require.Equal(t, uint32(0), seqNo)
59+
60+
require.NoError(t, sub.Close())
61+
}

www/zmq/raw_block_publisher.go www/zmq/publisher_raw_block.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type rawBlockPub struct {
1313
func newRawBlockPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
1414
return &rawBlockPub{
1515
basePub: basePub{
16-
topic: RawBlock,
16+
topic: TopicRawBlock,
1717
zmqSocket: socket,
1818
logger: logger,
1919
},

www/zmq/raw_tx_publisher.go www/zmq/publisher_raw_tx.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type rawTxPub struct {
1313
func newRawTxPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
1414
return &rawTxPub{
1515
basePub: basePub{
16-
topic: RawTransaction,
16+
topic: TopicRawTransaction,
1717
zmqSocket: socket,
1818
logger: logger,
1919
},

www/zmq/publisher_test.go

-29
This file was deleted.

www/zmq/tx_info_publisher.go www/zmq/publisher_tx_info.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ type txInfoPub struct {
1313
func newTxInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
1414
return &txInfoPub{
1515
basePub: basePub{
16-
topic: TransactionInfo,
16+
topic: TopicTransactionInfo,
1717
zmqSocket: socket,
1818
logger: logger,
1919
},

0 commit comments

Comments
 (0)