diff --git a/packages/events/constant.ts b/packages/events/constant.ts new file mode 100644 index 0000000..e4469ac --- /dev/null +++ b/packages/events/constant.ts @@ -0,0 +1,6 @@ +export const EVENTS_METADATA = { + EMIT_EVENT: 'events:emit', + EVENT_BUS: 'events:event_bus', + EVENT_MANAGER: 'events:event_maanger', + CONTEXT: 'events:context', +}; diff --git a/packages/events/core/event-bus.ts b/packages/events/core/event-bus.ts new file mode 100644 index 0000000..f32353a --- /dev/null +++ b/packages/events/core/event-bus.ts @@ -0,0 +1,49 @@ +import { EVENTS_METADATA } from '../constant'; +import { EventHook, EventStrategy } from '../interface'; +import { ErrorStrategy, EventPipeline, PhaseManager } from '../pipeline'; +import { Event } from '../types'; +import { EventContext } from './event-context'; +import { EventRegistry } from './event-registry'; +import 'reflect-metadata'; +export class EventBus { + private hooks: EventHook[] = []; + private pipeline: EventPipeline; + + constructor(private registry: EventRegistry, private strategy: EventStrategy, private context: EventContext) { + Reflect.defineMetadata(EVENTS_METADATA.EVENT_BUS, this, this); + this.pipeline = this.createPipeline(); + } + + addHook(hook: EventHook): void { + this.hooks.push(hook); + } + + private createPipeline(): EventPipeline { + const phaseManager = new PhaseManager(this.registry, this.strategy); + const errorStrategy = new ErrorStrategy(this.context); + return new EventPipeline(phaseManager, errorStrategy, this.context); + } + async emit(event: E): Promise { + this.context.set('ctx:bus', this, true); + this.context.set('ctx:registry', this.registry, true); + this.context.set('ctx:processed', false); + this.context.set('ctx:current_phase', event.phase); + + try { + await this.pipeline.process(event); + } catch (error) { + for (const listener of this.registry.getListeners(event.type)) { + for (const hook of this.hooks) { + await hook.onError?.(error, event, listener); + } + } + } finally { + // Cleanup + this.context.set('ctx:processed', true); + this.cleanupOnceListeners(event); + } + } + private cleanupOnceListeners(event: Event) { + this.registry.getListeners(event.type).forEach((l) => this.registry.unregister(event.type, l)); + } +} diff --git a/packages/events/core/event-context.ts b/packages/events/core/event-context.ts new file mode 100644 index 0000000..99cedfa --- /dev/null +++ b/packages/events/core/event-context.ts @@ -0,0 +1,42 @@ +import { EventContextFactory } from '../interface'; +import { CtxKey, CtxVal } from '../types'; + +export interface EventContextData { + [key: string | symbol]: CtxVal; +} +export class EventContext { + private data: EventContextData = {}; + + get(key: CtxKey): T | undefined { + return this.data[key] as T | undefined; + } + + set(key: CtxKey, value: T, immutable = false): void { + this.data[key] = value; + } + has(key: CtxKey): boolean { + return key in this.data; + } + + delete(key: CtxKey): boolean { + return delete this.data[key]; + } + + static create(): EventContext { + return new EventContext(); + } + + saveState(): EventContextData { + return { ...this.data }; + } + + restoreState(state: EventContextData): void { + this.data = { ...state }; + } +} + +export class ContextFactory implements EventContextFactory { + create(): EventContext { + return EventContext.create(); + } +} diff --git a/packages/events/core/event-manager.ts b/packages/events/core/event-manager.ts new file mode 100644 index 0000000..e77df25 --- /dev/null +++ b/packages/events/core/event-manager.ts @@ -0,0 +1,65 @@ +import { EVENTS_METADATA } from '../constant'; +import { ErrorHook } from '../hooks'; +import { BroadcastStrategy, ImmediateStrategy, QueueStrategy } from '../strategies'; +import { Event, EventStrategyType, Listener, QualifiedEvent } from '../types'; +import { EventMapper } from '../utils/event-mapper'; +import { EventBus } from './event-bus'; +import { ContextFactory, EventContext } from './event-context'; +import { EventRegistry } from './event-registry'; +import 'reflect-metadata'; + +export class EventManager { + private bus: EventBus; + private context: EventContext; + private eventRegistry: EventRegistry; + constructor(strategy?: EventStrategyType) { + this.eventRegistry = EventRegistry.getInstance(); + this.context = new ContextFactory().create(); + this.bus = new EventBus(this.eventRegistry, this.createStrategy(strategy), this.context); + this.bus.addHook(new ErrorHook(this.context)); + Reflect.defineMetadata(EVENTS_METADATA.EVENT_MANAGER, this, EventManager); + Reflect.defineMetadata(EVENTS_METADATA.CONTEXT, this, EventContext); + } + private createStrategy(strategy?: EventStrategyType) { + switch (strategy) { + case 'broadcast': + return new BroadcastStrategy(); + case 'queue': + return new QueueStrategy(); + case 'immediate': + default: + return new ImmediateStrategy(); + } + } + async publish(qualified: T, data: D): Promise { + const { phase, type } = EventMapper.parseQualifiedType(qualified); + + const event: Event = { + type, + phase, + data, + lifecycle: { + startedAt: new Date(), + }, + }; + await this.bus.emit(event); + } + + subscribe(qualified: T, listener: Listener): () => void { + const { phase, type } = EventMapper.parseQualifiedType(qualified); + this.eventRegistry.register(type, listener); + + return () => this.eventRegistry.unregister(type, listener); + } + unsubscribe(qualified: T, listener: Listener): void { + const { phase, type } = EventMapper.parseQualifiedType(qualified); + this.eventRegistry.unregister(type, listener); + } + + batchSubscribe(listeners: { qualified: T; listener: Listener }[]): (() => void)[] { + return listeners.map(({ qualified, listener }) => this.subscribe(qualified, listener)); + } + batchUnsubscribe(listeners: { qualified: T; listener: Listener }[]): void { + listeners.forEach(({ qualified, listener }) => this.unsubscribe(qualified, listener)); + } +} diff --git a/packages/events/core/event-registry.ts b/packages/events/core/event-registry.ts new file mode 100644 index 0000000..2cfdf68 --- /dev/null +++ b/packages/events/core/event-registry.ts @@ -0,0 +1,35 @@ +import { IEventType, Listener } from '../types'; + +export class EventRegistry { + private static instance: EventRegistry; + private listeners = new Map[]>(); + + private constructor() {} + + static getInstance(): EventRegistry { + if (!EventRegistry.instance) { + EventRegistry.instance = new EventRegistry(); + } + return EventRegistry.instance; + } + + register(type: IEventType, listener: Listener): void { + const listeners = this.listeners.get(type) ?? []; + listeners.push(listener); + + this.listeners.set(type, listeners); + } + + unregister(type: IEventType, listener: Listener): void { + const listeners = this.listeners.get(type)?.filter((l) => l !== listener); + if (listeners) this.listeners.set(type, listeners); + } + + getListeners(type: IEventType): Listener[] { + return this.listeners.get(type) ?? []; + } + + hasListeners(event: IEventType): boolean { + return !!this.listeners.get(event)?.length; + } +} diff --git a/packages/events/core/index.ts b/packages/events/core/index.ts new file mode 100644 index 0000000..3603dbe --- /dev/null +++ b/packages/events/core/index.ts @@ -0,0 +1,4 @@ +export * from './event-bus'; +export * from './event-context'; +export * from './event-manager'; +export * from './event-registry'; diff --git a/packages/events/decorators/emit-event.decorator.ts b/packages/events/decorators/emit-event.decorator.ts new file mode 100644 index 0000000..ab1b8c6 --- /dev/null +++ b/packages/events/decorators/emit-event.decorator.ts @@ -0,0 +1,43 @@ +import 'reflect-metadata'; +import { EVENTS_METADATA } from '../constant'; +import { Constructor, determineDecoratorType, getConstructor } from '@gland/common'; +import { QualifiedEvent } from '../types'; +import { EventManager } from '../core'; +import { EmitHandlers } from './handlers'; +import { EventEmitClassOptions, EventEmitMethodOptions } from '../interface'; +type EmitOptions = T extends Constructor ? EventEmitClassOptions : EventEmitMethodOptions; + +function EmitEvent(qualified: QualifiedEvent, options?: EmitOptions): MethodDecorator & ClassDecorator { + return function (target: any, propertyKey?: string | symbol, descriptor?: PropertyDescriptor | number) { + const eventManager: EventManager = Reflect.getMetadata(EVENTS_METADATA.EVENT_MANAGER, EventManager); + if (!eventManager) { + throw new Error('EventManager must be initialized before using @EmitEvent decorator'); + } + const constructor = getConstructor(target); + switch (determineDecoratorType(arguments)) { + case 'class': + EmitHandlers.classHandler({ + data: (options as any).data, + eventManager, + qualified, + }); + return target; + + case 'method': + if (typeof descriptor === 'object' && descriptor !== null) { + } else { + throw new Error('@EmitEvent can only be used on methods with a valid PropertyDescriptor.'); + } + break; + + default: + throw new Error('Invalid usage of @EmitEvent decorator.'); + } + }; +} +export function EmitMethod(qualified: QualifiedEvent, options?: EmitOptions) { + return EmitEvent(qualified, options); +} +export function EmitClass(qualified: QualifiedEvent, options?: EmitOptions) { + return EmitEvent(qualified, options); +} diff --git a/packages/events/decorators/handlers/emit-event.handlers.ts b/packages/events/decorators/handlers/emit-event.handlers.ts new file mode 100644 index 0000000..a6cba3c --- /dev/null +++ b/packages/events/decorators/handlers/emit-event.handlers.ts @@ -0,0 +1,26 @@ +import { QualifiedEvent } from '@gland/events/types'; +import { EventManager } from '../../core/event-manager'; +import { EventEmitMethodOptions } from '../../interface'; +export namespace EmitHandlers { + export function classHandler({ data, eventManager, qualified }: { eventManager: EventManager; qualified: QualifiedEvent; data: D }) { + eventManager.publish(qualified, data); + } + export function methodHandler({ + data, + descriptor, + eventManager, + qualified, + options, + }: { + data: D; + qualified: QualifiedEvent; + eventManager: EventManager; + descriptor: PropertyDescriptor; + options?: EventEmitMethodOptions; + }) { + const originalMethod = descriptor.value; + descriptor.value = async function (...args: any[]) { + let payload = data; + }; + } +} diff --git a/packages/events/decorators/handlers/index.ts b/packages/events/decorators/handlers/index.ts new file mode 100644 index 0000000..c37d4dd --- /dev/null +++ b/packages/events/decorators/handlers/index.ts @@ -0,0 +1,2 @@ +export * from './on-event.handlers'; +export * from './emit-event.handlers'; diff --git a/packages/events/decorators/handlers/on-event.handlers.ts b/packages/events/decorators/handlers/on-event.handlers.ts new file mode 100644 index 0000000..b835e91 --- /dev/null +++ b/packages/events/decorators/handlers/on-event.handlers.ts @@ -0,0 +1,107 @@ +import { EventOnClassOptions, EventOnMethodOptions } from '../../interface'; +import { EventManager } from '../../core'; +import { Event, IEventType, QualifiedEvent } from '../../types'; + +export namespace OnHandlers { + /** + * Registers all methods on the target class's prototype as event handlers for a given qualified event, + * but only for methods that have not already been subscribed. + * + * This function iterates over all properties of the target's prototype. For each property that is a function, + * it checks for a custom flag (`__sub__`) on the function. If the flag is not set, the method is bound to the + * target's prototype and subscribed to the event manager using the provided qualified event. This ensures + * that methods which have already been registered (and thus have the `__sub__` flag set to true) are not subscribed again. + * + * @param target - The class whose prototype methods will be registered as event handlers. + * @param qualified - The qualified event identifier (e.g., "server:start:pre") to which the methods will subscribe. + * @param eventManager - The instance of EventManager that will handle the subscription of event handlers. + * + * @remarks + * Methods that have been already subscribed should have the `__sub__` property set to `true` to avoid duplicate subscriptions. + */ + export function classDecorator(target: any, qualified: QualifiedEvent, eventManager: EventManager, options?: EventOnClassOptions) { + const prototype = target.prototype; + // 🚨 Validate that only one of 'pick' or 'omit' is used + if (options?.pick && options?.omit) { + throw new Error("Invalid decorator usage: You cannot use both 'pick' and 'omit' together."); + } + // Handle inheritance first + if (options?.inherit) { + const parent = Object.getPrototypeOf(target.prototype).constructor; + if (parent !== Object) { + classDecorator(parent, qualified, eventManager, options); + } + } + + // Normalize pick/omit to arrays + const pickArray = options?.pick ? (Array.isArray(options.pick) ? options.pick : [options.pick]) : []; + + const omitArray = options?.omit ? (Array.isArray(options.omit) ? options.omit : [options.omit]) : []; + + Reflect.ownKeys(prototype).forEach((propertyKey) => { + if (typeof propertyKey !== 'string' && typeof propertyKey !== 'symbol') return; + if (propertyKey === 'constructor') return; + + const handler = prototype[propertyKey]; + if (typeof handler !== 'function' || handler.__sub__) { + return; + } + + const methodName = propertyKey.toString(); + const isExplicitlyPicked = pickArray.length > 0 ? pickArray.includes(methodName) : true; + + const isExplicitlyOmitted = omitArray.includes(methodName); + + if (isExplicitlyPicked && !isExplicitlyOmitted) { + eventManager.subscribe(qualified, handler.bind(prototype)); + handler.__sub__ = true; + } + }); + } + + export function methodDecorator( + target: any, + propertyKey: string | symbol, + descriptor: PropertyDescriptor, + qualified: QualifiedEvent, + eventManager: EventManager, + options?: EventOnMethodOptions, + ) { + const originalMethod = descriptor.value; + + descriptor.value = async function (event: Event) { + try { + let transformedEvent = event; + + if (options?.transform) { + transformedEvent = options.transform(structuredClone(event)); + } + + if (options?.retry) { + let attempt = 0; + while (true) { + try { + return await originalMethod.call(this, transformedEvent); + } catch (error) { + // attempt++; + if (options.retry.delay > 0) { + await new Promise((resolve) => setTimeout(resolve, options.retry!.delay)); + } + + if (attempt > options.retry.max) { + throw error; + } + } + } + } + + return originalMethod.call(this, transformedEvent); + } catch (error) { + throw error; + } + }; + + descriptor.value.__sub__ = true; + eventManager.subscribe(qualified, descriptor.value.bind(target)); + } +} diff --git a/packages/events/decorators/index.ts b/packages/events/decorators/index.ts new file mode 100644 index 0000000..d2658b3 --- /dev/null +++ b/packages/events/decorators/index.ts @@ -0,0 +1,2 @@ +export * from './emit-event.decorator'; +export * from './on-event.decorator'; diff --git a/packages/events/decorators/on-event.decorator.ts b/packages/events/decorators/on-event.decorator.ts new file mode 100644 index 0000000..7a3d453 --- /dev/null +++ b/packages/events/decorators/on-event.decorator.ts @@ -0,0 +1,43 @@ +import 'reflect-metadata'; +import { EVENTS_METADATA } from '../constant'; +import { Constructor, determineDecoratorType, getConstructor } from '@gland/common'; +import { QualifiedEvent } from '../types'; +import { EventManager } from '../core'; +import { OnHandlers } from './handlers'; +import { EventOnClassOptions, EventOnMethodOptions } from '../interface'; +type EventOptions = T extends Constructor ? EventOnClassOptions : EventOnMethodOptions; + +function OnEvent(event: QualifiedEvent, options?: EventOptions): MethodDecorator & ClassDecorator { + return function (target: any, propertyKey?: string | symbol, descriptor?: PropertyDescriptor) { + const eventManager = Reflect.getMetadata(EVENTS_METADATA.EVENT_MANAGER, EventManager); + if (!eventManager) { + throw new Error('EventManager must be initialized before using @OnEvent decorator'); + } + const constructor = getConstructor(target); + + switch (determineDecoratorType(arguments)) { + case 'class': + OnHandlers.classDecorator(constructor, event, eventManager, options as EventOnClassOptions); + return target; + case 'method': + if (typeof descriptor === 'object' && descriptor !== null) { + OnHandlers.methodDecorator(constructor, propertyKey!, descriptor, event, eventManager, options as EventOnMethodOptions); + } else { + throw new Error('@OnEvent can only be used on methods with a valid PropertyDescriptor.'); + } + break; + default: + throw new Error('Invalid usage of @OnEvent decorator.'); + } + }; +} + +// Method-specific decorator +function OnMethod(event: QualifiedEvent, options?: EventOptions): MethodDecorator { + return OnEvent(event, options); +} +// Class-specific decorator +function OnClass(event: QualifiedEvent, options?: EventOptions): ClassDecorator { + return OnEvent(event, options); +} +export { OnClass, OnMethod }; diff --git a/packages/events/enums/context.enum.ts b/packages/events/enums/context.enum.ts new file mode 100644 index 0000000..46dcf49 --- /dev/null +++ b/packages/events/enums/context.enum.ts @@ -0,0 +1,8 @@ +export enum CtxKeys { + BUS = 'ctx:bus', + REGISTRY = 'ctx:registry', + IS_PROCESSED = 'ctx:processed', + CURRENT_PHASE = 'ctx:current_phase', + ERROR_PHASE = 'ctx:error_phase', + LAST_ERROR = 'ctx:last_error', +} diff --git a/packages/events/enums/event-phase.enum.ts b/packages/events/enums/event-phase.enum.ts new file mode 100644 index 0000000..c260f42 --- /dev/null +++ b/packages/events/enums/event-phase.enum.ts @@ -0,0 +1,21 @@ +export enum EventPhase { + PRE = 'pre', + + MAIN = 'main', + + POST = 'post', + + ERROR = 'error', + + ROLLBACK = 'rollback', + + FALLBACK = 'fallback', + + RETRY = 'retry', + + AUDIT = 'audit', + + NOTIFY = 'notify', + + VALIDATION = 'validation', +} diff --git a/packages/events/enums/event-type.enum.ts b/packages/events/enums/event-type.enum.ts new file mode 100644 index 0000000..aac22c5 --- /dev/null +++ b/packages/events/enums/event-type.enum.ts @@ -0,0 +1,35 @@ +export enum EventType { + SERVER_START = 'server:start', + SERVER_READY = 'server:ready', + SERVER_STOP = 'server:stop', + SERVER_ERROR = 'server:error', + SERVER_RESTART = 'server:restart', + SERVER_HEALTH_CHECK = 'server:health-check', + + REQUEST_START = 'request:start', + REQUEST_END = 'request:end', + REQUEST_ERROR = 'request:error', + REQUEST_TIMEOUT = 'request:timeout', + REQUEST_ABORTED = 'request:aborted', + + RESPONSE_START = 'response:start', + RESPONSE_END = 'response:end', + RESPONSE_ERROR = 'response:error', + + ROUTE_MATCHED = 'route:matched', + ROUTE_NOT_FOUND = 'route:not-found', + ROUTE_ERROR = 'route:error', + ROUTE_GUARD_CHECK = 'route:guard-check', + ROUTE_MIDDLEWARE_EXECUTION = 'route:middleware-execution', + ROUTE_HANDLER_EXECUTION = 'route:handler-execution', + + APP_BOOTSTRAP = 'app:bootstrap', + APP_READY = 'app:ready', + APP_SHUTDOWN = 'app:shutdown', + + WEBSOCKET_CONNECTED = 'websocket:connected', + WEBSOCKET_DISCONNECTED = 'websocket:disconnected', + WEBSOCKET_MESSAGE = 'websocket:message', + + ERROR = 'error', +} diff --git a/packages/events/enums/index.ts b/packages/events/enums/index.ts new file mode 100644 index 0000000..0acea3b --- /dev/null +++ b/packages/events/enums/index.ts @@ -0,0 +1,3 @@ +export * from './context.enum'; +export * from './event-type.enum'; +export * from './event-phase.enum'; diff --git a/packages/events/hooks/error.hook.ts b/packages/events/hooks/error.hook.ts new file mode 100644 index 0000000..035f79f --- /dev/null +++ b/packages/events/hooks/error.hook.ts @@ -0,0 +1,25 @@ +import { EventContext } from '../core'; +import { EventBus } from '../core/event-bus'; +import { EventPhase, EventType } from '../enums'; +import { EventHook } from '../interface'; +import { Event, Listener } from '../types'; +export class ErrorHook implements EventHook { + name = 'error'; + constructor(private context: EventContext) {} + async onError(error: Error, event: E, listener: Listener) { + const errorEvent: Event = { + type: EventType.ERROR, + phase: EventPhase.MAIN, + data: { + originalEvent: event, + listener: listener.name, + }, + isFailure: true, + isSuccess: false, + lifecycle: {}, + error: error, + }; + + this.context.get('ctx:bus')!.emit(errorEvent); + } +} diff --git a/packages/events/hooks/index.ts b/packages/events/hooks/index.ts new file mode 100644 index 0000000..bbc6e11 --- /dev/null +++ b/packages/events/hooks/index.ts @@ -0,0 +1 @@ +export * from './error.hook'; diff --git a/packages/events/index.ts b/packages/events/index.ts new file mode 100644 index 0000000..22f9fbe --- /dev/null +++ b/packages/events/index.ts @@ -0,0 +1,4 @@ +export * from './types'; +export * from './core'; +export * from './decorators'; +export * from './strategies'; diff --git a/packages/events/interface/decorators-options.interface.ts b/packages/events/interface/decorators-options.interface.ts new file mode 100644 index 0000000..3c8f368 --- /dev/null +++ b/packages/events/interface/decorators-options.interface.ts @@ -0,0 +1,45 @@ +import { Event, IEventType } from '../types'; +// On Decorators Options +export interface EventOnMethodOptions { + /** + * A transformation function to modify the event before the handler is called. + */ + transform?: (event: Event) => Event; + + /** + * Automatically retries the handler if it fails. + * - max: Maximum retry attempts. + * - delay: Time (ms) between retries. + */ + retry?: { max: number; delay: number }; +} +export interface EventOnClassOptions { + pick?: string[] | string; + + omit?: string[] | string; + + /** + * If true, the class will inherit all the events of its parent. + */ + inherit?: boolean; +} + +// Emit Decorators Options +export interface EventEmitMethodOptions { + + + /** + * Automatically retries the event publication if the decorated method fails. + * - max: Maximum retry attempts. + * - delay: Time (ms) between retries. + */ + retry?: { max: number; delay: number }; +} +export interface EventEmitClassOptions { + data:D + + /** + * If true, the class will inherit all the events of its parent. + */ + inherit?: boolean; +} diff --git a/packages/events/interface/event-strategy.interface.ts b/packages/events/interface/event-strategy.interface.ts new file mode 100644 index 0000000..110c3bc --- /dev/null +++ b/packages/events/interface/event-strategy.interface.ts @@ -0,0 +1,5 @@ +import { Event, Listener } from '../types'; + +export interface EventStrategy { + execute(event: E, listeners: Listener[]): Promise; +} diff --git a/packages/events/interface/event.interface.ts b/packages/events/interface/event.interface.ts new file mode 100644 index 0000000..d829bf8 --- /dev/null +++ b/packages/events/interface/event.interface.ts @@ -0,0 +1,5 @@ +import { EventContext } from '../core/event-context'; + +export interface EventContextFactory { + create(): EventContext; +} diff --git a/packages/events/interface/hook.interface.ts b/packages/events/interface/hook.interface.ts new file mode 100644 index 0000000..52fb3a0 --- /dev/null +++ b/packages/events/interface/hook.interface.ts @@ -0,0 +1,6 @@ +import { Event, Listener } from '../types'; +export interface EventHook { + name: string; + + onError?: (error: Error, event: Event, listener: Listener) => Promise | void; +} diff --git a/packages/events/interface/index.ts b/packages/events/interface/index.ts new file mode 100644 index 0000000..2c94d58 --- /dev/null +++ b/packages/events/interface/index.ts @@ -0,0 +1,4 @@ +export * from './event-strategy.interface'; +export * from './decorators-options.interface'; +export * from './event.interface'; +export * from './hook.interface'; diff --git a/packages/events/package.json b/packages/events/package.json new file mode 100644 index 0000000..a74de27 --- /dev/null +++ b/packages/events/package.json @@ -0,0 +1,19 @@ +{ + "name": "@gland/events", + "version": "1.0.0", + "author": "Mahdi", + "license": "MIT", + "engines": { + "node": ">= 20" + }, + "scripts": { + "build": "tsc" + }, + "dependencies": { + "@gland/common": "workspace:*", + "@gland/cache":"workspace:*" + }, + "devDependencies": { + "typescript": "^5.5.4" + } +} diff --git a/packages/events/pipeline/error-strategy.ts b/packages/events/pipeline/error-strategy.ts new file mode 100644 index 0000000..0ea47eb --- /dev/null +++ b/packages/events/pipeline/error-strategy.ts @@ -0,0 +1,15 @@ +import { EventContext } from '../core/event-context'; +import { EventPhase } from '../enums'; +import { Event } from '../types'; + +export class ErrorStrategy { + constructor(private context: EventContext) {} + + async handleError(error: Error, event: Event): Promise { + this.context.set('ctx:last_error', error); + this.context.set('ctx:error_phase', event.phase); + event.phase = EventPhase.ERROR; + event.phase = EventPhase.FALLBACK; + event.error = error; + } +} diff --git a/packages/events/pipeline/index.ts b/packages/events/pipeline/index.ts new file mode 100644 index 0000000..feecc4d --- /dev/null +++ b/packages/events/pipeline/index.ts @@ -0,0 +1,3 @@ +export * from './error-strategy'; +export * from './phase-manager'; +export * from './pipeline'; diff --git a/packages/events/pipeline/phase-manager.ts b/packages/events/pipeline/phase-manager.ts new file mode 100644 index 0000000..1f4ee4e --- /dev/null +++ b/packages/events/pipeline/phase-manager.ts @@ -0,0 +1,29 @@ +import { EventRegistry } from '../core/event-registry'; +import { EventStrategy } from '../interface'; +import { Event } from '../types'; + +export class PhaseManager { + constructor(private registry: EventRegistry, private strategy: EventStrategy) {} + + async executePhase(event: Event): Promise { + const listeners = this.registry.getListeners(event.type); + + try { + const start = event.lifecycle?.startedAt; + const end = new Date(); + event.lifecycle!.finishedAt = end; + event.lifecycle!.durationMs = end.getTime() - (start?.getTime() || end.getTime()); + event.isSuccess = true; + event.isFailure = false; + await this.strategy.execute(event, listeners); + } catch (error) { + event.isFailure = true; + event.isSuccess = false; + throw error; + } + } + + async transition(event: E): Promise { + await this.executePhase(event); + } +} diff --git a/packages/events/pipeline/pipeline.ts b/packages/events/pipeline/pipeline.ts new file mode 100644 index 0000000..7961ec3 --- /dev/null +++ b/packages/events/pipeline/pipeline.ts @@ -0,0 +1,17 @@ +import { PhaseManager } from './phase-manager'; +import { ErrorStrategy } from './error-strategy'; +import { Event } from '../types'; +import { EventContext } from '../core'; + +export class EventPipeline { + constructor(private phaseManager: PhaseManager, private errorStrategy: ErrorStrategy, private context: EventContext) {} + + async process(event: E): Promise { + try { + this.context.set('ctx:current_phase', event.phase); + await this.phaseManager.transition(event); + } catch (error) { + await this.errorStrategy.handleError(error, event); + } + } +} diff --git a/packages/events/strategies/broadcast.ts b/packages/events/strategies/broadcast.ts new file mode 100644 index 0000000..e53187f --- /dev/null +++ b/packages/events/strategies/broadcast.ts @@ -0,0 +1,8 @@ +import { EventStrategy } from '../interface'; +import { Event, Listener } from '../types'; + +export class BroadcastStrategy implements EventStrategy { + async execute(event: E, listeners: Listener[]): Promise { + await Promise.all(listeners.map((listener) => listener(event))); + } +} diff --git a/packages/events/strategies/immediate.ts b/packages/events/strategies/immediate.ts new file mode 100644 index 0000000..200bad9 --- /dev/null +++ b/packages/events/strategies/immediate.ts @@ -0,0 +1,10 @@ +import { EventStrategy } from '../interface'; +import { Event, Listener } from '../types'; + +export class ImmediateStrategy implements EventStrategy { + async execute(event: T, listeners: Listener[]): Promise { + for (const listener of listeners) { + await listener(event); + } + } +} diff --git a/packages/events/strategies/index.ts b/packages/events/strategies/index.ts new file mode 100644 index 0000000..e119c6c --- /dev/null +++ b/packages/events/strategies/index.ts @@ -0,0 +1,3 @@ +export * from './immediate'; +export * from './queue'; +export * from './broadcast'; diff --git a/packages/events/strategies/queue.ts b/packages/events/strategies/queue.ts new file mode 100644 index 0000000..3ce8cb7 --- /dev/null +++ b/packages/events/strategies/queue.ts @@ -0,0 +1,13 @@ +import { EventStrategy } from '../interface'; +import { Event, Listener } from '../types'; + +export class QueueStrategy implements EventStrategy { + private queue: Promise = Promise.resolve(); + + async execute(event: E, listeners: Listener[]): Promise { + for (const listener of listeners) { + this.queue = this.queue.then(() => listener(event)); + } + await this.queue; + } +} diff --git a/packages/events/tsconfig.json b/packages/events/tsconfig.json new file mode 100644 index 0000000..9260ed5 --- /dev/null +++ b/packages/events/tsconfig.json @@ -0,0 +1,19 @@ +{ + "extends": "../../tsconfig.json", + + "compilerOptions": { + "rootDir": ".", + "outDir": "." + }, + "files": [], + "include": ["**/*.ts"], + "exclude": ["node_modules"], + "references": [ + { + "path": "../common" + }, + { + "path": "../cache" + } + ] +} diff --git a/packages/events/types/context-keys.types.ts b/packages/events/types/context-keys.types.ts new file mode 100644 index 0000000..640daa6 --- /dev/null +++ b/packages/events/types/context-keys.types.ts @@ -0,0 +1,4 @@ +import { Primitive } from '@gland/common'; +import { CtxKeys } from '../enums'; +export type CtxKey = `${(typeof CtxKeys)[keyof typeof CtxKeys]}`; +export type CtxVal = T | Primitive | object | Array | Map | Set; diff --git a/packages/events/types/event.type.ts b/packages/events/types/event.type.ts new file mode 100644 index 0000000..bb73ce5 --- /dev/null +++ b/packages/events/types/event.type.ts @@ -0,0 +1,46 @@ +import { EventPhase, EventType } from '../enums'; +export type IEventPhase = (typeof EventPhase)[keyof typeof EventPhase]; + +export type IEventType = (typeof EventType)[keyof typeof EventType]; + +export type QualifiedEvent = `${IEventType}` | `${IEventType}:${IEventPhase}`; +/** + * Represents an event in Gland's event-driven system. + * @template T - The event type (e.g., "server:start"). + * @template P - The event phase (e.g., "pre" or "main"). + * @template D - The payload data type. + */ +export type Event = { + /** The event type (e.g., "server:start"). */ + type: T; + + /** The event phase (e.g., "pre" or "main"). */ + phase: P; + + /** The payload data associated with the event. */ + data: D; + + /** Event lifecycle timestamps */ + lifecycle?: { + /** When event processing started */ + startedAt?: Date; + /** When event processing finished */ + finishedAt?: Date; + /** Total processing duration */ + durationMs?: number; + }; + /** The error object if the event failed processing. */ + error?: Error; + + /** A flag indicating whether the event is a success event (e.g., for success notifications). */ + isSuccess?: boolean; + + /** A flag indicating whether the event is a failure event (e.g., for failure notifications). */ + isFailure?: boolean; +}; +export type Listener = (event: T) => void | Promise; + +export type EventFlow = { + [P in IEventPhase]: Event; +}; +export type EventStrategyType = 'queue' | 'broadcast' | 'immediate'; diff --git a/packages/events/types/index.ts b/packages/events/types/index.ts new file mode 100644 index 0000000..9095a38 --- /dev/null +++ b/packages/events/types/index.ts @@ -0,0 +1,2 @@ +export * from './event.type'; +export * from './context-keys.types'; diff --git a/packages/events/utils/event-mapper.ts b/packages/events/utils/event-mapper.ts new file mode 100644 index 0000000..a287136 --- /dev/null +++ b/packages/events/utils/event-mapper.ts @@ -0,0 +1,37 @@ +import { EventPhase } from '../enums'; +import { QualifiedEvent, IEventType, IEventPhase, Event } from '../types'; + +/** + * Utility class for event transformation and type mapping + */ +export class EventMapper { + static parseQualifiedType(qualified: QualifiedEvent): { + type: IEventType; + phase: IEventPhase; + } { + const parts = qualified.split(':'); + const possiblePhase = parts[parts.length - 1]; + if (Object.values(EventPhase).includes(possiblePhase as EventPhase)) { + return { + type: parts.slice(0, -1).join(':') as IEventType, + phase: possiblePhase as IEventPhase, + }; + } + return { + type: qualified as IEventType, + phase: EventPhase.MAIN, + }; + } + static createQualifiedEvent(event: E): QualifiedEvent { + return `${event.type}:${event.phase}`; + } + + static registryQualified(qualified: QualifiedEvent): QualifiedEvent { + const { phase, type } = this.parseQualifiedType(qualified); + const event = { + phase, + type, + }; + return this.createQualifiedEvent(event as any); + } +}