-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathprocess.js
141 lines (117 loc) · 3.21 KB
/
process.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
const { EventEmitter } = require('events')
const Promise = require('any-promise')
const co = require('co').wrap
const pump = require('pump')
const through = require('through2')
const merge = require('merge2')
const duplexify = require('duplexify')
const extend = require('xtend/mutable')
const reorder = require('./sort-transform')
const { MAX_INT } = require('./utils')
const AsyncEmitter = require('./async-emitter')
const createGates = require('./gates')
module.exports = function processMultiqueue ({ multiqueue, worker }) {
const streams = {}
const source = multiqueue.createReadStream({ live: true })
const gates = createGates()
const mainGate = createGate()
const splitter = through.obj(function (data, enc, cb) {
const { queue } = data
if (!streams[queue]) {
streams[queue] = createSortingStream(queue)
pump(
streams[queue],
createGate(queue),
createWorkerStream(queue),
function (err) {
if (err) api.emit('error', err)
}
)
}
streams[queue].write(data)
cb()
})
const work = pump(
source,
mainGate,
splitter
)
function createGate (queue) {
return through.obj(function (data, enc, cb) {
if (gates.isOpen(queue)) return cb(null, data)
gates.awaitOpen(queue).then(() => cb(null, data))
})
}
function createSortingStream (queue) {
const getCheckpoint = multiqueue.queue(queue).checkpoint()
let checkpoint
let sortStream
// TODO:
//
// make this and createWorkerStream more efficient
// currently it creates too many promises (at least one per item!)
const ensureCheckpoint = through.obj({ highWaterMark: MAX_INT }, co(function* (data, enc, cb) {
if (typeof checkpoint === 'undefined') {
checkpoint = yield getCheckpoint
sortStream = reorder({
getPosition: data => data.seq,
start: checkpoint + 1
})
duplex.setReadable(sortStream)
}
sortStream.write(data)
cb(null, data)
}))
const duplex = duplexify(null, null, { objectMode: true })
duplex.setWritable(ensureCheckpoint)
return duplex
}
function createWorkerStream (queue) {
return through.obj({ highWaterMark: 0 }, co(function* (data, enc, cb) {
const { key, value } = data
try {
if (!gates.isOpen(queue)) {
yield gates.awaitOpen(queue)
}
const maybePromise = worker({ queue, value })
if (isPromise(maybePromise)) yield maybePromise
yield multiqueue.queue(queue).dequeue()
} catch (err) {
return cb(err)
}
api.emitAsync('processed', data)
cb()
}))
}
function start (queue) {
if (stopped) {
throw new Error('am stopped. You might want to use pause/resume instead of start/stop')
}
gates.open(queue)
return api
}
function pause (queue) {
gates.close(queue)
return api
}
function stop (queue) {
pause(queue)
if (!queue) {
work.end()
stopped = true
}
return api
}
let stopped
const api = new AsyncEmitter()
extend(api, {
start,
resume: start,
pause,
stop
})
return api
}
function isPromise (obj) {
return obj && typeof obj.then === 'function'
}