Source code for joule.client.composite_module

import asyncio
import argparse
from typing import Dict
from joule.models.pipes.pipe import Pipe as Pipe

from . import base_module
from joule.errors import ConfigurationError
#from joule.models import ConfigurationError


[docs]class CompositeModule(base_module.BaseModule):
[docs] async def setup(self, parsed_args: argparse.Namespace, inputs: Dict[str, Pipe], outputs: Dict[str, Pipe], loop: asyncio.AbstractEventLoop): """ This method must be implemented Args: parsed_args: parsed command line arguments inputs: pipe connections to input streams. Keys are the names specified in the module configuration file outputs: pipe connections ot output streams. Keys are the names specified in the module configuration loop: the current event loop Returns: array of coroutine objects """ assert False, "implement in child class" # pragma: no cover
def run_as_task(self, parsed_args, app, loop): coro = self._build_pipes(parsed_args, loop) (pipes_in, pipes_out) = loop.run_until_complete(coro) coro = self.setup(parsed_args, pipes_in, pipes_out, loop) tasks = loop.run_until_complete(coro) return asyncio.gather(*tasks)