Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/add support and example messenger provider using rust engine 379 #700

Closed
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions examples/src/message_producer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Message producer for non-HTTP interactions.

This modules implements a very basic message producer which could
send to an eventing system, such as Kafka, or a message queue.
"""

from __future__ import annotations

import enum
import json
from typing import Literal, NamedTuple


class FileSystemAction(enum.Enum):
"""
Represents a file system action.
"""
READ = "READ"
WRITE = "WRITE"

class FileSystemEvent(NamedTuple):
"""
Represents a file system event.
"""
action: Literal[FileSystemAction.READ, FileSystemAction.WRITE]
path: str
contents: str | None

class MockMessageQueue:
"""
A mock message queue.
"""
def __init__(self) -> None:
"""
Initialize the message queue.
"""
self.messages = []

def send(self, message: str) -> None:
"""
Send a message to the queue.

Args:
message: The message to send.
"""
self.messages.append(message)

class FileSystemMessageProducer:
"""
A message producer for file system events.
"""
def __init__(self) -> None:
"""
Initialize the message producer.
"""
self.queue = MockMessageQueue()

def send_to_queue(self, message: FileSystemEvent) -> None:
"""
Send a message to a message queue.

:param message: The message to send.
"""
self.queue.send(json.dumps({
"action": message.action.value,
"path": message.path,
"contents": message.contents,
}))


def send_write_event(self, filename: str, contents: str) -> None:
"""
Send a write event to a message queue.

Args:
filename: The name of the file.
contents: The contents of the file.
"""
message = FileSystemEvent(
action=FileSystemAction.WRITE,
path=filename,
contents=contents,
)
self.send_to_queue(message)

def send_read_event(self, filename: str) -> None:
"""
Send a read event to a message queue.

:param filename: The name of the file.
"""
message = FileSystemEvent(
action=FileSystemAction.READ,
path=filename,
contents=None,
)
self.send_to_queue(message)
257 changes: 257 additions & 0 deletions examples/tests/provider_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
"""
HTTP Server to route message requests to message producer function.
"""

from __future__ import annotations

import logging
import re
import signal
import socket
import subprocess
import sys
import time
from contextlib import closing, contextmanager
from importlib import import_module
from pathlib import Path
from threading import Thread
from typing import Generator, NoReturn, Tuple, Union

import requests

sys.path.append(str(Path(__file__).parent.parent.parent))

import flask
from yarl import URL

logger = logging.getLogger(__name__)


class Provider:
"""
Provider class to route message requests to message producer function.

Sets up three endpoints:
- /_test/ping: A simple ping endpoint for testing.
- /produce_message: Route message requests to the handler function.
- /set_provider_state: Set the provider state.

The specific `produce_message` and `set_provider_state` URLs can be configured
with the `produce_message_url` and `set_provider_state_url` arguments.
"""

def __init__( # noqa: PLR0913
self,
handler_module: str,
handler_function: str,
produce_message_url: str,
state_provider_module: str,
state_provider_function: str,
set_provider_state_url: str,
) -> None:
"""
Initialize the provider.

Args:
handler_module:
The name of the module containing the handler function.
handler_function:
The name of the handler function.
produce_message_url:
The URL to route message requests to the handler function.
state_provider_module:
The name of the module containing the state provider setup function.
state_provider_function:
The name of the state provider setup function.
set_provider_state_url:
The URL to set the provider state.
"""
self.app = flask.Flask("Provider")
self.handler_function = getattr(
import_module(handler_module), handler_function
)
self.produce_message_url = produce_message_url
self.set_provider_state_url = set_provider_state_url
if (state_provider_module):
self.state_provider_function = getattr(
import_module(state_provider_module),
state_provider_function
)

@self.app.get("/_test/ping")
def ping() -> str:
"""Simple ping endpoint for testing."""
return "pong"

@self.app.route(self.produce_message_url, methods=["POST"])
def produce_message() -> Union[str, Tuple[str, int]]:
"""
Route a message request to the handler function.

Returns:
The response from the handler function.
"""
try:
body, content_type = self.handler_function()
return flask.Response(
response=body,
status=200,
content_type=content_type,
direct_passthrough=True,
)
except Exception as e: # noqa: BLE001
return str(e), 500

@self.app.route(self.set_provider_state_url, methods=["POST"])
def set_provider_state() -> Tuple[str, int]:
"""
Calls the state provider function with the state provided in the request.

Returns:
A response indicating that the state has been set.
"""
if self.state_provider_function:
self.state_provider_function(flask.request.args["state"])
return "Provider state set", 200

def _find_free_port(self) -> int:
"""
Find a free port.

This is used to find a free port to host the API on when running locally. It
is allocated, and then released immediately so that it can be used by the
API.

Returns:
The port number.
"""
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
s.bind(("", 0))
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
return s.getsockname()[1]

def run(self) -> None:
"""
Start the provider.
"""
url = URL(f"http://localhost:{self._find_free_port()}")
sys.stderr.write(f"Starting provider on {url}\n")

self.app.run(
host=url.host,
port=url.port,
debug=True,
)


def start_provider(**kwargs: dict([str, str])) -> Generator[URL, None, None]: # noqa: C901
"""
Start the provider app.

Expects kwargs to to contain the following:
handler_module: Required. The name of the module containing
the handler function.
handler_function: Required. The name of the handler function.
produce_message_url: Optional. The URL to route message requests to
the handler function.
state_provider_module: Optional. The name of the module containing
the state provider setup function.
state_provider_function: Optional. The name of the state provider
setup function.
set_provider_state_url: Optional. The URL to set the provider state.
"""
process = subprocess.Popen(
[ # noqa: S603
sys.executable,
Path(__file__),
kwargs.pop("handler_module"),
kwargs.pop("handler_function"),
kwargs.pop("produce_message_url", "/produce_message"),
kwargs.pop("state_provider_module", ""),
kwargs.pop("state_provider_function", ""),
kwargs.pop("set_provider_state_url", "/set_provider_state"),
],
cwd=Path.cwd(),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
)

pattern = re.compile(r" \* Running on (?P<url>[^ ]+)")
while True:
if process.poll() is not None:
logger.error("Provider process exited with code %d", process.returncode)
logger.error(
"Provider stdout: %s", process.stdout.read() if process.stdout else ""
)
logger.error(
"Provider stderr: %s", process.stderr.read() if process.stderr else ""
)
msg = f"Provider process exited with code {process.returncode}"
raise RuntimeError(msg)
if (
process.stderr
and (line := process.stderr.readline())
and (match := pattern.match(line))
):
break
time.sleep(0.1)

url = URL(match.group("url"))
logger.debug("Provider started on %s", url)
for _ in range(50):
try:
response = requests.get(str(url / "_test" / "ping"), timeout=1)
assert response.text == "pong"
break
except (requests.RequestException, AssertionError):
time.sleep(0.1)
continue
else:
msg = "Failed to ping provider"
raise RuntimeError(msg)

def redirect() -> NoReturn:
while True:
if process.stdout:
while line := process.stdout.readline():
logger.debug("Provider stdout: %s", line.strip())
if process.stderr:
while line := process.stderr.readline():
logger.debug("Provider stderr: %s", line.strip())

thread = Thread(target=redirect, daemon=True)
thread.start()

try:
yield url
finally:
process.send_signal(signal.SIGINT)


start_provider_context = contextmanager(start_provider)

if __name__ == "__main__":
import sys

if len(sys.argv) < 5: # noqa: PLR2004
sys.stderr.write(
f"Usage: {sys.argv[0]} <state_provider_module> <state_provider_function> "
f"<handler_module> <handler_function>"
)
sys.exit(1)

handler_module = sys.argv[1]
handler_function = sys.argv[2]
produce_message_url = sys.argv[3]
state_provider_module = sys.argv[4]
state_provider_function = sys.argv[5]
set_provider_state_url = sys.argv[6]
Provider(
handler_module,
handler_function,
produce_message_url,
state_provider_module,
state_provider_function,
set_provider_state_url,
).run()
2 changes: 2 additions & 0 deletions examples/tests/test_01_provider_fastapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from __future__ import annotations

import time
from multiprocessing import Process
from typing import Any, Dict, Generator, Union
from unittest.mock import MagicMock
Expand Down Expand Up @@ -145,6 +146,7 @@ def test_against_broker(broker: URL, verifier: Verifier) -> None:

For an example of the consumer's contract, see the consumer's tests.
"""
time.sleep(2) # give the broker time to start
code, _ = verifier.verify_with_broker(
broker_url=str(broker),
# Despite the auth being set in the broker URL, we still need to pass
Expand Down
2 changes: 2 additions & 0 deletions examples/tests/test_01_provider_flask.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from __future__ import annotations

import time
from multiprocessing import Process
from typing import Any, Dict, Generator, Union
from unittest.mock import MagicMock
Expand Down Expand Up @@ -133,6 +134,7 @@ def test_against_broker(broker: URL, verifier: Verifier) -> None:

For an example of the consumer's contract, see the consumer's tests.
"""
time.sleep(2) # give the broker time to start
code, _ = verifier.verify_with_broker(
broker_url=str(broker),
# Despite the auth being set in the broker URL, we still need to pass
Expand Down
Loading
Loading