From 6e5478ed3ba1a409af6af63b9dee1067d667561c Mon Sep 17 00:00:00 2001 From: Christian Barra Date: Sun, 12 Aug 2018 00:13:20 +0200 Subject: [PATCH] First attempt of moving to Redis --- .travis.yml | 2 +- Makefile | 6 +++--- rampante/connector.py | 41 +++++++++++++++++++++++------------------ requirements/common.txt | 2 +- setup.py | 2 +- 5 files changed, 29 insertions(+), 24 deletions(-) diff --git a/.travis.yml b/.travis.yml index 5fb4c44..63d5b81 100644 --- a/.travis.yml +++ b/.travis.yml @@ -16,7 +16,7 @@ services: - docker install: - - docker run -d --name streams -p 4222:4222 nats-streaming:0.7.0 + - docker run -d --name streams -p 6379:6379 redis:5.0-rc4-alpine - pip install -U attrs - pip install -r requirements/common.txt - pip install -r requirements/ci.txt diff --git a/Makefile b/Makefile index c0373f4..5ae0570 100644 --- a/Makefile +++ b/Makefile @@ -6,13 +6,13 @@ lint: @tox -e isort,flake8,mypy run-streams: - @docker run -d --name streams -p 4222:4222 nats-streaming:0.7.0 + @docker run -d --name streams -p 6379:6379 redis:5.0-rc4-alpine test: @docker network create test - @docker run -d --name streams --network test -p 4222:4222 nats-streaming:0.7.0 + @docker run -d --name streams --network test -p 6379:6379 redis:5.0-rc4-alpine @docker build -t $(SERVICE) -f Dockerfile.test . - @docker run --rm --network test -e STREAM_URI=nats://streams:4222 $(SERVICE) + @docker run --rm --network test -e STREAM_URI=nats://streams:6379 $(SERVICE) clean-docker: @docker rm -f streams diff --git a/rampante/connector.py b/rampante/connector.py index e4d05e1..3b32605 100644 --- a/rampante/connector.py +++ b/rampante/connector.py @@ -13,16 +13,30 @@ Dict, Optional, Union, + List ) import msgpack -from nats.aio.client import Client as NATS -from stan.aio.client import Client as STAN +import aioredis from tenacity import retry, wait_random_exponential log = logging.getLogger("rampante.connector") + +loop = asyncio.get_event_loop() + +async def go(): + redis = await aioredis.create_redis( + 'redis://localhost', loop=loop) + await redis.set('my-key', 'value') + val = await redis.get('my-key') + print(val) + redis.close() + await redis.wait_closed() +loop.run_until_complete(go()) + + class _Streaming(): """ @@ -30,10 +44,9 @@ class _Streaming(): """ def __init__(self) -> None: - self._nc: NATS = None - self._sc: STAN = None + self._redis = None self._status = False - self._subscription: Dict = {} + self._subscription: List = [] self.service_group: Optional[str] = None @retry(wait=wait_random_exponential(multiplier=1, max=10)) @@ -42,11 +55,7 @@ async def start(self, server: str, client_name: str, service_group: str, loop: a if self._status is False: loop = loop or asyncio.get_event_loop() self.service_group = service_group - self._nc = NATS() - await self._nc.connect(servers=[server], io_loop=loop) - # Start session with NATS Streaming cluster. - self._sc = STAN() - await self._sc.connect("test-cluster", client_name, nats=self._nc) + self._redis = await aioredis.create_redis(server, loop=loop) self._status = True log.info("Streaming connected.") else: @@ -55,16 +64,14 @@ async def start(self, server: str, client_name: str, service_group: str, loop: a async def publish(self, name: str, data: Dict) -> None: """Publish a message inside a queue.""" if self._status: - body = msgpack.packb(data) - await self._sc.publish(name, body) + await self._redis.xadd(name, body) log.info(f"Event {data} published inside {name}") else: raise RuntimeError("Streaming is not active.") async def subscribe(self, name: str, callback: Union[Callable, Awaitable]): """Subscribe to a given channel.""" - self._subscription[name] = await self._sc.subscribe( - name, queue=self.service_group, durable_name="durable", cb=callback) + self._subscription[name] = self._redis.xread([name], timeout=10) async def unsubscribe(self, name: str) -> None: """Unsubscribe from a given channel.""" @@ -74,10 +81,8 @@ async def unsubscribe(self, name: str) -> None: async def stop(self) -> None: """Close all connections.""" log.warning("Closing connections....") - for subsciption in self._subscription.values(): - await subsciption.unsubscribe() - await self._sc.close() - await self._nc.close() + self._redis.close() + await self._redis.wait_closed() self._status = False diff --git a/requirements/common.txt b/requirements/common.txt index c94480f..923ce35 100644 --- a/requirements/common.txt +++ b/requirements/common.txt @@ -1,3 +1,3 @@ -asyncio-nats-streaming==0.1.2 +git+git://github.com/aio-libs/aioredis@master#egg=aioredis msgpack-python==0.5.6 tenacity==4.12.0 diff --git a/setup.py b/setup.py index cf4883b..6f81c84 100644 --- a/setup.py +++ b/setup.py @@ -21,7 +21,7 @@ # What packages are required for this module to be executed? REQUIRED = [ "tenacity>=4.8.0", - "asyncio-nats-streaming>=0.1.2", + "aioredis>=1", "msgpack-python>=0.4.8" ]