Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

First attempt of moving to Redis #96

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ services:
- docker

install:
- docker run -d --name streams -p 4222:4222 nats-streaming:0.7.0
- docker run -d --name streams -p 6379:6379 redis:5.0-rc4-alpine
- pip install -U attrs
- pip install -r requirements/common.txt
- pip install -r requirements/ci.txt
Expand Down
6 changes: 3 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ lint:
@tox -e isort,flake8,mypy

run-streams:
@docker run -d --name streams -p 4222:4222 nats-streaming:0.7.0
@docker run -d --name streams -p 6379:6379 redis:5.0-rc4-alpine

test:
@docker network create test
@docker run -d --name streams --network test -p 4222:4222 nats-streaming:0.7.0
@docker run -d --name streams --network test -p 6379:6379 redis:5.0-rc4-alpine
@docker build -t $(SERVICE) -f Dockerfile.test .
@docker run --rm --network test -e STREAM_URI=nats://streams:4222 $(SERVICE)
@docker run --rm --network test -e STREAM_URI=nats://streams:6379 $(SERVICE)

clean-docker:
@docker rm -f streams
Expand Down
41 changes: 23 additions & 18 deletions rampante/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,27 +13,40 @@
Dict,
Optional,
Union,
List
)

import msgpack
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN
import aioredis
from tenacity import retry, wait_random_exponential

log = logging.getLogger("rampante.connector")



loop = asyncio.get_event_loop()

async def go():
redis = await aioredis.create_redis(
'redis://localhost', loop=loop)
await redis.set('my-key', 'value')
val = await redis.get('my-key')
print(val)
redis.close()
await redis.wait_closed()
loop.run_until_complete(go())


class _Streaming():
"""

await streaming.start(server=queue_uri, client_name="service-spawner", loop=loop)

"""
def __init__(self) -> None:
self._nc: NATS = None
self._sc: STAN = None
self._redis = None
self._status = False
self._subscription: Dict = {}
self._subscription: List = []
self.service_group: Optional[str] = None

@retry(wait=wait_random_exponential(multiplier=1, max=10))
Expand All @@ -42,11 +55,7 @@ async def start(self, server: str, client_name: str, service_group: str, loop: a
if self._status is False:
loop = loop or asyncio.get_event_loop()
self.service_group = service_group
self._nc = NATS()
await self._nc.connect(servers=[server], io_loop=loop)
# Start session with NATS Streaming cluster.
self._sc = STAN()
await self._sc.connect("test-cluster", client_name, nats=self._nc)
self._redis = await aioredis.create_redis(server, loop=loop)
self._status = True
log.info("Streaming connected.")
else:
Expand All @@ -55,16 +64,14 @@ async def start(self, server: str, client_name: str, service_group: str, loop: a
async def publish(self, name: str, data: Dict) -> None:
"""Publish a message inside a queue."""
if self._status:
body = msgpack.packb(data)
await self._sc.publish(name, body)
await self._redis.xadd(name, body)
log.info(f"Event {data} published inside {name}")
else:
raise RuntimeError("Streaming is not active.")

async def subscribe(self, name: str, callback: Union[Callable, Awaitable]):
"""Subscribe to a given channel."""
self._subscription[name] = await self._sc.subscribe(
name, queue=self.service_group, durable_name="durable", cb=callback)
self._subscription[name] = self._redis.xread([name], timeout=10)

async def unsubscribe(self, name: str) -> None:
"""Unsubscribe from a given channel."""
Expand All @@ -74,10 +81,8 @@ async def unsubscribe(self, name: str) -> None:
async def stop(self) -> None:
"""Close all connections."""
log.warning("Closing connections....")
for subsciption in self._subscription.values():
await subsciption.unsubscribe()
await self._sc.close()
await self._nc.close()
self._redis.close()
await self._redis.wait_closed()
self._status = False


Expand Down
2 changes: 1 addition & 1 deletion requirements/common.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
asyncio-nats-streaming==0.1.2
git+git://github.com/aio-libs/aioredis@master#egg=aioredis
msgpack-python==0.5.6
tenacity==4.12.0
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
# What packages are required for this module to be executed?
REQUIRED = [
"tenacity>=4.8.0",
"asyncio-nats-streaming>=0.1.2",
"aioredis>=1",
"msgpack-python>=0.4.8"
]

Expand Down