Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add RPC events and middleware data for better monitoring #118

Merged
merged 3 commits into from
Sep 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions packages/execution/src/common/networks.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
import { Network } from '@ethersproject/networks';
import { Networkish } from '../interfaces/networkish';
import { ConnectionInfo } from '@ethersproject/web';

const IP_V4_REGEX = new RegExp(
/^(?<domain>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})(?::(?<port>\d+))?/i,
);

const DOMAIN_REGEX = new RegExp(
/^(?<protocol>https?:\/\/)(?=(?<fqdn>[^:/]+))(?:(?<service>www|ww\d|cdn|ftp|mail|pop\d?|ns\d?|git)\.)?(?:(?<subdomain>[^:/]+)\.)*(?<domain>[^:/]+\.[a-z0-9]+)(?::(?<port>\d+))?(?<path>\/[^?]*)?(?:\?(?<query>[^#]*))?(?:#(?<hash>.*))?/i,
);

export const networksEqual = (
networkA: Network,
Expand All @@ -22,3 +31,22 @@ export const networksChainsEqual = (
networkA: Network,
networkB: Networkish,
): boolean => networkA.chainId === getNetworkChain(networkB);

export const getConnectionFQDN = (
connectionInfo: ConnectionInfo | string,
): string => {
const urlLike =
typeof connectionInfo === 'string' ? connectionInfo : connectionInfo.url;

const ipGroups = urlLike.match(IP_V4_REGEX)?.groups;

if (ipGroups) {
/* istanbul ignore next */
return ipGroups.domain ?? '';
}

const groups = urlLike.match(DOMAIN_REGEX)?.groups;

/* istanbul ignore next */
return groups?.fqdn ?? '';
};
62 changes: 62 additions & 0 deletions packages/execution/src/events/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { SimpleFallbackJsonRpcBatchProvider } from '../provider/simple-fallback-json-rpc-batch-provider';
import { AllProvidersFailedError } from '../error';
import {
ExtendedJsonRpcBatchProvider,
JsonRpcRequest,
JsonRpcResponse,
} from '../provider/extended-json-rpc-batch-provider';

export type FallbackProviderRequestFailedAllEvent = {
action: 'fallback-provider:request:failed:all';
provider: SimpleFallbackJsonRpcBatchProvider;
error: AllProvidersFailedError;
};

export type FallbackProviderRequestNonRetryableErrorEvent = {
action: 'fallback-provider:request:non-retryable-error';
provider: SimpleFallbackJsonRpcBatchProvider;
error: Error | unknown;
};

export type FallbackProviderRequestEvent = {
action: 'fallback-provider:request';
provider: SimpleFallbackJsonRpcBatchProvider;
activeFallbackProviderIndex: number;
fallbackProvidersCount: number;
domain: string;
retryAttempt: number;
};

export type ProviderResponseBatchedErrorEvent = {
action: 'provider:response-batched:error';
error: Error;
request: JsonRpcRequest[];
provider: ExtendedJsonRpcBatchProvider;
domain: string;
};

export type ProviderResponseBatchedEvent = {
action: 'provider:response-batched';
request: JsonRpcRequest[];
response: JsonRpcResponse[] | JsonRpcResponse;
provider: ExtendedJsonRpcBatchProvider;
domain: string;
};

export type ProviderRequestBatchedEvent = {
action: 'provider:request-batched';
request: JsonRpcRequest[];
provider: ExtendedJsonRpcBatchProvider;
domain: string;
};

export type ProviderEvents =
| ProviderRequestBatchedEvent
| ProviderResponseBatchedEvent
| ProviderResponseBatchedErrorEvent;

export type FallbackProviderEvents =
| ProviderEvents
| FallbackProviderRequestEvent
| FallbackProviderRequestNonRetryableErrorEvent
| FallbackProviderRequestFailedAllEvent;
1 change: 1 addition & 0 deletions packages/execution/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ export * from './interfaces/module.options';
export * from './interfaces/non-empty-array';
export * from './ethers/fee-history';
export * from './error';
export * from './events';
64 changes: 52 additions & 12 deletions packages/execution/src/provider/extended-json-rpc-batch-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,25 @@ import { FeeHistory, getFeeHistory } from '../ethers/fee-history';
import { ErrorCode } from '../error/codes/error-codes';
import { TraceConfig, TraceResult } from '../interfaces/debug-traces';
import { getDebugTraceBlockByHash } from '../ethers/debug-trace-block-by-hash';
import { getConnectionFQDN } from '../common/networks';
import { EventEmitter } from 'events';
import {
ProviderEvents,
ProviderRequestBatchedEvent,
ProviderResponseBatchedErrorEvent,
ProviderResponseBatchedEvent,
} from '../events';

// this will help with autocomplete
export interface ExtendedJsonRpcBatchProviderEventEmitter
extends NodeJS.EventEmitter {
on(eventName: 'rpc', listener: (event: ProviderEvents) => void): this;
once(eventName: 'rpc', listener: (event: ProviderEvents) => void): this;
addListener(
eventName: 'rpc',
listener: (event: ProviderEvents) => void,
): this;
}

export interface RequestPolicy {
jsonRpcMaxBatchSize: number;
Expand Down Expand Up @@ -104,6 +123,8 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {
protected _concurrencyLimiter: LimitFunction;
protected _tickCounter = 0;
protected _fetchMiddlewareService: MiddlewareService<Promise<any>>;
protected _domain: string;
protected _eventEmitter: ExtendedJsonRpcBatchProviderEventEmitter;

public constructor(
url: ConnectionInfo | string,
Expand All @@ -112,6 +133,8 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {
fetchMiddlewares: MiddlewareCallback<Promise<any>>[] = [],
) {
super(url, network);
this._eventEmitter = new EventEmitter();
this._domain = getConnectionFQDN(url);
this._requestPolicy = requestPolicy ?? {
jsonRpcMaxBatchSize: 200,
maxConcurrentRequests: 5,
Expand Down Expand Up @@ -152,28 +175,33 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {

const batchRequest = batch.map((intent) => intent.request);

this.emit('debug', {
action: 'requestBatch',
const event: ProviderRequestBatchedEvent = {
action: 'provider:request-batched',
request: deepCopy(batchRequest),
provider: this,
});
domain: this._domain,
};
this._eventEmitter.emit('rpc', event);

this._concurrencyLimiter(() => {
return this._fetchMiddlewareService.go(
() => this.fetchJson(this.connection, JSON.stringify(batchRequest)),
{
provider: this,
domain: this._domain,
},
);
})
.then(
(batchResult: JsonRpcResponse[] | JsonRpcResponse) => {
this.emit('debug', {
action: 'response',
const event: ProviderResponseBatchedEvent = {
action: 'provider:response-batched',
request: deepCopy(batchRequest),
response: deepCopy(batchResult),
provider: this,
});
domain: this._domain,
};
this._eventEmitter.emit('rpc', event);

if (!Array.isArray(batchResult)) {
const errMessage = 'Unexpected batch result.';
Expand Down Expand Up @@ -216,12 +244,14 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {
});
},
(error: Error) => {
this.emit('debug', {
action: 'response',
const event: ProviderResponseBatchedErrorEvent = {
action: 'provider:response-batched:error',
error: error,
request: deepCopy(batchRequest),
provider: this,
});
domain: this._domain,
};
this._eventEmitter.emit('rpc', event);

batch.forEach((inflightRequest) => {
inflightRequest.reject(error);
Expand All @@ -230,12 +260,14 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {
)
.catch((error: Error) => {
// catch errors happening in the 'then' callback
this.emit('debug', {
action: 'response',
const event: ProviderResponseBatchedErrorEvent = {
action: 'provider:response-batched:error',
error: error,
request: deepCopy(batchRequest),
provider: this,
});
domain: this._domain,
};
this._eventEmitter.emit('rpc', event);

batch.forEach((inflightRequest) => {
inflightRequest.reject(error);
Expand Down Expand Up @@ -299,6 +331,14 @@ export class ExtendedJsonRpcBatchProvider extends JsonRpcProvider {
this._fetchMiddlewareService.use(callback);
}

public get domain(): string {
return this._domain;
}

public get eventEmitter() {
return this._eventEmitter;
}

public send(method: string, params: Array<unknown>): Promise<unknown> {
const request: JsonRpcRequest = {
method: method,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ import { AllProvidersFailedError } from '../error/all-providers-failed.error';
import { FeeHistory, getFeeHistory } from '../ethers/fee-history';
import { TraceConfig, TraceResult } from '../interfaces/debug-traces';
import { getDebugTraceBlockByHash } from '../ethers/debug-trace-block-by-hash';
import { EventEmitter } from 'events';
import {
FallbackProviderEvents,
FallbackProviderRequestEvent,
FallbackProviderRequestFailedAllEvent,
FallbackProviderRequestNonRetryableErrorEvent,
} from '../events';

/**
* EIP-1898 support
Expand Down Expand Up @@ -62,6 +69,20 @@ declare module '@ethersproject/providers' {
}
}

// this will help with autocomplete
export interface SimpleFallbackJsonRpcBatchProviderEventEmitter
extends NodeJS.EventEmitter {
on(eventName: 'rpc', listener: (event: FallbackProviderEvents) => void): this;
once(
eventName: 'rpc',
listener: (event: FallbackProviderEvents) => void,
): this;
addListener(
eventName: 'rpc',
listener: (event: FallbackProviderEvents) => void,
): this;
}

@Injectable()
export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
protected config: SimpleFallbackProviderConfig;
Expand All @@ -73,12 +94,14 @@ export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
// it is crucial not to mix these two errors
protected lastPerformError: Error | null | unknown = null; // last error for 'perform' operations, is batch-oriented
protected lastError: Error | null | unknown = null; // last error for whole provider
protected _eventEmitter: SimpleFallbackJsonRpcBatchProviderEventEmitter;

public constructor(
config: SimpleFallbackProviderConfig,
logger: LoggerService,
) {
super(config.network);
this._eventEmitter = new EventEmitter();
this.config = {
maxRetries: 3,
minBackoffMs: 500,
Expand Down Expand Up @@ -113,6 +136,12 @@ export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
config.requestPolicy,
config.fetchMiddlewares ?? [],
);

// re-emitting events from fallback-providers
provider.eventEmitter.on('rpc', (event) => {
this._eventEmitter.emit('rpc', event);
});

return {
network: null,
provider,
Expand Down Expand Up @@ -245,16 +274,37 @@ export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
// maximum number of switching is limited to total fallback provider count
while (attempt < this.fallbackProviders.length) {
try {
let performRetryAttempt = 0;
attempt++;
// awaiting is extremely important here
// without it, the error will not be caught in current try-catch scope
return await retry(() =>
this.provider.provider.perform(method, params),
);
return await retry(() => {
const provider = this.provider;

const event: FallbackProviderRequestEvent = {
action: 'fallback-provider:request',
provider: this,
activeFallbackProviderIndex: this.activeFallbackProviderIndex,
fallbackProvidersCount: this.fallbackProviders.length,
domain: provider.provider.domain,
retryAttempt: performRetryAttempt,
};
this._eventEmitter.emit('rpc', event);

performRetryAttempt++;
return provider.provider.perform(method, params);
});
} catch (e) {
this.lastError = e;

// checking that error should not be retried on another provider
if (this.isNonRetryableError(e)) {
const event: FallbackProviderRequestNonRetryableErrorEvent = {
action: 'fallback-provider:request:non-retryable-error',
provider: this,
error: e,
};
this._eventEmitter.emit('rpc', event);
throw e;
}

Expand All @@ -279,6 +329,14 @@ export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
'All attempts to do ETH1 RPC request failed',
);
allProvidersFailedError.cause = this.lastError;

const event: FallbackProviderRequestFailedAllEvent = {
action: 'fallback-provider:request:failed:all',
provider: this,
error: allProvidersFailedError,
};
this._eventEmitter.emit('rpc', event);

throw allProvidersFailedError;
}

Expand Down Expand Up @@ -382,4 +440,8 @@ export class SimpleFallbackJsonRpcBatchProvider extends BaseProvider {
public get activeProviderIndex() {
return this.activeFallbackProviderIndex;
}

public get eventEmitter() {
return this._eventEmitter;
}
}
18 changes: 17 additions & 1 deletion packages/execution/test/fallback-provider.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import { MiddlewareCallback } from '@lido-nestjs/middleware';
import { Network } from '@ethersproject/networks';
import { nonRetryableErrors } from '../src/common/errors';
import { ErrorCode, Logger } from '@ethersproject/logger';
import { AllProvidersFailedError } from '../src';
import { AllProvidersFailedError, FallbackProviderEvents } from '../src';

export type MockedExtendedJsonRpcBatchProvider =
ExtendedJsonRpcBatchProvider & {
Expand Down Expand Up @@ -797,5 +797,21 @@ describe('Execution module. ', () => {
// second 'getBlock' fetch call to second provider that does not fail
expect(mockedFallbackProviderFetch[1]).toBeCalledTimes(2);
});

test('should emit `fallback-provider:request` events', async () => {
await createMocks(2);

let retryAttempt = NaN;
mockedProvider.eventEmitter.on('rpc', (event: FallbackProviderEvents) => {
if (event.action === 'fallback-provider:request') {
retryAttempt = event.retryAttempt;
}
});

const block = await mockedProvider.getBlock(42);

expect(retryAttempt).toBe(0);
expect(block.hash).toBe(fixtures.eth_getBlockByNumber.default.hash);
});
});
});
Loading
Loading