-
Notifications
You must be signed in to change notification settings - Fork 141
/
Copy pathhttp_proxy.py
65 lines (52 loc) · 1.81 KB
/
http_proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
"""Http Proxy to be used as provider url in verifier."""
import warnings
from fastapi import FastAPI, status, Request, HTTPException
import uvicorn as uvicorn
import logging
log = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG)
app = FastAPI()
PROXY_PORT = 1234
UVICORN_LOGGING_LEVEL = "error"
items = {
"states": None
}
def _match_states(payload):
"""Match states in payload against stored message handlers."""
log.debug(f'Find handler from payload: {payload}')
handlers = items["states"]
states = handlers['messageHandlers']
log.debug(f'Setup states: {handlers}')
provider_states = payload['providerStates']
for state in provider_states:
matching_state = state['name']
if matching_state in states:
return states[matching_state]
raise HTTPException(status_code=500, detail='No matched handler.')
@app.post("/")
async def root(request: Request):
"""Match states with provided message handlers."""
payload = await request.json()
message = _match_states(payload)
return {'contents': message}
@app.get('/ping', status_code=status.HTTP_200_OK)
def ping():
"""Check whether the server is available before setting up states."""
return {"ping": "pong"}
@app.post("/setup", status_code=status.HTTP_201_CREATED)
async def setup(request: Request):
"""Endpoint to setup states.
Use localstack to store payload.
"""
payload = await request.json()
items["states"] = payload
return items["states"]
def run_proxy():
"""Rub HTTP Proxy."""
warnings.warn(
"This class will be deprecated Pact Python v3 "
"(see pact-foundation/pact-python#396)",
PendingDeprecationWarning,
stacklevel=2,
)
uvicorn.run("pact.http_proxy:app", port=PROXY_PORT, log_level=UVICORN_LOGGING_LEVEL)