-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhtishard
executable file
·100 lines (81 loc) · 2.64 KB
/
htishard
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/python
from time import sleep
from random import random
import os, sys
from socket import gethostname
from dask.distributed import Queue
import htpar
reload(htpar)
from random import randint
from distributed import Client, fire_and_forget
from collections import Counter
import pprint
client = Client("192.168.4.6:8786")
# client = Client()
print client
def procno(shard, nprocs, nshards):
return (shard * nprocs) // nshards
def read_fun(queues, src, nprocs, nshards):
def fix(x):
if isinstance(x, buffer):
x = str(x)
return x
count = 0
for record in htpar.tarrecords(src):
shard = count % nshards
record["__shard__"] = shard
proc = procno(shard, nprocs, nshards)
# queues[proc].put(dict(__shard__=shard, __key__=str(count), test="test"))
record = {k: fix(v) for k, v in record.items()}
queues[proc].put(record)
count += 1
# if count>100: break
return src
def write_fun(destinations, queue):
streams = {}
for shard, url in destinations.items():
print shard, url
streams[shard] = htpar.TarRecords(url)
while streams != {}:
record = queue.get()
if isinstance(record, int):
print "closing", record
streams[record].close()
streams[record] = None
del streams[record]
continue
shard = record.get("__shard__")
assert shard is not None
stream = streams.get(shard)
assert stream is not None
stream.write(record["__key__"], record)
print "done", destinations
return "OK"
inputs = "http://192.168.4.6/data/minio/ocr/linegen-lo-train-@000006.tgz"
inputs = htpar.path_shards(inputs)
nprocs = 2
outputs = "http://192.168.4.6/data/temp/_test-@000010.tgz"
shards = list(htpar.path_shards(outputs))
nshards = len(shards)
print "nprocs", nprocs, "nshards", nshards
dests = [{} for i in range(nprocs)]
for i, s in enumerate(shards):
dests[procno(i, nprocs, nshards)][i] = s
print "distribution:", Counter([len(x.keys()) for x in dests])
print "starting output processes"
queues = [None] * nprocs
results = [None] * nprocs
for i in range(nprocs):
queues[i] = Queue()
results[i] = client.submit(write_fun, dests[i], queues[i])
print i
pprint.pprint(dests[i])
print "starting input processes"
iresults = []
for i, src in enumerate(inputs):
iresults.append(client.submit(read_fun, queues, src, nprocs, nshards))
print "iresults", client.gather(iresults)
print "finishing output processes"
for i in range(nshards):
queues[procno(i, nprocs, len(shards))].put(i)
print "results", Counter(client.gather(results))