Skip to content

Commit

Permalink
Merge pull request #35 from medishen/dev/v2
Browse files Browse the repository at this point in the history
feat(events/core): implement event-driven system (EDS) with SEP architecture
  • Loading branch information
0xii00 authored Feb 14, 2025
2 parents bbd1020 + 0be4871 commit 042be2d
Show file tree
Hide file tree
Showing 38 changed files with 811 additions and 0 deletions.
6 changes: 6 additions & 0 deletions packages/events/constant.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
export const EVENTS_METADATA = {
EMIT_EVENT: 'events:emit',
EVENT_BUS: 'events:event_bus',
EVENT_MANAGER: 'events:event_maanger',
CONTEXT: 'events:context',
};
49 changes: 49 additions & 0 deletions packages/events/core/event-bus.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import { EVENTS_METADATA } from '../constant';
import { EventHook, EventStrategy } from '../interface';
import { ErrorStrategy, EventPipeline, PhaseManager } from '../pipeline';
import { Event } from '../types';
import { EventContext } from './event-context';
import { EventRegistry } from './event-registry';
import 'reflect-metadata';
export class EventBus {
private hooks: EventHook[] = [];
private pipeline: EventPipeline;

constructor(private registry: EventRegistry, private strategy: EventStrategy, private context: EventContext) {
Reflect.defineMetadata(EVENTS_METADATA.EVENT_BUS, this, this);
this.pipeline = this.createPipeline();
}

addHook(hook: EventHook): void {
this.hooks.push(hook);
}

private createPipeline(): EventPipeline {
const phaseManager = new PhaseManager(this.registry, this.strategy);
const errorStrategy = new ErrorStrategy(this.context);
return new EventPipeline(phaseManager, errorStrategy, this.context);
}
async emit<E extends Event>(event: E): Promise<void> {
this.context.set('ctx:bus', this, true);
this.context.set('ctx:registry', this.registry, true);
this.context.set('ctx:processed', false);
this.context.set('ctx:current_phase', event.phase);

try {
await this.pipeline.process(event);
} catch (error) {
for (const listener of this.registry.getListeners(event.type)) {
for (const hook of this.hooks) {
await hook.onError?.(error, event, listener);
}
}
} finally {
// Cleanup
this.context.set('ctx:processed', true);
this.cleanupOnceListeners(event);
}
}
private cleanupOnceListeners(event: Event<any>) {
this.registry.getListeners(event.type).forEach((l) => this.registry.unregister(event.type, l));
}
}
42 changes: 42 additions & 0 deletions packages/events/core/event-context.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
import { EventContextFactory } from '../interface';
import { CtxKey, CtxVal } from '../types';

export interface EventContextData {
[key: string | symbol]: CtxVal;
}
export class EventContext {
private data: EventContextData = {};

get<T extends CtxVal>(key: CtxKey): T | undefined {
return this.data[key] as T | undefined;
}

set<T extends CtxVal>(key: CtxKey, value: T, immutable = false): void {
this.data[key] = value;
}
has(key: CtxKey): boolean {
return key in this.data;
}

delete(key: CtxKey): boolean {
return delete this.data[key];
}

static create(): EventContext {
return new EventContext();
}

saveState(): EventContextData {
return { ...this.data };
}

restoreState(state: EventContextData): void {
this.data = { ...state };
}
}

export class ContextFactory implements EventContextFactory {
create(): EventContext {
return EventContext.create();
}
}
65 changes: 65 additions & 0 deletions packages/events/core/event-manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { EVENTS_METADATA } from '../constant';
import { ErrorHook } from '../hooks';
import { BroadcastStrategy, ImmediateStrategy, QueueStrategy } from '../strategies';
import { Event, EventStrategyType, Listener, QualifiedEvent } from '../types';
import { EventMapper } from '../utils/event-mapper';
import { EventBus } from './event-bus';
import { ContextFactory, EventContext } from './event-context';
import { EventRegistry } from './event-registry';
import 'reflect-metadata';

export class EventManager {
private bus: EventBus;
private context: EventContext;
private eventRegistry: EventRegistry;
constructor(strategy?: EventStrategyType) {
this.eventRegistry = EventRegistry.getInstance();
this.context = new ContextFactory().create();
this.bus = new EventBus(this.eventRegistry, this.createStrategy(strategy), this.context);
this.bus.addHook(new ErrorHook(this.context));
Reflect.defineMetadata(EVENTS_METADATA.EVENT_MANAGER, this, EventManager);
Reflect.defineMetadata(EVENTS_METADATA.CONTEXT, this, EventContext);
}
private createStrategy(strategy?: EventStrategyType) {
switch (strategy) {
case 'broadcast':
return new BroadcastStrategy();
case 'queue':
return new QueueStrategy();
case 'immediate':
default:
return new ImmediateStrategy();
}
}
async publish<T extends QualifiedEvent, D>(qualified: T, data: D): Promise<void> {
const { phase, type } = EventMapper.parseQualifiedType(qualified);

const event: Event<typeof type, typeof phase, D> = {
type,
phase,
data,
lifecycle: {
startedAt: new Date(),
},
};
await this.bus.emit(event);
}

subscribe<T extends QualifiedEvent>(qualified: T, listener: Listener): () => void {
const { phase, type } = EventMapper.parseQualifiedType(qualified);
this.eventRegistry.register(type, listener);

return () => this.eventRegistry.unregister(type, listener);
}
unsubscribe<T extends QualifiedEvent, D>(qualified: T, listener: Listener): void {
const { phase, type } = EventMapper.parseQualifiedType(qualified);
this.eventRegistry.unregister(type, listener);
}

batchSubscribe<T extends QualifiedEvent, D>(listeners: { qualified: T; listener: Listener }[]): (() => void)[] {
return listeners.map(({ qualified, listener }) => this.subscribe(qualified, listener));
}
batchUnsubscribe<T extends QualifiedEvent, D>(listeners: { qualified: T; listener: Listener }[]): void {
listeners.forEach(({ qualified, listener }) => this.unsubscribe(qualified, listener));
}
}
35 changes: 35 additions & 0 deletions packages/events/core/event-registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { IEventType, Listener } from '../types';

export class EventRegistry {
private static instance: EventRegistry;
private listeners = new Map<IEventType, Listener<any>[]>();

private constructor() {}

static getInstance(): EventRegistry {
if (!EventRegistry.instance) {
EventRegistry.instance = new EventRegistry();
}
return EventRegistry.instance;
}

register(type: IEventType, listener: Listener): void {
const listeners = this.listeners.get(type) ?? [];
listeners.push(listener);

this.listeners.set(type, listeners);
}

unregister(type: IEventType, listener: Listener): void {
const listeners = this.listeners.get(type)?.filter((l) => l !== listener);
if (listeners) this.listeners.set(type, listeners);
}

getListeners(type: IEventType): Listener[] {
return this.listeners.get(type) ?? [];
}

hasListeners(event: IEventType): boolean {
return !!this.listeners.get(event)?.length;
}
}
4 changes: 4 additions & 0 deletions packages/events/core/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export * from './event-bus';
export * from './event-context';
export * from './event-manager';
export * from './event-registry';
43 changes: 43 additions & 0 deletions packages/events/decorators/emit-event.decorator.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import 'reflect-metadata';
import { EVENTS_METADATA } from '../constant';
import { Constructor, determineDecoratorType, getConstructor } from '@gland/common';
import { QualifiedEvent } from '../types';
import { EventManager } from '../core';
import { EmitHandlers } from './handlers';
import { EventEmitClassOptions, EventEmitMethodOptions } from '../interface';
type EmitOptions<T, D> = T extends Constructor<infer _> ? EventEmitClassOptions<D> : EventEmitMethodOptions;

function EmitEvent<T, D = any>(qualified: QualifiedEvent, options?: EmitOptions<T, D>): MethodDecorator & ClassDecorator {
return function (target: any, propertyKey?: string | symbol, descriptor?: PropertyDescriptor | number) {
const eventManager: EventManager = Reflect.getMetadata(EVENTS_METADATA.EVENT_MANAGER, EventManager);
if (!eventManager) {
throw new Error('EventManager must be initialized before using @EmitEvent decorator');
}
const constructor = getConstructor(target);
switch (determineDecoratorType(arguments)) {
case 'class':
EmitHandlers.classHandler({
data: (options as any).data,
eventManager,
qualified,
});
return target;

case 'method':
if (typeof descriptor === 'object' && descriptor !== null) {
} else {
throw new Error('@EmitEvent can only be used on methods with a valid PropertyDescriptor.');
}
break;

default:
throw new Error('Invalid usage of @EmitEvent decorator.');
}
};
}
export function EmitMethod<D, T extends Function>(qualified: QualifiedEvent, options?: EmitOptions<T, D>) {
return EmitEvent(qualified, options);
}
export function EmitClass<D, T extends Constructor>(qualified: QualifiedEvent, options?: EmitOptions<T, D>) {
return EmitEvent(qualified, options);
}
26 changes: 26 additions & 0 deletions packages/events/decorators/handlers/emit-event.handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { QualifiedEvent } from '@gland/events/types';
import { EventManager } from '../../core/event-manager';
import { EventEmitMethodOptions } from '../../interface';
export namespace EmitHandlers {
export function classHandler<D>({ data, eventManager, qualified }: { eventManager: EventManager; qualified: QualifiedEvent; data: D }) {
eventManager.publish(qualified, data);
}
export function methodHandler<D>({
data,
descriptor,
eventManager,
qualified,
options,
}: {
data: D;
qualified: QualifiedEvent;
eventManager: EventManager;
descriptor: PropertyDescriptor;
options?: EventEmitMethodOptions;
}) {
const originalMethod = descriptor.value;
descriptor.value = async function (...args: any[]) {
let payload = data;
};
}
}
2 changes: 2 additions & 0 deletions packages/events/decorators/handlers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './on-event.handlers';
export * from './emit-event.handlers';
107 changes: 107 additions & 0 deletions packages/events/decorators/handlers/on-event.handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
import { EventOnClassOptions, EventOnMethodOptions } from '../../interface';
import { EventManager } from '../../core';
import { Event, IEventType, QualifiedEvent } from '../../types';

export namespace OnHandlers {
/**
* Registers all methods on the target class's prototype as event handlers for a given qualified event,
* but only for methods that have not already been subscribed.
*
* This function iterates over all properties of the target's prototype. For each property that is a function,
* it checks for a custom flag (`__sub__`) on the function. If the flag is not set, the method is bound to the
* target's prototype and subscribed to the event manager using the provided qualified event. This ensures
* that methods which have already been registered (and thus have the `__sub__` flag set to true) are not subscribed again.
*
* @param target - The class whose prototype methods will be registered as event handlers.
* @param qualified - The qualified event identifier (e.g., "server:start:pre") to which the methods will subscribe.
* @param eventManager - The instance of EventManager that will handle the subscription of event handlers.
*
* @remarks
* Methods that have been already subscribed should have the `__sub__` property set to `true` to avoid duplicate subscriptions.
*/
export function classDecorator(target: any, qualified: QualifiedEvent, eventManager: EventManager, options?: EventOnClassOptions) {
const prototype = target.prototype;
// 🚨 Validate that only one of 'pick' or 'omit' is used
if (options?.pick && options?.omit) {
throw new Error("Invalid decorator usage: You cannot use both 'pick' and 'omit' together.");
}
// Handle inheritance first
if (options?.inherit) {
const parent = Object.getPrototypeOf(target.prototype).constructor;
if (parent !== Object) {
classDecorator(parent, qualified, eventManager, options);
}
}

// Normalize pick/omit to arrays
const pickArray = options?.pick ? (Array.isArray(options.pick) ? options.pick : [options.pick]) : [];

const omitArray = options?.omit ? (Array.isArray(options.omit) ? options.omit : [options.omit]) : [];

Reflect.ownKeys(prototype).forEach((propertyKey) => {
if (typeof propertyKey !== 'string' && typeof propertyKey !== 'symbol') return;
if (propertyKey === 'constructor') return;

const handler = prototype[propertyKey];
if (typeof handler !== 'function' || handler.__sub__) {
return;
}

const methodName = propertyKey.toString();
const isExplicitlyPicked = pickArray.length > 0 ? pickArray.includes(methodName) : true;

const isExplicitlyOmitted = omitArray.includes(methodName);

if (isExplicitlyPicked && !isExplicitlyOmitted) {
eventManager.subscribe(qualified, handler.bind(prototype));
handler.__sub__ = true;
}
});
}

export function methodDecorator<T extends IEventType>(
target: any,
propertyKey: string | symbol,
descriptor: PropertyDescriptor,
qualified: QualifiedEvent,
eventManager: EventManager,
options?: EventOnMethodOptions<T>,
) {
const originalMethod = descriptor.value;

descriptor.value = async function (event: Event<T>) {
try {
let transformedEvent = event;

if (options?.transform) {
transformedEvent = options.transform(structuredClone(event));
}

if (options?.retry) {
let attempt = 0;
while (true) {
try {
return await originalMethod.call(this, transformedEvent);
} catch (error) {
// attempt++;
if (options.retry.delay > 0) {
await new Promise((resolve) => setTimeout(resolve, options.retry!.delay));
}

if (attempt > options.retry.max) {
throw error;
}
}
}
}

return originalMethod.call(this, transformedEvent);
} catch (error) {
throw error;
}
};

descriptor.value.__sub__ = true;
eventManager.subscribe(qualified, descriptor.value.bind(target));
}
}
2 changes: 2 additions & 0 deletions packages/events/decorators/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './emit-event.decorator';
export * from './on-event.decorator';
Loading

0 comments on commit 042be2d

Please sign in to comment.