-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhtmapper
executable file
·104 lines (90 loc) · 3.07 KB
/
htmapper
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#!/usr/bin/python
import argparse
import sys
import os
import os.path
import re
import StringIO
import tarfile
import warnings
import time
import imp
import tempfile
from dask.distributed import Client
from contextlib import closing
import htpar
def process_paths(iname, oname, source, keys):
"""Process a tar input file to a tar output file."""
assert callable(keys)
with tempfile.NamedTemporaryFile(suffix=".py") as stream:
stream.write(source)
stream.flush()
# print stream.name
pmod = imp.load_source("processing_mod", stream.name)
count = 0
storage = htpar.storage
istream = storage.open_read(iname)
assert istream is not None, iname
reader = htpar.tarrecords(istream, keys=keys)
ostream = storage.open_write(oname)
assert ostream is not None, oname
writer = htpar.TarRecords(ostream)
for record in reader:
# if count%10 == 0: print count
result = pmod.process_sample(record)
if result is None:
continue
if "__key__" not in result:
warnings.warn("no __key__ in record")
continue
writer.write(result["__key__"], result)
count += 1
del istream
del ostream
del pmod
return (iname, oname)
def split_sharded_path(path):
"""Split a path containing shard notation into prefix, format, suffix, and number."""
match = re.search(r"^(.*)@([0-9]+)(.*)", path)
if not match:
return path, None
prefix = match.group(1)
num = int(match.group(2))
suffix = match.group(3)
fmt = "%%0%dd" % len(match.group(2))
return prefix+fmt+suffix, num
if __name__=="__main__":
parser = argparse.ArgumentParser("Mapping sharded tar files.")
parser.add_argument("-p", "--process", default="process_sample.py")
parser.add_argument("-d", "--dask", default=None)
parser.add_argument("-k", "--keys", default="base_plus_ext")
parser.add_argument("inputs")
parser.add_argument("outputs")
args = parser.parse_args()
## Function used for splitting tar file names into record key and sample keys.
keys = htpar.get_keyfun(args.keys)
assert callable(keys)
## Load the record processing code.
with open(args.process) as stream:
source = stream.read()
pmod = imp.load_source("processing_mod", stream.name)
assert "process_sample" in dir(pmod), \
"{} does not define process_sample function".format(args.process)
## Compute the set of paths to iterate over.
ifmt, ninputs = split_sharded_path(args.inputs)
ofmt, noutputs = split_sharded_path(args.outputs)
assert ninputs == noutputs
if ninputs is None:
inputs = [ifmt]
outputs = [ofmt]
else:
inputs = [ifmt % i for i in range(ninputs)]
outputs = [ofmt % i for i in range(noutputs)]
## Map over the path pairs in parallel.
client = Client(args.dask)
print client
result = client.map(process_paths, inputs, outputs,
[source]*len(inputs),
[keys]*len(inputs))
## Wait for results.
print client.gather(result)