Skip to content

Commit

Permalink
Merge pull request #46 from aio-libs/fix_asyncio_queue_pool
Browse files Browse the repository at this point in the history
Fix pool concurrency
  • Loading branch information
argaen authored May 24, 2017
2 parents 84199f1 + ff4dbc1 commit cd902d6
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 32 deletions.
49 changes: 18 additions & 31 deletions aiomcache/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ def __init__(self, host, port, *, minsize, maxsize, loop=None):
self._minsize = minsize
self._maxsize = maxsize
self._loop = loop
self._pool = asyncio.Queue(maxsize, loop=loop)
self._pool = asyncio.Queue(loop=loop)
self._in_use = set()
self._size = 0

@asyncio.coroutine
def clear(self):
Expand All @@ -28,7 +27,6 @@ def clear(self):
self._do_close(conn)

def _do_close(self, conn):
self._size -= 1
conn.reader.feed_eof()
conn.writer.close()

Expand All @@ -39,29 +37,20 @@ def acquire(self):
:return: ``tuple`` (reader, writer)
"""
while self._size < self._minsize:
while self.size() == 0 or self.size() < self._minsize:
_conn = yield from self._create_new_conn()

# Could not create new connection
if _conn is None:
break
yield from self._pool.put(_conn)
self._pool.put_nowait(_conn)

conn = None
while not conn:
if not self._pool.empty():
_conn = yield from self._pool.get()
if _conn.reader.at_eof() or _conn.reader.exception():
self._do_close(_conn)
conn = None
else:
conn = _conn

if conn is None:
_conn = yield from self._pool.get()
if _conn.reader.at_eof() or _conn.reader.exception():
self._do_close(_conn)
conn = yield from self._create_new_conn()

# Give up control
yield from asyncio.sleep(0, loop=self._loop)
else:
conn = _conn

self._in_use.add(conn)
return conn
Expand All @@ -72,26 +61,24 @@ def release(self, conn):
:param conn: ``namedtuple`` (reader, writer)
"""
self._in_use.remove(conn)

if conn.reader.at_eof() or conn.reader.exception():
self._do_close(conn)
else:
# This should never fail because poolsize=maxsize
self._pool.put_nowait(conn)

@asyncio.coroutine
def _create_new_conn(self):
if self._size < self._maxsize:
self._size += 1
try:
reader, writer = yield from asyncio.open_connection(
self._host, self._port, loop=self._loop)
except:
self._size -= 1
raise
return _connection(reader, writer)
if self.size() < self._maxsize:
reader, writer = yield from asyncio.open_connection(
self._host, self._port, loop=self._loop)
if self.size() < self._maxsize:
return _connection(reader, writer)
else:
reader.feed_eof()
writer.close()
return None
else:
return None

def size(self):
return self._size
return self._pool.qsize() + len(self._in_use)
36 changes: 35 additions & 1 deletion tests/pool_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import random
import asyncio
import pytest
from aiomcache.pool import MemcachePool, _connection
from aiomcache.client import acquire


def test_pool_creation(mcache_params, loop):
Expand Down Expand Up @@ -80,9 +82,41 @@ def acquire_wait_release():
assert pool._pool.qsize() == 0
pool.release(conn)

yield from asyncio.gather(*([acquire_wait_release()] * 3), loop=loop)
yield from asyncio.gather(*([acquire_wait_release()] * 50), loop=loop)
assert pool.size() == 1
assert len(pool._in_use) == 0
assert pool._pool.qsize() == 1


@pytest.mark.run_loop
def test_acquire_task_cancellation(
mcache_params, loop):

class Client:
def __init__(self, pool_size=4):
self._pool = MemcachePool(
minsize=pool_size, maxsize=pool_size,
loop=loop, **mcache_params)

@acquire
@asyncio.coroutine
def acquire_wait_release(self, conn):
assert self._pool.size() <= pool_size
yield from asyncio.sleep(random.uniform(0.01, 0.02), loop=loop)
return "foo"

pool_size = 4
client = Client(pool_size=pool_size)
tasks = [
asyncio.wait_for(
client.acquire_wait_release(),
random.uniform(1, 2), loop=loop) for x in range(1000)
]
results = yield from asyncio.gather(
*tasks, loop=loop, return_exceptions=True)
assert client._pool.size() <= pool_size
assert len(client._pool._in_use) == 0
assert "foo" in results


@pytest.mark.run_loop
Expand Down

0 comments on commit cd902d6

Please sign in to comment.