Source code for joule.client.composite_module

import asyncio
import argparse
import logging
from typing import Dict

from joule.models.pipes.pipe import Pipe as Pipe
from joule import errors
from . import base_module


[docs] class CompositeModule(base_module.BaseModule):
[docs] async def setup(self, parsed_args: argparse.Namespace, inputs: Dict[str, Pipe], outputs: Dict[str, Pipe]): """ This method must be implemented Args: parsed_args: parsed command line arguments inputs: pipe connections to input streams. Keys are the names specified in the module configuration file outputs: pipe connections ot output streams. Keys are the names specified in the module configuration loop: the current event loop Returns: array of coroutine objects """ assert False, "implement in child class" # pragma: no cover
async def run_as_task(self, parsed_args, app) -> asyncio.Task: try: (pipes_in, pipes_out) = await self._build_pipes(parsed_args) except errors.ApiError as e: logging.error(str(e)) return asyncio.create_task(asyncio.sleep(0)) tasks = await self.setup(parsed_args, pipes_in, pipes_out) return asyncio.gather(*tasks)