-
Notifications
You must be signed in to change notification settings - Fork 37
/
Copy pathsimDistributedQueue.py
137 lines (115 loc) · 3.94 KB
/
simDistributedQueue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import time
import random
import threading
# TODO: get fifo working
printLock = threading.Lock()
def message(s):
if False:
with printLock:
print(s)
class Node(threading.Thread):
def __init__(self, id, nodes):
threading.Thread.__init__(self)
self.id = id
self.queueLock = threading.Condition()
self.queryQueue = {}
self.queryDone = []
self.nodes = nodes
def addQuery(self, queryID, text):
message('N%d: add query Q%s' % (self.id, queryID))
with self.queueLock:
if queryID in self.queryQueue:
if self.queryQueue[queryID][1] == 0:
raise RuntimeError('duplicated query ID %s' % queryID)
else:
x = self.queryQueue.pop(queryID)
self.queryDone.append((queryID, text, x[2]))
else:
self.queryQueue[queryID] = [text, 0]
self.queueLock.notify()
def takeNextQuery(self):
with self.queueLock:
while True:
for queryID, l in self.queryQueue.items():
text, state = l
if state == 0:
# 2 means "I am taking this query"
l[1] = 2
return queryID
self.queueLock.wait()
def takeQuery(self, queryID, remoteNodeID):
with self.queueLock:
l = self.queryQueue.get(queryID)
if l is not None:
if l[1] in (0, 1) or remoteNodeID < self.id:
# Resolve race condition by always yielding to the lower numbered node ID
self.queryDone.append((queryID, l[0], remoteNodeID))
del self.queryQueue[queryID]
return True
else:
if remoteNodeID == self.id:
raise RuntimeError('duplicated node ID %s' % remoteNodeID);
message('N%d: reject take query Q%d' % (self.id, queryID))
return False
else:
# It's a new query, that hasn't yet been enqueued here:
message('N%d: take brand new query Q%d' % (self.id, queryID))
self.queryQueue[queryID] = [None, 1, remoteNodeID]
return True
def finishedQuery(self, queryID):
with self.queueLock:
x = self.queryQueue.pop(queryID)
self.queryDone.append((queryID, x[0], self.id))
message('N%d: finishQuery Q%s; now %s done' % (self.id, queryID, len(self.queryDone)))
def run(self):
while True:
# Find a query we can try to take:
queryID = self.takeNextQuery()
message('N%d: try to take Q%s' % (self.id, queryID))
# Ask the other nodes if it's OK if we take this one:
for node in self.nodes:
if node != self:
if not node.takeQuery(queryID, self.id):
message('N%d: failed to take Q%d' % (self.id, queryID))
break
else:
# OK we execute the query!
message('N%s: run query Q%s' % (self.id, queryID))
sleepTimeUS = random.randint(0, 1000)
time.sleep(sleepTimeUS/1000000.)
self.finishedQuery(queryID)
def main():
numNodes = 2
nodes = []
for nodeID in range(numNodes):
message('start node %s' % nodeID)
node = Node(nodeID, nodes)
nodes.append(node)
node.start()
for queryID in range(1000):
for node in nodes:
node.addQuery(queryID, 'text')
sleepTimeUS = random.randint(0, 1000)
time.sleep(sleepTimeUS/1000000.)
while True:
for node in nodes:
if len(node.queryQueue) > 0:
time.sleep(0.1)
message('N%s still has %d queries in its queue...' % (node.id, len(node.queryQueue)))
with node.queueLock:
for queryID, l in node.queryQueue.items():
message(' query Q%d: flag %s' % (queryID, l[1]))
break
else:
break
seen = set()
for node in nodes:
message('N%d finished %d queries' % (node.id, len(node.queryDone)))
for queryID, queryText, remoteNodeID in node.queryDone:
if remoteNodeID == node.id:
if queryID in seen:
raise RuntimeError('query Q%s ran more than once' % queryID)
seen.add(queryID)
print('done!')
if __name__ == '__main__':
main()