diff --git a/server/mock/mock_relay.go b/server/mock/mock_relay.go index a93462fc..1e812807 100644 --- a/server/mock/mock_relay.go +++ b/server/mock/mock_relay.go @@ -158,12 +158,21 @@ func (m *Relay) handleRegisterValidator(w http.ResponseWriter, req *http.Request // defaultHandleRegisterValidator returns the default handler for handleRegisterValidator func (m *Relay) defaultHandleRegisterValidator(w http.ResponseWriter, req *http.Request) { - payload := []builderApiV1.SignedValidatorRegistration{} - decoder := json.NewDecoder(req.Body) - decoder.DisallowUnknownFields() - if err := decoder.Decode(&payload); err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return + reqContentType := req.Header.Get("Content-Type") + if reqContentType == "" || reqContentType == "application/json" { + var payload []builderApiV1.SignedValidatorRegistration + decoder := json.NewDecoder(req.Body) + decoder.DisallowUnknownFields() + if err := decoder.Decode(&payload); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + } else if reqContentType == "application/octet-stream" { + // TODO(jtraglia): Handle this when a SignedValidatorRegistrationList type exists. + // See: https://github.com/attestantio/go-builder-client/pull/38 + _ = reqContentType + } else { + panic("invalid content type: " + reqContentType) } w.Header().Set("Content-Type", "application/json") diff --git a/server/register_validator.go b/server/register_validator.go new file mode 100644 index 00000000..6a570bd8 --- /dev/null +++ b/server/register_validator.go @@ -0,0 +1,100 @@ +package server + +import ( + "bytes" + "context" + "fmt" + "net/http" + "net/url" + + "github.com/flashbots/mev-boost/server/params" + "github.com/flashbots/mev-boost/server/types" + "github.com/sirupsen/logrus" +) + +func (m *BoostService) registerValidator(log *logrus.Entry, regBytes []byte, header http.Header) error { + respErrCh := make(chan error, len(m.relays)) + + // Forward request to each relay + for _, relay := range m.relays { + go func(relay types.RelayEntry) { + // Get the URL for this relay + requestURL := relay.GetURI(params.PathRegisterValidator) + log := log.WithField("url", requestURL) + + // Build the new request + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, requestURL, bytes.NewReader(regBytes)) + if err != nil { + log.WithError(err).Warn("error creating new request") + respErrCh <- err + return + } + + // Extend the request header with our values + for key, values := range header { + req.Header[key] = values + } + + // Send the request + resp, err := m.httpClientRegVal.Do(req) + if err != nil { + log.WithError(err).Warn("error calling registerValidator on relay") + respErrCh <- err + return + } + resp.Body.Close() + + // Check if response is successful + if resp.StatusCode == http.StatusOK { + respErrCh <- nil + } else { + respErrCh <- fmt.Errorf("%w: %d", errHTTPErrorResponse, resp.StatusCode) + } + }(relay) + } + + // Return OK if any relay responds OK + for range m.relays { + respErr := <-respErrCh + if respErr == nil { + // Goroutines are independent, so if there are a lot of configured + // relays and the first one responds OK, this will continue to send + // validator registrations to the other relays. + return nil + } + } + + // None of the relays responded OK + return errNoSuccessfulRelayResponse +} + +func (m *BoostService) sendValidatorRegistrationsToRelayMonitors(log *logrus.Entry, regBytes []byte, header http.Header) { + // Forward request to each relay monitor + for _, relayMonitor := range m.relayMonitors { + go func(relayMonitor *url.URL) { + // Get the URL for this relay monitor + requestURL := types.GetURI(relayMonitor, params.PathRegisterValidator) + log := log.WithField("url", requestURL) + + // Build the new request + req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, requestURL, bytes.NewReader(regBytes)) + if err != nil { + log.WithError(err).Warn("error creating new request") + return + } + + // Extend the request header with our values + for key, values := range header { + req.Header[key] = values + } + + // Send the request + resp, err := m.httpClientRegVal.Do(req) + if err != nil { + log.WithError(err).Warn("error calling registerValidator on relay monitor") + return + } + resp.Body.Close() + }(relayMonitor) + } +} diff --git a/server/register_validator_test.go b/server/register_validator_test.go new file mode 100644 index 00000000..5ba6accf --- /dev/null +++ b/server/register_validator_test.go @@ -0,0 +1,298 @@ +// register_validator_test.go +package server + +import ( + "bytes" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + builderApiV1 "github.com/attestantio/go-builder-client/api/v1" + "github.com/flashbots/mev-boost/server/mock" + "github.com/flashbots/mev-boost/server/params" + "github.com/flashbots/mev-boost/server/types" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" +) + +// TestHandleRegisterValidator_EmptyList verifies that a valid registration returns status ok +func TestHandleRegisterValidator_EmptyList(t *testing.T) { + relay := mock.NewRelay(t) + defer relay.Server.Close() + + m := &BoostService{ + relays: []types.RelayEntry{relay.RelayEntry}, + httpClientRegVal: *http.DefaultClient, + log: logrus.NewEntry(logrus.New()), + } + + reqBody := bytes.NewBufferString("[]") + req := httptest.NewRequest(http.MethodPost, "https://example.com"+params.PathRegisterValidator, reqBody) + req.Header.Set("Content-Type", "application/json") + + rr := httptest.NewRecorder() + m.handleRegisterValidator(rr, req) + + require.Equal(t, http.StatusOK, rr.Code, "expected status ok") + + count := relay.GetRequestCount(params.PathRegisterValidator) + require.Equal(t, 1, count) +} + +// TestHandleRegisterValidator_NotEmptyList verifies that a non-empty list returns status ok +func TestHandleRegisterValidator_NotEmptyList(t *testing.T) { + relay := mock.NewRelay(t) + defer relay.Server.Close() + + m := &BoostService{ + relays: []types.RelayEntry{relay.RelayEntry}, + httpClientRegVal: *http.DefaultClient, + log: logrus.NewEntry(logrus.New()), + } + + validatorRegistrations := []builderApiV1.SignedValidatorRegistration{ + { + Message: &builderApiV1.ValidatorRegistration{ + Timestamp: time.Unix(1, 0), + }, + }, + { + Message: &builderApiV1.ValidatorRegistration{ + Timestamp: time.Unix(2, 0), + }, + }, + } + + encodedValidatorRegistrations, err := json.Marshal(validatorRegistrations) + require.NoError(t, err) + + reqBody := bytes.NewBuffer(encodedValidatorRegistrations) + req := httptest.NewRequest(http.MethodPost, "https://example.com"+params.PathRegisterValidator, reqBody) + req.Header.Set("Content-Type", "application/json") + + rr := httptest.NewRecorder() + m.handleRegisterValidator(rr, req) + + require.Equal(t, http.StatusOK, rr.Code, "expected status ok") + + count := relay.GetRequestCount(params.PathRegisterValidator) + require.Equal(t, 1, count) +} + +// TestHandleRegisterValidator_InvalidJSON verifies that an invalid registration returns bad gateway +func TestHandleRegisterValidator_InvalidJSON(t *testing.T) { + relay := mock.NewRelay(t) + defer relay.Server.Close() + + m := &BoostService{ + relays: []types.RelayEntry{relay.RelayEntry}, + httpClientRegVal: *http.DefaultClient, + log: logrus.NewEntry(logrus.New()), + } + + reqBody := bytes.NewBufferString("invalid json") + req := httptest.NewRequest(http.MethodPost, "https://example.com"+params.PathRegisterValidator, reqBody) + req.Header.Set("Content-Type", "application/json") + + rr := httptest.NewRecorder() + m.handleRegisterValidator(rr, req) + + require.Equal(t, http.StatusBadGateway, rr.Code) + + count := relay.GetRequestCount(params.PathRegisterValidator) + require.Equal(t, 1, count) +} + +// TestHandleRegisterValidator_ValidSSZ verifies that a valid registration returns status ok +func TestHandleRegisterValidator_ValidSSZ(t *testing.T) { + relay := mock.NewRelay(t) + defer relay.Server.Close() + + m := &BoostService{ + relays: []types.RelayEntry{relay.RelayEntry}, + httpClientRegVal: *http.DefaultClient, + log: logrus.NewEntry(logrus.New()), + } + + validatorRegistrations := []builderApiV1.SignedValidatorRegistration{ + { + Message: &builderApiV1.ValidatorRegistration{ + Timestamp: time.Unix(1, 0), + }, + }, + { + Message: &builderApiV1.ValidatorRegistration{ + Timestamp: time.Unix(2, 0), + }, + }, + } + + // TODO(jtraglia): Use SSZ here when a SignedValidatorRegistrationList type exists. + // See: https://github.com/attestantio/go-builder-client/pull/38 + encodedValidatorRegistrations, err := json.Marshal(validatorRegistrations) + require.NoError(t, err) + + reqBody := bytes.NewBuffer(encodedValidatorRegistrations) + req := httptest.NewRequest(http.MethodPost, "https://example.com"+params.PathRegisterValidator, reqBody) + req.Header.Set("Content-Type", "application/octet-stream") + + rr := httptest.NewRecorder() + m.handleRegisterValidator(rr, req) + + require.Equal(t, http.StatusOK, rr.Code) + + count := relay.GetRequestCount(params.PathRegisterValidator) + require.Equal(t, 1, count) +} + +// TestHandleRegisterValidator_InvalidSSZ verifies that an invalid registration returns bad gateway +func TestHandleRegisterValidator_InvalidSSZ(t *testing.T) { + relay := mock.NewRelay(t) + defer relay.Server.Close() + + m := &BoostService{ + relays: []types.RelayEntry{relay.RelayEntry}, + httpClientRegVal: *http.DefaultClient, + log: logrus.NewEntry(logrus.New()), + } + + reqBody := bytes.NewBufferString("invalid ssz") + req := httptest.NewRequest(http.MethodPost, "https://example.com"+params.PathRegisterValidator, reqBody) + req.Header.Set("Content-Type", "application/octet-stream") + + rr := httptest.NewRecorder() + m.handleRegisterValidator(rr, req) + + // TODO(jtraglia): Enable this when a SignedValidatorRegistrationList type exists. + // See: https://github.com/attestantio/go-builder-client/pull/38 + // require.Equal(t, http.StatusBadGateway, rr.Code) + + count := relay.GetRequestCount(params.PathRegisterValidator) + require.Equal(t, 1, count) +} + +// TestHandleRegisterValidator_MultipleRelaysOneSuccess verifies that if one relay succeeds the response is ok +func TestHandleRegisterValidator_MultipleRelaysOneSuccess(t *testing.T) { + badRelay := mock.NewRelay(t) + defer badRelay.Server.Close() + badRelay.OverrideHandleRegisterValidator(func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "simulated failure", http.StatusInternalServerError) + }) + + relaySuccess := mock.NewRelay(t) + defer relaySuccess.Server.Close() + + m := &BoostService{ + relays: []types.RelayEntry{badRelay.RelayEntry, relaySuccess.RelayEntry}, + httpClientRegVal: *http.DefaultClient, + log: logrus.NewEntry(logrus.New()), + } + + reqBody := bytes.NewBufferString("[]") + req := httptest.NewRequest(http.MethodPost, "https://example.com"+params.PathRegisterValidator, reqBody) + req.Header.Set("Content-Type", "application/json") + + rr := httptest.NewRecorder() + m.handleRegisterValidator(rr, req) + + require.Equal(t, http.StatusOK, rr.Code) + + countBadRelay := badRelay.GetRequestCount(params.PathRegisterValidator) + require.Equal(t, 1, countBadRelay) + countSuccess := relaySuccess.GetRequestCount(params.PathRegisterValidator) + require.Equal(t, 1, countSuccess) +} + +// TestHandleRegisterValidator_AllFail verifies that if all relays fail the response is bad gateway +func TestHandleRegisterValidator_AllFail(t *testing.T) { + badRelay1 := mock.NewRelay(t) + defer badRelay1.Server.Close() + badRelay1.OverrideHandleRegisterValidator(func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "simulated failure 1", http.StatusInternalServerError) + }) + + badRelay2 := mock.NewRelay(t) + defer badRelay2.Server.Close() + badRelay2.OverrideHandleRegisterValidator(func(w http.ResponseWriter, _ *http.Request) { + http.Error(w, "simulated failure 2", http.StatusInternalServerError) + }) + + m := &BoostService{ + relays: []types.RelayEntry{badRelay1.RelayEntry, badRelay2.RelayEntry}, + httpClientRegVal: *http.DefaultClient, + log: logrus.NewEntry(logrus.New()), + } + + reqBody := bytes.NewBufferString("[]") + req := httptest.NewRequest(http.MethodPost, "https://example.com"+params.PathRegisterValidator, reqBody) + req.Header.Set("Content-Type", "application/json") + + rr := httptest.NewRecorder() + m.handleRegisterValidator(rr, req) + + require.Equal(t, http.StatusBadGateway, rr.Code) + + countBadRelay1 := badRelay1.GetRequestCount(params.PathRegisterValidator) + require.Equal(t, 1, countBadRelay1) + countBadRelay2 := badRelay2.GetRequestCount(params.PathRegisterValidator) + require.Equal(t, 1, countBadRelay2) +} + +// TestHandleRegisterValidator_RelayNetworkError verifies that a network error results in bad gateway +func TestHandleRegisterValidator_RelayNetworkError(t *testing.T) { + relay := mock.NewRelay(t) + relay.Server.Close() // simulate network error + + m := &BoostService{ + relays: []types.RelayEntry{relay.RelayEntry}, + httpClientRegVal: *http.DefaultClient, + log: logrus.NewEntry(logrus.New()), + } + + reqBody := bytes.NewBufferString("[]") + req := httptest.NewRequest(http.MethodPost, "https://example.com"+params.PathRegisterValidator, reqBody) + req.Header.Set("Content-Type", "application/json") + + rr := httptest.NewRecorder() + m.handleRegisterValidator(rr, req) + + require.Equal(t, http.StatusBadGateway, rr.Code) +} + +// TestHandleRegisterValidator_HeaderPropagation verifies that headers from the request are forwarded +func TestHandleRegisterValidator_HeaderPropagation(t *testing.T) { + relay := mock.NewRelay(t) + defer relay.Server.Close() + + headerChan := make(chan http.Header, 1) + relay.OverrideHandleRegisterValidator(func(w http.ResponseWriter, req *http.Request) { + headerChan <- req.Header + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + }) + + m := &BoostService{ + relays: []types.RelayEntry{relay.RelayEntry}, + httpClientRegVal: *http.DefaultClient, + log: logrus.NewEntry(logrus.New()), + } + + reqBody := bytes.NewBufferString("[]") + req := httptest.NewRequest(http.MethodPost, "https://example.com"+params.PathRegisterValidator, reqBody) + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Custom-Header", "custom-value") + + rr := httptest.NewRecorder() + m.handleRegisterValidator(rr, req) + + require.Equal(t, http.StatusOK, rr.Code) + + select { + case capturedHeader := <-headerChan: + require.Equal(t, "custom-value", capturedHeader.Get("X-Custom-Header")) + case <-time.After(2 * time.Second): + t.Fatal("timed out waiting for header capture") + } +} diff --git a/server/service.go b/server/service.go index 1e041cd9..38d0c410 100644 --- a/server/service.go +++ b/server/service.go @@ -16,7 +16,6 @@ import ( "time" builderApi "github.com/attestantio/go-builder-client/api" - builderApiV1 "github.com/attestantio/go-builder-client/api/v1" eth2ApiV1Bellatrix "github.com/attestantio/go-eth2-client/api/v1/bellatrix" eth2ApiV1Capella "github.com/attestantio/go-eth2-client/api/v1/capella" eth2ApiV1Deneb "github.com/attestantio/go-eth2-client/api/v1/deneb" @@ -209,22 +208,6 @@ func (m *BoostService) startBidCacheCleanupTask() { } } -func (m *BoostService) sendValidatorRegistrationsToRelayMonitors(payload []builderApiV1.SignedValidatorRegistration) { - log := m.log.WithField("method", "sendValidatorRegistrationsToRelayMonitors").WithField("numRegistrations", len(payload)) - for _, relayMonitor := range m.relayMonitors { - go func(relayMonitor *url.URL) { - url := types.GetURI(relayMonitor, params.PathRegisterValidator) - log = log.WithField("url", url) - _, err := SendHTTPRequest(context.Background(), m.httpClientRegVal, http.MethodPost, url, "", nil, payload, nil) - if err != nil { - log.WithError(err).Warn("error calling registerValidator on relay monitor") - return - } - log.Debug("sent validator registrations to relay monitor") - }(relayMonitor) - } -} - func (m *BoostService) handleRoot(w http.ResponseWriter, _ *http.Request) { m.respondOK(w, nilResponse) } @@ -240,54 +223,42 @@ func (m *BoostService) handleStatus(w http.ResponseWriter, _ *http.Request) { } } -// handleRegisterValidator returns StatusOK if at least one relay returns StatusOK, else StatusBadGateway +// handleRegisterValidator returns StatusOK if at least one relay returns StatusOK, else StatusBadGateway. +// This forwards the message from the node to relays with minimal overhead. The registrations will maintain their +// original encoding (SSZ or JSON) from the node. func (m *BoostService) handleRegisterValidator(w http.ResponseWriter, req *http.Request) { log := m.log.WithField("method", "registerValidator") - log.Debug("registerValidator") - - payload := []builderApiV1.SignedValidatorRegistration{} - if err := DecodeJSON(req.Body, &payload); err != nil { - m.respondError(w, http.StatusBadRequest, err.Error()) - return - } + log.Debug("handling request") + // Get the user agent ua := UserAgent(req.Header.Get("User-Agent")) - log = log.WithFields(logrus.Fields{ - "numRegistrations": len(payload), - "ua": ua, - }) + log = log.WithFields(logrus.Fields{"ua": ua}) - // Add request headers - headers := map[string]string{ - HeaderStartTimeUnixMS: fmt.Sprintf("%d", time.Now().UTC().UnixMilli()), - } + // Additional header fields + header := req.Header + header.Set("User-Agent", wrapUserAgent(ua)) + header.Set(HeaderStartTimeUnixMS, fmt.Sprintf("%d", time.Now().UTC().UnixMilli())) - relayRespCh := make(chan error, len(m.relays)) - - for _, relay := range m.relays { - go func(relay types.RelayEntry) { - url := relay.GetURI(params.PathRegisterValidator) - log := log.WithField("url", url) - - _, err := SendHTTPRequest(context.Background(), m.httpClientRegVal, http.MethodPost, url, ua, headers, payload, nil) - if err != nil { - log.WithError(err).Warn("error calling registerValidator on relay") - } - relayRespCh <- err - }(relay) + // Read the validator registrations + regBytes, err := io.ReadAll(req.Body) + if err != nil { + m.respondError(w, http.StatusInternalServerError, err.Error()) + return } + req.Body.Close() - go m.sendValidatorRegistrationsToRelayMonitors(payload) + // Send the registrations to relay monitors, if configured + go m.sendValidatorRegistrationsToRelayMonitors(log, regBytes, header) - for i := 0; i < len(m.relays); i++ { - respErr := <-relayRespCh - if respErr == nil { - m.respondOK(w, nilResponse) - return - } + // Send the registrations to each relay + err = m.registerValidator(log, regBytes, header) + if err == nil { + // One of the relays responded OK + m.respondOK(w, nilResponse) + } else { + // None of the relays responded OK + m.respondError(w, http.StatusBadGateway, err.Error()) } - - m.respondError(w, http.StatusBadGateway, errNoSuccessfulRelayResponse.Error()) } // handleGetHeader requests bids from the relays diff --git a/server/utils.go b/server/utils.go index 97dcb81e..70ca747c 100644 --- a/server/utils.go +++ b/server/utils.go @@ -268,3 +268,7 @@ func getPayloadResponseIsEmpty(payload *builderApi.VersionedSubmitBlindedBlockRe } return false } + +func wrapUserAgent(ua UserAgent) string { + return strings.TrimSpace(fmt.Sprintf("mev-boost/%s %s", config.Version, ua)) +}