-
Notifications
You must be signed in to change notification settings - Fork 2
Plumber
An instance of this class is responsible for actually creating the pipeline using Processor
objects and the provided input graph.
It sets up a series of producer-consumer relationships using asyncio.Queue
s between different Processor
nodes as specified in the input graph.
This attribute is set by the Plumber._parse_input_graph
method and holds the liason queue graph for the specified input_d
. Liason Queues are the asyncio.Queue
s that build the producer-consumer relationships between the nodes of the graph. The structure is inspired by an adjacency graph -
As an example a dummy input_d
and its resulting liason_q_graph
is shown
input_d = \
{
'nodes': {
'n1': {'coro': ..., 'args': {...} },
'n2': {'coro': ...},
'n3': {'coro': ...},
'n4': {'coro': ..., 'properties': {'aggregate_inputs': False }},
},
'graph': {
'n1': ('n2', ),
'n2': ('n3', 'n4'),
'n3': ('n4', ),
'n4': None,
},
}
self.__liason_q_graph = \
[[None, <asyncio.Queue at 0xadd1>, None, None],
[None, None, <asyncio.Queue at 0xadd2>, <asyncio.Queue at 0xadd3>],
[None, None, None, <asyncio.Queue at 0xadd3>],
[None, None, None, None]]
Note, that when aggregate_inputs
is set to False
, the inputs to that node are treated mutually independent. For example, in the above graph the input queue of node n4
would look like -
# with aggregate_inputs: False with aggregate_inputs: True
┌-------------┐ ┌------------------------┐
| (output n2) | | (output n2, output n3) |
┌-------------┐ ┌------------------------┐
| (output n3) | | (output n2, output n3) |
┌-------------┐ └------------------------┘
| (output n3) |
┌-------------┐
| (output n2) |
└-------------┘
A list of Processor
instances correspoding to each node.
Function that takes the input_d['nodes']['<node_name>']['coro']
as input and returns the appropriate coroutine as output.
A dictionary containing variables that the caller of Plumber
might want to pass to the Processor
instances. This dict
is set as the Processor._env_vars
attribute of every node in the graph.
The input data structure containing the node descriptions and the connecting graph
input_d = \
{
nodes: {...},
graph: {...},
}
Each item in the nodes
dict is in turn a dict describing the characteristics of that particular node. The following characteristics can be set for each node -
-
coro
~> The value of this key is fed as input to thecoro_map
to point to the function associated with that particular node. -
args
~> This maps directly to the arguments of theProcessor._processor
coroutine. -
properties
~> This dict is used to manipulate the creation of theliason_q_graph
. The only property currently supported isaggregate_inputs
def __init__(self, input_d:dict=None, coro_map=None, env_vars:dict=None):
...
The Plumber
class constructor takes three arguments -
-
input_d
~> The input data structure containing node info and the graph for the pipeline -
coro_map
~> Function to map theinput_d['nodes'][<'node_name'>]['coro']
specified in theinput_d
to actual coroutines -
env_vars
~> Dictionary for passing local scope variables toProcessor
instances.
The __init__
function also calls the _parse_input_graph
function to create the liason_q_graph
for the given input.
def create_pipeline():
Calls _create_pipline
with __input_d['nodes']
and __liason_q_graph
as the arguments
def _create_pipeline(self, nodes_d:dict=None, liason_g:list=None):
Given the liason queue graph, this method instantiates the Processor
/ InputProcessor
classes with the coroutine objects returned by __coro_map(nodes_d['<node_name>']['coro'])
and its arguments provided by nodes_d['<node_name>']['args']
.
The env_vars
dict is also passed to each Processor
/ InputProcessor
instance.
def _parse_input_graph(self, input_d:dict):
This method parses the input_d
dict and builds the __liason_q_graph
. The properties
key of each input_d['nodes']['<node_name>']
is used here to manipulate the graph creation process.
Feel free to raise issues!
I'm looking for people who can help me backpressure test this 🚀