diff --git a/CHANGELOG.md b/CHANGELOG.md index dae5664..9c2be7d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -41,4 +41,11 @@ A ready version to use (hopefully) - Fixed a bug where the leader would not send a `StorageState` notification to a follower when the follower was behind the leader. - Fixed bug of remaining remote peer not part of the grid due to follower ENDPOINT_STATE_NOTIFICATION contained a wrong endpoint. -- Changing follower behavior when falling out of the grid. Instead of trying to collect endpoints, it periodically sends JOIN_NOTIFICATION until a leader is not elected for the endpoint \ No newline at end of file +- Changing follower behavior when falling out of the grid. Instead of trying to collect endpoints, it periodically sends JOIN_NOTIFICATION until a leader is not elected for the endpoint + +### 2.6.0 + + - Update `HamokEmitter` to have metaData property bind to subscriber peers per events. + * Add Subscriptions to `HamokEmitter` to track the subscribers of an event. + * Add `HamokEmitterStats` to track the number of events emitted and received by the emitter. + - Simplifying discovery and joining methods in `Hamok`. \ No newline at end of file diff --git a/README.md b/README.md index d4af464..64ba6f7 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,7 @@ yarn add hamok - [HamokEmitter](#hamokemitter) - [HamokRecord](#hamokrecord) - [User Manual](#user-manual) +- [NPM library](#npm-library) - [Important Notes](#important-notes) - [Contributing](#contributing) - [License](#license) @@ -129,7 +130,11 @@ HamokRecord is a feature that provides distributed storage for individual record ## User Manual -You can find detailed user manuals [here](https://balazskreith.github.io/hamok-ts/) +You can find detailed user manuals [here](https://balazskreith.github.io/hamok-ts/). + +## NPM library + +The Hamok library is available on [NPM](https://www.npmjs.com/package/hamok). ## Contributing diff --git a/docs/emitter.md b/docs/emitter.md index 02506db..da08330 100644 --- a/docs/emitter.md +++ b/docs/emitter.md @@ -10,6 +10,7 @@ - [Properties](#properties) - [Events](#events) - [Methods](#methods) + - [Subscriptions](#subscriptions) - [Examples](#examples) - [FAQ](#faq) @@ -131,6 +132,75 @@ emitter.unsubscribe("event", (data) => { emitter.close(); ``` +### Subscriptions + +Peers subscribe to events using the `subscribe` method. When an event is published, all subscribed peers receive the event. +You can observe the subscriptions by accessing the `subscriptions` property of the emitter. + +```typescript +// Handle a peer subscribing to an event +for (const [event, peers] of emitter.subscriptions.entries()) { + console.log(`Event: ${event}`, [ ...peers.entries() ]); +} +``` + +Additionally you can listen to the `add-peer` and `remove-peer` events to observe the subscriptions. + +```typescript +// Handle a peer subscribing to an event +emitter.subscriptions.on('add-peer', (event, peerId) => { + console.log(`Peer ${peerId} subscribed to event ${event}.`); +}); + +// Handle a peer unsubscribing from an event +emitter.subscriptions.on('remove-peer', (event, peerId) => { + console.log(`Peer ${peerId} unsubscribed from event ${event}.`); +}); +``` + +#### Add Metadata to subscription + +Peers can subscribe to an event with metadata. + +```typescript +type SubscriptionMetaData = { + userId: string; + timestamp: Date; // Time when the peer subscribed +} + +// Create the emitter with metadata for subscriptions +const emitter = hamok.createEmitter({ + emitterId: "exampleEmitter", +}); + +// Handle a peer subscribing to an event +emitter.subscriptions.on('add-peer', (event, peerId, metaData) => { + console.log(`Peer ${peerId} subscribed to event ${event}.`); + console.log(`User ID: ${metaData?.userId}`); + console.log(`Subscription Timestamp: ${metaData.timestamp}`); +}); + +// Handle a peer unsubscribing from an event +emitter.subscriptions.on('remove-peer', (event, peerId, metaData) => { + console.log(`Peer ${peerId} unsubscribed from event ${event}.`); + console.log(`User ID: ${metaData?.userId}`); + console.log(`Unsubscription Timestamp: ${new Date().toISOString()}`); +}); + +``` + +#### Update Metadata for subscription + +Peers can update the metadata for an existing subscription. + +```typescript +// Update the metadata for a subscription +emitter.subscriptions.updateSubscriptionMetaData('myEvent', 'peerId', { + userId +}, prevMetaData); +``` + + ## Examples - [simple distributed emitter](https://github.com/balazskreith/hamok-ts/blob/main/examples/src/emitter-example.ts) diff --git a/examples/.gitignore b/examples/.gitignore index 5cb8347..71b628a 100644 --- a/examples/.gitignore +++ b/examples/.gitignore @@ -3,4 +3,5 @@ purgatory/ node_modules lib/ package-lock.json -# yarn.lock \ No newline at end of file +# yarn.lock +_mything.ts \ No newline at end of file diff --git a/examples/package.json b/examples/package.json index 243c191..600ee73 100644 --- a/examples/package.json +++ b/examples/package.json @@ -5,6 +5,7 @@ "main": "main.js", "scripts": { "format": "prettier --write \"src/**/*.ts\"", + "dev:my": "nodemon -x ts-node src/_mything.ts | pino-pretty", "dev:run-all": "nodemon -x ts-node src/run-all.ts | pino-pretty", "dev:readme": "nodemon -x ts-node src/common-readme-example.ts | pino-pretty", "dev:map:1": "nodemon -x ts-node src/map-insert-get-example.ts | pino-pretty", @@ -14,8 +15,10 @@ "dev:record:1": "nodemon -x ts-node src/record-insert-get-example.ts | pino-pretty", "dev:record:2": "nodemon -x ts-node src/record-events-example.ts | pino-pretty", "dev:record:3": "nodemon -x ts-node src/record-dynamic-creating-example.ts | pino-pretty", + "dev:record:4": "nodemon -x ts-node src/record-update-if.ts | pino-pretty", "dev:emitter:1": "nodemon -x ts-node src/emitter-example.ts | pino-pretty", "dev:emitter:2": "nodemon -x ts-node src/emitter-catchup-example.ts | pino-pretty", + "dev:emitter:3": "nodemon -x ts-node src/emitter-catchup-2-example.ts | pino-pretty", "dev:election:1": "nodemon -x ts-node src/reelection-example.ts | pino-pretty", "dev:queue:1": "nodemon -x ts-node src/queue-events-example.ts | pino-pretty", "dev:queue:2": "nodemon -x ts-node src/queue-push-pop-example.ts | pino-pretty", @@ -25,6 +28,7 @@ "dev:common:3": "nodemon -x ts-node src/common-join-leave-rejoin-example.ts | pino-pretty", "dev:common:4": "nodemon -x ts-node src/common-waiting-example.ts | pino-pretty", "dev:common:5": "nodemon -x ts-node src/common-join-leave-rejoin-example-2.ts | pino-pretty", + "dev:common:6": "nodemon -x ts-node src/common-rollout-example.ts | pino-pretty", "dev:redis:1": "nodemon -x ts-node src/redis-remote-map-example.ts | pino-pretty", "dev:redis:2": "nodemon -x ts-node src/redis-dynamic-record-example.ts | pino-pretty", "dev:redis:3": "nodemon -x ts-node src/redis-job-executing-example.ts | pino-pretty", @@ -57,7 +61,7 @@ "dependencies": { "pino": "^9.3.2", "ioredis": "^5.4.1", - "hamok": "file:../" + "hamok": "2.6.1-7ab70c1.0" }, "devDependencies": { "@types/events": "^3.0.0", diff --git a/examples/src/common-3-peers.ts b/examples/src/common-3-peers.ts new file mode 100644 index 0000000..de9f90c --- /dev/null +++ b/examples/src/common-3-peers.ts @@ -0,0 +1,78 @@ + + +import { Hamok, setHamokLogLevel } from 'hamok'; +import * as pino from 'pino'; +import { HamokMessageHub } from './utils/HamokMessageHub'; + +const logger = pino.pino({ + name: 'common-join-example-2', + level: 'debug', +}); + +export async function run() { + + const servers = new Map(); + const messageHub = new HamokMessageHub(); + const addServer = (server: Hamok) => { + const leaderChangedListener = () => { + logger.debug('Server %s, State: %s remotePeers: %s', server.localPeerId, server.raft.state.stateName, [...server.remotePeerIds].join(', ')); + } + server.once('close', () => { + servers.delete(server.localPeerId); + messageHub.remove(server); + server.off('leader-changed', leaderChangedListener); + }) + servers.set(server.localPeerId, server); + messageHub.add(server); + server.on('leader-changed', leaderChangedListener); + } + addServer(new Hamok()); + addServer(new Hamok()); + addServer(new Hamok()); + + await Promise.all([...servers.values()].map(server => server.join())); + + + + for (let i = 0; i < 10; ++i) { + const newServer = new Hamok(); + const oldServer = servers.values().next().value; + addServer(newServer); + // by having the communication channel we assume we can inquery remote endpoints + + + const timer = setInterval(() => { + const messages: string[] = []; + for (const server of servers.values()) { + messages.push(`server (${server.localPeerId}, state: ${server.state}) remotePeers are ${[...server.remotePeerIds].join(', ')}`); + } + logger.debug('iteration: %d\n, %s', i, messages.join('\n')); + }, 1000) + + + await newServer.join(); + + const messages: string[] = []; + for (const server of servers.values()) { + messages.push(`server (${server.localPeerId}, state: ${server.state}) remotePeers are ${[...server.remotePeerIds].join(', ')}`); + } + logger.debug('iteration: %d\n, %s', i, messages.join('\n')); + + oldServer.close(); + + clearInterval(timer); + } + + + logger.info('Close'); + + for (const server of servers.values()) { + server.close(); + } +} + +if (require.main === module) { + logger.info('Running from module file'); + setHamokLogLevel('debug'); + run(); +} diff --git a/examples/src/common-join-leave-rejoin-example-2.ts b/examples/src/common-join-leave-rejoin-example-2.ts index 3e45318..923b64f 100644 --- a/examples/src/common-join-leave-rejoin-example-2.ts +++ b/examples/src/common-join-leave-rejoin-example-2.ts @@ -31,6 +31,7 @@ import { Hamok, setHamokLogLevel } from 'hamok'; import * as pino from 'pino'; +import { HamokMessageHub } from './utils/HamokMessageHub'; const logger = pino.pino({ name: 'common-join-example-2', @@ -39,61 +40,63 @@ const logger = pino.pino({ export async function run() { - const server_1 = new Hamok({ - onlyFollower: true, - }); - logger.info('server 1 is %s', server_1.localPeerId); - const server1Acceptor = server_1.accept.bind(server_1); + const servers = new Map(); + const messageHub = new HamokMessageHub(); + const addServer = (server: Hamok) => { + const leaderChangedListener = () => { + logger.debug('Server %s, State: %s remotePeers: %s', server.localPeerId, server.raft.state.stateName, [...server.remotePeerIds].join(', ')); + } + server.once('close', () => { + servers.delete(server.localPeerId); + messageHub.remove(server); + server.off('leader-changed', leaderChangedListener); + }) + servers.set(server.localPeerId, server); + messageHub.add(server); + server.on('leader-changed', leaderChangedListener); + } + addServer(new Hamok()); + addServer(new Hamok()); - let server_1_joined = false; + await Promise.all([...servers.values()].map(server => server.join())); for (let i = 0; i < 10; ++i) { - const server_2 = new Hamok(); + const newServer = new Hamok(); + const oldServer = servers.values().next().value; + addServer(newServer); // by having the communication channel we assume we can inquery remote endpoints - logger.info('server 2 is %s', server_2.localPeerId); - - const server2Acceptor = server_2.accept.bind(server_2); - server_1.on('message', server2Acceptor); - server_2.on('message', server1Acceptor); const timer = setInterval(() => { - logger.debug('\ - \niteration: %d, \ - \nserver_1 (%s, state: %s) remotePeers are %s, \ - \nserver_2 (%s, state: %s) remotePeers are %s', - i, - server_1.localPeerId, - server_1.state, - [...server_1.remotePeerIds].join(', '), - server_2.localPeerId, - server_2.state, - [...server_2.remotePeerIds].join(', '), - ); + const messages: string[] = []; + for (const server of servers.values()) { + messages.push(`server (${server.localPeerId}, state: ${server.state}) remotePeers are ${[...server.remotePeerIds].join(', ')}`); + } + logger.debug('iteration: %d\n, %s', i, messages.join('\n')); }, 1000) - await Promise.all([ - server_1_joined ? Promise.resolve() : server_1.join(), - server_2.join(), - ]); - - logger.info('Server 1 and Server 2 joined'); + await newServer.join(); - server_2.close(); + const messages: string[] = []; + for (const server of servers.values()) { + messages.push(`server (${server.localPeerId}, state: ${server.state}) remotePeers are ${[...server.remotePeerIds].join(', ')}`); + } + logger.debug('iteration: %d\n, %s', i, messages.join('\n')); + + await new Promise(resolve => setTimeout(resolve, 10000)); - server_1.off('message', server2Acceptor); - server_2.off('message', server1Acceptor); + oldServer.close(); - server_1_joined = true; - server_1.raft.config.onlyFollower = false; clearInterval(timer); } logger.info('Close'); - server_1.close(); + for (const server of servers.values()) { + server.close(); + } } if (require.main === module) { diff --git a/examples/src/common-join-leave-rejoin-example.ts b/examples/src/common-join-leave-rejoin-example.ts index 66190e1..2d87194 100644 --- a/examples/src/common-join-leave-rejoin-example.ts +++ b/examples/src/common-join-leave-rejoin-example.ts @@ -105,6 +105,8 @@ export async function run() { logger.info('We remove server1Acceptor from server_2 and server_3 and see if rejoin event is triggered'); + logger.info('Waiting for rejoining event'); + await Promise.all([ new Promise(resolve => server_1.once('rejoining', () => (logger.info('Server_1 rejoin'), resolve()))), Promise.resolve(server_2.off('message', server1Acceptor)), @@ -115,6 +117,8 @@ export async function run() { logger.info('We add server1Acceptor to server_2 and server_3 and see if joined event is triggered'); + logger.info('Waiting for joined event'); + await Promise.all([ new Promise(resolve => server_1.once('joined', () => (logger.info('Server_1 joined'), resolve()))), Promise.resolve(server_2.on('message', server1Acceptor)), diff --git a/examples/src/common-rollout-example.ts b/examples/src/common-rollout-example.ts new file mode 100644 index 0000000..b6f5ec3 --- /dev/null +++ b/examples/src/common-rollout-example.ts @@ -0,0 +1,87 @@ + +import { Hamok, setHamokLogLevel } from 'hamok'; +import * as pino from 'pino'; +import { HamokMessageHub } from './utils/HamokMessageHub'; + +const logger = pino.pino({ + name: 'common-rollout-example', + level: 'debug', +}); + +export async function run() { + + const servers = new Map(); + const messageHub = new HamokMessageHub(); + const addServer = (server: Hamok) => { + const leaderChangedListener = () => { + logger.debug('Server %s, State: %s remotePeers: %s', server.localPeerId, server.raft.state.stateName, [...server.remotePeerIds].join(', ')); + } + server.once('close', () => { + logger.info('Server %s closed', server.localPeerId); + servers.delete(server.localPeerId); + messageHub.remove(server); + server.off('leader-changed', leaderChangedListener); + }) + servers.set(server.localPeerId, server); + messageHub.add(server); + server.on('leader-changed', leaderChangedListener); + logger.info('Server %s added', server.localPeerId); + } + addServer(new Hamok()); + addServer(new Hamok()); + addServer(new Hamok()); + + await Promise.all([...servers.values()].map(server => server.join())); + + for (let i = 0; i < 10; ++i) { + logger.info(` + ---------------- + Iteration %d + ---------------- + `, i); + + const oldServer = [...servers.values()].find(server => server.leader); + + if (!oldServer) { + logger.error('!!!!!!!! No leader found'); + break; + } + + const newServer = new Hamok(); + addServer(newServer); + + logger.info('!!!!!!!! Old server is %s, new server is %s', oldServer.localPeerId, newServer.localPeerId); + + await newServer.join(); + + logger.info('!!!!!!!! New server joined, let\'s stop old server %s', oldServer.localPeerId); + + await oldServer.leave(); + + logger.info('!!!!!!!! Old server left, make sure new server has a leader'); + + oldServer.close(); + + logger.info('!!!!!!!! Old server closed, wait 5s'); + + await new Promise(resolve => setTimeout(resolve, 5000)); + + logger.info('!!!!!!!! Wait until we have a leader'); + + await Promise.all([...servers.values()].map(server => server.waitUntilLeader())); + + + } + + logger.info('Close'); + + for (const server of servers.values()) { + server.close(); + } +} + +if (require.main === module) { + logger.info('Running from module file'); + setHamokLogLevel('debug'); + run(); +} diff --git a/examples/src/emitter-catchup-2-example.ts b/examples/src/emitter-catchup-2-example.ts new file mode 100644 index 0000000..88ab9a8 --- /dev/null +++ b/examples/src/emitter-catchup-2-example.ts @@ -0,0 +1,185 @@ +import { Hamok, HamokConfig, HamokEmitter, setHamokLogLevel } from 'hamok'; +import * as pino from 'pino'; +import { HamokMessageHub } from './utils/HamokMessageHub'; + +const logger = pino.pino({ + name: 'emitter-example', + level: 'debug', +}); + +type ExampleEventMap = { + 'simple-request': [requestId: string, { + 'param-1': number, + }], + 'complex-request': [requestId: string, { + 'param-1': number, + 'param-2': string, + }], + 'response': [requestId: string, payload?: unknown, error?: string], +} + +type PendingRequest = { + requestId: string, + timer: ReturnType, + resolve: (value: T) => void, + reject: (reason: string) => void, +} + +function createResponseHandler(hamok: Hamok<{requests: Map>}>) { + return (requestId: string, payload?: unknown, error?: string) => { + const pendingRequest = hamok.appData.requests.get(requestId); + + logger.debug('Response received by server (%s). requestId: %s, payload: %o, error: %o. do we have this request?: %s', hamok.localPeerId, requestId, payload, error, Boolean(pendingRequest)); + + if (pendingRequest) { + clearTimeout(pendingRequest.timer); + hamok.appData.requests.delete(requestId); + if (error) { + pendingRequest.reject(error); + } else { + pendingRequest.resolve(payload); + } + } + } +} + +function createRequest(requests: Map>, emitter: HamokEmitter, event: K, payload: ExampleEventMap[K][1]): Promise { + const requestId = Math.random().toString(36).substring(7); + + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + reject('Request timeout. event: ' + event + ', payload: ' + JSON.stringify(payload)); + }, 5000); + requests.set(requestId, { requestId, timer, resolve, reject }); + + logger.debug('Request sent by server (%s). requestId: %s, event: %s, payload: %o', emitter.connection.localPeerId, requestId, event, payload); + + emitter.notify(event, requestId as any, payload as any); + }); +} + + + +export async function run() { + + const server_1 = new Hamok<{ requests: Map> }>({ + peerId: 'server_1', + appData: { + requests: new Map(), + }, + onlyFollower: true, + }); + const server_2 = new Hamok<{ requests: Map> }>({ + peerId: 'server_2', + appData: { + requests: new Map(), + }, + }); + const server_3 = new Hamok<{ requests: Map> }>({ + peerId: 'server_3', + appData: { + requests: new Map(), + }, + onlyFollower: true, + }); + const messageHub = new HamokMessageHub(); + messageHub.add(server_1, server_2, server_3); + + await Promise.all([ + server_1.join(), + server_2.join(), + server_3.join(), + ]); + + logger.info('Servers are joined'); + + const emitter_1 = server_1.createEmitter({ + emitterId: 'my-distributed-emitter', + }); + const emitter_2 = server_2.createEmitter({ + emitterId: 'my-distributed-emitter', + }); + const emitter_3 = server_3.createEmitter({ + emitterId: 'my-distributed-emitter', + }); + + (emitter_1.subscriptions as any).on('debug', (log: any) => { + logger.debug('Log received by server_1: %o', log); + }) + emitter_1.subscriptions.on('added', (event, peerId, metaData) => { + logger.debug('On server_1 (%s) peer %s subscribed to event %s, metaData: %o. peers on event: %o', server_1.localPeerId, peerId, event, metaData, [...(emitter_1.subscriptions.getEventPeersMap(event) ?? [])]); + }); + emitter_2.subscriptions.on('added', (event, peerId, metaData) => { + logger.debug('On server_2 (%s) peer %s subscribed to event %s, metaData: %o. peers on event: %o', server_2.localPeerId, peerId, event, metaData, [...(emitter_2.subscriptions.getEventPeersMap(event) ?? [])]); + }); + emitter_3.subscriptions.on('added', (event, peerId, metaData) => { + logger.debug('On server_3 (%s) peer %s subscribed to event %s, metaData: %o. peers on event: %o', server_3.localPeerId, peerId, event, metaData, [...(emitter_3.subscriptions.getEventPeersMap(event) ?? [])]); + }); + emitter_1.subscriptions.on('removed', (event, peerId, metaData) => { + logger.debug('On server_1 (%s) peer %s unsubscribed from event %s, metaData: %o, peers on event: %o', server_1.localPeerId, peerId, event, metaData, [...(emitter_1.subscriptions.getEventPeersMap(event) ?? [])]); + }); + emitter_2.subscriptions.on('removed', (event, peerId, metaData) => { + logger.debug('On server_2 (%s) peer %s unsubscribed from event %s, metaData: %o, peers on event: %o', server_2.localPeerId, peerId, event, metaData, [...(emitter_2.subscriptions.getEventPeersMap(event) ?? [])]); + }); + emitter_3.subscriptions.on('removed', (event, peerId, metaData) => { + logger.debug('On server_3 (%s) peer %s unsubscribed from event %s, metaData: %o, peers on event: %o', server_3.localPeerId, peerId, event, metaData, [...(emitter_3.subscriptions.getEventPeersMap(event) ?? [])]); + }); + + + await emitter_1.subscribe('simple-request', (requestId, { 'param-1': param1 }) => { + logger.debug('Simple request received by server_1: %s, %s, %s', requestId, param1); + + emitter_1.notify('response', requestId, 10); + }); + + await emitter_2.subscribe('complex-request', (requestId, { 'param-1': param1, 'param-2': param2 }) => { + logger.debug('Complex request received by server_2: %s, %s, %s', requestId, param1, param2); + + if (10 < param1) { + emitter_2.notify('response', requestId, 20); + } + }); + + await emitter_1.subscribe('complex-request', (requestId, { 'param-1': param1, 'param-2': param2 }) => { + logger.debug('Complex request received by server_1: %s, %s, %s', requestId, param1, param2); + + if (param1 < 10) { + emitter_1.notify('response', requestId, 10); + } + }); + + await emitter_1.subscribe('response', createResponseHandler(server_1)); + await emitter_2.subscribe('response', createResponseHandler(server_2)); + await emitter_3.subscribe('response', createResponseHandler(server_3)); + + await emitter_1.ready; + await emitter_2.ready; + await emitter_3.ready; + + const request_1 = createRequest(server_1.appData.requests, emitter_1, 'simple-request', { 'param-1': 5 }) + const request_2 = createRequest(server_2.appData.requests, emitter_2, 'complex-request', { 'param-1': 9, 'param-2': 'test' }) + const request_3 = createRequest(server_2.appData.requests, emitter_2, 'complex-request', { 'param-1': 20, 'param-2': 'test' }) + + logger.info('Response for server : %d', await request_1); + logger.info('Response for server : %d', await request_2); + logger.info('Response for server : %d', await request_3); + + messageHub.remove(server_1); + + logger.info('We wait 10s') + await new Promise(resolve => setTimeout(resolve, 10000)); + + logger.info('We wait 10s') + await new Promise(resolve => setTimeout(resolve, 10000)); + + logger.info('we also add some listeners to server_2 to trigger the expiration of logs'); + + server_1.close(); + server_2.close(); +} + +if (require.main === module) { + logger.info('Running from module file'); + setHamokLogLevel('info'); + run(); +} diff --git a/examples/src/emitter-example.ts b/examples/src/emitter-example.ts index 088e427..e99bfa3 100644 --- a/examples/src/emitter-example.ts +++ b/examples/src/emitter-example.ts @@ -12,6 +12,10 @@ type ExampleEventMap = { 'event-2': [number, string], } +type ExampleSubscriberMetaData = { + some: string; +} + export async function run() { const server_1 = new Hamok(); @@ -27,24 +31,44 @@ export async function run() { logger.info('Servers are joined'); - const emitter_1 = await server_1.createEmitter({ + const emitter_1 = await server_1.createEmitter({ emitterId: 'my-distributed-emitter', }).ready; const emitter_2 = await server_2.createEmitter({ emitterId: 'my-distributed-emitter', }).ready; - - const listener = (number: number, string: string, boolean: boolean) => { + const eventListener = (number: number, string: string, boolean: boolean) => { logger.debug('Event-1 received by server_1: %s, %s, %s', number, string, boolean); }; - await emitter_1.subscribe('event-1', listener); + emitter_1.subscriptions.on('added', (event, peerId, metaData) => { + logger.debug('On server_1 (%s) peer %s subscribed to event %s, metaData: %o', server_1.localPeerId, peerId, event, metaData); + }); + emitter_1.subscriptions.on('removed', (event, peerId, metaData) => { + logger.debug('On server_1 (%s) peer %s unsubscribed from event %s, metaData: %o', server_1.localPeerId, peerId, event, metaData); + }); + emitter_1.subscriptions.on('updated', (event, peerId, newMetaData, oldMetaData) => { + logger.debug('On server_1 (%s) peer %s updated subscription to event %s, newMetaData: %o, oldMetaData: %o', server_1.localPeerId, peerId, event, newMetaData, oldMetaData); + }); + + await emitter_1.subscribe('event-1', eventListener); await emitter_2.subscribe('event-1', (number: number, string: string, boolean: boolean) => { logger.debug('Event-1 received by server_2: %s, %s, %s', number, string, boolean); - }); + }, { some: 'metadata' }); await emitter_2.subscribe('event-2', (number, string) => { logger.debug('Event-2 received by server_2: %s, %s', number, string); }); + const success1 = await emitter_2.updateSubscriptionMetaData('event-1', { some: 'new metadata' }, { some: 'metadat' }); + const success2 = await emitter_2.updateSubscriptionMetaData('event-1', { some: 'new metadata 2' }, { some: 'metadata' }); + const success3 = await emitter_2.updateSubscriptionMetaData('event-1', { some: 'new metadata 3' }); + try { + const success4 = await emitter_2.updateSubscriptionMetaData('event-2', { some: 'new metadata 4' }); + } catch (err) { + logger.debug('Update metadata failed: %s', err); + } + + logger.debug('Update metadata success: %s, %s, %s', success1, success2, success3); + logger.debug('Publishing event-1 from server_2'); await emitter_2.publish('event-1', 1, 'hello', true); @@ -52,7 +76,7 @@ export async function run() { await emitter_1.publish('event-2', 2, 'world'); logger.debug('Unsubscribing from event-1 on server_1'); - await emitter_1.unsubscribe('event-1', listener); + await emitter_1.unsubscribe('event-1', eventListener); logger.debug('Publishing event-1 from server_2'); await emitter_1.publish('event-1', 3, 'hello', false); diff --git a/examples/src/map-catchup-example.ts b/examples/src/map-catchup-example.ts index a2f735c..08fb38c 100644 --- a/examples/src/map-catchup-example.ts +++ b/examples/src/map-catchup-example.ts @@ -42,6 +42,11 @@ export async function run() { server_1.close(); server_2.close(); + + for (const [key, value] of storage_2) { + logger.debug('key: %s, value: %d', key, value); + } + } if (require.main === module) { diff --git a/examples/src/record-update-if.ts b/examples/src/record-update-if.ts new file mode 100644 index 0000000..1229708 --- /dev/null +++ b/examples/src/record-update-if.ts @@ -0,0 +1,74 @@ +import { Hamok, setHamokLogLevel } from 'hamok'; +import * as pino from 'pino'; + +const logger = pino.pino({ + name: 'record-update-if', + level: 'debug', +}); + +type MySharedConfig = { + foo: string; + bar: number; +} + +export async function run() { + const server_1 = new Hamok(); + const server_2 = new Hamok(); + + server_1.on('message', server_2.accept.bind(server_2)); + server_2.on('message', server_1.accept.bind(server_1)); + + server_1.addRemotePeerId(server_2.localPeerId); + server_2.addRemotePeerId(server_1.localPeerId); + + await Promise.all([ + server_1.join(), + server_2.join(), + ]); + + const storage_1 = server_1.createRecord({ + recordId: 'my-replicated-record', + }); + const storage_2 = server_2.createRecord({ + recordId: 'my-replicated-record', + }); + + logger.debug(`Inserting values into replicated record.`); + + const [ insertedByServer2, insertedByServer1 ] = await Promise.all([ + storage_1.insertInstance({ + bar: 1, + foo: 'inserted-by-server-1', + }), + storage_2.insertInstance({ + bar: 2, + foo: 'inserted-by-server-2', + }), + ]); + + logger.debug('Inserted values into replicated record. %o, %o', insertedByServer1, insertedByServer2); + + if (insertedByServer1 && !insertedByServer2) logger.debug('Server 1 inserted %o', insertedByServer1); + else if (insertedByServer2 && !insertedByServer1) logger.debug('Server 2 inserted %o', insertedByServer2); + else throw new Error('Both servers inserted the record or neither of them'); + + const [ updatedByServer1, updatedByServer2 ] = await Promise.all([ + storage_1.updateInstanceIf({bar: 5}, insertedByServer1 ?? insertedByServer2 ?? {}), + storage_2.updateInstanceIf({bar: 6}, insertedByServer1 ?? insertedByServer2 ?? {}) + ]); + + updatedByServer1 && logger.debug('Server 1 updated %o. instance: %o', updatedByServer1, storage_1.instance); + updatedByServer2 && logger.debug('Server 2 updated %o. instance: %o', updatedByServer2, storage_2.instance); + + await new Promise(resolve => setTimeout(resolve, 1000)); + + server_1.close(); + server_2.close(); +} + +if (require.main === module) { + logger.info('Running from module file'); + setHamokLogLevel('info'); + run(); +} + diff --git a/examples/src/run-all.ts b/examples/src/run-all.ts index 83c5545..5cebd01 100644 --- a/examples/src/run-all.ts +++ b/examples/src/run-all.ts @@ -8,7 +8,6 @@ import { run as recordEvents } from './record-events-example'; import { run as recordDynamicCreating } from './record-dynamic-creating-example'; import { run as emitterExample } from './emitter-example'; import { run as emitterCatchup } from './emitter-catchup-example'; -import { run as reelectionExample } from './common-reelection-example'; import { run as queueEvents } from './queue-events-example'; import { run as queuePushPop } from './queue-push-pop-example'; import { run as queueCatchingUp } from './queue-catching-up-example'; diff --git a/examples/yarn.lock b/examples/yarn.lock index e5d36ab..2cee771 100644 --- a/examples/yarn.lock +++ b/examples/yarn.lock @@ -1713,8 +1713,10 @@ graphemer@^1.4.0: resolved "https://registry.npmjs.org/graphemer/-/graphemer-1.4.0.tgz" integrity sha512-EtKwoO6kxCL9WO5xipiHTZlSzBm7WLT627TqC/uVRd0HKmq8NXyebnNYxDoBi7wt8eTWrUrKXCOVaFq9x1kgag== -"hamok@file:..": - version "2.2.0" +hamok@2.6.1-7ab70c1.0: + version "2.6.1-7ab70c1.0" + resolved "https://registry.yarnpkg.com/hamok/-/hamok-2.6.1-7ab70c1.0.tgz#84448b18f0324a278677856882ba994b484baf58" + integrity sha512-qf3OMnCyGcBx6/e+IaKvwNVYkibdNYTGy1aIAEIsQgxk2vzyEp8SMnGRUG5eZ6G5PkRUu9xL9JBDx8xkc5s6pg== dependencies: "@bufbuild/protobuf" "^1.10.0" pino "^9.3.2" diff --git a/ideabox.txt b/ideabox.txt new file mode 100644 index 0000000..a56eb1e --- /dev/null +++ b/ideabox.txt @@ -0,0 +1,5 @@ +RemoteMap will be dumper and access to the remoteMap without any going to the leader thing, the only thing is the lock! +and locks will be named so several can be ongoing at the same time with a consensus. + +RxProducer, RxConsumer - which can connect to a remote source (key-value sending, so like in kafka, but much simpler here). +The hamok comes for acknowledgement and consuming in consumerGroup <- RxJs compatible, so backpressure can be handled \ No newline at end of file diff --git a/package.json b/package.json index 088e1f1..94ea0c1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "hamok", - "version": "2.5.1", + "version": "2.6.0", "description": "Lightweight Distributed Object Storage on RAFT consensus algorithm", "main": "lib/index.js", "types": "lib/index.d.ts", diff --git a/src/Hamok.ts b/src/Hamok.ts index b4d1eed..e6b7788 100644 --- a/src/Hamok.ts +++ b/src/Hamok.ts @@ -21,7 +21,6 @@ import { HamokQueue } from './collections/HamokQueue'; import { HamokEmitter, HamokEmitterEventMap } from './collections/HamokEmitter'; import { RaftLogs } from './raft/RaftLogs'; import { HamokRecord, HamokRecordObject } from './collections/HamokRecord'; -import { HelloNotification } from './messages/messagetypes/HelloNotification'; import { EndpointStatesNotification } from './messages/messagetypes/EndpointNotification'; import { JoinNotification } from './messages/messagetypes/JoinNotification'; import { RemoteMap } from './collections/RemoteMap'; @@ -29,8 +28,6 @@ import { HamokRemoteMap } from './collections/HamokRemoteMap'; const logger = createLogger('Hamok'); -type HamokHelloNotificationCustomRequestType = 'snapshot'; - export type HamokJoinProcessParams = { /** @@ -229,6 +226,12 @@ export type HamokEmitterBuilderConfig = Partial< */ payloadsCodec?: Map string, decode: (data: string) => unknown[] }>, + /** + * Optional.Indicate if the emitter automatically cleans up and unssubscribes from events + * remote peers gone offline. Only leader endpoint does this. + */ + autoClean?: boolean, + } export type HamokFetchRemotePeersResponse = { @@ -247,11 +250,12 @@ export type HamokEventMap = { 'remote-peer-joined': [peerId: string], 'remote-peer-left': [peerId: string], 'leader-changed': [leaderId: string | undefined, prevLeader: string | undefined], - 'state-changed': [state: RaftStateName], + 'state-changed': [newState: RaftStateName, prevState: RaftStateName], commit: [commitIndex: number, message: HamokMessage], heartbeat: [], error: [error: Error], 'no-heartbeat-from': [remotePeerId: string], + 'unsynced-peer': [remotePeerId: string], // new events: joined:[], @@ -276,43 +280,37 @@ export class Hamok = Record | HamokMap | HamokQueue | HamokRemoteMap | HamokEmitter>(); - // eslint-disable-next-line @typescript-eslint/no-explicit-any - // public readonly records = new Map>(); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - // public readonly maps = new Map>(); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - // public readonly queues = new Map>(); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - // public readonly emitters = new Map>(); - - // eslint-disable-next-line @typescript-eslint/no-explicit-any - // public readonly remoteMaps = new Map>(); private _closed = false; private _run = false; - private _joining?: Promise; + // private _joining?: Promise; private _raftTimer?: ReturnType; - private readonly _remoteStateRequests = new Map, responses: EndpointStatesNotification[] }>(); - private readonly _remoteHeartbeats = new Map>(); + // private readonly _remoteHeartbeats = new Map>(); + private readonly _remoteHeartbeats = new Map(); private readonly _codec = new HamokGridCodec(); public readonly grid: HamokGrid; - private _lookingForRemotePeers?: ReturnType; + // private _lookingForRemotePeers?: { + // waiters: (() => void)[], + // timer: ReturnType, + // close: () => void, + // }; + private _joining?: { + promise: Promise, + aborted: boolean, + }; public constructor(providedConfig?: Partial>) { super(); this.setMaxListeners(Infinity); this._emitMessage = this._emitMessage.bind(this); this._acceptLeaderChanged = this._acceptLeaderChanged.bind(this); + this._sendJoinMsg = this._sendJoinMsg.bind(this); this._acceptCommit = this._acceptCommit.bind(this); - this._emitRemotePeerRemoved = this._emitRemotePeerRemoved.bind(this); + this._checkRemoteHeartbeats = this._checkRemoteHeartbeats.bind(this); this.removeRemotePeerId = this.removeRemotePeerId.bind(this); this.addRemotePeerId = this.addRemotePeerId.bind(this); - this._sendEndpointNotificationsToAll = this._sendEndpointNotificationsToAll.bind(this); - this._checkRemotePeers = this._checkRemotePeers.bind(this); + this.broadcastEndpointNotification = this.broadcastEndpointNotification.bind(this); const raftLogs = providedConfig?.raftLogs ?? new MemoryStoredRaftLogs({ expirationTimeInMs: providedConfig?.logEntriesExpirationTimeInMs ?? 300000, @@ -355,19 +353,17 @@ export class Hamok = Record { - this.off('no-heartbeat-from', this.removeRemotePeerId); this.off('commit', this._acceptCommit); this.off('leader-changed', this._acceptLeaderChanged); - this.off('remote-peer-left', this._emitRemotePeerRemoved); - - // we may add these events along the way - this.off('remote-peer-joined', this._sendEndpointNotificationsToAll); - this.off('remote-peer-left', this._sendEndpointNotificationsToAll); + this.off('unsynced-peer', this.removeRemotePeerId); + this.off('heartbeat', this._sendJoinMsg); + this.off('heartbeat', this._checkRemoteHeartbeats); }); - this.on('no-heartbeat-from', this.removeRemotePeerId); this.on('commit', this._acceptCommit); this.on('leader-changed', this._acceptLeaderChanged); - this.on('remote-peer-left', this._emitRemotePeerRemoved); + this.on('unsynced-peer', this.removeRemotePeerId); + this.on('heartbeat', this._sendJoinMsg); + this.on('heartbeat', this._checkRemoteHeartbeats); } public get appData(): AppData { @@ -387,7 +383,7 @@ export class Hamok = Record { - return (this._joining ?? this.waitUntilLeader()).then(() => this); + return (this._joining?.promise ?? this.waitUntilLeader()).then(() => this); } public get state(): RaftStateName { @@ -398,6 +394,10 @@ export class Hamok = Record { + return this._remoteHeartbeats.keys(); + } + public get closed() { return this._closed; } @@ -414,6 +414,8 @@ export class Hamok = Record = Record clearTimeout(timer)); + this._remoteHeartbeats.clear(); + + // notify all storages about the leader change + for (const collection of this.storages.values()) { + collection.connection.emit('leader-changed', leaderId); + } + + if (this.localPeerId === leaderId) { + // we are the leader, we should send the endpoint notification to all remote peers + return this.broadcastEndpointNotification(); + } + + if (leaderId === undefined) { if (this._closed || !this._run) return; + + logger.warn('%s detected that Leader is gone, clearing the remote peers', this.localPeerId); + // this._stopRaftEngine(); + [ ...this.remotePeerIds ].forEach((peerId) => this.removeRemotePeerId(peerId)); + this.join({ - // we retry indefinitely if we lost the connection - maxRetry: -1, - // fetchRemotePeerTimeoutInMs: 5000, + maxRetry: -1, }).catch((err) => { logger.error('Failed to rejoin the grid', err); }); - } else { - // if I am not the leader I need to fetch the remote peers or at least check who is alive and who is not to be sure we are in the network } } @@ -520,18 +556,25 @@ export class Hamok = Record { + public async waitUntilLeader(timeoutInMs?: number): Promise { if (this._closed) throw new Error('Cannot wait until leader on a closed Hamok instance'); if (this.raft.leaderId !== undefined) return; - return new Promise((resolve) => { + return new Promise((resolve, reject) => { const listener = () => { if (!this.raft.leaderId === undefined) return; this.off('leader-changed', listener); resolve(); }; + if (timeoutInMs) { + setTimeout(() => { + this.off('leader-changed', listener); + reject(new Error('Timeout waiting for leader')); + }, timeoutInMs); + } + this.on('leader-changed', listener); }); } @@ -714,7 +757,7 @@ export class Hamok = Record(options: HamokEmitterBuilderConfig): HamokEmitter { + public createEmitter = Record>(options: HamokEmitterBuilderConfig): HamokEmitter { if (this._closed) throw new Error('Cannot create emitter on a closed Hamok instance'); const connection = this._createStorageConnection( @@ -732,9 +775,10 @@ export class Hamok = Record( + const storage = new HamokEmitter( connection, options.payloadsCodec, + options.autoClean, ); connection.once('close', () => { @@ -759,7 +803,11 @@ export class Hamok = Record { if (this._closed) throw new Error('Cannot submit on a closed Hamok instance'); if (!this.raft.leaderId) { - throw new Error(`No leader is elected, cannot submit message type ${entry.type}`); + const error = new Error(`No leader is elected, cannot submit message type ${entry.type}`); + + if (!this.emit('error', error)) throw error; + + return; } entry.sourceId = this.localPeerId; @@ -891,7 +939,11 @@ export class Hamok = Record void 0); + if (this._joining) { + this._joining.aborted = true; + + await this._joining.promise.catch((err) => logger.warn(`${err}`)); + } this._run = false; this._stopRaftEngine(); @@ -908,26 +960,29 @@ export class Hamok = Record { if (this._closed) throw new Error('Cannot execute join on a closed hamok'); - if (this._joining) return this._joining; + if (this._joining) return this._joining.promise; if (this.raft.leaderId !== undefined) return logger.warn('Already joined the network as %s', this.localPeerId); - if (0 < this.remotePeerIds.size) { - // we can issue a fetchRequest and then wait for the leader in this case, but we don't have to issue a new join??? - } - try { if (this._run) { this.emit('rejoining'); } this._run = true; - this._joining = this._join({ - fetchRemotePeerTimeoutInMs: params?.fetchRemotePeerTimeoutInMs ?? 5000, - maxRetry: params?.maxRetry ?? 3, - }); + // let's synchronize the remote peers and the no heartbeat peers + this._remoteHeartbeats.clear(); - await this._joining; + this._joining = { + promise: this._join({ + fetchRemotePeerTimeoutInMs: params?.fetchRemotePeerTimeoutInMs ?? 5000, + maxRetry: params?.maxRetry ?? 3, + }), + aborted: false, + }; + await this._joining.promise; + + this.emit('joined'); } finally { this._joining = undefined; } @@ -936,135 +991,40 @@ export class Hamok = Record, retried = 0): Promise { if (this._closed) throw new Error('Cannot join the network on a closed hamok'); - if (!this._run) throw new Error('Cannot join the network while the hamok is not in running phase'); - - // we stop the engine while we are fetching and joining - this._stopRaftEngine(); - const { fetchRemotePeerTimeoutInMs, maxRetry, } = params ?? {}; - logger.debug('%s Joining the network. startAfterJoin: %s, fetchRemotePeerTimeoutInMs: %s, maxRetry: %s', - this.localPeerId, fetchRemotePeerTimeoutInMs, maxRetry - ); + if (this._joining?.aborted || this._closed) { + return Promise.reject(new Error('Joining process is aborted or the hamok is closed')); + } - // I have not added the remote peers to this hamok actually... :S - const { remotePeers, minNumberOfLogs, smallestCommitIndex } = await this.fetchRemotePeers( - fetchRemotePeerTimeoutInMs, - 'snapshot', - ); + // this will start a heartbeat timer, which will start sending join messages + if (!this._raftTimer) this._startRaftEngine(); - remotePeers.forEach((remotePeerId) => this.addRemotePeerId(remotePeerId)); + try { + await this.waitUntilLeader(fetchRemotePeerTimeoutInMs); - if (this.remotePeerIds.size < 1) { - if (0 <= maxRetry && maxRetry <= retried) throw new Error('No remote peers found'); + if (this._joining?.aborted || this._closed) { + return Promise.reject(new Error('Joining process is aborted or the hamok is closed')); + } + + } catch (err) { + if (this._joining?.aborted || this._closed) { + return Promise.reject(new Error('Joining process is aborted or the hamok is closed')); + } else if (0 < maxRetry && maxRetry <= retried) { + throw err; + } else if (0 < this.raft.remotePeers.size) { + logger.debug('%s Failed to join the network, but we have remote peers, so we will continue', this.localPeerId); + + return this._join(params, retried); + } logger.warn('%s No remote peers found, retrying %s/%s', this.localPeerId, retried, maxRetry < 0 ? '∞' : maxRetry); return this._join(params, retried + 1); } - - if (smallestCommitIndex && minNumberOfLogs) { - if (this.raft.logs.commitIndex < smallestCommitIndex) { - // we make a warn message only if it is not the first join - const loggerFn = (0 < this.raft.logs.commitIndex ? logger.warn : logger.info).bind(logger); - const newCommitIndex = smallestCommitIndex - minNumberOfLogs; - - loggerFn('%s Commit index of this peer (%d) is lower than the smallest commit index (%s) from remote peers resetting the logs', - this.localPeerId, - this.raft.logs.commitIndex, - newCommitIndex - ); - // this.raft.logs.reset(newCommitIndex); - } - } - - const joinMsg = this._codec.encodeJoinNotification(new JoinNotification(this.localPeerId)); - - // this will trigger the remote endpoint to add this endpoint - this._emitMessage(joinMsg); - - let leaderElected: () => void | undefined; - let noMoreRemotePeers: () => void | undefined; - - await new Promise( - (resolve, reject) => { - leaderElected = () => { - if (this.raft.leaderId === undefined) return; - - resolve(); - this.emit('joined'); - }; - noMoreRemotePeers = () => (this.remotePeerIds.size === 0 ? reject(new Error('Remote peers are gone while joining')) : void 0); - - this.on('leader-changed', leaderElected); - this.on('remote-peer-left', noMoreRemotePeers); - - // now we start the engine - this._startRaftEngine(); - }) - .catch((err) => { - if (this._closed) throw err; - logger.warn('Failed to join the network %o', err); - - return this._join(params, retried + 1); - }) - .finally(() => { - this.off('leader-changed', leaderElected); - this.off('remote-peer-left', noMoreRemotePeers); - }); - } - - public async fetchRemotePeers(timeout?: number, customRequest?: HamokHelloNotificationCustomRequestType): Promise { - const requestId = uuid(); - const helloNotification = new HelloNotification( - this.localPeerId, - undefined, - this.raft.leaderId, - customRequest, - requestId, - ); - const helloMsg = this._codec.encodeHelloNotification(helloNotification); - - return new Promise((resolve) => { - let smallestCommitIndex: number | undefined; - let minNumberOfLogs: number | undefined; - const remotePeerIds = new Set(); - const timer = setTimeout(() => { - for (const notification of this._remoteStateRequests.get(requestId)?.responses ?? []) { - // if remote notification is from itself, we skip it - if (notification.sourceEndpointId === this.localPeerId) continue; - - if (smallestCommitIndex === undefined || notification.commitIndex < smallestCommitIndex) { - smallestCommitIndex = notification.commitIndex; - } - if (minNumberOfLogs === undefined || notification.numberOfLogs < minNumberOfLogs) { - minNumberOfLogs = notification.numberOfLogs; - } - - // notification.activeEndpointIds?.forEach((remotePeerId) => (remotePeerId !== this.localPeerId ? remotePeerIds.add(remotePeerId) : void 0)); - if (notification.sourceEndpointId !== this.localPeerId) { - remotePeerIds.add(notification.sourceEndpointId); - } - } - - this._remoteStateRequests.delete(requestId); - resolve({ - remotePeers: [ ...remotePeerIds ], - minNumberOfLogs, - smallestCommitIndex, - }); - }, timeout ?? 5000); - - this._remoteStateRequests.set(requestId, { - timer, - responses: [], - }); - - this.emit('message', helloMsg); - }); } private _startRaftEngine(): void { @@ -1128,72 +1088,100 @@ export class Hamok = Record = Record | string[] | string, @@ -1371,45 +1330,66 @@ export class Hamok = Record { - const joinMsg = this._codec.encodeJoinNotification(new JoinNotification(this.localPeerId)); - - // this will trigger the remote endpoint to add this endpoint - this._emitMessage(joinMsg); - }, Math.max(this.raft.config.followerMaxIdleInMs / 5, 100)); - } - private _acceptKeepAliveHamokMessage(message: HamokMessage) { + if (this.raft.leaderId !== this.raft.localPeerId) return; if (!message.sourceId || message.sourceId === this.localPeerId) return; const remotePeerId = message.sourceId; + + this._remoteHeartbeats.set(remotePeerId, Date.now()); - this._addNoHeartbeatTimer(remotePeerId); + // this._addNoHeartbeatTimer(remotePeerId); } - private _addNoHeartbeatTimer(remotePeerId: string) { - clearTimeout(this._remoteHeartbeats.get(remotePeerId)); - - logger.trace('%s Add no heartbeat timeout for %s', this.localPeerId, remotePeerId); + private _checkRemoteHeartbeats() { + if (!this.leader) { + return; + } else if (this._joining) { + // we don't do this when we are joining. essentially we should not have anything while we are joining + return; + } - const timer = setTimeout(() => { - this._remoteHeartbeats.delete(remotePeerId); + const now = Date.now(); - if (this._joining) { - return this._addNoHeartbeatTimer(remotePeerId); + for (const [ remotePeerId, lastUpdate ] of this._remoteHeartbeats) { + if (now - lastUpdate < this.raft.config.electionTimeoutInMs) continue; + + if (!this.raft.remotePeers.has(remotePeerId)) { + this._remoteHeartbeats.delete(remotePeerId); + + logger.debug('%s No heartbeat from %s, but it is not registered in remotePeers', this.localPeerId, remotePeerId); + continue; } + logger.info('%s No heartbeat from %s', this.localPeerId, remotePeerId); + this.emit('no-heartbeat-from', remotePeerId); - }, this.raft.config.electionTimeoutInMs); + this.removeRemotePeerId(remotePeerId); + } + + } + + // we want to delete this function as we will send join messages if there is no leader + // private _acceptStateChange(newState: RaftStateName, prevState: RaftStateName): void { + // if (prevState === 'follower' && newState === 'candidate') { + // const joinMsg = new JoinNotification(this.localPeerId); + + // this._emitMessage(this._codec.encodeJoinNotification(joinMsg)); + // } + // } + + private _sendJoinMsg() { + // if there is no remote peers we want to join to the grid + if (this.raft.remotePeers.size < 1) { + const message = this._codec.encodeJoinNotification(new JoinNotification(this.localPeerId)); + + return this._emitMessage(message); + } + // if we have remote peers, but we do not have a leader, then + // we need to keep send the join notification + if (this.raft.leaderId === undefined) { + const message = this._codec.encodeJoinNotification(new JoinNotification(this.localPeerId)); + + return this._emitMessage(message); + } - this._remoteHeartbeats.set(remotePeerId, timer); } } diff --git a/src/HamokGrid.ts b/src/HamokGrid.ts index 1ccd039..8f52aac 100644 --- a/src/HamokGrid.ts +++ b/src/HamokGrid.ts @@ -85,7 +85,7 @@ export class HamokGrid { this.sendMessage(options.message, remotePeers); } - const response = await pendingRequest; + const response = await pendingRequest.promise; return response; } catch (error) { diff --git a/src/HamokSnapshot.ts b/src/HamokSnapshot.ts index ae9c02d..d18866a 100644 --- a/src/HamokSnapshot.ts +++ b/src/HamokSnapshot.ts @@ -23,6 +23,10 @@ export type HamokQueueSnapshot = { export type HamokEmitterSnapshot = { emitterId: string; - events: string[]; - subscribers: string[][]; + subscriptions: { + event: string; + subscribers: { + peerId: string, metaData: Record | null + }[]; + }[], } diff --git a/src/collections/HamokConnection.ts b/src/collections/HamokConnection.ts index 9781d81..45155d6 100644 --- a/src/collections/HamokConnection.ts +++ b/src/collections/HamokConnection.ts @@ -250,7 +250,11 @@ export class HamokConnection extends EventEmitter { // message.type === HamokMessageType.INSERT_ENTRIES_REQUEST ? this.codec.valueCodec.decode(message.values[0]) : -1 // ); if (commitIndex <= this._appliedCommitIndex) { - return logger.warn('Received message with commit index %d is older or equal than the last applied commit index %d', commitIndex, this._appliedCommitIndex); + return logger.warn('Connection for id %s Received message with commit index %d is older or equal than the last applied commit index %d', + this.config.storageId, + commitIndex, + this._appliedCommitIndex + ); } // only in test purposes // if (this._appliedCommitIndex + 1 !== commitIndex) { diff --git a/src/collections/HamokEmitter.ts b/src/collections/HamokEmitter.ts index eec73e4..20d08cb 100644 --- a/src/collections/HamokEmitter.ts +++ b/src/collections/HamokEmitter.ts @@ -10,16 +10,34 @@ export interface HamokEmitterEventMap extends Record { // empty } -export class HamokEmitter { - private readonly _subscriptions = new Map>(); +type UpdatedMetaData> = { + prevMetaData?: M | null; + newMetaData: M; +} + +export type HamokEmitterStats = { + numberOfSubscriptions: number; + numberOfReceivedEventInvocations: number; + numberOfSentEventInvocations: number; +} + +export class HamokEmitter = Record> { + // private readonly _subscriptions = new Map>(); + public readonly subscriptions = new HamokEmitterSubscriptions(); private readonly _emitter = new EventEmitter(); private _initializing?: Promise; - private _removedPeerIdsBuffer: string[] = []; private _closed = false; + + public stats: HamokEmitterStats = { + numberOfSubscriptions: 0, + numberOfReceivedEventInvocations: 0, + numberOfSentEventInvocations: 0, + }; public constructor( public readonly connection: HamokConnection, - public readonly payloadsCodec?: Map string, decode: (data: string) => unknown[] }> + public readonly payloadsCodec?: Map string, decode: (data: string) => unknown[] }>, + public readonly autoClean?: boolean ) { this.connection .on('InsertEntriesRequest', (request) => { @@ -31,14 +49,38 @@ export class HamokEmitter { request ); } - for (const event of request.entries.keys()) { - let subscribedPeerIds = this._subscriptions.get(event); - - if (!subscribedPeerIds) { - subscribedPeerIds = new Set(); - this._subscriptions.set(event, subscribedPeerIds); + let responseEntries: Map | undefined; + + for (const [ event, serializedMetaData ] of request.entries.entries()) { + try { + if (this.subscriptions.hasPeerOnEvent(event as keyof T, request.sourceEndpointId)) { + const metaDataUpdate = JSON.parse(serializedMetaData) as UpdatedMetaData; + const updated = this.subscriptions.updatePeer( + event as keyof T, + request.sourceEndpointId, + metaDataUpdate.newMetaData, + metaDataUpdate.prevMetaData + ); + + if (!updated) { + if (!responseEntries) responseEntries = new Map(); + + responseEntries.set(event, 'not-updated'); + + continue; + } + } else { + // this is a new subscription + let metaData: M | null = null; + + if (serializedMetaData !== 'null') metaData = JSON.parse(serializedMetaData); + + this.subscriptions.addPeer(event, request.sourceEndpointId, metaData); + } + } catch (err) { + logger.error('Error while decoding the metadata for %s, %s, %o', this.id, event, `${err}`); + continue; } - subscribedPeerIds.add(request.sourceEndpointId); logger.debug('%s InsertEntriesRequest is received, %s is added to the subscription list for %s', this.connection.grid.localPeerId, @@ -50,7 +92,7 @@ export class HamokEmitter { if (request.sourceEndpointId === this.connection.grid.localPeerId) { this.connection.respond( 'InsertEntriesResponse', - request.createResponse(Collections.EMPTY_MAP), + request.createResponse(responseEntries ?? Collections.EMPTY_MAP), request.sourceEndpointId ); } @@ -58,16 +100,9 @@ export class HamokEmitter { .on('DeleteEntriesRequest', (request) => { const removedPeerIds = [ ...request.keys ]; - for (const [ eventType, subscribedPeerIds ] of [ ...this._subscriptions.entries() ]) { - for (const removedPeerId of removedPeerIds) { - subscribedPeerIds.delete(removedPeerId); - - if (subscribedPeerIds.size < 1) { - this._subscriptions.delete(eventType); - } - } - } - logger.info('DeleteEntriesRequest is received, %o is removed from the subscription list for %s', removedPeerIds, this.id); + removedPeerIds.forEach((peerId) => this.subscriptions.removePeerFromAllEvent(peerId)); + + logger.debug('DeleteEntriesRequest is received, %o is removed from the subscription list for %s', removedPeerIds, this.id); if (request.sourceEndpointId === this.connection.grid.localPeerId) { this.connection.respond( @@ -86,23 +121,8 @@ export class HamokEmitter { request ); } - for (const event of request.keys) { - const subscribedPeerIds = this._subscriptions.get(event); - - if (!subscribedPeerIds) continue; - - subscribedPeerIds.delete(request.sourceEndpointId); - - if (subscribedPeerIds.size < 1) { - this._subscriptions.delete(event); - } - logger.debug('%s RemoveEntriesRequest is received, %s is removed from the subscription list for %s', - this.connection.grid.localPeerId, - request.sourceEndpointId, - event - ); - } + this.subscriptions.removePeerFromAllEvent(request.sourceEndpointId); if (request.sourceEndpointId === this.connection.grid.localPeerId) { this.connection.respond( @@ -115,12 +135,14 @@ export class HamokEmitter { }) .on('UpdateEntriesRequest', (request) => { // this is for the events to emit - for (const [ event, serializedPayload ] of request.entries) { try { const payloads = this.payloadsCodec?.get(event)?.decode(serializedPayload) ?? JSON.parse(serializedPayload); this._emitter.emit(event, ...payloads); + + ++this.stats.numberOfReceivedEventInvocations; + } catch (err) { logger.error('Error while decoding the payload for %s, %s, %o', this.id, event, `${err}`); } @@ -140,6 +162,9 @@ export class HamokEmitter { const payloads = this.payloadsCodec?.get(event)?.decode(serializedPayload) ?? JSON.parse(serializedPayload); this._emitter.emit(event, ...payloads); + + ++this.stats.numberOfReceivedEventInvocations; + } catch (err) { logger.error('Error while decoding the payload for %s, %s, %o', this.id, event, `${err}`); } @@ -155,59 +180,33 @@ export class HamokEmitter { ); } - for (const event of this._subscriptions.keys()) { - const subscribedPeerIds = this._subscriptions.get(event); - - if (!subscribedPeerIds) continue; - - subscribedPeerIds.delete(request.sourceEndpointId); - - if (subscribedPeerIds.size < 1) { - this._subscriptions.delete(event); - } - - logger.debug('%s ClearEntriesNotification is received, %s is removed from the subscription list for %s', - this.connection.grid.localPeerId, - request.sourceEndpointId, - event - ); - } + this.subscriptions.removePeerFromAllEvent(request.sourceEndpointId); }) .on('remote-peer-removed', async (remotePeerId) => { - if (this.connection.grid.leaderId !== this.connection.localPeerId) { - return; - } + if (this.connection.grid.leaderId !== this.connection.localPeerId) return; + if (this.connection.localPeerId === remotePeerId) return; + if (!this.autoClean) return; + for (let retried = 0; retried < 10; retried++) { try { - await this.connection.requestDeleteEntries(new Set([ remotePeerId ])); + await this.cleanup(); break; } catch (err) { if (retried < 8) continue; - logger.warn('Error while requesting to remove endpoint %s, from subscriptions in emitter %s, error: %o', remotePeerId, this.id, err); + logger.error('Error while cleaning up subscriptions in emitter %s, error: %o', this.id, err); break; } } }) .on('leader-changed', async (leaderId) => { - if (leaderId !== this.connection.grid.localPeerId) { + if (leaderId !== this.connection.localPeerId || !this.autoClean) { return; } - const removedPeerIds = new Set(); - for (const [ , subscribedPeerIds ] of this._subscriptions) { - for (const subscribedPeerId of subscribedPeerIds) { - removedPeerIds.add(subscribedPeerId); - } - } - for (const remotePeerId of this.connection.grid.remotePeerIds) { - if (removedPeerIds.has(remotePeerId)) removedPeerIds.delete(remotePeerId); - } - if (0 < removedPeerIds.size) { - try { - await this.connection.requestDeleteEntries(removedPeerIds); - } catch (err) { - logger.warn('Error while requesting to remove endpoints %o, from subscriptions in emitter %s. error: %o', removedPeerIds, this.id, err); - } + try { + await this.cleanup(); + } catch (err) { + logger.error('Error while cleaning up subscriptions in emitter %s, error: %o', this.id, err); } }) .on('StorageHelloNotification', (notification) => { @@ -222,7 +221,7 @@ export class HamokEmitter { notification.sourceEndpointId, ); } catch (err) { - logger.error('Failed to send snapshot', err); + logger.error(`Failed to send snapshot: ${err}`); } }) .on('remote-snapshot', (serializedSnapshot, done) => { @@ -230,6 +229,8 @@ export class HamokEmitter { const snapshot = JSON.parse(serializedSnapshot) as HamokEmitterSnapshot; this._import(snapshot); + + this.subscriptions.emit('debug', `Imported snapshot from ${JSON.stringify(snapshot)}`); } catch (err) { logger.error(`Failed to import to emitter ${this.id}. Error: ${err}`); } finally { @@ -239,6 +240,11 @@ export class HamokEmitter { .once('close', () => this.close()) ; + this.subscriptions + .on('added', () => (this.stats.numberOfSubscriptions = this.subscriptions.size)) + .on('removed', () => (this.stats.numberOfSubscriptions = this.subscriptions.size)) + ; + logger.trace('Emitter %s is created', this.id); process.nextTick(() => (this._initializing = this._startInitializing())); @@ -249,7 +255,7 @@ export class HamokEmitter { } public get empty() { - return this._subscriptions.size < 1; + return this.subscriptions.size < 1; } public get ready(): Promise { @@ -266,6 +272,25 @@ export class HamokEmitter { this.connection.close(); this._emitter.removeAllListeners(); + this.subscriptions.removeAllListeners(); + } + + /** + * This method is used to cleanup the subscriptions by removing the endpoints that are not in the grid anymore. + */ + public async cleanup() { + const removedPeerIds = this.subscriptions.getAllPeerIds(); + + removedPeerIds.delete(this.connection.grid.localPeerId); + + for (const remotePeerId of this.connection.grid.remotePeerIds) { + if (removedPeerIds.has(remotePeerId)) removedPeerIds.delete(remotePeerId); + } + if (0 < removedPeerIds.size) { + this.subscriptions.emit('debug', `Removing endpoints ${JSON.stringify(removedPeerIds)} from subscriptions in emitter ${this.id}`); + + return this.connection.requestDeleteEntries(removedPeerIds); + } } public async hasSubscribers(event: K, filterByLocalNode = false): Promise { @@ -275,14 +300,14 @@ export class HamokEmitter { await this.connection.grid.waitUntilCommitHead(); - const remotePeerIds = this._subscriptions.get(event); + const remotePeerIds = this.subscriptions.getPeerIds(event); if (!remotePeerIds) return false; else if (!filterByLocalNode) return true; else return remotePeerIds.has(this.connection.grid.localPeerId); } - public async subscribe(event: K, listener: (...args: T[K]) => void): Promise { + public async subscribe(event: K, listener: (...args: T[K]) => void, metaData: M | null = null): Promise { if (this._closed) throw new Error('Cannot subscribe on a closed emitter'); await this._initializing; @@ -291,9 +316,43 @@ export class HamokEmitter { if (this._emitter.listenerCount(event as string)) { return (this._emitter.on(event as string, listener), void 0); } + let serializedMetaData: string; + + if (metaData) { + try { + serializedMetaData = JSON.stringify(metaData); + } catch (err) { + logger.error('Error while serializing metadata for %s, %s, %o', this.id, event, `${err}`); + serializedMetaData = 'null'; + } + } else serializedMetaData = 'null'; - await this.connection.requestInsertEntries(new Map([ [ event as string, 'empty' ] ])); this._emitter.on(event as string, listener); + try { + await this.connection.requestInsertEntries(new Map([ [ event as string, serializedMetaData ] ])); + } catch (err) { + this._emitter.off(event as string, listener); + throw err; + } + } + + public async updateSubscriptionMetaData(event: K, newMetaData: M, prevMetaData?: M | null): Promise { + if (this._closed) throw new Error('Cannot subscribe on a closed emitter'); + + await this._initializing; + + // if we already have a listener, we don't need to subscribe in the raft + if (!this._emitter.listenerCount(event as string)) { + throw new Error('Cannot update a non-existing subscription'); + } + + const updatedMetaData: UpdatedMetaData = { + prevMetaData, + newMetaData, + }; + const serializedMetaData = JSON.stringify(updatedMetaData); + + return (await this.connection.requestInsertEntries(new Map([ [ event as string, serializedMetaData ] ]))).get(event as string) === undefined; } public async unsubscribe(event: K, listener: (...args: T[K]) => void): Promise { @@ -321,7 +380,7 @@ export class HamokEmitter { await this._initializing; - const remotePeerIds = this._subscriptions.get(event); + const remotePeerIds = this.subscriptions.getPeerIds(event); if (!remotePeerIds || remotePeerIds.size < 1) { return []; @@ -346,13 +405,15 @@ export class HamokEmitter { result.push(this.connection.grid.localPeerId); } + ++this.stats.numberOfSentEventInvocations; + return result; } public notify(event: K, ...args: T[K]): boolean { if (this._closed) throw new Error('Cannot publish on a closed emitter'); - const remotePeerIds = this._subscriptions.get(event); + const remotePeerIds = this.subscriptions.getPeerIds(event); if (!remotePeerIds || remotePeerIds.size < 1) { return false; @@ -375,24 +436,35 @@ export class HamokEmitter { ); } + ++this.stats.numberOfSentEventInvocations; + return true; } public export(): HamokEmitterSnapshot { if (this._closed) throw new Error('Cannot export a closed emitter'); - const events: string[] = []; - const subscribers: string[][] = []; + const subscriptions: HamokEmitterSnapshot['subscriptions'] = []; - for (const [ event, peerIds ] of this._subscriptions) { - events.push(event as string); - subscribers.push(Array.from(peerIds)); + for (const [ event, peerMap ] of this.subscriptions.entries()) { + const subscribers: HamokEmitterSnapshot['subscriptions'][number]['subscribers'] = []; + + for (const [ peerId, metaData ] of peerMap.entries()) { + subscribers.push({ + peerId, + metaData, + }); + } + + subscriptions.push({ + event: event as string, + subscribers, + }); } return { emitterId: this.id, - events, - subscribers + subscriptions, }; } @@ -407,11 +479,10 @@ export class HamokEmitter { } private _import(snapshot: HamokEmitterSnapshot): void { - for (let i = 0; i < snapshot.events.length; i++) { - const event = snapshot.events[i]; - const peerIds = snapshot.subscribers[i] ?? []; - - this._subscriptions.set(event, new Set(peerIds)); + for (const subscription of snapshot.subscriptions) { + for (const { peerId, metaData } of subscription.subscribers) { + this.subscriptions.addPeer(subscription.event as keyof T, peerId, metaData as M | null); + } } } @@ -426,4 +497,143 @@ export class HamokEmitter { return this; } +} + +type HamokSubscriptionsEmitterEventMap = Record> = { + 'added': [ + event: keyof EventMap, + peerId: string, + metaData: M | null, + ], + 'updated': [ + event: keyof EventMap, + peerId: string, + newMetaData: M, + prevMetaData?: M | null, + ], + 'removed': [ + event: keyof EventMap, + peerId: string, + metaData: M | null, + ], + 'debug': [ + log: string, + ] +} + +class HamokEmitterSubscriptions = Record> extends EventEmitter> { + private readonly _map = new Map>(); + + public hasEvent(event: K): boolean { + return this._map.has(event); + } + + public addPeer(event: K, peerId: string, metaData: M | null = null): boolean { + let peersMap = this._map.get(event); + + if (!peersMap) { + peersMap = new Map(); + this._map.set(event, peersMap); + } else if (peersMap.has(peerId)) return false; + + peersMap.set(peerId, metaData); + + this.emit('added', event, peerId, metaData); + + return true; + } + + public updatePeer(event: K, peerId: string, metaData: M, prevMetaData?: M | null): boolean { + const peersMap = this._map.get(event); + const currentMetaData = peersMap?.get(peerId); + + if (!peersMap || currentMetaData === undefined) return false; + + if (prevMetaData !== undefined) { + const serializedCurrentMetaData = JSON.stringify(currentMetaData); + const serializedPrevMetaData = JSON.stringify(prevMetaData); + + if (serializedCurrentMetaData !== serializedPrevMetaData) return false; + } + + peersMap.set(peerId, metaData); + + this.emit('updated', event, peerId, metaData, currentMetaData); + + return true; + } + + public removePeer(event: K, peerId: string): boolean { + const peersMap = this._map.get(event); + const metaData = peersMap?.get(peerId); + + if (!peersMap || !peersMap.delete(peerId)) return false; + if (peersMap.size < 1) { + this._map.delete(event); + } + + this.emit('removed', event, peerId, metaData ?? null); + + return true; + } + + public removePeerFromAllEvent(peerId: string): boolean { + const events = [ ...this.events() ]; + let removedAtLeastFromOneEvent = false; + + for (const event of events) { + removedAtLeastFromOneEvent = this.removePeer(event, peerId) || removedAtLeastFromOneEvent; + } + + return removedAtLeastFromOneEvent; + } + + public getEventPeersMap(event: K): Map | undefined { + return this._map.get(event); + } + + public entries(): IterableIterator<[keyof EventMap, Map]> { + return this._map.entries(); + } + + public events(): IterableIterator { + return this._map.keys(); + } + + public hasPeerOnEvent(event: K, peerId: string): boolean { + const peersMap = this._map.get(event); + + return peersMap ? peersMap.has(peerId) : false; + } + + public getPeerIds(event: K): Set | undefined { + const peersMap = this._map.get(event); + + if (!peersMap) return; + else return new Set([ ...peersMap.keys() ]); + } + + public getAllPeerIds(): Set { + const peerIds = new Set(); + + for (const peersMap of this._map.values()) { + for (const peerId of peersMap.keys()) { + peerIds.add(peerId); + } + } + + return peerIds; + } + + public get [Symbol.toStringTag]() { + return 'HamokSubscriptions'; + } + + public get size() { + return this._map.size; + } + + public get [Symbol.species]() { + return HamokEmitterSubscriptions; + } } \ No newline at end of file diff --git a/src/collections/HamokMap.ts b/src/collections/HamokMap.ts index d1fe4db..80d2976 100644 --- a/src/collections/HamokMap.ts +++ b/src/collections/HamokMap.ts @@ -416,6 +416,14 @@ export class HamokMap extends EventEmitter { return this.baseMap[Symbol.iterator](); } + public entries(): IterableIterator<[K, V]> { + return this.baseMap.entries(); + } + + public values(): IterableIterator { + return this.baseMap.values(); + } + /** * Exports the storage data */ diff --git a/src/collections/HamokRecord.ts b/src/collections/HamokRecord.ts index a055c41..0a86555 100644 --- a/src/collections/HamokRecord.ts +++ b/src/collections/HamokRecord.ts @@ -6,6 +6,7 @@ import { HamokRecordSnapshot } from '../HamokSnapshot'; import { HamokCodec } from '../common/HamokCodec'; const logger = createLogger('HamokMap'); +const UPDATE_IF_RESPONSE_KEY = 'update-if-response-key'; export type HamokRecordObject = Record; @@ -136,43 +137,77 @@ export class HamokRecord extends EventEmitter { } }) .on('UpdateEntriesRequest', (request) => { - + if (!request.prevValue) return; // the other listener will handle this const updatedEntries: [keyof T, T[keyof T], T[keyof T]][] = []; const insertedEntries: [keyof T, T[keyof T]][] = []; + const prevValue = JSON.parse(request.prevValue); + + logger.warn('UpdateEntriesRequest prevValue %o, prevValue: %o', [ ...request.entries ].map(([ k, v ]) => `key: ${k}, value: ${v}`).join(', '), prevValue); - if (request.prevValue !== undefined) { - // this is a conditional update - if (request.entries.size !== 1) { - // we let the request to timeout - return logger.trace('Conditional update request must have only one entry: %o', request); + // check + let ok = true; + + for (const [ key, value ] of Object.entries(prevValue)) { + logger.warn('UpdateEntriesRequest prevValue %o, checking equality between: %s === %s', key, value, this._object[key as keyof T]); + // eslint-disable-next-line @typescript-eslint/no-explicit-any + if (this.equalValues(this._object[key] as any, value as any)) continue; + ok = false; + break; + } + logger.warn('Apply update %o', [ ...request.entries ].map(([ k, v ]) => `key: ${k}, value: ${v}`).join(', ')); + if (!ok) { + // respond false + if (request.sourceEndpointId === this.connection.grid.localPeerId) { + // some special response because this is used in updateIf + + this.connection.respond( + 'UpdateEntriesResponse', + request.createResponse(Collections.mapOf([ UPDATE_IF_RESPONSE_KEY, 'false' ])), + request.sourceEndpointId + ); } - const [ key, encodedNewValue ] = [ ...request.entries ][0]; - const newValue = this._decodeValue(key, encodedNewValue); - const prevValue = this._decodeValue(key, request.prevValue); + + return; + } + for (const [ key, encodedValue ] of request.entries) { + const newValue = this._decodeValue(key, encodedValue); const existingValue = this._object[key]; + + this._object[key as keyof T] = newValue; + if (existingValue) updatedEntries.push([ key, existingValue, newValue ]); + else insertedEntries.push([ key, newValue ]); + + } + if (request.sourceEndpointId === this.connection.grid.localPeerId) { + // some special response because this is used in updateIf + + this.connection.respond( + 'UpdateEntriesResponse', + request.createResponse(Collections.mapOf([ UPDATE_IF_RESPONSE_KEY, 'true' ])), + request.sourceEndpointId + ); + } + insertedEntries.forEach(([ key, value ]) => this.emit('insert', { key, value })); + updatedEntries.forEach(([ key, oldValue, newValue ]) => this.emit('update', { key, oldValue, newValue })); + }) + .on('UpdateEntriesRequest', (request) => { + if (request.prevValue) return; - logger.trace('Conditional update request: %s, %s, %s, %s', key, newValue, existingValue, prevValue); + const updatedEntries: [keyof T, T[keyof T], T[keyof T]][] = []; + const insertedEntries: [keyof T, T[keyof T]][] = []; - if (existingValue && this.equalValues(existingValue, prevValue)) { - this._object[key as keyof T] = newValue; - updatedEntries.push([ key, existingValue, newValue ]); - } - } else { - - for (const [ key, encodedValue ] of request.entries) { - const existingValue = this._object[key]; - const decodedNewValue = this._decodeValue(key, encodedValue); - - if (existingValue === undefined) { - insertedEntries.push([ key, this._decodeValue(key, encodedValue) ]); - } else { - updatedEntries.push([ key, existingValue, decodedNewValue ]); - } - - this._object[key as keyof T] = decodedNewValue; + for (const [ key, encodedValue ] of request.entries) { + const existingValue = this._object[key]; + const decodedNewValue = this._decodeValue(key, encodedValue); + + if (existingValue === undefined) { + insertedEntries.push([ key, this._decodeValue(key, encodedValue) ]); + } else { + updatedEntries.push([ key, existingValue, decodedNewValue ]); } - } + this._object[key as keyof T] = decodedNewValue; + } if (request.sourceEndpointId === this.connection.grid.localPeerId) { this.connection.respond( 'UpdateEntriesResponse', @@ -342,18 +377,63 @@ export class HamokRecord extends EventEmitter { return this._decodeValue(key as string, respondedValue) as T[K]; } + public async insertInstance(instance: Partial): Promise | undefined> { + if (this._closed) throw new Error(`Cannot set an entry on a closed storage (${this.id})`); + + await this._initializing; + + const entries = new Map(); + + for (const [ key, value ] of Object.entries(instance)) { + entries.set(key, this._encodeValue(key as keyof T, value as T[keyof T])); + } + + const respondedValue = (await this.connection.requestInsertEntries(entries)); + + if (!respondedValue || respondedValue.size < 1) return; + + const respondedInstance: Partial = {}; + + for (const [ key, value ] of Object.entries(respondedValue)) { + respondedInstance[key as keyof T] = this._decodeValue(key, value); + } + + return respondedInstance; + } + public async updateIf(key: K, value: T[K], oldValue: T[K]): Promise { if (this._closed) throw new Error(`Cannot update an entry on a closed storage (${this.id})`); await this._initializing; logger.trace('%s UpdateIf: %s, %s, %s', this.connection.grid.localPeerId, key, value, oldValue); + const newValue: Partial = {}; + const prevValue: Partial = {}; + + newValue[key] = value; + prevValue[key] = oldValue; + + return this.updateInstanceIf(newValue, prevValue); + } + + public async updateInstanceIf(newValue: Partial, oldValue: Partial): Promise { + if (this._closed) throw new Error(`Cannot update an entry on a closed storage (${this.id})`); + + await this._initializing; + + const entries = new Map(); + + for (const [ key, value ] of Object.entries(newValue)) { + entries.set(key, this._encodeValue(key as keyof T, value as T[keyof T])); + } + + logger.trace('%s UpdateIf: %s, %s, %s', this.connection.grid.localPeerId, [ ...entries ].map(([ k, v ]) => `key: ${k}, value: ${v}`).join(','), oldValue); return (await this.connection.requestUpdateEntries( - Collections.mapOf([ key as string, this._encodeValue(key, value) ]), + entries, undefined, - this._encodeValue(key, oldValue) - )).get(key as string) !== undefined; + JSON.stringify(oldValue) + )).get(UPDATE_IF_RESPONSE_KEY) === 'true'; } public async delete(key: K): Promise { @@ -431,8 +511,3 @@ export class HamokRecord extends EventEmitter { return this._payloadsCodec?.get(key as keyof T) ?? JSON.parse(value); } } - -// const record: HamokRecord<{ foo: number, bar: string[] }>; - -// record.changeNumBy('bar', 1); -// record.removeFromList('bar', 'asd'); \ No newline at end of file diff --git a/src/index.ts b/src/index.ts index 9cb718b..4528399 100644 --- a/src/index.ts +++ b/src/index.ts @@ -21,7 +21,8 @@ export { HamokQueue } from './collections/HamokQueue'; export { - HamokEmitter + HamokEmitter, + HamokEmitterStats, } from './collections/HamokEmitter'; export { HamokConnection diff --git a/src/messages/PendingRequest.ts b/src/messages/PendingRequest.ts index 51cfbbb..43bab19 100644 --- a/src/messages/PendingRequest.ts +++ b/src/messages/PendingRequest.ts @@ -13,7 +13,7 @@ export type PendingRequestConfig = { export type PendingRequestState = 'pending' | 'resolved' | 'rejected'; -export class PendingRequest implements Promise> { +export class PendingRequest { public readonly responses = new Map(); private _postponeTimeout = false; @@ -95,17 +95,21 @@ export class PendingRequest implements Promise> { return this._state; } - public then(onfulfilled?: ((value: HamokMessage[]) => TResult1 | PromiseLike) | null, onrejected?: ((reason: string) => TResult2 | PromiseLike) | null): Promise { - return this._promise.then(onfulfilled, onrejected); + public get promise(): Promise { + return this._promise; } + + // public then(onfulfilled?: ((value: HamokMessage[]) => TResult1 | PromiseLike) | null, onrejected?: ((reason: string) => TResult2 | PromiseLike) | null): Promise { + // return this._promise.then(onfulfilled, onrejected); + // } - public catch(onrejected?: ((reason: string) => TResult | PromiseLike) | null): Promise { - return this._promise.catch(onrejected); - } + // public catch(onrejected?: ((reason: string) => TResult | PromiseLike) | null): Promise { + // return this._promise.catch(onrejected); + // } - public finally(onfinally?: (() => void) | null): Promise { - return this._promise.finally(onfinally); - } + // public finally(onfinally?: (() => void) | null): Promise { + // return this._promise.finally(onfinally); + // } // private _resolve(): void { // if (this._timer) { diff --git a/src/messages/StorageCodec.ts b/src/messages/StorageCodec.ts index 9a4e905..149018d 100644 --- a/src/messages/StorageCodec.ts +++ b/src/messages/StorageCodec.ts @@ -832,6 +832,7 @@ export class StorageCodec implements HamokCodec, Message> { requestId: request.requestId, keys, values, + // prevValue: request.prevValue !== undefined ? this.valueCodec.encode(request.prevValue) : undefined, prevValue: request.prevValue !== undefined ? this.valueCodec.encode(request.prevValue) : undefined, }); } diff --git a/src/raft/MemoryStoredRaftLogs.ts b/src/raft/MemoryStoredRaftLogs.ts index b8b932c..afccb19 100644 --- a/src/raft/MemoryStoredRaftLogs.ts +++ b/src/raft/MemoryStoredRaftLogs.ts @@ -279,7 +279,7 @@ export class MemoryStoredRaftLogs extends EventEmitter implements RaftLogs { this._firstIndex = newCommitIndex; this._memoryEstimateBytesLength = 0; - logger.debug(`Logs are reset. new values: commitIndex: ${this._commitIndex}, nextIndex: ${this._nextIndex}, lastApplied: ${this._firstIndex}`); + logger.warn(`Logs are reset. new values: commitIndex: ${this._commitIndex}, nextIndex: ${this._nextIndex}, lastApplied: ${this._firstIndex}`); } public removeUntil(newFirstIndex: number): void { diff --git a/src/raft/RaftEngine.ts b/src/raft/RaftEngine.ts index 1e76f74..de34b1b 100644 --- a/src/raft/RaftEngine.ts +++ b/src/raft/RaftEngine.ts @@ -109,7 +109,7 @@ export class RaftEngine { prevState.close(); this._state = newState; - + logger.debug(`%s State changed from ${prevState.stateName} to ${newState.stateName}`, this.localPeerId); if (prevState.stateName === 'candidate' && newState.stateName === 'follower') { @@ -118,7 +118,7 @@ export class RaftEngine { newState.init?.(); - this.events.emit('state-changed', newState.stateName); + this.events.emit('state-changed', newState.stateName, prevState.stateName); switch (newState.stateName) { case 'leader': diff --git a/src/raft/RaftLeaderState.ts b/src/raft/RaftLeaderState.ts index b4b8b74..2ab7208 100644 --- a/src/raft/RaftLeaderState.ts +++ b/src/raft/RaftLeaderState.ts @@ -33,7 +33,7 @@ export function createRaftLeaderState(context: RaftLeaderStateContext): RaftStat * until the follower does not respond normally. */ const sentRequests = new Map(); - const unsyncedRemotePeers = new Set(); + const unsyncedRemotePeers = new Map(); let follow: () => void = () => void 0; let closed = false; const appendEntriesRequestListener = (request: RaftAppendEntriesRequestChunk) => { @@ -195,7 +195,10 @@ export function createRaftLeaderState(context: RaftLeaderStateContext): RaftStat logger.trace('%s Collected %d entries for peer %s', localPeerId, entries.length, peerId); if (peerNextIndex < logs.firstIndex) { - if (unsyncedRemotePeers.add(peerId)) { + const startedUnsynced = unsyncedRemotePeers.get(peerId); + + if (!startedUnsynced) { + unsyncedRemotePeers.set(peerId, Date.now()); logger.warn('%s Peer %s is unsynced, logs.nextIndex: %d, peerNextIndex: %d', localPeerId, peerId, @@ -203,15 +206,22 @@ export function createRaftLeaderState(context: RaftLeaderStateContext): RaftStat peerNextIndex ); // logger.warn(`Collected ${entries.length} entries, but peer ${peerId} should need ${logs.nextIndex - peerNextIndex}. logs.nextIndex: ${logs.nextIndex}, peerNextIndex: ${peerNextIndex}`); + } else if (30000 < now - startedUnsynced) { + // we should kick the peer out of the cluster + + logger.warn('%s Peer %s is unsynced for a long time, we remove it from the cluster'); + raftEngine.events.emit('unsynced-peer', peerId); + unsyncedRemotePeers.delete(peerId); } } else if (0 < unsyncedRemotePeers.size) { - unsyncedRemotePeers.delete(peerId); - logger.info('%s Peer %s is synced, logs.nextIndex: %d, peerNextIndex: %d', - localPeerId, - peerId, - logs.nextIndex, - peerNextIndex - ); + if (unsyncedRemotePeers.delete(peerId)) { + logger.info('%s Peer %s is synced, logs.nextIndex: %d, peerNextIndex: %d', + localPeerId, + peerId, + logs.nextIndex, + peerNextIndex + ); + } } let sentRequest = sentRequests.get(peerId);