from typing import Optional, Union, Dict, List
from .session import BaseSession
from .folder_type import Folder
from joule import errors
[docs]
class DataStream:
"""
API DataStream model. See :ref:`sec-node-data-stream-actions` for details on using the API to
manipulate data streams. Streams are locked if they are active or statically configured. When
creating a stream manually, omit the ID and status attributes (**is_\***, **active**, and **locked**),
these are set by the Joule server.
Parameters:
name (str): stream name, must be unique in the parent
description (str): optional field
datatype (str): element datatype
keep_us (int): store the last N microseconds of data (-1 to keep all and 0 to keep none)
is_configured: is the stream statically configured with a \*.conf file
is_source: is the stream an active data source
is_destination: is the stream an active data destination
active (bool): is the stream a source or destination
locked (bool): is the stream active or configured
decimate (bool): is the stream data decimated for visualization
elements (List[Element]): list of the stream elements
"""
def __init__(self,
name: str = "", description: str = "",
datatype: str = "float32", keep_us: int = -1,
elements: List['Element'] = None):
self._id = None
self.name = name
self.description = description
self.datatype = datatype
self.keep_us = keep_us # -1 = KEEP ALL
self.is_configured = False
self.is_source = False
self.is_destination = False
self.locked = False
self.active = False
self.decimate = True
if elements is None:
elements = []
self.elements = elements
def __repr__(self):
return "<joule.api.DataStream id=%r name=%r description=%r datatype=%r is_configured=%r is_source=%r is_destination=%r locked=%r decimate=%r>" % (
self._id, self.name, self.description,
self.datatype, self.is_configured,
self.is_source, self.is_destination,
self.locked, self.decimate)
@property
def id(self) -> int:
if self._id is None:
raise errors.ApiError("this is a local model with no ID. See API docs")
return self._id
@id.setter
def id(self, value: int):
self._id = value
@property
def layout(self):
return self.datatype.lower() + '_' + str(len(self.elements))
def to_json(self) -> Dict:
return {
"id": self._id,
"name": self.name,
"description": self.description,
"is_configured": self.is_configured,
"is_source": self.is_source,
"is_destination": self.is_destination,
"datatype": self.datatype,
"keep_us": self.keep_us,
"locked": self.locked,
"active": self.active,
"decimate": self.decimate,
"elements": [e.to_json() for e in self.elements]
}
def from_json(json) -> DataStream:
my_stream = DataStream()
my_stream.id = json['id']
my_stream.name = json['name']
my_stream.description = json['description']
my_stream.datatype = json['datatype']
my_stream.decimate = json['decimate']
my_stream.keep_us = json['keep_us']
my_stream.is_configured = json['is_configured']
my_stream.is_source = json['is_source']
my_stream.is_destination = json['is_destination']
my_stream.locked = json['locked']
my_stream.active = json['active']
my_stream.elements = [elem_from_json(item) for item in json['elements']]
return my_stream
[docs]
class Element:
"""
API Element model. Streams have one or more elements. See :ref:`sec-streams` for details on the stream data model.
Parameters:
id (int): unique numeric ID assigned by Joule
index (int): column position in the data array (0 = first element)
name (str): element name
units (str): unit of measurement, may be any string
plottable (bool): should the element be visible in the Lumen plotting interface
display_type [continous|discrete|event]: plot type, defaults to continuous
offset (float): offset data visualization by ``y=(x-offset)*scale_factor``
scale_factor (float): scale data visualation with above equation
default_max (float): fix auto scale max (set to None to fit plotted data)
default_min (float): fix auto scale min (set to None to fit plotted data)
"""
def __init__(self, name: str = "", units: str = "",
plottable: bool = True,
display_type: str = 'continuous'):
self.id = None
self.index = None
self.name = name
self.units = units
self.plottable = plottable
self.display_type = display_type
self.offset = 0
self.scale_factor = 1.0
self.default_max = None
self.default_min = None
def __repr__(self):
return "<joule.api.Element id=%r index=%r, name=%r units=%r plottable=%r display_type=%r>" % (
self.id, self.index, self.name,
self.units, self.plottable,
self.display_type)
def to_json(self) -> Dict:
return {
'id': self.id,
'index': self.index,
'name': self.name,
'units': self.units,
'plottable': self.plottable,
'display_type': self.display_type,
'offset': self.offset,
'scale_factor': self.scale_factor,
'default_max': self.default_max,
'default_min': self.default_min
}
def elem_from_json(json) -> Element:
my_elem = Element()
my_elem.id = json['id']
my_elem.index = json['index']
my_elem.name = json['name']
my_elem.units = json['units']
my_elem.plottable = json['plottable']
my_elem.display_type = json['display_type']
my_elem.offset = json['offset']
my_elem.scale_factor = json['scale_factor']
my_elem.default_min = json['default_min']
my_elem.default_max = json['default_max']
return my_elem
[docs]
class DataStreamInfo:
"""
API DataStreamInfo model. Received from :meth:`Node.data_stream_info` and should not be created directly.
.. warning::
Rows and Bytes values are approximate
Parameters:
start (int): timestamp in UNIX microseconds of the first data element
end (int): timestamp in UNIX microsseconds of the last data element
rows (int): approximate rows of data in the stream
bytes (int): approximate size of the data on disk
total_time (int): data duration in microseconds (start-end)
"""
def __init__(self, start: Optional[int], end: Optional[int], rows: int,
total_time: int = 0, bytes: int = 0):
self.start = start
self.end = end
self.rows = rows
self.bytes = bytes
self.total_time = total_time
def __repr__(self):
return "<joule.api.DataStreamInfo start=%r end=%r rows=%r, total_time=%r>" % (
self.start, self.end, self.rows, self.total_time)
def info_from_json(json) -> DataStreamInfo:
if json is not None:
return DataStreamInfo(json['start'],
json['end'],
json['rows'],
json['total_time'],
json['bytes'])
else:
return DataStreamInfo(None,
None,
0,
0,
0)
async def data_stream_delete(session: BaseSession,
stream: Union[DataStream, str, int]):
data = {}
if type(stream) is DataStream:
data["id"] = stream.id
elif type(stream) is int:
data["id"] = stream
elif type(stream) is str:
data["path"] = stream
else:
raise errors.ApiError("Invalid stream datatype. Must be DataStream, Path, or ID")
await session.delete("/stream.json", data)
async def data_stream_create(session: BaseSession,
stream: DataStream, folder: Union[Folder, str, int]) -> DataStream:
data = {"stream": stream.to_json()}
if type(folder) is Folder:
data["dest_id"] = folder.id
elif type(folder) is int:
data["dest_id"] = folder
elif type(folder) is str:
data["dest_path"] = folder
else:
raise errors.ApiError("Invalid folder datatype. Must be Folder, Path, or ID")
resp = await session.post("/stream.json", json=data)
return from_json(resp)
async def data_stream_info(session: BaseSession,
stream: Union[DataStream, str, int]) -> DataStreamInfo:
data = {}
if type(stream) is DataStream:
data["id"] = stream.id
elif type(stream) is int:
data["id"] = stream
elif type(stream) is str:
data["path"] = stream
else:
raise errors.ApiError("Invalid stream datatype. Must be DataStream, Path, or ID")
resp = await session.get("/stream.json", data)
return info_from_json(resp['data_info'])
async def data_stream_get(session: BaseSession,
stream: Union[DataStream, str, int]) -> DataStream:
data = {}
if type(stream) is DataStream:
data["id"] = stream.id
elif type(stream) is int:
data["id"] = stream
elif type(stream) is str:
data["path"] = stream
else:
raise errors.ApiError("Invalid stream datatype. Must be DataStream, Path, or ID")
data["no-info"]=''
resp = await session.get("/stream.json", data)
return from_json(resp)
async def data_stream_update(session: BaseSession,
stream: DataStream) -> None:
await session.put("/stream.json", {"id": stream.id,
"stream": stream.to_json()})
async def data_stream_move(session: BaseSession,
source: Union[DataStream, str, int],
destination: Union[Folder, str, int]) -> None:
data = {}
if type(source) is DataStream:
data["src_id"] = source.id
elif type(source) is int:
data["src_id"] = source
elif type(source) is str:
data["src_path"] = source
else:
raise errors.ApiError("Invalid source datatype. Must be DataStream, Path, or ID")
if type(destination) is Folder:
data["dest_id"] = destination.id
elif type(destination) is int:
data["dest_id"] = destination
elif type(destination) is str:
data["dest_path"] = destination
else:
raise errors.ApiError("Invalid destination datatype. Must be Folder, Path, or ID")
await session.put("/stream/move.json", data)
async def data_stream_annotation_delete(session: BaseSession,
stream: Union[DataStream, str, int],
start: Optional[int] = None,
end: Optional[int] = None):
data = {}
if start is not None:
data["start"] = start
if end is not None:
data["end"] = end
if type(stream) is DataStream:
data["stream_id"] = stream.id
elif type(stream) is int:
data["stream_id"] = stream
elif type(stream) is str:
data["stream_path"] = stream
else:
raise errors.ApiError("Invalid source datatype. Must be DataStream, Path, or ID")
await session.delete("/stream/annotations.json", params=data)