Skip to content

Commit d59c0c9

Browse files
authored
feat(zeromq): add publisher raw tx (#1672)
1 parent 0a9ecfd commit d59c0c9

10 files changed

+229
-83
lines changed

www/zmq/config.go

-56
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,7 @@
11
package zmq
22

33
import (
4-
"errors"
54
"fmt"
6-
"net/url"
7-
"strings"
85
)
96

107
type Config struct {
@@ -26,62 +23,9 @@ func DefaultConfig() *Config {
2623
}
2724

2825
func (c *Config) BasicCheck() error {
29-
if c.ZmqPubBlockInfo != "" {
30-
if err := validateTopicSocket(c.ZmqPubBlockInfo); err != nil {
31-
return err
32-
}
33-
}
34-
35-
if c.ZmqPubTxInfo != "" {
36-
if err := validateTopicSocket(c.ZmqPubTxInfo); err != nil {
37-
return err
38-
}
39-
}
40-
41-
if c.ZmqPubRawBlock != "" {
42-
if err := validateTopicSocket(c.ZmqPubRawBlock); err != nil {
43-
return err
44-
}
45-
}
46-
47-
if c.ZmqPubRawTx != "" {
48-
if err := validateTopicSocket(c.ZmqPubRawTx); err != nil {
49-
return err
50-
}
51-
}
52-
5326
if c.ZmqPubHWM < 0 {
5427
return fmt.Errorf("invalid publisher hwm %d", c.ZmqPubHWM)
5528
}
5629

5730
return nil
5831
}
59-
60-
func validateTopicSocket(socket string) error {
61-
addr, err := url.Parse(socket)
62-
if err != nil {
63-
return errors.New("failed to parse URL: " + err.Error())
64-
}
65-
66-
if addr.Scheme != "tcp" {
67-
return errors.New("invalid scheme: zeromq socket schema")
68-
}
69-
70-
if addr.Host == "" {
71-
return errors.New("invalid host: host is empty")
72-
}
73-
74-
parts := strings.Split(addr.Host, ":")
75-
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
76-
return errors.New("invalid host: missing or malformed host/port")
77-
}
78-
79-
port := parts[1]
80-
for _, r := range port {
81-
if r < '0' || r > '9' {
82-
return errors.New("invalid port: non-numeric characters detected")
83-
}
84-
}
85-
86-
return nil
87-
}

www/zmq/config_test.go

-21
Original file line numberDiff line numberDiff line change
@@ -34,27 +34,6 @@ func TestBasicCheck(t *testing.T) {
3434
},
3535
expectErr: false,
3636
},
37-
{
38-
name: "Invalid scheme",
39-
config: &Config{
40-
ZmqPubBlockInfo: "udp://127.0.0.1:28332",
41-
},
42-
expectErr: true,
43-
},
44-
{
45-
name: "Missing port",
46-
config: &Config{
47-
ZmqPubBlockInfo: "tcp://127.0.0.1",
48-
},
49-
expectErr: true,
50-
},
51-
{
52-
name: "Empty host",
53-
config: &Config{
54-
ZmqPubTxInfo: "tcp://:28332",
55-
},
56-
expectErr: true,
57-
},
5837
{
5938
name: "Negative ZmqPubHWM",
6039
config: &Config{

www/zmq/publisher_block_info.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (b *blockInfoPub) onNewBlock(blk *block.Block) {
3737
return
3838
}
3939

40-
b.logger.Debug("zmq published message success",
40+
b.logger.Debug("ZMQ published the message successfully",
4141
"publisher", b.TopicName(),
4242
"block_height", blk.Height())
4343

www/zmq/publisher_raw_block.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (r *rawBlockPub) onNewBlock(blk *block.Block) {
4141
return
4242
}
4343

44-
r.logger.Debug("zmq published message success",
44+
r.logger.Debug("ZMQ published the message successfully",
4545
"publisher", r.TopicName(),
4646
"block_height", blk.Height())
4747

www/zmq/publisher_raw_tx.go

+24-3
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,28 @@ func newRawTxPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
2020
}
2121
}
2222

23-
func (*rawTxPub) onNewBlock(_ *block.Block) {
24-
// TODO implement me
25-
panic("implement me")
23+
func (r *rawTxPub) onNewBlock(blk *block.Block) {
24+
for _, tx := range blk.Transactions() {
25+
buf, err := tx.Bytes()
26+
if err != nil {
27+
r.logger.Error("failed to serializing raw tx", "err", err, "topic", r.TopicName())
28+
29+
return
30+
}
31+
32+
rawMsg := r.makeTopicMsg(buf, blk.Height())
33+
message := zmq4.NewMsg(rawMsg)
34+
35+
if err := r.zmqSocket.Send(message); err != nil {
36+
r.logger.Error("zmq publish message error", "err", err, "publisher", r.TopicName())
37+
38+
return
39+
}
40+
41+
r.logger.Debug("ZMQ published the message successfully",
42+
"publisher", r.TopicName(),
43+
"block_height", blk.Height())
44+
45+
r.seqNo++
46+
}
2647
}

www/zmq/publisher_raw_tx_test.go

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package zmq
2+
3+
import (
4+
"context"
5+
"encoding/binary"
6+
"fmt"
7+
"testing"
8+
9+
"github.com/go-zeromq/zmq4"
10+
"github.com/pactus-project/pactus/types/tx"
11+
"github.com/pactus-project/pactus/util/testsuite"
12+
"github.com/stretchr/testify/require"
13+
)
14+
15+
func TestRawTxPublisher(t *testing.T) {
16+
port := testsuite.FindFreePort()
17+
addr := fmt.Sprintf("tcp://localhost:%d", port)
18+
conf := DefaultConfig()
19+
conf.ZmqPubRawTx = addr
20+
21+
td := setup(t, conf)
22+
defer td.closeServer()
23+
24+
td.server.Publishers()
25+
26+
sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false))
27+
28+
err := sub.Dial(addr)
29+
require.NoError(t, err)
30+
31+
err = sub.SetOption(zmq4.OptionSubscribe, string(TopicRawTransaction.Bytes()))
32+
require.NoError(t, err)
33+
34+
blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight())
35+
36+
td.eventCh <- blk
37+
38+
for i := 0; i < len(blk.Transactions()); i++ {
39+
received, err := sub.Recv()
40+
require.NoError(t, err)
41+
42+
require.NotNil(t, received.Frames)
43+
require.GreaterOrEqual(t, len(received.Frames), 1)
44+
45+
msg := received.Frames[0]
46+
47+
topic := msg[:2]
48+
rawTx := msg[2 : len(msg)-8]
49+
50+
blockNumberOffset := len(msg) - 8
51+
height := binary.BigEndian.Uint32(msg[blockNumberOffset : blockNumberOffset+4])
52+
seqNo := binary.BigEndian.Uint32(msg[len(msg)-4:])
53+
54+
txn, err := tx.FromBytes(rawTx)
55+
require.NoError(t, err)
56+
require.NotNil(t, txn)
57+
58+
require.Equal(t, TopicRawTransaction.Bytes(), topic)
59+
require.Equal(t, height, blk.Height())
60+
require.Equal(t, uint32(i), seqNo)
61+
require.NotEqual(t, 0, txn.SerializeSize())
62+
}
63+
64+
require.NoError(t, sub.Close())
65+
}

www/zmq/publisher_test.go

+111
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
package zmq
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/binary"
7+
"fmt"
8+
"testing"
9+
10+
"github.com/go-zeromq/zmq4"
11+
"github.com/pactus-project/pactus/crypto/hash"
12+
"github.com/pactus-project/pactus/types/block"
13+
"github.com/pactus-project/pactus/types/tx"
14+
"github.com/pactus-project/pactus/util/testsuite"
15+
"github.com/stretchr/testify/require"
16+
)
17+
18+
func TestPublisherOnSameSockets(t *testing.T) {
19+
port := testsuite.FindFreePort()
20+
addr := fmt.Sprintf("tcp://localhost:%d", port)
21+
conf := DefaultConfig()
22+
conf.ZmqPubRawTx = addr
23+
conf.ZmqPubTxInfo = addr
24+
conf.ZmqPubRawBlock = addr
25+
conf.ZmqPubBlockInfo = addr
26+
27+
td := setup(t, conf)
28+
defer td.closeServer()
29+
30+
td.server.Publishers()
31+
32+
sub := zmq4.NewSub(context.TODO(), zmq4.WithAutomaticReconnect(false))
33+
34+
err := sub.Dial(addr)
35+
require.NoError(t, err)
36+
37+
err = sub.SetOption(zmq4.OptionSubscribe, string(TopicTransactionInfo.Bytes()))
38+
require.NoError(t, err)
39+
40+
err = sub.SetOption(zmq4.OptionSubscribe, string(TopicRawTransaction.Bytes()))
41+
require.NoError(t, err)
42+
43+
err = sub.SetOption(zmq4.OptionSubscribe, string(TopicBlockInfo.Bytes()))
44+
require.NoError(t, err)
45+
46+
err = sub.SetOption(zmq4.OptionSubscribe, string(TopicRawBlock.Bytes()))
47+
require.NoError(t, err)
48+
49+
blk, _ := td.TestSuite.GenerateTestBlock(td.RandHeight())
50+
51+
td.eventCh <- blk
52+
53+
for i := 0; i < (len(blk.Transactions())*2)+2; i++ {
54+
received, err := sub.Recv()
55+
require.NoError(t, err)
56+
57+
require.NotNil(t, received.Frames)
58+
require.GreaterOrEqual(t, len(received.Frames), 1)
59+
60+
msg := received.Frames[0]
61+
62+
topic := TopicFromBytes(msg[:2])
63+
blockNumberOffset := len(msg) - 8
64+
height := binary.BigEndian.Uint32(msg[blockNumberOffset : blockNumberOffset+4])
65+
seqNo := binary.BigEndian.Uint32(msg[len(msg)-4:])
66+
t.Logf("[%s] %d", topic, seqNo)
67+
68+
require.Equal(t, height, blk.Height())
69+
70+
switch topic {
71+
case TopicRawTransaction:
72+
rawTx := msg[2 : len(msg)-8]
73+
74+
txn, err := tx.FromBytes(rawTx)
75+
76+
require.NoError(t, err)
77+
require.NotNil(t, txn)
78+
require.Equal(t, TopicRawTransaction, topic)
79+
require.NotEqual(t, 0, txn.SerializeSize())
80+
case TopicTransactionInfo:
81+
txHash := msg[2:34]
82+
id, err := hash.FromBytes(txHash)
83+
84+
require.NoError(t, err)
85+
require.NotNil(t, id)
86+
require.Equal(t, TopicTransactionInfo, topic)
87+
88+
case TopicRawBlock:
89+
blockHeader := msg[2 : len(msg)-8]
90+
buf := bytes.NewBuffer(blockHeader)
91+
header := new(block.Header)
92+
93+
require.NoError(t, header.Decode(buf))
94+
require.NotNil(t, header)
95+
require.Equal(t, TopicRawBlock, topic)
96+
require.Equal(t, header.PrevBlockHash(), blk.Header().PrevBlockHash())
97+
require.Equal(t, header.StateRoot(), blk.Header().StateRoot())
98+
case TopicBlockInfo:
99+
proposerBytes := msg[2:23]
100+
timestamp := binary.BigEndian.Uint32(msg[23:27])
101+
txCount := binary.BigEndian.Uint16(msg[27:29])
102+
103+
require.Equal(t, TopicBlockInfo, topic)
104+
require.Equal(t, blk.Header().ProposerAddress().Bytes(), proposerBytes)
105+
require.Equal(t, blk.Header().UnixTime(), timestamp)
106+
require.Equal(t, uint16(len(blk.Transactions())), txCount)
107+
}
108+
}
109+
110+
require.NoError(t, sub.Close())
111+
}

www/zmq/publisher_tx_info.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ func (t *txInfoPub) onNewBlock(blk *block.Block) {
3131
continue
3232
}
3333

34-
t.logger.Debug("zmq published message success",
34+
t.logger.Debug("ZMQ published the message successfully",
3535
"publisher", t.TopicName(),
3636
"block_height", blk.Height(),
3737
"tx_hash", txn.ID().String(),

www/zmq/topic.go

+8
Original file line numberDiff line numberDiff line change
@@ -36,3 +36,11 @@ func (t Topic) Bytes() []byte {
3636

3737
return b
3838
}
39+
40+
func TopicFromBytes(b []byte) Topic {
41+
if len(b) < 2 {
42+
return 0
43+
}
44+
45+
return Topic(binary.BigEndian.Uint16(b))
46+
}

www/zmq/topic_test.go

+18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package zmq
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/require"
7+
)
8+
9+
func TestTopicFromBytes(t *testing.T) {
10+
validRawTopic := TopicRawTransaction.Bytes()
11+
invalidRawTopic := make([]byte, 0)
12+
13+
topic := TopicFromBytes(validRawTopic)
14+
require.Equal(t, TopicRawTransaction, topic)
15+
16+
topic = TopicFromBytes(invalidRawTopic)
17+
require.Equal(t, 0, int(topic))
18+
}

0 commit comments

Comments
 (0)