Skip to content

Commit 5f05122

Browse files
authored
feat(other): add zeromq server with configration (#1660)
1 parent 2ae3504 commit 5f05122

19 files changed

+654
-0
lines changed

.github/workflows/semantic-pr.yml

+1
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ jobs:
6060
http
6161
jsonrpc
6262
nanomsg
63+
zeromq
6364
windows
6465
linux
6566
macos

config/config.go

+11
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"github.com/pactus-project/pactus/www/http"
2121
"github.com/pactus-project/pactus/www/jsonrpc"
2222
"github.com/pactus-project/pactus/www/nanomsg"
23+
"github.com/pactus-project/pactus/www/zmq"
2324
"github.com/pelletier/go-toml/v2"
2425
)
2526

@@ -47,6 +48,7 @@ type Config struct {
4748
HTTP *http.Config `toml:"http"`
4849
WalletManager *wallet.Config `toml:"-"`
4950
Nanomsg *nanomsg.Config `toml:"nanomsg"`
51+
ZeroMq *zmq.Config `toml:"zeromq"`
5052
}
5153

5254
type BootstrapInfo struct {
@@ -99,6 +101,7 @@ func defaultConfig() *Config {
99101
JSONRPC: jsonrpc.DefaultConfig(),
100102
HTTP: http.DefaultConfig(),
101103
Nanomsg: nanomsg.DefaultConfig(),
104+
ZeroMq: zmq.DefaultConfig(),
102105
WalletManager: wallet.DefaultConfig(),
103106
}
104107

@@ -219,6 +222,11 @@ func DefaultConfigLocalnet() *Config {
219222
conf.HTTP.EnablePprof = true
220223
conf.Nanomsg.Enable = true
221224
conf.Nanomsg.Listen = "tcp://[::]:40799"
225+
conf.ZeroMq.ZmqPubBlockInfo = "tcp://127.0.0.1:28332"
226+
conf.ZeroMq.ZmqPubTxInfo = "tcp://127.0.0.1:28333"
227+
conf.ZeroMq.ZmqPubRawBlock = "tcp://127.0.0.1:28334"
228+
conf.ZeroMq.ZmqPubRawTx = "tcp://127.0.0.1:28335"
229+
conf.ZeroMq.ZmqPubHWM = 1000
222230

223231
return conf
224232
}
@@ -296,6 +304,9 @@ func (conf *Config) BasicCheck() error {
296304
if err := conf.GRPC.BasicCheck(); err != nil {
297305
return err
298306
}
307+
if err := conf.ZeroMq.BasicCheck(); err != nil {
308+
return err
309+
}
299310

300311
return conf.HTTP.BasicCheck()
301312
}

config/example_config.toml

+30
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@
177177
_pool = 'error'
178178
_state = 'info'
179179
_sync = 'error'
180+
_zmq = 'info'
180181
default = 'info'
181182

182183
# `grpc` contains configuration of the gRPC module.
@@ -246,3 +247,32 @@
246247

247248
# `listen` is the address for incoming connections to the nanomsg server.
248249
listen = 'tcp://127.0.0.1:40899'
250+
251+
# ZeroMQ configuration.
252+
[zeromq]
253+
254+
# `zmqpubblockinfo` specifies the address for publishing block info notifications.
255+
# Example: 'tcp://127.0.0.1:28332'
256+
# Default is '', meaning the topic is disabled
257+
zmqpubblockinfo = ''
258+
259+
# `zmqpubtxinfo` specifies the address for publishing transaction info notifications.
260+
# Example: 'tcp://127.0.0.1:28332'
261+
# Default is '', meaning the topic is disabled
262+
zmqpubtxinfo = ''
263+
264+
# `zmqpubrawblock` specifies the address for publishing raw block notifications.
265+
# Example: 'tcp://127.0.0.1:28332'
266+
# Default is '', meaning the topic is disabled
267+
zmqpubrawblock = ''
268+
269+
# `zmqpubrawtx` specifies the address for publishing raw transaction notifications.
270+
# Example: 'tcp://127.0.0.1:28332'
271+
# Default is '', meaning the topic is disabled
272+
zmqpubrawtx = ''
273+
274+
# `zmqpubhwm` defines the High Watermark (HWM) for ZeroMQ message pipes.
275+
# This parameter determines the maximum number of messages ZeroMQ can buffer before blocking the publishing of further messages.
276+
# The watermark is applied uniformly to all active topics.
277+
# Default is 1000
278+
zmqpubhwm = 1000

go.mod

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ require (
88
github.com/c-bata/go-prompt v0.2.6
99
github.com/consensys/gnark-crypto v0.14.0
1010
github.com/fxamacker/cbor/v2 v2.7.0
11+
github.com/go-zeromq/zmq4 v0.17.0
1112
github.com/gofrs/flock v0.12.1
1213
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
1314
github.com/google/uuid v1.6.0
@@ -68,6 +69,7 @@ require (
6869
github.com/go-logr/logr v1.4.2 // indirect
6970
github.com/go-logr/stdr v1.2.2 // indirect
7071
github.com/go-task/slim-sprig/v3 v3.0.0 // indirect
72+
github.com/go-zeromq/goczmq/v4 v4.2.2 // indirect
7173
github.com/godbus/dbus/v5 v5.1.0 // indirect
7274
github.com/gogo/protobuf v1.3.2 // indirect
7375
github.com/golang/snappy v0.0.4 // indirect

go.sum

+4
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/me
116116
github.com/go-task/slim-sprig/v3 v3.0.0 h1:sUs3vkvUymDpBKi3qH1YSqBQk9+9D/8M2mN1vB6EwHI=
117117
github.com/go-task/slim-sprig/v3 v3.0.0/go.mod h1:W848ghGpv3Qj3dhTPRyJypKRiqCdHZiAzKg9hl15HA8=
118118
github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0=
119+
github.com/go-zeromq/goczmq/v4 v4.2.2 h1:HAJN+i+3NW55ijMJJhk7oWxHKXgAuSBkoFfvr8bYj4U=
120+
github.com/go-zeromq/goczmq/v4 v4.2.2/go.mod h1:Sm/lxrfxP/Oxqs0tnHD6WAhwkWrx+S+1MRrKzcxoaYE=
121+
github.com/go-zeromq/zmq4 v0.17.0 h1:r12/XdqPeRbuaF4C3QZJeWCt7a5vpJbslDH1rTXF+Kc=
122+
github.com/go-zeromq/zmq4 v0.17.0/go.mod h1:EQxjJD92qKnrsVMzAnx62giD6uJIPi1dMGZ781iCDtY=
119123
github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
120124
github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA=
121125
github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk=

util/logger/config.go

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ func DefaultConfig() *Config {
2929
conf.Levels["_grpc"] = "info"
3030
conf.Levels["_nonomsg"] = "info"
3131
conf.Levels["_jsonrpc"] = "info"
32+
conf.Levels["_zmq"] = "info"
3233
conf.Levels["_firewall"] = "warn"
3334

3435
return conf

util/logger/logger.go

+1
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ func getLoggersInst() *logger {
5454
conf.Levels["_pool"] = "debug"
5555
conf.Levels["_http"] = "debug"
5656
conf.Levels["_grpc"] = "debug"
57+
conf.Levels["_zmq"] = "debug"
5758
conf.Levels["_firewall"] = "debug"
5859
globalInst = &logger{
5960
config: conf,

util/testsuite/testsuite.go

+10
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package testsuite
33
import (
44
"encoding/hex"
55
"math/rand"
6+
"net"
67
"testing"
78
"time"
89

@@ -866,3 +867,12 @@ func (*TestSuite) HelperSignTransaction(prv crypto.PrivateKey, trx *tx.Tx) {
866867
trx.SetSignature(sig)
867868
trx.SetPublicKey(prv.PublicKey())
868869
}
870+
871+
func (*TestSuite) FindFreePort() int {
872+
listener, _ := net.Listen("tcp", "localhost:0")
873+
defer func() {
874+
_ = listener.Close()
875+
}()
876+
877+
return listener.Addr().(*net.TCPAddr).Port
878+
}

www/zmq/block_info_publisher.go

+26
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package zmq
2+
3+
import (
4+
"github.com/go-zeromq/zmq4"
5+
"github.com/pactus-project/pactus/types/block"
6+
"github.com/pactus-project/pactus/util/logger"
7+
)
8+
9+
type blockInfoPub struct {
10+
basePub
11+
}
12+
13+
func newBlockInfoPub(socket zmq4.Socket, logger *logger.SubLogger) Publisher {
14+
return &blockInfoPub{
15+
basePub: basePub{
16+
topic: BlockInfo,
17+
zmqSocket: socket,
18+
logger: logger,
19+
},
20+
}
21+
}
22+
23+
func (*blockInfoPub) onNewBlock(_ *block.Block) {
24+
// TODO implement me
25+
panic("implement me")
26+
}

www/zmq/config.go

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package zmq
2+
3+
import (
4+
"errors"
5+
"fmt"
6+
"net/url"
7+
"strings"
8+
"time"
9+
)
10+
11+
type Config struct {
12+
ZmqPubBlockInfo string `toml:"zmqpubblockinfo"`
13+
ZmqPubTxInfo string `toml:"zmqpubtxinfo"`
14+
ZmqPubRawBlock string `toml:"zmqpubrawblock"`
15+
ZmqPubRawTx string `toml:"zmqpubrawtx"`
16+
ZmqPubHWM int `toml:"zmqpubhwm"`
17+
18+
// Private config
19+
ZmqAutomaticReconnect bool `toml:"-"`
20+
ZmqDialerRetryTime time.Duration `toml:"-"`
21+
ZmqDialerMaxRetries int `toml:"-"`
22+
}
23+
24+
func DefaultConfig() *Config {
25+
return &Config{
26+
ZmqAutomaticReconnect: true,
27+
ZmqDialerMaxRetries: 10,
28+
ZmqDialerRetryTime: 250 * time.Millisecond,
29+
ZmqPubHWM: 1000,
30+
}
31+
}
32+
33+
func (c *Config) BasicCheck() error {
34+
if c.ZmqPubBlockInfo != "" {
35+
if err := validateTopicSocket(c.ZmqPubBlockInfo); err != nil {
36+
return err
37+
}
38+
}
39+
40+
if c.ZmqPubTxInfo != "" {
41+
if err := validateTopicSocket(c.ZmqPubTxInfo); err != nil {
42+
return err
43+
}
44+
}
45+
46+
if c.ZmqPubRawBlock != "" {
47+
if err := validateTopicSocket(c.ZmqPubRawBlock); err != nil {
48+
return err
49+
}
50+
}
51+
52+
if c.ZmqPubRawTx != "" {
53+
if err := validateTopicSocket(c.ZmqPubRawTx); err != nil {
54+
return err
55+
}
56+
}
57+
58+
if c.ZmqPubHWM < 0 {
59+
return fmt.Errorf("invalid publisher hwm %d", c.ZmqPubHWM)
60+
}
61+
62+
return nil
63+
}
64+
65+
func validateTopicSocket(socket string) error {
66+
addr, err := url.Parse(socket)
67+
if err != nil {
68+
return errors.New("failed to parse ZmqPub value: " + err.Error())
69+
}
70+
71+
if addr.Scheme != "tcp" {
72+
return errors.New("invalid scheme: zeromq socket schema")
73+
}
74+
75+
if addr.Host == "" {
76+
return errors.New("invalid host: host is empty")
77+
}
78+
79+
parts := strings.Split(addr.Host, ":")
80+
if len(parts) != 2 || parts[0] == "" || parts[1] == "" {
81+
return errors.New("invalid host: missing or malformed host/port")
82+
}
83+
84+
port := parts[1]
85+
for _, r := range port {
86+
if r < '0' || r > '9' {
87+
return errors.New("invalid port: non-numeric characters detected")
88+
}
89+
}
90+
91+
return nil
92+
}

www/zmq/config_test.go

+82
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
package zmq
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
)
8+
9+
func TestDefaultConfig(t *testing.T) {
10+
cfg := DefaultConfig()
11+
12+
assert.NotNil(t, cfg, "DefaultConfig should not return nil")
13+
assert.Equal(t, "", cfg.ZmqPubBlockInfo, "ZmqPubBlockInfo should be empty")
14+
assert.Equal(t, "", cfg.ZmqPubTxInfo, "ZmqPubTxInfo should be empty")
15+
assert.Equal(t, "", cfg.ZmqPubRawBlock, "ZmqPubRawBlock should be empty")
16+
assert.Equal(t, "", cfg.ZmqPubRawTx, "ZmqPubRawTx should be empty")
17+
assert.Equal(t, 1000, cfg.ZmqPubHWM, "ZmqPubHWM should default to 1000")
18+
}
19+
20+
func TestBasicCheck(t *testing.T) {
21+
testCases := []struct {
22+
name string
23+
config *Config
24+
expectErr bool
25+
}{
26+
{
27+
name: "Valid configuration",
28+
config: &Config{
29+
ZmqPubBlockInfo: "tcp://127.0.0.1:28332",
30+
ZmqPubTxInfo: "tcp://127.0.0.1:28333",
31+
ZmqPubRawBlock: "tcp://127.0.0.1:28334",
32+
ZmqPubRawTx: "tcp://127.0.0.1:28335",
33+
ZmqPubHWM: 1000,
34+
},
35+
expectErr: false,
36+
},
37+
{
38+
name: "Invalid scheme",
39+
config: &Config{
40+
ZmqPubBlockInfo: "udp://127.0.0.1:28332",
41+
},
42+
expectErr: true,
43+
},
44+
{
45+
name: "Missing port",
46+
config: &Config{
47+
ZmqPubBlockInfo: "tcp://127.0.0.1",
48+
},
49+
expectErr: true,
50+
},
51+
{
52+
name: "Empty host",
53+
config: &Config{
54+
ZmqPubBlockInfo: "tcp://:28332",
55+
},
56+
expectErr: true,
57+
},
58+
{
59+
name: "Negative ZmqPubHWM",
60+
config: &Config{
61+
ZmqPubHWM: -1,
62+
},
63+
expectErr: true,
64+
},
65+
{
66+
name: "Empty configuration",
67+
config: DefaultConfig(),
68+
expectErr: false,
69+
},
70+
}
71+
72+
for _, tc := range testCases {
73+
t.Run(tc.name, func(t *testing.T) {
74+
err := tc.config.BasicCheck()
75+
if tc.expectErr {
76+
assert.Error(t, err, "BasicCheck should return an error")
77+
} else {
78+
assert.NoError(t, err, "BasicCheck should not return an error")
79+
}
80+
})
81+
}
82+
}

www/zmq/publisher.go

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package zmq
2+
3+
import (
4+
"github.com/go-zeromq/zmq4"
5+
"github.com/pactus-project/pactus/types/block"
6+
"github.com/pactus-project/pactus/util/logger"
7+
)
8+
9+
type Publisher interface {
10+
Address() string
11+
TopicName() string
12+
13+
onNewBlock(blk *block.Block)
14+
}
15+
16+
type basePub struct {
17+
topic Topic
18+
zmqSocket zmq4.Socket
19+
logger *logger.SubLogger
20+
}
21+
22+
func (b *basePub) Address() string {
23+
return b.zmqSocket.Addr().String()
24+
}
25+
26+
func (b *basePub) TopicName() string {
27+
return b.topic.String()
28+
}

0 commit comments

Comments
 (0)