forked from pytorch/pytorch
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpython_op_test.py
245 lines (203 loc) · 8.95 KB
/
python_op_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
from caffe2.python import core, workspace
from caffe2.python.core import CreatePythonOperator
import caffe2.python.hypothesis_test_util as hu
from hypothesis import given, settings
import hypothesis.strategies as st
import numpy as np
class CustomError(Exception):
pass
def SubFunctionThatThrowsCustomError():
raise CustomError("This is an intentional exception.")
def MainOpFunctionThatThrowsCustomError(inputs, _):
return SubFunctionThatThrowsCustomError()
def MainOpFunctionThatThrowsCustomErrorInBuilder(inputs, _):
raise CustomError("This is an intentional exception in builder.")
def op_builder(name, index, extra):
iterations = [0]
assert name == 'name'
assert index == 5
assert extra - 4.2 < 0.0001
def my_op(inputs, outputs):
assert inputs[0].data[0] == iterations[0]
assert name == 'name'
assert index == 5
assert extra - 4.2 < 0.0001
iterations[0] += 1
return my_op
class PythonOpTest(hu.HypothesisTestCase):
@given(x=hu.tensor())
def test_feed(self, x):
def f(inputs, _):
self.assertEqual(x.shape, inputs[0].shape)
self.assertEqual(type(inputs[0].shape), tuple)
self.assertEqual(type(inputs[0].data), np.ndarray)
np.testing.assert_almost_equal(x, inputs[0].data)
op = CreatePythonOperator(f, ["x"], [])
workspace.FeedBlob("x", x)
workspace.RunOperatorOnce(op)
def test_exception(self):
op = CreatePythonOperator(MainOpFunctionThatThrowsCustomError, [], [])
with self.assertRaisesRegex(CustomError, "This is an intentional exception."):
workspace.RunOperatorOnce(op)
def test_exception_builder(self):
op = CreatePythonOperator(MainOpFunctionThatThrowsCustomErrorInBuilder, [], [])
with self.assertRaisesRegex(CustomError, "This is an intentional exception in builder."):
workspace.RunOperatorOnce(op)
@given(x=hu.tensor())
def test_feed_with_helper_function(self, x):
def f(inputs, _):
self.assertEqual(x.shape, inputs[0].shape)
self.assertEqual(type(inputs[0].shape), tuple)
self.assertEqual(type(inputs[0].data), np.ndarray)
np.testing.assert_almost_equal(x, inputs[0].data)
net = core.Net("test")
net.Python(f)(["x"], [])
workspace.FeedBlob("x", x)
workspace.RunNetOnce(net)
def test_builder_tuple(self):
net = core.Net("builder_template")
iter_blob = 'iter'
net.Python((op_builder, ['name', 5], {'extra': 4.2}))([iter_blob], [])
net.Python((op_builder, ['name', 5], {'extra': 4.2}))([iter_blob], [])
for repeat in range(2):
# check that the builder will be called exactly once for each
# PythonOp constructor. Cloning the net will also trigger a call
# to the builder when the net is created.
cloned_net = net.Clone('builder_%d' % repeat)
workspace.FeedBlob(iter_blob, np.array([0]))
# Builder gets called once per python op in the line below
workspace.CreateNet(cloned_net)
for i in range(10):
workspace.FeedBlob(iter_blob, np.array([i]))
workspace.RunNet(cloned_net)
@given(x=hu.tensor())
def test_feed_with_gc(self, x):
def f(inputs, _):
self.assertEqual(x.shape, inputs[0].shape)
np.testing.assert_almost_equal(x, inputs[0].data)
op = CreatePythonOperator(f, ["x"], [])
workspace.FeedBlob("x", x)
workspace.RunOperatorOnce(op)
del f
workspace.FeedBlob("x", x)
workspace.RunOperatorOnce(op)
@given(x=hu.tensor())
def test_reshape(self, x):
def f(inputs, outputs):
outputs[0].reshape(inputs[0].shape)
self.assertEqual(x.shape, inputs[0].shape)
self.assertEqual(x.shape, outputs[0].shape)
outputs[0].data[...] = inputs[0].data
op = CreatePythonOperator(f, ["x"], ["y"])
workspace.FeedBlob("x", x)
workspace.RunOperatorOnce(op)
y = workspace.FetchBlob("y")
np.testing.assert_almost_equal(x, y)
@given(x=hu.tensor())
def test_workspace_manipulation(self, x):
"""
Verify that python op can manipulate workspace directly
"""
def f(inputs, outputs, ws):
fetched = ws.blobs['internal'].fetch()
np.testing.assert_almost_equal(fetched, x)
ws = workspace.C.Workspace()
net = core.Net("test")
net.GivenTensorFill([], ['internal'], values=x, shape=x.shape)
net.Python(f, pass_workspace=True)([], [])
ws.run(net)
@given(x=hu.tensor())
def test_caught_exception_doesnt_terminate(self, x):
def f(inputs, outputs):
try:
raise Exception("Exception in handler")
except Exception:
pass
op = CreatePythonOperator(f, ["x"], ["y"])
workspace.FeedBlob("x", x)
workspace.RunOperatorOnce(op)
@given(x=hu.tensor(),
n=st.integers(min_value=1, max_value=20),
w=st.integers(min_value=1, max_value=20))
@settings(deadline=1000)
def test_multithreaded_evaluation(self, x, n, w):
def f(inputs, outputs):
outputs[0].reshape(inputs[0].shape)
outputs[0].data[...] = inputs[0].data
ops = [CreatePythonOperator(f, ["x"], [str(i)]) for i in range(n)]
net = core.Net("net")
net.Proto().op.extend(ops)
net.Proto().type = "dag"
net.Proto().num_workers = w
iters = 100
plan = core.Plan("plan")
plan.AddStep(core.ExecutionStep("test-step", net, iters))
workspace.FeedBlob("x", x)
workspace.RunPlan(plan.Proto().SerializeToString())
for i in range(n):
y = workspace.FetchBlob(str(i))
np.testing.assert_almost_equal(x, y)
@given(x=hu.tensor(), in_place=st.booleans(), **hu.gcs)
@settings(deadline=10000)
def test_gradient(self, x, in_place, gc, dc):
def f(inputs, outputs):
outputs[0].reshape(inputs[0].shape)
outputs[0].data[...] = inputs[0].data * 2
def grad_f(inputs, outputs):
# Ordering is [inputs, outputs, grad_outputs]
grad_output = inputs[2]
grad_input = outputs[0]
grad_input.reshape(grad_output.shape)
grad_input.data[...] = grad_output.data * 2
op = CreatePythonOperator(
f, ["x"], ["x" if in_place else "y"], grad_f=grad_f)
self.assertGradientChecks(gc, op, [x], 0, [0])
self.assertDeviceChecks(dc, op, [x], [0])
@given(inputs=hu.tensors(n=2), **hu.gcs)
@settings(deadline=10000)
def test_gradient_multiple(self, inputs, gc, dc):
(x1, x2) = inputs
def f(inputs, outputs):
for idx in [0, 1]:
self.assertEqual(type(inputs[idx].shape), tuple)
outputs[idx].reshape(inputs[idx].shape)
outputs[idx].data[...] = inputs[idx].data * 2
def grad_f(inputs, outputs):
# Ordering is [inputs, outputs, grad_outputs]
self.assertEqual(len(inputs), 6)
self.assertEqual(len(outputs), 2)
for (grad_output_idx, grad_input_idx) in [(4, 0), (5, 1)]:
grad_output = inputs[grad_output_idx]
grad_input = outputs[grad_input_idx]
grad_input.reshape(grad_output.shape)
grad_input.data[...] = grad_output.data * 2
op = CreatePythonOperator(f, ["x1", "x2"], ["y1", "y2"], grad_f=grad_f)
for idx in [0, 1]:
self.assertGradientChecks(gc, op, [x1, x2], idx, [0, 1])
self.assertDeviceChecks(dc, op, [x1, x2], [0, 1])
@given(inputs=hu.tensors(n=3), **hu.gcs)
@settings(deadline=10000)
def test_gradient_multiple_with_indices(self, inputs, gc, dc):
(x1, x2, x3) = inputs
def f(inputs, outputs):
for idx in [0, 1, 2]:
self.assertEqual(type(inputs[idx].shape), tuple)
outputs[idx].reshape(inputs[idx].shape)
outputs[idx].data[...] = inputs[idx].data * 2
def grad_f(inputs, outputs):
# Ordering is [inputs, outputs, grad_outputs]
self.assertEqual(len(inputs), 8)
self.assertEqual(len(outputs), 1)
for (grad_output_idx, grad_input_idx) in [(6, 0)]:
grad_output = inputs[grad_output_idx]
grad_input = outputs[grad_input_idx]
grad_input.reshape(grad_output.shape)
grad_input.data[...] = grad_output.data * 2
op = CreatePythonOperator(
f, ["x1", "x2", "x3"], ["y1", "y2", "y3"],
grad_f=grad_f,
grad_output_indices=[0, 2], # Receive grad outputs for y1 and y3
grad_input_indices=[0] # Produce grad inputs for x1
)
self.assertGradientChecks(gc, op, [x1, x2, x3], 0, [0, 2])
self.assertDeviceChecks(dc, op, [x1, x2, x3], [0, 1, 2])