Skip to content

Commit

Permalink
Merge pull request #38 from medishen/dev/v2
Browse files Browse the repository at this point in the history
feat(events): Two new methods were added, called request and channel,…
  • Loading branch information
m-mdy-m authored Feb 17, 2025
2 parents 174a108 + 305f41a commit 31b0cb1
Show file tree
Hide file tree
Showing 16 changed files with 524 additions and 181 deletions.
42 changes: 42 additions & 0 deletions packages/events/core/correlation-id-factory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { CorrelationId } from '../types';

export class CorrelationIdFactory {
private readonly namespace: string;
private sequence: number = 0;
private lastTimestamp: number = 0;

constructor(namespace: string = 'gland') {
this.namespace = namespace;
}

create(): CorrelationId {
const timestamp = Date.now();
const random = Math.random().toString(36).substring(2, 15);
const sequence = this.getSequenceNumber(timestamp);

return this.formatId({
timestamp,
sequence,
random,
node: this.namespace,
}) as CorrelationId;
}

private getSequenceNumber(timestamp: number): number {
if (timestamp === this.lastTimestamp) {
this.sequence = (this.sequence + 1) & 0xfff; // 12-bit sequence
} else {
this.sequence = 0;
this.lastTimestamp = timestamp;
}
return this.sequence;
}

private formatId(parts: { timestamp: number; sequence: number; random: string; node: string }): string {
return [parts.timestamp.toString(16).padStart(10, '0'), parts.sequence.toString(16).padStart(3, '0'), parts.random, parts.node].join('-');
}

static validate(id: string): id is CorrelationId {
return /^[0-9a-f]{10}-[0-9a-f]{3}-[a-z0-9]{13}-[a-z]+$/.test(id);
}
}
129 changes: 105 additions & 24 deletions packages/events/core/event-manager.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,32 @@
import { EVENTS_METADATA } from '../constant';
import { ErrorHook } from '../hooks';
import { EventChannel } from '../interface';
import { BroadcastStrategy, ImmediateStrategy, QueueStrategy } from '../strategies';
import { Event, EventStrategyType, Listener, QualifiedEvent } from '../types';
import { EventMapper } from '../utils/event-mapper';
import { CorrelationIdFactory } from './correlation-id-factory';
import { EventBus } from './event-bus';
import { ContextFactory, EventContext } from './event-context';
import { EventRegistry } from './event-registry';
import 'reflect-metadata';
import { EventQueue } from './event.queue';
import { EventType } from '@gland/common';

export class EventManager {
private bus: EventBus;
private context: EventContext;
private eventRegistry: EventRegistry;
private registry: EventRegistry;
private queues = new Map<string, EventQueue>();
private correlationIdFactory: CorrelationIdFactory;
private channels = new Map<string, EventChannel<any, any>>();
private readonly MAX_SIZE: number = 1000;
constructor(strategy?: EventStrategyType) {
this.eventRegistry = EventRegistry.getInstance();
this.correlationIdFactory = new CorrelationIdFactory();

this.registry = EventRegistry.getInstance();
this.context = new ContextFactory().create();
this.bus = new EventBus(this.eventRegistry, this.createStrategy(strategy), this.context);
this.bus.addHook(new ErrorHook(this.context));
this.bus = new EventBus(this.registry, this.createStrategy(strategy), this.context);
this.bus.addHook(new ErrorHook(this.context, this.correlationIdFactory));
Reflect.defineMetadata(EVENTS_METADATA.EVENT_MANAGER, this, EventManager);
Reflect.defineMetadata(EVENTS_METADATA.CONTEXT, this, EventContext);
}
Expand All @@ -31,35 +41,106 @@ export class EventManager {
return new ImmediateStrategy();
}
}
async publish<T extends string, D>(qualified: QualifiedEvent<T>, data?: D): Promise<void> {
const { phase, type } = EventMapper.parseQualifiedType(qualified);

const event: Event<typeof type, typeof phase, D> = {
type,
phase,
data: data ?? ({} as D),
lifecycle: {
startedAt: new Date(),
async request<T extends string, D = any, R = any>(qualified: QualifiedEvent<T>, data?: D): Promise<R> {
const event = EventMapper.createEvent(qualified, data);
const listeners = this.registry.getListeners(event.type);

const results = await Promise.all(listeners.map(async (listener) => listener(event)));

const mergedResults = results.reduce((acc, result) => {
return { ...acc, ...result };
}, {});
return mergedResults;
}

channel<T extends string, D = any, R = any>(type: QualifiedEvent<T>): EventChannel<D, R> {
if (!this.channels.has(type)) {
this.channels.set(type, this.createChannel(type));
}
return this.channels.get(type)!;
}

private createChannel<T, R>(type: QualifiedEvent): EventChannel<T, R> {
return {
emit: (data) => this.emit(type, data),
request: (data) => this.request(type, data),
respond: (handler: (data: T) => Promise<R> | R) => {
return this.on(type, async (event) => {
try {
return await handler(event.data);
} catch (error) {
this.emit(`${event.type}:error`, {
correlationId: event.correlationId,
error,
});
}
});
},
};
await this.bus.emit(event);
}

subscribe<T extends string>(qualified: QualifiedEvent<T>, listener: Listener): () => void {
const { phase, type } = EventMapper.parseQualifiedType(qualified);
this.eventRegistry.register(type, listener);
async emit<T extends string, D>(qualified: QualifiedEvent<T>, data?: D): Promise<void> {
const event = EventMapper.createEvent(qualified, data);

if (this.registry.hasListeners(event.type)) {
await this.bus.emit(event);
} else {
if (this.registry.hasEverHadListeners(event.type)) {
console.warn(`[EventManager] No active listeners for "${event.type}", but previous listeners existed. Not queuing.`);
return;
}
console.warn(`[EventManager] No listeners for "${event.type}", queueing event.`);
this.queueEvent(event);
}
}

return () => this.eventRegistry.unregister(type, listener);
private queueEvent<T extends EventType, D>(event: Event<T, D>): void {
const queue = this.getOrCreateQueue(event.type);
queue.enqueue(event);
}
unsubscribe<T extends string, D>(qualified: QualifiedEvent<T>, listener: Listener): void {
const { phase, type } = EventMapper.parseQualifiedType(qualified);
this.eventRegistry.unregister(type, listener);
private getOrCreateQueue<T extends EventType>(type: T): EventQueue<T> {
if (!this.queues.has(type)) {
this.queues.set(type, new EventQueue<T>(this.MAX_SIZE));
}
return this.queues.get(type) as EventQueue<T>;
}

batchSubscribe<T extends string, D>(listeners: { qualified: QualifiedEvent<T>; listener: Listener }[]): (() => void)[] {
return listeners.map(({ qualified, listener }) => this.subscribe(qualified, listener));
on<T extends string, R>(qualified: QualifiedEvent<T>, listener: Listener<R>): () => void {
const { type } = EventMapper.parseQualifiedEvent(qualified);
// Mark that this event type has had listeners
this.registry.markHasListeners(type);

this.registry.register(type, listener);

const unsubscribe = () => {
this.off(type, listener);
this.cleanupQueue(type);
};

const queue = this.queues.get(type);
if (queue) {
console.log(`[EventManager] Processing queued events for "${type}"`);
queue.process(async (event) => {
await this.bus.emit(event);
});
this.queues.delete(type);
}
return unsubscribe;
}
off<T extends string, R>(qualified: QualifiedEvent<T>, listener: Listener<R>): void {
const { type } = EventMapper.parseQualifiedEvent(qualified);
this.registry.unregister(type, listener);
}
batchUnsubscribe<T extends string, D>(listeners: { qualified: QualifiedEvent<T>; listener: Listener }[]): void {
listeners.forEach(({ qualified, listener }) => this.unsubscribe(qualified, listener));

private cleanupQueue<T extends EventType>(type: T): void {
if (!this.registry.hasListeners(type)) {
const queue = this.queues.get(type);
if (queue) {
console.log(`[EventManager] Cleaning up queue for "${type}"`);
queue.clear();
this.queues.delete(type);
}
}
}
}
10 changes: 10 additions & 0 deletions packages/events/core/event-registry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { IEventType, Listener } from '../types';
export class EventRegistry {
private static instance: EventRegistry;
private listeners = new Map<IEventType, Listener<any>[]>();
private hasHadListeners = new Set<string>();

private constructor() {}

Expand All @@ -18,6 +19,7 @@ export class EventRegistry {
listeners.push(listener);

this.listeners.set(type, listeners);
this.hasHadListeners.add(type);
}

unregister(type: IEventType, listener: Listener): void {
Expand All @@ -32,4 +34,12 @@ export class EventRegistry {
hasListeners(event: IEventType): boolean {
return !!this.listeners.get(event)?.length;
}

hasEverHadListeners(type: string): boolean {
return this.hasHadListeners.has(type);
}

markHasListeners(type: string): void {
this.hasHadListeners.add(type);
}
}
105 changes: 105 additions & 0 deletions packages/events/core/event.queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
import { EventType } from '@gland/common';
import { Event } from '../types';

interface QueueNode<T extends EventType, D> {
event: Event<T, D>;
next?: QueueNode<T, D>;
prev?: QueueNode<T, D>;
}

export class EventQueue<T extends EventType = EventType, D = unknown> {
private readonly maxSize: number;
private readonly map = new Map<string, QueueNode<T, D>>();
private head?: QueueNode<T, D>;
private tail?: QueueNode<T, D>;
private size = 0;

constructor(maxSize: number = 1000) {
this.maxSize = maxSize;
}

enqueue(event: Event<T, D>): void {
const node: QueueNode<T, D> = { event };
const key = this.getEventKey(event);

if (this.map.has(key)) {
this.moveToFront(key);
return;
}

this.map.set(key, node);
this.addToFront(node);
this.ensureSize();
}

dequeue(): Event<T, D> | undefined {
if (!this.tail) return undefined;

const node = this.tail;
this.removeNode(node);
return node.event;
}

process(callback: (event: Event<T, D>) => void | Promise<void>): void {
let current = this.head;
while (current) {
callback(current.event);
current = current.next;
}
this.clear();
}

clear(): void {
this.map.clear();
this.head = undefined;
this.tail = undefined;
this.size = 0;
}

private getEventKey(event: Event<T, D>): string {
return `${event.type}:${event.correlationId || event.timestamp}`;
}

private addToFront(node: QueueNode<T, D>): void {
if (!this.head) {
this.head = this.tail = node;
} else {
node.next = this.head;
this.head.prev = node;
this.head = node;
}
this.size++;
}

private moveToFront(key: string): void {
const node = this.map.get(key);
if (!node || node === this.head) return;

// Remove from current position
if (node.prev) node.prev.next = node.next;
if (node.next) node.next.prev = node.prev;
if (node === this.tail) this.tail = node.prev;

// Add to head
node.next = this.head;
node.prev = undefined;
if (this.head) this.head.prev = node;
this.head = node;
}

private removeNode(node: QueueNode<T, D>): void {
if (node.prev) node.prev.next = node.next;
if (node.next) node.next.prev = node.prev;
if (node === this.head) this.head = node.next;
if (node === this.tail) this.tail = node.prev;

this.map.delete(this.getEventKey(node.event));
this.size--;
}

private ensureSize(): void {
while (this.size > this.maxSize && this.tail) {
this.removeNode(this.tail);
}
}
}
23 changes: 11 additions & 12 deletions packages/events/decorators/emit-event.decorator.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
import 'reflect-metadata';
import { EVENTS_METADATA } from '../constant';
import { Constructor, determineDecoratorType, getConstructor } from '@gland/common';
import { Constructor, determineDecoratorType, getConstructor, isNil, isObject } from '@gland/common';
import { QualifiedEvent } from '../types';
import { EventManager } from '../core';
import { EmitHandlers } from './handlers';
import { EventEmitClassOptions, EventEmitMethodOptions } from '../interface';
type EmitOptions<T, D> = T extends Constructor<infer _> ? EventEmitClassOptions<D> : EventEmitMethodOptions<D>;

function EmitEvent<Q extends string, T, D = any>(qualified: QualifiedEvent<Q>, options?: EmitOptions<T, D>): MethodDecorator & ClassDecorator {
return function (target: any, propertyKey?: string | symbol, descriptor?: PropertyDescriptor | number) {
import { EventEmitMethodOptions } from '../interface';
function EmitEvent<Q extends string, T, D = any>(qualified: QualifiedEvent<Q>, options?: EventEmitMethodOptions<D>): MethodDecorator & ClassDecorator {
return function (target: any, _?: string | symbol, descriptor?: PropertyDescriptor | number) {
const eventManager: EventManager = Reflect.getMetadata(EVENTS_METADATA.EVENT_MANAGER, EventManager);
if (!eventManager) {
throw new Error('EventManager must be initialized before using @EmitEvent decorator');
Expand All @@ -20,17 +18,16 @@ function EmitEvent<Q extends string, T, D = any>(qualified: QualifiedEvent<Q>, o
target: constructor,
eventManager,
qualified,
options: options as EventEmitClassOptions<D>,
});
return target;

case 'method':
if (typeof descriptor === 'object' && descriptor !== null) {
if (isObject(descriptor) && !isNil(descriptor)) {
EmitHandlers.methodHandler({
descriptor,
eventManager,
qualified,
options: options as EventEmitMethodOptions<D>,
options: options,
});
} else {
throw new Error('@EmitEvent can only be used on methods with a valid PropertyDescriptor.');
Expand All @@ -42,9 +39,11 @@ function EmitEvent<Q extends string, T, D = any>(qualified: QualifiedEvent<Q>, o
}
};
}
export function EmitMethod<Q extends string, D, T extends Function>(qualified: QualifiedEvent<Q>, options?: EmitOptions<T, D>) {
// method-level
export function Emit<Q extends string, D, T extends Function>(qualified: QualifiedEvent<Q>, options?: EventEmitMethodOptions<D>) {
return EmitEvent<Q, T, D>(qualified, options);
}
export function EmitClass<Q extends string, D, T extends Constructor>(qualified: QualifiedEvent<Q>, options?: EmitOptions<T, D>) {
return EmitEvent<Q, T, D>(qualified, options);
// class-level
export function Emits<Q extends string, D, T extends Constructor>(qualified: QualifiedEvent<Q>) {
return EmitEvent<Q, T, D>(qualified);
}
Loading

0 comments on commit 31b0cb1

Please sign in to comment.