diff --git a/api/clients/node_client.go b/api/clients/node_client.go index 4a86d50ece..a357f4d052 100644 --- a/api/clients/node_client.go +++ b/api/clients/node_client.go @@ -40,8 +40,12 @@ func (c client) GetBlobHeader( batchHeaderHash [32]byte, blobIndex uint32, ) (*core.BlobHeader, *merkletree.Proof, error) { + operatorSocket, err := core.ParseOperatorSocket(socket) + if err != nil { + return nil, nil, err + } conn, err := grpc.NewClient( - core.OperatorSocket(socket).GetV1RetrievalSocket(), + operatorSocket.GetV1RetrievalSocket(), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { @@ -85,8 +89,18 @@ func (c client) GetChunks( quorumID core.QuorumID, chunksChan chan RetrievedChunks, ) { + operatorSocket, err := core.ParseOperatorSocket(opInfo.Socket) + if err != nil { + chunksChan <- RetrievedChunks{ + OperatorID: opID, + Err: err, + Chunks: nil, + } + return + } conn, err := grpc.NewClient( - core.OperatorSocket(opInfo.Socket).GetV1RetrievalSocket(), + //core.OperatorSocket(opInfo.Socket).GetV1RetrievalSocket(), + operatorSocket.GetV1RetrievalSocket(), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { diff --git a/api/clients/v2/retrieval_client.go b/api/clients/v2/retrieval_client.go index fc715ccfed..b79f3451c0 100644 --- a/api/clients/v2/retrieval_client.go +++ b/api/clients/v2/retrieval_client.go @@ -175,8 +175,17 @@ func (r *retrievalClient) getChunksFromOperator( fudgeFactor := units.MiB // to allow for some overhead from things like protobuf encoding maxMessageSize := maxBlobSize*encodingRate + fudgeFactor + operatorSocket, err := core.ParseOperatorSocket(opInfo.Socket) + if err != nil { + chunksChan <- clients.RetrievedChunks{ + OperatorID: opID, + Err: err, + Chunks: nil, + } + return + } conn, err := grpc.NewClient( - core.OperatorSocket(opInfo.Socket).GetV2RetrievalSocket(), + operatorSocket.GetV2RetrievalSocket(), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMessageSize)), ) diff --git a/core/mock/state.go b/core/mock/state.go index f414af7d1e..1ef7c8b4a0 100644 --- a/core/mock/state.go +++ b/core/mock/state.go @@ -142,10 +142,10 @@ func (d *ChainDataMock) GetTotalOperatorStateWithQuorums(ctx context.Context, bl retrievalPort := fmt.Sprintf("3%03v", 2*i+1) v2DispersalPort := fmt.Sprintf("3%03v", 2*i+2) v2RetrievalPort := fmt.Sprintf("3%03v", 2*i+3) - socket := core.MakeOperatorSocket(host, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort) + socket := core.NewOperatorSocket(host, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort) indexed := &core.IndexedOperatorInfo{ - Socket: string(socket), + Socket: socket.String(), PubkeyG1: d.KeyPairs[id].GetPubKeyG1(), PubkeyG2: d.KeyPairs[id].GetPubKeyG2(), } diff --git a/core/serialization.go b/core/serialization.go index 5c06d682f2..5dde7be9e5 100644 --- a/core/serialization.go +++ b/core/serialization.go @@ -528,33 +528,33 @@ func decode(data []byte, obj any) error { } func (s OperatorSocket) GetV1DispersalSocket() string { - ip, v1DispersalPort, _, _, _, err := ParseOperatorSocket(string(s)) - if err != nil { + if s.host == "" || s.v1DispersalPort == "" { return "" } - return fmt.Sprintf("%s:%s", ip, v1DispersalPort) + return fmt.Sprintf("%s:%s", s.host, s.v1DispersalPort) } func (s OperatorSocket) GetV2DispersalSocket() string { - ip, _, _, v2DispersalPort, _, err := ParseOperatorSocket(string(s)) - if err != nil || v2DispersalPort == "" { + if s.host == "" || s.v2DispersalPort == "" { return "" } - return fmt.Sprintf("%s:%s", ip, v2DispersalPort) + return fmt.Sprintf("%s:%s", s.host, s.v2DispersalPort) } func (s OperatorSocket) GetV1RetrievalSocket() string { - ip, _, v1retrievalPort, _, _, err := ParseOperatorSocket(string(s)) - if err != nil { + if s.host == "" || s.v1RetrievalPort == "" { return "" } - return fmt.Sprintf("%s:%s", ip, v1retrievalPort) + return fmt.Sprintf("%s:%s", s.host, s.v1RetrievalPort) } func (s OperatorSocket) GetV2RetrievalSocket() string { - ip, _, _, _, v2RetrievalPort, err := ParseOperatorSocket(string(s)) - if err != nil || v2RetrievalPort == "" { + if s.host == "" || s.v2RetrievalPort == "" { return "" } - return fmt.Sprintf("%s:%s", ip, v2RetrievalPort) + return fmt.Sprintf("%s:%s", s.host, s.v2RetrievalPort) +} + +func (s OperatorSocket) GetHost() string { + return s.host } diff --git a/core/serialization_test.go b/core/serialization_test.go index 45592b37aa..71a0f3d0e0 100644 --- a/core/serialization_test.go +++ b/core/serialization_test.go @@ -195,123 +195,147 @@ func TestHashPubKeyG1(t *testing.T) { } func TestParseOperatorSocket(t *testing.T) { - operatorSocket := "localhost:1234;5678;9999;10001" - host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err := core.ParseOperatorSocket(operatorSocket) + opSocketStr := "localhost:1234;5678;9999;10001" + operatorSocket, err := core.ParseOperatorSocket(opSocketStr) assert.NoError(t, err) - assert.Equal(t, "localhost", host) - assert.Equal(t, "1234", v1DispersalPort) - assert.Equal(t, "5678", v1RetrievalPort) - assert.Equal(t, "9999", v2DispersalPort) - assert.Equal(t, "10001", v2RetrievalPort) - host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, _, err = core.ParseOperatorSocket("localhost:1234;5678") + assert.Equal(t, "localhost", operatorSocket.GetHost()) + assert.Equal(t, "localhost:1234", operatorSocket.GetV1DispersalSocket()) + assert.Equal(t, "localhost:5678", operatorSocket.GetV1RetrievalSocket()) + assert.Equal(t, "localhost:9999", operatorSocket.GetV2DispersalSocket()) + assert.Equal(t, "localhost:10001", operatorSocket.GetV2RetrievalSocket()) + + opSocketStr = "localhost:1234;5678" + operatorSocket, err = core.ParseOperatorSocket(opSocketStr) assert.NoError(t, err) - assert.Equal(t, "localhost", host) - assert.Equal(t, "1234", v1DispersalPort) - assert.Equal(t, "5678", v1RetrievalPort) - assert.Equal(t, "", v2DispersalPort) + assert.Equal(t, "localhost", operatorSocket.GetHost()) + assert.Equal(t, "localhost:1234", operatorSocket.GetV1DispersalSocket()) + assert.Equal(t, "localhost:5678", operatorSocket.GetV1RetrievalSocket()) + assert.Equal(t, "", operatorSocket.GetV2DispersalSocket()) - _, _, _, _, _, err = core.ParseOperatorSocket("localhost;1234;5678") + opSocketStr = "localhost;1234;5678" + _, err = core.ParseOperatorSocket(opSocketStr) assert.NotNil(t, err) assert.ErrorContains(t, err, "invalid host address format") - _, _, _, _, _, err = core.ParseOperatorSocket("localhost:12345678") + opSocketStr = "localhost:12345678" + _, err = core.ParseOperatorSocket(opSocketStr) assert.NotNil(t, err) assert.ErrorContains(t, err, "invalid v1 dispersal port format") - _, _, _, _, _, err = core.ParseOperatorSocket("localhost1234;5678") + opSocketStr = "localhost1234;5678" + _, err = core.ParseOperatorSocket(opSocketStr) assert.NotNil(t, err) assert.ErrorContains(t, err, "invalid host address format") } func TestGetV1DispersalSocket(t *testing.T) { - operatorSocket := core.OperatorSocket("localhost:1234;5678;9999;1025") + operatorSocket, err := core.ParseOperatorSocket("localhost:1234;5678;9999;1025") socket := operatorSocket.GetV1DispersalSocket() + assert.NoError(t, err) assert.Equal(t, "localhost:1234", socket) - operatorSocket = core.OperatorSocket("localhost:1234;5678") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234;5678") socket = operatorSocket.GetV1DispersalSocket() + assert.NoError(t, err) assert.Equal(t, "localhost:1234", socket) - operatorSocket = core.OperatorSocket("localhost:1234;5678;") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234;5678;") socket = operatorSocket.GetV1DispersalSocket() + assert.NotNil(t, err) assert.Equal(t, "", socket) - operatorSocket = core.OperatorSocket("localhost:1234") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234") + assert.NotNil(t, err) socket = operatorSocket.GetV1DispersalSocket() assert.Equal(t, "", socket) } func TestGetV1RetrievalSocket(t *testing.T) { // Valid v1/v2 socket - operatorSocket := core.OperatorSocket("localhost:1234;5678;9999;10001") + operatorSocket, err := core.ParseOperatorSocket("localhost:1234;5678;9999;10001") + assert.NoError(t, err) socket := operatorSocket.GetV1RetrievalSocket() assert.Equal(t, "localhost:5678", socket) // Valid v1 socket - operatorSocket = core.OperatorSocket("localhost:1234;5678") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234;5678") socket = operatorSocket.GetV1RetrievalSocket() + assert.NoError(t, err) assert.Equal(t, "localhost:5678", socket) // Invalid socket testcases - operatorSocket = core.OperatorSocket("localhost:1234;5678;9999;10001;") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234;5678;9999;10001;") + assert.NotNil(t, err) socket = operatorSocket.GetV1RetrievalSocket() assert.Equal(t, "", socket) - operatorSocket = core.OperatorSocket("localhost:1234;5678;") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234;5678;") + assert.NotNil(t, err) socket = operatorSocket.GetV1RetrievalSocket() assert.Equal(t, "", socket) - operatorSocket = core.OperatorSocket("localhost:;1234;5678;") + operatorSocket, err = core.ParseOperatorSocket("localhost:;1234;5678;") + assert.NotNil(t, err) socket = operatorSocket.GetV1RetrievalSocket() assert.Equal(t, "", socket) - operatorSocket = core.OperatorSocket("localhost:1234;:;5678;") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234;:;5678;") + assert.NotNil(t, err) socket = operatorSocket.GetV1RetrievalSocket() assert.Equal(t, "", socket) - operatorSocket = core.OperatorSocket("localhost:;;;") + operatorSocket, err = core.ParseOperatorSocket("localhost:;;;") + assert.NotNil(t, err) socket = operatorSocket.GetV1RetrievalSocket() assert.Equal(t, "", socket) - operatorSocket = core.OperatorSocket("localhost:1234") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234") + assert.NotNil(t, err) socket = operatorSocket.GetV1RetrievalSocket() assert.Equal(t, "", socket) } func TestGetV2RetrievalSocket(t *testing.T) { // Valid v1/v2 socket - operatorSocket := core.OperatorSocket("localhost:1234;5678;9999;10001") + operatorSocket, err := core.ParseOperatorSocket("localhost:1234;5678;9999;10001") + assert.NoError(t, err) socket := operatorSocket.GetV2RetrievalSocket() assert.Equal(t, "localhost:10001", socket) - // Invalid v2 socket - operatorSocket = core.OperatorSocket("localhost:1234;5678") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234;5678") + assert.NoError(t, err) socket = operatorSocket.GetV2RetrievalSocket() assert.Equal(t, "", socket) // Invalid socket testcases - operatorSocket = core.OperatorSocket("localhost:1234;5678;9999;10001;") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234;5678;9999;10001;") + assert.NotNil(t, err) socket = operatorSocket.GetV2RetrievalSocket() assert.Equal(t, "", socket) - operatorSocket = core.OperatorSocket("localhost:1234;5678;") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234;5678;") + assert.NotNil(t, err) socket = operatorSocket.GetV2RetrievalSocket() assert.Equal(t, "", socket) - operatorSocket = core.OperatorSocket("localhost:;1234;5678;") + operatorSocket, err = core.ParseOperatorSocket("localhost:;1234;5678;") + assert.NotNil(t, err) socket = operatorSocket.GetV2RetrievalSocket() assert.Equal(t, "", socket) - operatorSocket = core.OperatorSocket("localhost:1234;:;5678;") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234;:;5678;") + assert.NotNil(t, err) socket = operatorSocket.GetV2RetrievalSocket() assert.Equal(t, "", socket) - operatorSocket = core.OperatorSocket("localhost:;;;") + operatorSocket, err = core.ParseOperatorSocket("localhost:;;;") + assert.NotNil(t, err) socket = operatorSocket.GetV2RetrievalSocket() assert.Equal(t, "", socket) - operatorSocket = core.OperatorSocket("localhost:1234") + operatorSocket, err = core.ParseOperatorSocket("localhost:1234") + assert.NotNil(t, err) socket = operatorSocket.GetV2RetrievalSocket() assert.Equal(t, "", socket) } diff --git a/core/state.go b/core/state.go index 3e9ac699c6..80f05b0241 100644 --- a/core/state.go +++ b/core/state.go @@ -13,78 +13,78 @@ import ( // Operators -// OperatorSocket is formatted as "host:dispersalPort;retrievalPort;v2DispersalPort" -type OperatorSocket string +// OperatorSocket is formatted as "host:dispersalPort;retrievalPort;v2DispersalPort;v2RetrievalPort" +type OperatorSocket struct { + host string + v1DispersalPort string + v1RetrievalPort string + v2DispersalPort string + v2RetrievalPort string +} func (s OperatorSocket) String() string { - return string(s) + if s.v2DispersalPort == "" && s.v2RetrievalPort == "" { + return fmt.Sprintf("%s:%s;%s", s.host, s.v1DispersalPort, s.v2RetrievalPort) + } + return fmt.Sprintf("%s:%s;%s;%s;%s", s.host, s.v1DispersalPort, s.v1RetrievalPort, s.v2DispersalPort, s.v2RetrievalPort) } -func MakeOperatorSocket(nodeIP, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort string) OperatorSocket { - //TODO: Add config checks for invalid v1/v2 configs -- for v1, both v2 ports must be empty and for v2, both ports must be valid, reject any other combinations. - if v2DispersalPort == "" && v2RetrievalPort == "" { - return OperatorSocket(fmt.Sprintf("%s:%s;%s", nodeIP, dispersalPort, retrievalPort)) +func NewOperatorSocket(nodeIP, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort string) OperatorSocket { + return OperatorSocket{ + host: nodeIP, + v1DispersalPort: dispersalPort, + v1RetrievalPort: retrievalPort, + v2DispersalPort: v2DispersalPort, + v2RetrievalPort: v2RetrievalPort, } - return OperatorSocket(fmt.Sprintf("%s:%s;%s;%s;%s", nodeIP, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort)) } type StakeAmount = *big.Int -func ParseOperatorSocket(socket string) (host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort string, err error) { +func ParseOperatorSocket(opSocketStr string) (opSocket OperatorSocket, err error) { - s := strings.Split(socket, ";") + operatorSocket := OperatorSocket{} + s := strings.Split(opSocketStr, ";") - host, v1DispersalPort, err = net.SplitHostPort(s[0]) + host, v1DispersalPort, err := net.SplitHostPort(s[0]) if err != nil { - err = fmt.Errorf("invalid host address format in %s: it must specify valid IP or host name (ex. 0.0.0.0:32004;32005;32006;32007)", socket) - + err = fmt.Errorf("invalid host address format in %s: it must specify valid IP or host name (ex. 0.0.0.0:32004;32005;32006;32007)", opSocketStr) return } if _, err = net.LookupHost(host); err != nil { - //Invalid host - host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = - "", "", "", "", "", - fmt.Errorf("invalid host address format in %s: it must specify valid IP or host name (ex. 0.0.0.0:32004;32005;32006;32007)", socket) + err = fmt.Errorf("invalid host address format in %s: it must specify valid IP or host name (ex. 0.0.0.0:32004;32005;32006;32007)", opSocketStr) return } if err = ValidatePort(v1DispersalPort); err != nil { - host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = - "", "", "", "", "", - fmt.Errorf("invalid v1 dispersal port format in %s: it must specify valid v1 dispersal port (ex. 0.0.0.0:32004;32005;32006;32007)", socket) + err = fmt.Errorf("invalid v1 dispersal port format in %s: it must specify valid v1 dispersal port (ex. 0.0.0.0:32004;32005;32006;32007)", opSocketStr) return } switch len(s) { case 4: - v2DispersalPort = s[2] - if err = ValidatePort(v2DispersalPort); err != nil { - host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = - "", "", "", "", "", - fmt.Errorf("invalid v2 dispersal port format in %s: it must specify valid v2 dispersal port (ex. 0.0.0.0:32004;32005;32006;32007)", socket) + if err = ValidatePort(s[2]); err != nil { + err = fmt.Errorf("invalid v2 dispersal port format in %s: it must specify valid v2 dispersal port (ex. 0.0.0.0:32004;32005;32006;32007)", opSocketStr) return } - v2RetrievalPort = s[3] - if err = ValidatePort(v2RetrievalPort); err != nil { - host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = - "", "", "", "", "", - fmt.Errorf("invalid v2 retrieval port format in %s: it must specify valid v2 retrieval port (ex. 0.0.0.0:32004;32005;32006;32007)", socket) + if err = ValidatePort(s[3]); err != nil { + err = fmt.Errorf("invalid v2 retrieval port format in %s: it must specify valid v2 retrieval port (ex. 0.0.0.0:32004;32005;32006;32007)", opSocketStr) return } + operatorSocket.v2DispersalPort = s[2] + operatorSocket.v2RetrievalPort = s[3] fallthrough case 2: - // V1 Parsing - v1RetrievalPort = s[1] - if err = ValidatePort(v1RetrievalPort); err != nil { - host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = - "", "", "", "", "", - fmt.Errorf("invalid v1 retrieval port format in %s: it must specify valid v1 retrieval port (ex. 0.0.0.0:32004;32005;32006;32007)", socket) + operatorSocket.host = host + operatorSocket.v1DispersalPort = v1DispersalPort + if err = ValidatePort(s[1]); err != nil { + err = fmt.Errorf("invalid v1 retrieval port format in %s: it must specify valid v1 retrieval port (ex. 0.0.0.0:32004;32005;32006;32007)", opSocketStr) + return } - return + operatorSocket.v1RetrievalPort = s[1] + return operatorSocket, nil default: - host, v1DispersalPort, v1RetrievalPort, v2DispersalPort, v2RetrievalPort, err = - "", "", "", "", "", - fmt.Errorf("invalid socket address format %s: it must specify v1 dispersal/retrieval ports, or v2 dispersal/retrieval ports (ex. 0.0.0.0:32004;32005;32006;32007)", socket) + err = fmt.Errorf("invalid opSocketStr address format %s: it must specify v1 dispersal/retrieval ports, or v2 dispersal/retrieval ports (ex. 0.0.0.0:32004;32005;32006;32007)", opSocketStr) return } } diff --git a/disperser/batcher/grpc/dispatcher.go b/disperser/batcher/grpc/dispatcher.go index 1a472fb029..cda8027718 100644 --- a/disperser/batcher/grpc/dispatcher.go +++ b/disperser/batcher/grpc/dispatcher.go @@ -120,13 +120,18 @@ func (c *dispatcher) sendAllChunks(ctx context.Context, state *core.IndexedOpera func (c *dispatcher) sendChunks(ctx context.Context, blobs []*core.EncodedBlobMessage, batchHeader *core.BatchHeader, op *core.IndexedOperatorInfo) (*core.Signature, error) { // TODO Add secure Grpc + operatorSocket, err := core.ParseOperatorSocket(op.Socket) + if err != nil { + c.logger.Warn("Disperser failed to parse socket", "socket", op.Socket, "err", err) + return nil, err + } conn, err := grpc.NewClient( - core.OperatorSocket(op.Socket).GetV1DispersalSocket(), + operatorSocket.GetV1DispersalSocket(), grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { - c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", core.OperatorSocket(op.Socket).GetV1DispersalSocket(), "err", err) + c.logger.Warn("Disperser cannot connect to operator dispersal socket", "dispersal_socket", operatorSocket.GetV1DispersalSocket(), "err", err) return nil, err } defer conn.Close() diff --git a/disperser/cmd/dataapi/docs/docs.go b/disperser/cmd/dataapi/docs/docs.go index ab01604ce9..91ec59a8ca 100644 --- a/disperser/cmd/dataapi/docs/docs.go +++ b/disperser/cmd/dataapi/docs/docs.go @@ -12,7 +12,7 @@ const docTemplate = `{ "contact": {}, "version": "{{.Version}}" }, - "host": "{{.Host}}", + "host": "{{.host}}", "basePath": "{{.BasePath}}", "paths": {} }` diff --git a/disperser/common/semver/semver.go b/disperser/common/semver/semver.go index b3720361b1..fe954d9727 100644 --- a/disperser/common/semver/semver.go +++ b/disperser/common/semver/semver.go @@ -28,7 +28,10 @@ func ScanOperators(operators map[core.OperatorID]*core.IndexedOperatorInfo, oper operatorChan := make(chan core.OperatorID, len(operators)) worker := func() { for operatorId := range operatorChan { - operatorSocket := core.OperatorSocket(operators[operatorId].Socket) + operatorSocket, err := core.ParseOperatorSocket(operators[operatorId].Socket) + if err != nil { + logger.Warnf("Failed to parse operator socket: %v", err) + } var socket string if useRetrievalSocket { socket = operatorSocket.GetV1RetrievalSocket() diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index 5d0e88fb38..8ce84cc236 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -161,7 +161,8 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, for opID, op := range state.IndexedOperators { opID := opID op := op - host, _, _, v2DispersalPort, _, err := core.ParseOperatorSocket(op.Socket) + + operatorSocket, err := core.ParseOperatorSocket(op.Socket) if err != nil { d.logger.Warn("failed to parse operator socket, check if the socket format is correct", "operator", opID.Hex(), "socket", op.Socket, "err", err) sigChan <- core.SigningMessage{ @@ -174,6 +175,9 @@ func (d *Dispatcher) HandleBatch(ctx context.Context) (chan core.SigningMessage, continue } + host := operatorSocket.GetHost() + v2DispersalPort := operatorSocket.GetV2DispersalSocket() + client, err := d.nodeClientManager.GetClient(host, v2DispersalPort) if err != nil { d.logger.Warn("failed to get node client; node may not be reachable", "operator", opID.Hex(), "host", host, "v2DispersalPort", v2DispersalPort, "err", err) diff --git a/disperser/dataapi/docs/v1/V1_docs.go b/disperser/dataapi/docs/v1/V1_docs.go index f12eef0b3e..183e73e36c 100644 --- a/disperser/dataapi/docs/v1/V1_docs.go +++ b/disperser/dataapi/docs/v1/V1_docs.go @@ -12,7 +12,7 @@ const docTemplateV1 = `{ "contact": {}, "version": "{{.Version}}" }, - "host": "{{.Host}}", + "host": "{{.host}}", "basePath": "{{.BasePath}}", "paths": { "/feed/batches/{batch_header_hash}/blobs": { diff --git a/disperser/dataapi/docs/v2/V2_docs.go b/disperser/dataapi/docs/v2/V2_docs.go index 533b45a5f6..ee9094115e 100644 --- a/disperser/dataapi/docs/v2/V2_docs.go +++ b/disperser/dataapi/docs/v2/V2_docs.go @@ -12,7 +12,7 @@ const docTemplateV2 = `{ "contact": {}, "version": "{{.Version}}" }, - "host": "{{.Host}}", + "host": "{{.host}}", "basePath": "{{.BasePath}}", "paths": { "/batches/feed": { diff --git a/disperser/dataapi/operator_handler.go b/disperser/dataapi/operator_handler.go index 70fb4d116d..4b0db4a9b8 100644 --- a/disperser/dataapi/operator_handler.go +++ b/disperser/dataapi/operator_handler.go @@ -95,7 +95,11 @@ func (oh *OperatorHandler) ProbeV2OperatorPorts(ctx context.Context, operatorId return &OperatorPortCheckResponse{}, err } - operatorSocket := core.OperatorSocket(operatorInfo.Socket) + operatorSocket, err := core.ParseOperatorSocket(operatorInfo.Socket) + if err != nil { + oh.logger.Warn("failed to parse operator socket", "operatorId", operatorId, "error", err) + return &OperatorPortCheckResponse{}, err + } retrievalOnline, retrievalStatus := false, "v2 retrieval port closed or unreachable" retrievalSocket := operatorSocket.GetV2RetrievalSocket() @@ -144,7 +148,12 @@ func (oh *OperatorHandler) ProbeV1OperatorPorts(ctx context.Context, operatorId return &OperatorPortCheckResponse{}, err } - operatorSocket := core.OperatorSocket(operatorInfo.Socket) + operatorSocket, err := core.ParseOperatorSocket(operatorInfo.Socket) + if err != nil { + oh.logger.Warn("failed to parse operator socket", "operatorId", operatorId, "error", err) + return &OperatorPortCheckResponse{}, err + } + retrievalSocket := operatorSocket.GetV1RetrievalSocket() retrievalPortOpen := checkIsOperatorPortOpen(retrievalSocket, 3, oh.logger) retrievalOnline, retrievalStatus := false, "v1 retrieval port closed or unreachable" diff --git a/disperser/dataapi/queried_operators_handlers.go b/disperser/dataapi/queried_operators_handlers.go index b282f0ff10..a7a28450cf 100644 --- a/disperser/dataapi/queried_operators_handlers.go +++ b/disperser/dataapi/queried_operators_handlers.go @@ -198,7 +198,12 @@ func checkIsOnlineAndProcessOperator(operatorStatus OperatorOnlineStatus, operat var isOnline bool var socket string if operatorStatus.IndexedOperatorInfo != nil { - socket = core.OperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetV1RetrievalSocket() + operatorSocket, err := core.ParseOperatorSocket(operatorStatus.IndexedOperatorInfo.Socket) + if err != nil { + logger.Warn("Failed to parse operator socket", "err", err) + } + socket = operatorSocket.GetV1RetrievalSocket() + //socket = core.ParseOperatorSocket(operatorStatus.IndexedOperatorInfo.Socket).GetV1RetrievalSocket() isOnline = checkIsOperatorPortOpen(socket, 10, logger) } diff --git a/inabox/deploy/deploy.go b/inabox/deploy/deploy.go index 3e0c2a0e48..6439b23e20 100644 --- a/inabox/deploy/deploy.go +++ b/inabox/deploy/deploy.go @@ -365,7 +365,7 @@ func (env *Config) StopAnvil() { func (env *Config) RunNodePluginBinary(operation string, operator OperatorVars) { changeDirectory(filepath.Join(env.rootPath, "inabox")) - socket := string(core.MakeOperatorSocket(operator.NODE_HOSTNAME, operator.NODE_DISPERSAL_PORT, operator.NODE_RETRIEVAL_PORT, operator.NODE_V2_DISPERSAL_PORT, operator.NODE_V2_RETRIEVAL_PORT)) + socket := core.NewOperatorSocket(operator.NODE_HOSTNAME, operator.NODE_DISPERSAL_PORT, operator.NODE_RETRIEVAL_PORT, operator.NODE_V2_DISPERSAL_PORT, operator.NODE_V2_RETRIEVAL_PORT) envVars := []string{ "NODE_OPERATION=" + operation, @@ -373,7 +373,7 @@ func (env *Config) RunNodePluginBinary(operation string, operator OperatorVars) "NODE_BLS_KEY_FILE=" + operator.NODE_BLS_KEY_FILE, "NODE_ECDSA_KEY_PASSWORD=" + operator.NODE_ECDSA_KEY_PASSWORD, "NODE_BLS_KEY_PASSWORD=" + operator.NODE_BLS_KEY_PASSWORD, - "NODE_SOCKET=" + socket, + "NODE_SOCKET=" + socket.String(), "NODE_QUORUM_ID_LIST=" + operator.NODE_QUORUM_ID_LIST, "NODE_CHAIN_RPC=" + operator.NODE_CHAIN_RPC, "NODE_BLS_OPERATOR_STATE_RETRIVER=" + operator.NODE_BLS_OPERATOR_STATE_RETRIVER, diff --git a/node/node.go b/node/node.go index e35c00b11d..1b8feeeda1 100644 --- a/node/node.go +++ b/node/node.go @@ -294,7 +294,7 @@ func (n *Node) Start(ctx context.Context) error { } // Build the socket based on the hostname/IP provided in the CLI - socket := string(core.MakeOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort, n.Config.V2RetrievalPort)) + socket := core.NewOperatorSocket(n.Config.Hostname, n.Config.DispersalPort, n.Config.RetrievalPort, n.Config.V2DispersalPort, n.Config.V2RetrievalPort) var operator *Operator if n.Config.RegisterNodeAtStart { n.Logger.Info("Registering node on chain with the following parameters:", "operatorId", @@ -306,7 +306,7 @@ func (n *Node) Start(ctx context.Context) error { } operator = &Operator{ Address: crypto.PubkeyToAddress(privateKey.PublicKey).Hex(), - Socket: socket, + Socket: socket.String(), Timeout: 10 * time.Second, PrivKey: privateKey, Signer: n.BLSSigner, @@ -325,7 +325,7 @@ func (n *Node) Start(ctx context.Context) error { if err != nil { n.Logger.Warnf("failed to get operator socket: %w", err) } - if registeredSocket != socket { + if registeredSocket != socket.String() { n.Logger.Warnf("registered socket %s does not match expected socket %s", registeredSocket, socket) } @@ -347,7 +347,7 @@ func (n *Node) Start(ctx context.Context) error { } } - n.CurrentSocket = socket + n.CurrentSocket = socket.String() // Start the Node IP updater only if the PUBLIC_IP_PROVIDER is greater than 0. if n.Config.PubIPCheckInterval > 0 && n.Config.EnableV1 && n.Config.EnableV2 { go n.checkRegisteredNodeIpOnChain(ctx) diff --git a/node/plugin/cmd/main.go b/node/plugin/cmd/main.go index f1619d7200..552d59d778 100644 --- a/node/plugin/cmd/main.go +++ b/node/plugin/cmd/main.go @@ -135,12 +135,17 @@ func pluginOps(ctx *cli.Context) { return } - _, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort, err := core.ParseOperatorSocket(config.Socket) + opSocket, err := core.ParseOperatorSocket(config.Socket) if err != nil { log.Printf("Error: failed to parse operator socket: %v", err) return } + dispersalPort := opSocket.GetV1DispersalSocket() + retrievalPort := opSocket.GetV1RetrievalSocket() + v2DispersalPort := opSocket.GetV2DispersalSocket() + v2RetrievalPort := opSocket.GetV2RetrievalSocket() + socket := config.Socket if isLocalhost(socket) { pubIPProvider := pubip.ProviderOrDefault(logger, config.PubIPProvider) diff --git a/node/utils.go b/node/utils.go index 394ba4115b..beb4b4eeec 100644 --- a/node/utils.go +++ b/node/utils.go @@ -126,7 +126,7 @@ func SocketAddress(ctx context.Context, provider pubip.Provider, dispersalPort, if err != nil { return "", fmt.Errorf("failed to get public ip address from IP provider: %w", err) } - socket := core.MakeOperatorSocket(ip, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort) + socket := core.NewOperatorSocket(ip, dispersalPort, retrievalPort, v2DispersalPort, v2RetrievalPort) return socket.String(), nil } diff --git a/test/integration_test.go b/test/integration_test.go index 049f86d53a..09a298e837 100644 --- a/test/integration_test.go +++ b/test/integration_test.go @@ -387,7 +387,7 @@ func mustMakeOperators(t *testing.T, cst *coremock.ChainDataMock, logger logging tx.On("GetBlockStaleMeasure").Return(nil) tx.On("GetStoreDurationBlocks").Return(nil) tx.On("OperatorIDToAddress").Return(gethcommon.Address{1}, nil) - socket := core.MakeOperatorSocket(config.Hostname, config.DispersalPort, config.RetrievalPort, config.V2DispersalPort, config.V2RetrievalPort) + socket := core.NewOperatorSocket(config.Hostname, config.DispersalPort, config.RetrievalPort, config.V2DispersalPort, config.V2RetrievalPort) tx.On("GetOperatorSocket", mock.Anything, mock.Anything).Return(socket.String(), nil) noopMetrics := metrics.NewNoopMetrics()