-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathtxwebtest.py
152 lines (123 loc) · 5.36 KB
/
txwebtest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
from cStringIO import StringIO
from twisted.internet import defer
from twisted.web.server import Site, NOT_DONE_YET
from twisted.web.test.test_web import DummyRequest
from twisted.web.http_headers import Headers
from urlparse import parse_qs
class TestClient(object):
def __init__(self, resource):
self.site = Site(resource)
def get(self, path_with_params, status=None):
return self.request(path_with_params, method='GET', status=status)
def post(self, path_with_params, content=None, content_type=None, status=None):
return self.request(path_with_params, method='POST', content=content,
content_type=content_type, status=status)
def delete(self, path_with_params, status=None):
return self.request(path_with_params, method='DELETE', status=status)
def patch(self, path_with_params, content=None, content_type=None, status=None):
return self.request(path_with_params, method='PATCH', content=content,
content_type=content_type, status=None)
def put(self, path_with_params, content=None, content_type=None, status=None):
return self.request(path_with_params, method='PUT', content=content,
content_type=content_type, status=status)
def head(self, path_with_params, status=None):
return self.request(path_with_params, method='HEAD', status=status)
def options(self, path_with_params, status=None):
return self.request(path_with_params, method='OPTIONS', status=status)
@defer.inlineCallbacks
def request(self, req_or_path_with_params, status=None, **request_kwargs):
if isinstance(req_or_path_with_params, TestRequest):
req = req_or_path_with_params
else:
path, params = self.parse_url(req_or_path_with_params)
req = TestRequest(path, params=params, **request_kwargs)
resp = yield self._handle_request(req)
if status is not None:
assert resp.status_code == status, resp.status_code
defer.returnValue(resp)
def _handle_request(self, request):
''' Resolves a test request. '''
finished = request.notifyFinish()
def extract_response(none):
return TestResponse(request.responseCode or 200, request.responseHeaders,
''.join(request.written))
def _render(resource):
result = resource.render(request)
if isinstance(result, str):
request.write(result)
request.finish()
return finished
elif result is NOT_DONE_YET:
return finished
else:
raise ValueError("Unexpected return value: %r" % (result,))
resource_handler = self.site.getResourceFor(request)
d = _render(resource_handler)
d.addCallback(extract_response)
return d
def parse_url(self, path_with_params):
parts = path_with_params.split('?')
if len(parts) == 2:
path, param_str = parts
params = parse_qs(param_str)
else:
path, = parts
params = {}
return path, params
class TestRequest(DummyRequest):
''' DummyRequest just isn't good enough for klein. '''
def __init__(self, path, method, content_type=None, params=None, content=None, headers=None):
super(TestRequest, self).__init__(path.split('/'))
self._finishedDeferreds = []
self.requestHeaders = Headers(headers or {})
for k, v in (headers or {}).items():
self.responseHeaders.addRawHeader(k.lower(), v)
self.method = method
self.path = path
self.code = None
self.received_headers = {}
self.uri = path
self.redirect_url = None
self.postpath = path.split('/')
if not self.postpath[0]:
self.postpath.pop(0)
self.args = params or {}
# To simulate a real request with query string params (the encoding is usually resulting in string literals)
for k, v in self.args.items():
# expecting "v" to always be of type "list"
self.args[k] = [str(item) for item in v]
self.content = StringIO(content or '')
if content:
self.content_type = content_type or 'application/x-www-form-urlencoded'
if self.content_type == 'application/x-www-form-urlencoded':
if isinstance(content, dict):
self.args.update(content)
else:
self.args.update(parse_qs(content))
else:
self.content_type = None
def getRequestHostname(self, *args, **kwargs):
return 'localhost'
def isSecure(self):
return False
class TestResponse(object):
def __init__(self, status_code, headers, body):
self.status_code = status_code
self.headers = headers
self.body = body
def get_header(self, header):
values = self.headers.getRawHeaders(header.lower())
if values:
return values[0]
return None
@property
def text(self):
return self.body
try:
# Klein uses zope to restrict what types it accepts, for some reason.
from klein.interfaces import IKleinRequest
from klein.app import KleinRequest
from twisted.python.components import registerAdapter
registerAdapter(KleinRequest, TestRequest, IKleinRequest)
except ImportError:
pass