This repository has been archived by the owner on Feb 22, 2022. It is now read-only.
-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnanoasgi.py
171 lines (137 loc) · 4.78 KB
/
nanoasgi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import json
import re
from collections import defaultdict
from collections.abc import Mapping
from functools import partial
from typing import List, NamedTuple, Tuple, Union
from urllib.parse import parse_qsl
class MultiDict(Mapping):
def __init__(self, items):
self._data = defaultdict(list)
self._len = 0
for k, v in items:
self._data[self._transform_key(k.decode())].append(v.decode())
self._len += 1
@staticmethod
def _transform_key(key):
return key
def get_list(self, key):
return self._data[self._transform_key(key)]
def __getitem__(self, key):
try:
return self.get_list(key)[0]
except IndexError:
raise KeyError(key)
def __iter__(self):
return (k for k in self._data for _ in self._data[k])
def __len__(self):
return self._len
def __repr__(self):
return f'<{self.__class__.__name__}: {self._data!r}>'
class CaselessMultiDict(MultiDict):
@staticmethod
def _transform_key(key):
return key.lower()
class Request(NamedTuple):
path: str
method: str
headers: CaselessMultiDict
query: MultiDict
body: bytes
@property
def text(self) -> str:
return self.body.decode()
@property
def json(self) -> object:
return json.loads(self.body)
class Response(NamedTuple):
data: Union[bytes, str, list, dict, None] = None
status: int = 200
headers: List[Tuple[str, str]] = []
@property
def body(self) -> bytes:
if self.data is None:
return b''
elif isinstance(self.data, bytes):
return self.data
elif isinstance(self.data, str):
return self.data.encode()
elif isinstance(self.data, (dict, list)):
return json.dumps(self.data).encode()
else:
raise TypeError(type(self.data))
class App:
def __init__(self):
self._routes = []
self._listeners = {}
def route(self, method, path):
return partial(self._add_route, path, method)
def on(self, event):
return partial(self._add_event_listener, event)
def _add_route(self, path, method, handler):
param_re = r'{([a-zA-Z_][a-zA-Z0-9_]*)}'
path_re = re.sub(param_re, r'(?P<\1>\\w+)', path)
self._routes.append((re.compile(path_re), method, handler))
return handler
def _add_event_listener(self, event, listener):
self._listeners[event] = listener
return listener
async def __call__(self, scope, receive, send):
if scope['type'] == 'http':
await self.http_handler(scope, receive, send)
elif scope['type'] == 'lifespan':
await self.lifespan_handler(scope, receive, send)
async def http_handler(self, scope, receive, send):
response = await self._handle_request(scope, receive)
await send({
'type': 'http.response.start',
'status': response.status,
'headers': [(k.encode(), v.encode()) for k, v in response.headers],
})
await send({
'type': 'http.response.body',
'body': response.body,
})
async def _handle_request(self, scope, receive):
match = self._match(scope['path'])
if match is None:
return Response(status=404)
method, handler, params = match
if method != scope['method']:
return Response(status=405)
request = Request(
path=scope['path'],
method=scope['method'],
headers=CaselessMultiDict(scope['headers']),
query=MultiDict(parse_qsl(scope['query_string'])),
body=await self._read_request_body(receive),
)
return await handler(request, **params)
def _match(self, request_path):
for path, method, handler in self._routes:
m = path.match(request_path)
if m is not None:
return method, handler, m.groupdict()
@staticmethod
async def _read_request_body(receive):
body = bytearray()
while True:
msg = await receive()
body += msg['body']
if not msg.get('more_body'):
break
return bytes(body)
async def lifespan_handler(self, scope, receive, send):
while True:
msg = await receive()
msg_type = msg['type'].split('.')[-1]
listener = self._listeners.get(msg_type)
if listener is not None:
try:
await listener()
except Exception:
await send({'type': f'lifespan.{msg_type}.failed'})
raise
await send({'type': f'lifespan.{msg_type}.complete'})
if msg_type == 'shutdown':
break