-
Notifications
You must be signed in to change notification settings - Fork 2
Processor
master
: de0c926
This class represents the nodes of the pipeline. Each Processor
instance is tasked with performing an operation.
An operation can be anything like -
- Reading input data from a source. (see
InputProcessor
) - Processing i.e. applying a coroutine to an input element.
- Providing outputs for the pipeline.
┌-----------------------[Processor]---------------------------┐
┌-----------┐ ┌-----└---┐ ┌----┘----┐
| input_src | --> | | ┌-------------┐ ┌--------------┐ | |
└-----------┘ | input | ---> | input_queue | | output_queue | ---> | output |
| handler | └-------------┘ └--------------┘ | handler |
┌-----------┐ | | | ┌----------------┐ ^ | |
| input_src | --> | | └---> | processor_coro | ---┘ | |
└-----------┘ └----┐----┘ └----------------┘ ┌-------------┐ └----┐----┘
| | accumulator | |
└-----------------------------------------└-------------┘------┘
Contains the name string for that particular node
A unique identifier for that node.
The input Queue for the node. This queue contains tuples
of the form (input1, input2, ...)
with each input
coming from the output of one of the input sources.
The output Queue for the node. This queue contains the output of the _processor_coro
coroutine.
The associated coroutine with this Processor
instance. This is the coroutine that does the actual processing expected of that node.
The function signature of this coroutine is -
async def coro(self:Processor, q_elt:tuple, *args, **kwargs):
...
return *something*
self
is the Processor
instance coro
is running on and q_elt
is the tuple
popped off the _input_queue
. coro
can also be defined as a @classmethod
to group related coroutines together as in the demo
In which case, the signature becomes -
@classmethod
async def coro(cls, self:Processor, q_elt:tuple, *args, **kwargs)
The signature is also different for instances of class
InputProcessor
whereq_elt:tuple
~>q_out:asyncio.Queue
which is a reference to_output_queue
.
An empty list to be used for persisting values across calls ? No specific use defined
A list
of asyncio.Queue
s represeting the input sources for that particular node. These queues are set up by the Plumber
that is instrumenting the pipeline and are used for passing data between processors.
_input_srcs
is used by the _input_handler
to populate the _input_queue
of the instance.
Similar to _input_srcs
, this is a list
of asyncio.Queue
s representing the output destinations for that particular node.
The _output_handler
is responsible for populating theses queues with copies of the output of _process
This is a dict
passed by the Plumber
instrumenting the pipeline to each Processor
node, containing "environment variables" i.e. variables from contexts that are not directly visible to the Processor
member functions/variables.
Method Signature
def __init__(self, name:str = None, input_queue:asyncio.Queue = None,
output_queue:asyncio.Queue = None, coro = None,
input_srcs:list = None, output_dests:list = None,
env_vars:dict = None,
*args, **kwargs):
...
The __init__
constructor sets the values of the attributes based on the arguments provided and sets up 3 tasks -
-
_input_handler_task
~>_input_handler
coroutine -
_processor_task
~>_processor
coroutine -
_output_handler_task
~>_output_handler
coroutine
Method Signature
async def _input_handler(self, input_src:list=None):
...
_input_handler
waits on each of the queues in the input_srcs
list and puts a tuple (elt1, elt2,...)
into the input_queue
of the object, where each element belongs to one of the queues from the input_srcs
(in order of insertion)
When input_srcs
is an empty list or None
, the _input_handler_task
stops.
Method Signature
async def _output_handler(self, output_dest:list=None):
...
_output_handler
pops an element off the output_queue
and puts the same in each of the queues in output_dests
.
When output_dests
is an empty list or None
, the _output_handler_task
stops.
Feel free to raise issues!
I'm looking for people who can help me backpressure test this 🚀