-
Notifications
You must be signed in to change notification settings - Fork 23
/
Copy pathsum.ts
79 lines (66 loc) · 1.99 KB
/
sum.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import type {
EagerCollection,
Mapper,
Resource,
Values,
} from "@skipruntime/core";
import { runService } from "@skipruntime/server";
const platform: "wasm" | "native" =
process.env["SKIP_PLATFORM"] == "native" ? "native" : "wasm";
class AddIndex implements Mapper<string, number, string, [number, number]> {
constructor(private index: number) {}
mapEntry(
key: string,
values: Values<number>,
): Iterable<[string, [number, number]]> {
return [[key, [this.index, values.getUnique()]]];
}
}
class Plus implements Mapper<string, number, string, number> {
mapEntry(key: string, values: Values<number>): Iterable<[string, number]> {
return [[key, values.toArray().reduce((p, c) => p + c, 0)]];
}
}
class Minus implements Mapper<string, [number, number], string, number> {
mapEntry(
key: string,
values: Values<[number, number]>,
): Iterable<[string, number]> {
const acc = (p: number | null, c: [number, number]) => {
return p !== null ? p - c[1] : c[1];
};
return [[key, values.toArray().sort().reduce(acc, null) ?? 0]];
}
}
type Collections = {
input1: EagerCollection<string, number>;
input2: EagerCollection<string, number>;
};
class Add implements Resource<Collections> {
instantiate(cs: Collections): EagerCollection<string, number> {
return cs.input1.merge(cs.input2).map(Plus);
}
}
class Sub implements Resource<Collections> {
instantiate(cs: Collections): EagerCollection<string, number> {
return cs.input1
.map(AddIndex, 0)
.merge(cs.input2.map(AddIndex, 1))
.map(Minus);
}
}
const service = {
initialData: { input1: [], input2: [] },
resources: { add: Add, sub: Sub },
createGraph: (inputs: Collections) => inputs,
};
const server = await runService(service, {
control_port: 3588,
streaming_port: 3587,
platform,
});
async function shutdown() {
await server.close();
}
// eslint-disable-next-line @typescript-eslint/no-misused-promises
["SIGTERM", "SIGINT"].map((sig) => process.on(sig, shutdown));