Skip to content

Commit 0a9ecfd

Browse files
authored
feat(zeromq): add publisher raw block (#1670)
1 parent 74b2214 commit 0a9ecfd

File tree

3 files changed

+93
-3
lines changed

3 files changed

+93
-3
lines changed

www/zmq/publisher_block_info.go

+2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@ func (b *blockInfoPub) onNewBlock(blk *block.Block) {
3333

3434
if err := b.zmqSocket.Send(message); err != nil {
3535
b.logger.Error("zmq publish message error", "err", err, "publisher", b.TopicName())
36+
37+
return
3638
}
3739

3840
b.logger.Debug("zmq published message success",

www/zmq/publisher_raw_block.go

+26-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package zmq
22

33
import (
4+
"bytes"
5+
46
"github.com/go-zeromq/zmq4"
57
"github.com/pactus-project/pactus/types/block"
68
"github.com/pactus-project/pactus/util/logger"
@@ -20,7 +22,28 @@ func newRawBlockPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
2022
}
2123
}
2224

23-
func (*rawBlockPub) onNewBlock(_ *block.Block) {
24-
// TODO implement me
25-
panic("implement me")
25+
func (r *rawBlockPub) onNewBlock(blk *block.Block) {
26+
rawHeader := make([]byte, 0)
27+
buf := bytes.NewBuffer(rawHeader)
28+
29+
if err := blk.Header().Encode(buf); err != nil {
30+
r.logger.Error("failed to encode block header", "err", err, "publisher", r.TopicName())
31+
32+
return
33+
}
34+
35+
rawMsg := r.makeTopicMsg(buf.Bytes(), blk.Height())
36+
message := zmq4.NewMsg(rawMsg)
37+
38+
if err := r.zmqSocket.Send(message); err != nil {
39+
r.logger.Error("zmq publish message error", "err", err, "publisher", r.TopicName())
40+
41+
return
42+
}
43+
44+
r.logger.Debug("zmq published message success",
45+
"publisher", r.TopicName(),
46+
"block_height", blk.Height())
47+
48+
r.seqNo++
2649
}

www/zmq/publisher_raw_block_test.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package zmq
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/binary"
7+
"fmt"
8+
"testing"
9+
10+
"github.com/go-zeromq/zmq4"
11+
"github.com/pactus-project/pactus/types/block"
12+
"github.com/pactus-project/pactus/util/testsuite"
13+
"github.com/stretchr/testify/require"
14+
)
15+
16+
func TestRawBlockPublisher(t *testing.T) {
17+
port := testsuite.FindFreePort()
18+
addr := fmt.Sprintf("tcp://localhost:%d", port)
19+
conf := DefaultConfig()
20+
conf.ZmqPubRawBlock = addr
21+
22+
td := setup(t, conf)
23+
defer td.closeServer()
24+
25+
td.server.Publishers()
26+
27+
sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false))
28+
29+
err := sub.Dial(addr)
30+
require.NoError(t, err)
31+
32+
err = sub.SetOption(zmq4.OptionSubscribe, string(TopicRawBlock.Bytes()))
33+
require.NoError(t, err)
34+
35+
blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight())
36+
37+
td.eventCh <- blk
38+
39+
received, err := sub.Recv()
40+
require.NoError(t, err)
41+
42+
require.NotNil(t, received.Frames)
43+
require.GreaterOrEqual(t, len(received.Frames), 1)
44+
45+
msg := received.Frames[0]
46+
47+
topic := msg[:2]
48+
blockHeader := msg[2 : len(msg)-8]
49+
height := binary.BigEndian.Uint32(msg[140 : len(msg)-4])
50+
seqNo := binary.BigEndian.Uint32(msg[len(msg)-4:])
51+
52+
buf := bytes.NewBuffer(blockHeader)
53+
header := new(block.Header)
54+
55+
require.NoError(t, header.Decode(buf))
56+
57+
require.NotNil(t, header)
58+
require.Equal(t, uint32(0), seqNo)
59+
require.Equal(t, blk.Height(), height)
60+
require.Equal(t, TopicRawBlock.Bytes(), topic)
61+
require.Equal(t, header.PrevBlockHash(), blk.Header().PrevBlockHash())
62+
require.Equal(t, header.StateRoot(), blk.Header().StateRoot())
63+
64+
require.NoError(t, sub.Close())
65+
}

0 commit comments

Comments
 (0)