-
Notifications
You must be signed in to change notification settings - Fork 2
Processor
This class represents the nodes of the pipeline. Each Processor
instance is tasked with performing an operation.
An operation can be anything like -
- Reading input data from a source. (see
InputProcessor
) - Processing i.e. applying a coroutine to an input element.
- Providing outputs for the pipeline.
┌-----------------------[Processor]---------------------------┐
┌-----------┐ ┌-----└---┐ ┌----┘----┐
| input_src | --> | | ┌-------------┐ ┌--------------┐ | |
└-----------┘ | input | ---> | input_queue | | output_queue | ---> | output |
| handler | └-------------┘ └--------------┘ | handler |
┌-----------┐ | | | ┌----------------┐ ^ | |
| input_src | --> | | └---> | processor_coro | ---┘ | |
└-----------┘ └----┐----┘ └----------------┘ ┌-------------┐ └----┐----┘
| | accumulator | |
└-----------------------------------------└-------------┘------┘
Contains the name string for that particular node
A unique identifier for that node.
The input Queue for the node. This queue contains tuples
of the form (input1, input2, ...)
with each input
coming from the output of one of the input sources.
The output Queue for the node. This queue contains the output of the _processor_coro
coroutine.
The associated coroutine with this Processor
instance. This is the coroutine that does the actual processing expected of that node.
The function signature of this coroutine is -
async def coro(self:Processor, q_elt:tuple, *args, **kwargs):
...
return *something*
self
is the Processor
instance coro
is running on and q_elt
is the tuple
popped off the _input_queue
. coro
can also be defined as a @classmethod
to group related coroutines together as in the demo
In which case, the signature becomes -
@classmethod
async def coro(cls, self:Processor, q_elt:tuple, *args, **kwargs)
The signature is also different for instances of class
InputProcessor
whereq_elt:tuple
is replaced byq_out:asyncio.Queue
which is a reference to theoutput_queue
. The difference betweenProcessor._process
andInputProcessor._process
is in the way they expectself._processor_coro
to behave. This is evident in their function signature -
# Processor._processor_coro
async def coro(self, q_element, *args, **kwargs)
# InputProcessor._processor_coro
async def coro(self, output_queue, *args, **kwargs)
That is,
Processor._processor_coro
works with input data whileInputProcessor._processor_coro
needs a reference to theouput_queue
to fill it up.
An empty list to be used for persisting values across calls? No specific use defined
A list
of asyncio.Queue
s representing the input sources for that particular node. These queues are set up by the Plumber
that is instrumenting the pipeline and are used for passing data between processors.
_input_srcs
is used by the _input_handler
to populate the _input_queue
of the instance.
Similar to _input_srcs
, this is a list
of asyncio.Queue
s representing the output destinations for that particular node.
The _output_handler
is responsible for populating these queues with copies of the output of _process
This is a dict
passed by the Plumber
instrumenting the pipeline to each Processor
node, containing "environment variables" i.e. variables from contexts that are not directly visible to the Processor
member functions/variables.
def __init__(self, name:str = None, input_queue:asyncio.Queue = None,
output_queue:asyncio.Queue = None, coro = None,
input_srcs:list = None, output_dests:list = None,
env_vars:dict = None,
*args, **kwargs):
...
The __init__
constructor sets the values of the attributes based on the arguments provided and sets up 3 tasks -
-
_input_handler_task
:_input_handler
coroutine -
_processor_task
:_processor
coroutine -
_output_handler_task
:_output_handler
coroutine
async def _input_handler(self, input_src:list=None):
...
_input_handler
waits on each of the queues in the input_srcs
list and puts a tuple (elt1, elt2,...)
into the input_queue
of the object, where each element belongs to one of the queues from the input_srcs
(in order of insertion)
When input_srcs
is an empty list or None
, the _input_handler_task
stops.
async def _output_handler(self, output_dest:list=None):
...
_output_handler
pops an element off the output_queue
and puts the same in each of the queues in output_dests
.
When output_dests
is an empty list or None
, the _output_handler_task
stops.
async def _process(self, *args, **kwargs):
...
_processor_task
is the task that is responsible for the processing of input data in each node. _process
waits on the _input_queue
of the node and for each element is pops off the queue, it does the following-
- Create a task
_processor_task(input_element, *args, **kwargs)
that takes in the element (input) and calls
await self._processor_coro(self, input_element, *args, **kwargs)
Note that,
-
*args, **kwargs
were passed in theProcessor.__init__
call -
self._processor_coro
needs to return a value which is put into the node'soutput_queue
Feel free to raise issues!
I'm looking for people who can help me backpressure test this 🚀