Skip to content

Commit

Permalink
add: p queue
Browse files Browse the repository at this point in the history
  • Loading branch information
thutasann committed Feb 27, 2025
1 parent e3d35d1 commit b0ca446
Show file tree
Hide file tree
Showing 2 changed files with 86 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import { PQueue, Task } from '.';

function got(url: string): Promise<string> {
return new Promise((resolve) => {
setTimeout(() => {
resolve(`Fetched ${url}`);
}, 1000);
});
}

async function getUnicornTask(): Promise<Task<string>> {
return () =>
new Promise((resolve) => {
setTimeout(() => {
resolve('Unicorn task completed');
}, 1500);
});
}

const queue = new PQueue({ concurrency: 1 });

(async () => {
await queue.add(() => got('https://sindresorhus.com'));
console.log('Done: sindresorhus.com');
})();

(async () => {
await queue.add(() => got('https://avajs.dev'));
console.log('Done: avajs.dev');
})();

(async () => {
const task = await getUnicornTask();
await queue.add(task);
console.log('Done: Unicorn task');
})();
50 changes: 50 additions & 0 deletions data_structures/src/KeyPatterns/Queue/typescript/p-queue/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
export type Task<T> = () => Promise<T>;

export class PQueue<T = any> {
private concurrency: number;
private queue: Task<T>[] = [];
private activeCount = 0;

constructor({ concurrency = 1 } = {}) {
if (concurrency < 1) {
throw new Error('Concurrency must be atleast 1');
}
this.concurrency = concurrency;
}

private next(): void {
if (this.queue.length === 0 || this.activeCount >= this.concurrency) {
return;
}

const task = this.queue.shift()!;
this.activeCount++;

task()
.then(() => {
this.activeCount--;
this.next();
})
.catch((err) => {
console.error('Task error: ', err);
this.activeCount--;
this.next();
});
}

add(task: Task<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
const runTask = async () => {
try {
const result = await task();
resolve(result);
} catch (error) {
reject(error);
}
};

this.queue.push(runTask as unknown as Task<T>);
this.next();
});
}
}

0 comments on commit b0ca446

Please sign in to comment.