Source code for joule.models.pipes.pipe

import enum
from typing import TYPE_CHECKING, List
import numpy as np
import logging

from joule.models.pipes.errors import PipeError

if TYPE_CHECKING:  # pragma: no cover
    from joule.models import (Module, Stream)

log = logging.getLogger('joule')


[docs]class Pipe: """ This encapsulates streams and connects to modules. Some more infos2 .. note:: There are many different kinds of pipes """
[docs] class DIRECTION(enum.Enum): INPUT = enum.auto() OUTPUT = enum.auto() TWOWAY = enum.auto()
def __init__(self, name=None, direction=None, module=None, stream=None, layout=None): """ :param name: optional name for the pipe, useful for debugging :param direction: data flow :type direction: DIRECTION :param module: what?? :param stream: optional stream :param layout: the data layout """ self.name: str = name self.direction: Pipe.DIRECTION = direction self.module: 'Module' = module self.stream: 'Stream' = stream self._layout = layout self.closed = False self.subscribers: List['Pipe'] = []
[docs] def enable_cache(self, lines: int): """ Turn on caching for pipe writes. Data is only transmitted once the cache is full. This improves system performance especially if :meth:`write` is called rapidly with short arrays. Once enabled, caching cannot be disabled. Args: lines: cache size """ if self.direction == Pipe.DIRECTION.INPUT: raise PipeError("cannot control cache on input pipes") raise PipeError("abstract method must be implemented by child")
[docs] async def flush_cache(self): """ Force a pipe flush even if the cache is not full. Raises an error if caching is not enabled. """ if self.direction == Pipe.DIRECTION.INPUT: raise PipeError("cannot control cache on input pipes") raise PipeError("abstract method must be implemented by child")
[docs] async def read(self, flatten=False): """ Read stream data. By default this method returns a structured array with ``timestamp`` and ``data`` fields. This method is a coroutine. Args: flatten: return an unstructured array (flat 2D matrix) with timestamps in the first column Returns: numpy.ndarray >>> data = await pipe.read() [1, 2, 3] >>> data = await pipe.read(flatten=True) # the same data is returned again [1,2,3] >>> pipe.consume(len(data)) # next call to read will return only new data """ if self.direction == Pipe.DIRECTION.OUTPUT: raise PipeError("cannot read from an output pipe") raise PipeError("abstract method must be implemented by child")
[docs] def consume(self, num_rows): """ Flush data from the read buffer. The next call to :meth:`read` will return any unflushed data followed by new incoming data. Args: num_rows: number of rows to flush from the read buffer """ if self.direction == Pipe.DIRECTION.OUTPUT: raise PipeError("cannot consume from an output pipe") raise PipeError("abstract method must be implemented by child")
@property def end_of_interval(self): return False
[docs] async def write(self, data): """ Write timestamped data to the pipe. Timestamps must be monotonically increasing and should not overlap with existing stream data in the database. This method is a coroutine. Args: data (numpy.ndarray): May be a structured array with ``timestamp`` and ``data`` fields or an unstructured array with timestamps in the first column. >>> await pipe.write([[1000, 2, 3],[1001, 3, 4]]) """ if self.direction == Pipe.DIRECTION.INPUT: raise PipeError("cannot write to an input pipe") raise PipeError("abstract method must be implemented by child")
[docs] async def close_interval(self): """ Signal a break in the data stream. This should be used to indicate missing data. Data returned from :meth:`read` will be chunked by interval boundaries. """ if self.direction == Pipe.DIRECTION.INPUT: raise PipeError("cannot write to an input pipe") raise PipeError("abstract method must be implemented by child")
def subscribe(self, pipe): if self.direction == Pipe.DIRECTION.INPUT: raise PipeError("cannot subscribe to an input pipe") self.subscribers.append(pipe) def unsubscribe(): i = self.subscribers.index(pipe) del self.subscribers[i] return unsubscribe
[docs] async def close(self): """ Close the pipe. This also closes any subscribers. If ``close_cb`` is defined it will be executed before the subscribers are closed. """ # close the pipe, optionally implemented by the child pass # pragma: no cover
@property def layout(self): if self.stream is not None: return self.stream.layout return self._layout @property def dtype(self) -> np.dtype: return compute_dtype(self.layout) def _apply_dtype(self, data: np.ndarray) -> np.ndarray: """convert [data] to the pipe's [dtype]""" if data.ndim == 1: # already a structured array just verify its data type if data.dtype != self.dtype: raise PipeError("wrong dtype for 1D (structured) array" + "[%s] != req type [%s]" % (data.dtype, self.dtype)) return data elif data.ndim == 2: # Convert to structured array sarray = np.zeros(data.shape[0], dtype=self.dtype) try: sarray['timestamp'] = data[:, 0] # Need the squeeze in case sarray['data'] is 1 dimensional sarray['data'] = np.squeeze(data[:, 1:]) return sarray except (IndexError, ValueError): raise PipeError("wrong number of fields for this data type") else: raise PipeError("wrong number of dimensions in array") @staticmethod def _format_data(data, flatten): if flatten: return np.c_[data['timestamp'][:, None], data['data']] else: return data @staticmethod def _validate_data(data): if type(data) is not np.ndarray: raise PipeError("invalid data type must be a structured array or 2D matrix") # check for valid data type try: if (len(data) == 0) or len(data[0]) == 0: log.info("pipe write called with no data") return False except TypeError: raise PipeError("invalid data type must be a structured array or 2D matrix") return True def __repr__(self): msg = "<Pipe(name='%s', direction=" % self.name if self.direction == Pipe.DIRECTION.INPUT: msg += 'INPUT' else: msg += 'OUTPUT' msg += ", module=" if self.module is not None: msg += self.module.name else: msg += "[None]" msg += ", stream=" if self.stream is not None: msg += self.stream.name else: msg += "[None]" msg += '>' return msg
def interval_token(layout): nelem = int(layout.split('_')[1]) token = np.zeros(1, dtype=compute_dtype(layout)) token['timestamp'] = 0 token['data'] = np.zeros(nelem) return token def find_interval_token(raw: bytes, layout): token = interval_token(layout).tostring() index = raw.find(token) if index == -1: return None return index, index + len(token) def compute_dtype(layout: str) -> np.dtype: try: ltype = layout.split('_')[0] lcount = int(layout.split('_')[1]) if ltype.startswith('int'): atype = '<i' + str(int(ltype[3:]) // 8) elif ltype.startswith('uint'): atype = '<u' + str(int(ltype[4:]) // 8) elif ltype.startswith('float'): atype = '<f' + str(int(ltype[5:]) // 8) else: raise ValueError() return np.dtype([('timestamp', '<i8'), ('data', atype, lcount)]) except (IndexError, ValueError): raise ValueError("bad layout: [%s]" % layout)