-
Notifications
You must be signed in to change notification settings - Fork 2
InputProcessor
This class represents the input nodes of the pipeline's graph. It is a subclass of Processor
and overrides the _process
member coroutine. This class' object is instantiated by the Plumber
when the input_srcs
list is empty.
async def _process(self, *args, **kwargs):
await self._processor_coro(self, self._output_queue, *args, **kwargs)
The role of _process
is a little different from that of Processor._process
. In this case, it calls the _processor_coro
with the arguments - self
, self._output_queue
, *args
, **kwargs
.
_processor_coro
is expected to be a coroutine that fills the providedself._output_queue
with input data. It can be implemented as follows -async def input_coro(self:Processor, q_out:asyncio.Queue, *args, **kwargs): while(True): inp = *get_input coro* await q_out.put(inp)
Note- Bear in mind that, since _processor_coro
is an async function, it cannot be a long processor bound call as that will cause the other tasks to suffer. Furthermore, an overeager _processor_coro
can lead to backpressure issues down the line.
Feel free to raise issues!
I'm looking for people who can help me backpressure test this 🚀