import asyncio
import numpy as np
import logging
import argparse
import sys
from joule.client.base_module import BaseModule
from joule.models.pipes import Pipe
from joule import errors
[docs]
class ReaderModule(BaseModule):
"""
Inherit from this class and implement a :meth:`run` coroutine to create a Joule reader module.
Other methods documented below may be implemented as desired.
"""
[docs]
async def setup(self, parsed_args, app, output):
"""
Configure the module, executes before :meth:`run`
Args:
parsed_args:
app:
output:
"""
pass
[docs]
async def run(self, parsed_args: argparse.Namespace, output: Pipe):
"""
This method must be implemented. It should run in a loop, if it returns the module
stops.
Args:
parsed_args: command line arguments, configure with :meth:`custom_args`
output: pipe connection to the output data stream
.. code-block:: python
class ModuleDemo(ReaderModule):
def run(self, parsed_args, output):
while(not self.stop_requested):
data = self.read_sensor()
await output.write(data)
def self.read_sensor(self) -> np.ndarray:
# custom logic specific to the reader
#... other module code
"""
assert False, "implement in child class" # pragma: no cover
async def run_as_task(self, parsed_args, app):
# check if we should use stdout (no fd's and no configs)
if (parsed_args.pipes == "unset" and
parsed_args.module_config == "unset"):
output = StdoutPipe()
else:
try:
(pipes_in, pipes_out) = await self._build_pipes(parsed_args)
except errors.ApiError as e:
logging.error(str(e))
return asyncio.create_task(asyncio.sleep(0))
if len(pipes_out.keys()) != 1:
logging.error("Reader module must have a single output")
return asyncio.create_task(asyncio.sleep(0))
output = list(pipes_out.values())[0]
await self.setup(parsed_args, app, output)
return asyncio.create_task(self.run(parsed_args, output))
class StdoutPipe:
@staticmethod
async def write(data: np.ndarray):
# check if this is a structured array, if so flatten it
if data.ndim == 1:
data = np.c_[data['timestamp'][:, None], data['data']]
for row in data:
ts = row[0]
vals = row[1:]
print("%d %s" % (ts, " ".join([repr(x) for x in vals])))
async def close_interval(self):
print("--- interval gap ---", file=sys.stderr)
async def close(self):
pass