Skip to content

Processor

Ahaan Dabholkar edited this page May 23, 2021 · 19 revisions

class Processor

master : de0c926

This class represents the nodes of the pipeline. Each Processor instance is tasked with performing an operation. An operation can be anything like -

  1. Reading input data from a source. (see InputProcessor)
  2. Processing i.e. applying a coroutine to an input element.
  3. Providing outputs for the pipeline.
                           ┌-----------------------[Processor]---------------------------┐     
 ┌-----------┐       ┌-----└---┐                                                    ┌----┘----┐
 | input_src |  -->  |         |      ┌-------------┐         ┌--------------┐      |         |
 └-----------┘       |  input  | ---> | input_queue |         | output_queue | ---> | output  |
                     | handler |      └-------------┘         └--------------┘      | handler |
 ┌-----------┐       |         |        |     ┌----------------┐    ^               |         |
 | input_src |  -->  |         |        └---> | processor_coro | ---┘               |         |
 └-----------┘       └----┐----┘              └----------------┘    ┌-------------┐ └----┐----┘
                          |                                         | accumulator |      |     
                          └-----------------------------------------└-------------┘------┘     

Attributes

_name : str

Contains the name string for that particular node

_uuid : str

A unique identifier for that node.

_input_queue : asyncio.Queue

The input Queue for the node. This queue contains tuples of the form (input1, input2, ...) with each input coming from the output of one of the input sources.

_output_queue : asyncio.Queue

The output Queue for the node. This queue contains the output of the _processor_coro coroutine.

_processor_coro : coroutine object

The associated coroutine with this Processor instance. This is the coroutine that does the actual processing expected of that node. The function signature of this coroutine is -

async def coro(self:Processor, q_elt:tuple, *args, **kwargs):
    ...
    return *something*

self is the Processor instance coro is running on and q_elt is the tuple popped off the _input_queue. coro can also be defined as a @classmethod to group related coroutines together as in the demo

In which case, the signature becomes -

@classmethod
async def coro(cls, self:Processor, q_elt:tuple, *args, **kwargs)

The signature is also different for instances of class InputProcessor where q_elt:tuple ~> q_out:asyncio.Queue which is a reference to _output_queue.

_output_accumulator : list

An empty list to be used for persisting values across calls ? No specific use defined

_input_srcs : list

A list of asyncio.Queues represeting the input sources for that particular node. These queues are set up by the Plumber that is instrumenting the pipeline and are used for passing data between processors.

_input_srcs is used by the _input_handler to populate the _input_queue of the instance.

_output_dests : list

Similar to _input_srcs, this is a list of asyncio.Queues representing the output destinations for that particular node. The _output_handler is responsible for populating theses queues with copies of the output of _process

_env_vars : dict

This is a dict passed by the Plumber instrumenting the pipeline to each Processor node, containing "environment variables" i.e. variables from contexts that are not directly visible to the Processor member functions/variables.


Methods

__init__(...)

Method Signature

def __init__(self, name:str = None, input_queue:asyncio.Queue = None,
             output_queue:asyncio.Queue = None, coro = None,
             input_srcs:list = None, output_dests:list = None,
             env_vars:dict = None, 
             *args, **kwargs):
    ...

The __init__ constructor sets the values of the attributes based on the arguments provided and sets up 3 tasks -

  1. _input_handler_task ~> _input_handler coroutine
  2. _processor_task ~> _processor coroutine
  3. _output_handler_task ~> _output_handler coroutine

_input_handler(...)

Method Signature

async def _input_handler(self, input_src:list=None):
    ...

_input_handler waits on each of the queues in the input_srcs list and puts a tuple (elt1, elt2,...) into the input_queue of the object, where each element belongs to one of the queues from the input_srcs (in order of insertion)

When input_srcs is an empty list or None, the _input_handler_task stops.

_output_handler(...)

Method Signature

async def _output_handler(self, output_dest:list=None):
    ...

_output_handler pops an element off the output_queue and puts the same in each of the queues in output_dests.

When output_dests is an empty list or None, the _output_handler_task stops.

Clone this wiki locally