Skip to content

Commit 74b2214

Browse files
authored
feat(zeromq): add publisher transaction info (#1669)
1 parent 10d6a0d commit 74b2214

File tree

3 files changed

+79
-4
lines changed

3 files changed

+79
-4
lines changed

www/zmq/publisher.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (b *basePub) HWM() int {
4444
// - Message body (varies based on provided parts)
4545
// - Sequence number (4 Bytes).
4646
func (b *basePub) makeTopicMsg(parts ...any) []byte {
47-
result := make([]byte, 0, 64)
47+
result := make([]byte, 0)
4848

4949
// Append Topic ID to the message (2 Bytes)
5050
result = append(result, b.topic.Bytes()...)

www/zmq/publisher_tx_info.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,23 @@ func newTxInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
2020
}
2121
}
2222

23-
func (*txInfoPub) onNewBlock(_ *block.Block) {
24-
// TODO implement me
25-
panic("implement me")
23+
func (t *txInfoPub) onNewBlock(blk *block.Block) {
24+
for _, txn := range blk.Transactions() {
25+
rawMsg := t.makeTopicMsg(txn.ID().Bytes(), blk.Height())
26+
message := zmq4.NewMsg(rawMsg)
27+
28+
if err := t.zmqSocket.Send(message); err != nil {
29+
t.logger.Error("zmq publish message error", "err", err, "publisher", t.TopicName())
30+
31+
continue
32+
}
33+
34+
t.logger.Debug("zmq published message success",
35+
"publisher", t.TopicName(),
36+
"block_height", blk.Height(),
37+
"tx_hash", txn.ID().String(),
38+
)
39+
40+
t.seqNo++
41+
}
2642
}

www/zmq/publisher_tx_info_test.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package zmq
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"fmt"
7+
"testing"
8+
9+
"github.com/go-zeromq/zmq4"
10+
"github.com/pactus-project/pactus/util/testsuite"
11+
"github.com/stretchr/testify/require"
12+
)
13+
14+
func TestTxInfoPublisher(t *testing.T) {
15+
port := testsuite.FindFreePort()
16+
addr := fmt.Sprintf("tcp://localhost:%d", port)
17+
conf := DefaultConfig()
18+
conf.ZmqPubTxInfo = addr
19+
20+
td := setup(t, conf)
21+
defer td.closeServer()
22+
23+
td.server.Publishers()
24+
25+
sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false))
26+
27+
err := sub.Dial(addr)
28+
require.NoError(t, err)
29+
30+
err = sub.SetOption(zmq4.OptionSubscribe, string(TopicTransactionInfo.Bytes()))
31+
require.NoError(t, err)
32+
33+
blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight())
34+
35+
td.eventCh <- blk
36+
37+
for i := 0; i < len(blk.Transactions()); i++ {
38+
received, err := sub.Recv()
39+
require.NoError(t, err)
40+
41+
require.NotNil(t, received.Frames)
42+
require.GreaterOrEqual(t, len(received.Frames), 1)
43+
44+
msg := received.Frames[0]
45+
require.Len(t, msg, 42)
46+
47+
topic := msg[:2]
48+
txHash := msg[2:34]
49+
height := binary.BigEndian.Uint32(msg[34:38])
50+
seqNo := binary.BigEndian.Uint32(msg[38:])
51+
52+
require.Equal(t, TopicTransactionInfo.Bytes(), topic)
53+
require.Equal(t, blk.Transactions()[i].ID().Bytes(), txHash)
54+
require.Equal(t, blk.Height(), height)
55+
require.Equal(t, uint32(i), seqNo)
56+
}
57+
58+
require.NoError(t, sub.Close())
59+
}

0 commit comments

Comments
 (0)