diff --git a/README.md b/README.md index d90135d..457ea8e 100644 --- a/README.md +++ b/README.md @@ -16,4 +16,58 @@ neon_users_service: ``` `module` defines the backend to use and a config key matching that backend -will specify the kwargs passed to the initialization of that module. \ No newline at end of file +will specify the kwargs passed to the initialization of that module. + +## MQ Integration +The `mq_connector` module provides an MQ entrypoint to services and is the +primary method of interaction with this service. Valid requests are detailed +below. Responses will always follow the form: + +```yaml +success: False +error: +``` + +```yaml +success: True +user: +``` + +### Create +Create a new user by sending a request with the following parameters: +```yaml +operation: create +username: +password: +user: +``` + +### Read +Read an existing user. If `password` is not supplied, then the returned User +object will have the `password_hash` and `tokens` config redacted. +```yaml +operation: read +username: +password: +``` + +### Update +Update an existing user. If a `password` is supplied, it will replace the +user's current password. If no `password` is supplied and `user.password_hash` +is updated, the database entry will be updated with that new value. + +```yaml +operation: update +username: +password: +user: +``` + +### Delete +Delete an existing user. This requires that the supplied `user` object matches +an entry in the database exactly for validation. +```yaml +operation: delete +username: +user: +``` diff --git a/neon_users_service/mq_connector.py b/neon_users_service/mq_connector.py new file mode 100644 index 0000000..b2c062f --- /dev/null +++ b/neon_users_service/mq_connector.py @@ -0,0 +1,110 @@ +from typing import Optional + +import pika.channel +from ovos_config.config import Configuration +from ovos_utils import LOG + +from neon_mq_connector.connector import MQConnector +from neon_mq_connector.utils.network_utils import b64_to_dict, dict_to_b64 +from neon_users_service.models import MQRequest, User + +from neon_users_service.service import NeonUsersService + + +class NeonUsersConnector(MQConnector): + def __init__(self, config: Optional[dict], service_name: str = "neon_users_service"): + MQConnector.__init__(self, config, service_name) + self.vhost = '/neon_users' + module_config = (config or Configuration()).get('neon_users_service', + {}) + self.service = NeonUsersService(module_config) + + def parse_mq_request(self, mq_req: dict) -> dict: + mq_req = MQRequest(**mq_req) + + # Ensure supplied `user` object is consistent with request params + if mq_req.user and mq_req.username != mq_req.user.username: + return {"success": False, + "error": f"Supplied username ({mq_req.username}) " + f"Does not match user object " + f"({mq_req.user.username})"} + + if mq_req.operaion == "create": + if not mq_req.password: + return {"success": False, + "error": "Empty password provided"} + if not mq_req.user: + mq_req.user = User(username=mq_req.username, + password_hash=mq_req.password) + mq_req.user.password_hash = mq_req.password + try: + user = self.service.create_user(mq_req.user) + except Exception as e: + return {"success": False, "error": repr(e)} + elif mq_req.operation == "read": + try: + if mq_req.password: + user = self.service.read_authenticated_user(mq_req.username, + mq_req.password) + else: + user = self.service.read_unauthenticated_user( + mq_req.username) + except Exception as e: + return {"success": False, "error": repr(e)} + elif mq_req.operation == "update": + try: + if mq_req.password: + mq_req.user.password_hash = mq_req.password + user = self.service.update_user(mq_req.user) + except Exception as e: + return {"success": False, "error": repr(e)} + elif mq_req.operation == "delete": + try: + user = self.service.delete_user(mq_req.user) + except Exception as e: + return {"success": False, "error": repr(e)} + else: + raise RuntimeError(f"Invalid operation requested: " + f"{mq_req.operation}") + return {"success": True, "user": user.model_dump()} + + def handle_request(self, + channel: pika.channel.Channel, + method: pika.spec.Basic.Deliver, + _: pika.spec.BasicProperties, + body: bytes): + """ + Handles input MQ request objects. + @param channel: MQ channel object (pika.channel.Channel) + @param method: MQ return method (pika.spec.Basic.Deliver) + @param _: MQ properties (pika.spec.BasicProperties) + @param body: request body (bytes) + """ + message_id = None + try: + if not isinstance(body, bytes): + raise TypeError(f'Invalid body received, expected bytes string;' + f' got: {type(body)}') + request = b64_to_dict(body) + message_id = request.get("message_id") + response = self.parse_mq_request(request) + data = dict_to_b64(response) + + # queue declare is idempotent, just making sure queue exists + channel.queue_declare(queue='neon_users_output') + + channel.basic_publish( + exchange='', + routing_key=request.get('routing_key', + 'neon_users_output'), + body=data, + properties=pika.BasicProperties(expiration='1000') + ) + channel.basic_ack(method.delivery_tag) + except Exception as e: + LOG.exception(f"message_id={message_id}: {e}") + + def pre_run(self, **kwargs): + self.register_consumer("neon_users_consumer", self.vhost, + "neon_users_input", self.handle_request, + auto_ack=False)