import numpy as np
import asyncio
import logging
from joule.models.pipes import Pipe
from joule.models.pipes.errors import PipeError, EmptyPipe
Loop = asyncio.AbstractEventLoop
log = logging.getLogger('joule')
[docs]
class LocalPipe(Pipe):
"""
Pipe for intra-module communication.
Args:
layout: ``datatype_width``, for example ``float32_3`` for a three element stream
must. See DataStream.layout
Keyword Args:
name: useful for debugging with multiple pipes
close_cb: callback coroutine executed when pipe closes
debug: enable to log pipe usage events
"""
def __init__(self, layout: str, name: str = None,
close_cb=None, debug: bool = False, stream=None, write_limit=0):
super().__init__(name=name, layout=layout, stream=stream)
# tunable constants
self.TIMEOUT_INTERVAL = 0.5
self.debug = debug
self._interval_break = False
self.closed = False
self.read_buffer = np.empty((0,), dtype=self.dtype)
self.close_cb = close_cb
self._last_read = False # flag to indicate pipe only has previously read data (see input_pipe.py)
self._reread = False
# initialize buffer and queue
self.queue = asyncio.Queue(maxsize=write_limit)
self.queued_rows = 0
self.last_index = 0
self.direction = Pipe.DIRECTION.TWOWAY
# caching
self._caching = False
self._cache_index = 0
self._cache = None
[docs]
async def read(self, flatten=False) -> np.ndarray:
if self._failed:
await self.close()
raise PipeError('pipe failed')
# if reread is set just return the old data
if self._reread:
self._reread = False
if len(self.read_buffer) == 0:
raise PipeError("No data left to reread")
return self._format_data(self.read_buffer, flatten)
self._interval_break = False
# if the queue is empty and we have old data, just return the old data
# THIS IS REMOVED, OTHERWISE THE WRITER CAN BE STARVED AND NEVER CLOSE THE PIPE
# if self.queue.empty() and len(self.read_buffer) > 0:
# await asyncio.sleep(self.TIMEOUT_INTERVAL)
# return self._format_data(self.read_buffer, flatten)
# otherwise wait for at least one block
while self.queue.empty():
# if self._last_read:
# raise EmptyPipe() # trying to re-read old data
# if the buffer is empty and the queue is empty and the pipe is closed
if self.queue.empty() and self.closed:
self._last_read = True # from now on the is_empty flag will be set
# but an error will only be generated if all the remaining data is consumed
break # return unconsumed data
await asyncio.sleep(self.TIMEOUT_INTERVAL)
data_block = self._read(flatten)
# NOTE: There is a chance read will return an empty array-> if the producer simply closes the existing
# interval but all of the data is already consumed. This happens typically when a module fails and has to
# be restarted, then the inserter pipe will have no data (probably already has been read), but the terminating
# worker adds in an interval closing block [None] to the pipe. But if the producer also closes the pipe there
# is no reason to pass back empty data so raise an EmptyPipe exception instead
if len(data_block) == 0 and self.closed:
raise EmptyPipe()
return data_block
[docs]
def read_nowait(self, flatten=False):
"""
Same as read but this is not a coroutine. This should only be used for unit testing.
Args:
flatten:
Returns:
numpy.ndarray
>>> data = pipe.read_nowait()
[1, 2, 3]
"""
if self._failed:
raise PipeError('pipe failed')
# if reread is set just return the old data
if self._reread:
self._reread = False
if len(self.read_buffer) == 0:
raise PipeError("No data left to reread")
return self._format_data(self.read_buffer, flatten)
# if the queue is empty and we have old data, just return the old data
if self.queue.empty() and len(self.read_buffer) > 0:
return self._format_data(self.read_buffer, flatten)
# if the buffer is empty and the queue is empty and the pipe is closed
if self.queue.empty() and len(self.read_buffer) == 0 and self.closed:
raise EmptyPipe()
# do not wait for new data, return an empty array if nothing else is available
return self._read(flatten)
def _read(self, flatten=False):
# now put all the queued data together in a single array with the previous data
# this cannot be interrupted, relies on the total size of data written to the pipe
start = 0
end = len(self.read_buffer)
buffer = np.empty((self.queued_rows + end,), self.dtype)
if self.debug:
print("[%s:read] initialized %d row buffer" % (self.name, len(buffer)))
print("[%s:read] adding %d rows of unconsumed data" % (self.name, len(self.read_buffer)))
buffer[start:end] = self.read_buffer
start = end
while not self.queue.empty():
block = self.queue.get_nowait()
if block is None:
self._interval_break = True
break
end = start + len(block)
buffer[start:end] = block
start = end
self.queued_rows -= len(block)
self.read_buffer = buffer[:end]
if self.debug:
print("[%s:read] returning %d rows" % (self.name, len(self.read_buffer)))
return self._format_data(self.read_buffer, flatten)
[docs]
def reread_last(self):
if len(self.read_buffer) == 0:
raise PipeError("No data left to reread")
self._reread = True
@property
def end_of_interval(self):
return self._interval_break
def is_empty(self):
if self._last_read:
return True
if self.queue.empty() and len(self.read_buffer) == 0 and self.closed:
return True
return False
[docs]
def consume(self, num_rows):
if num_rows == 0:
return
if num_rows < 0:
raise PipeError("consume called with negative offset: %d" % num_rows)
if num_rows > len(self.read_buffer):
raise PipeError("cannot consume %d rows: only %d available"
% (num_rows, len(self.read_buffer)))
if self.debug:
print("[%s:read] consumed %d rows" % (self.name, num_rows))
self.read_buffer = self.read_buffer[num_rows:]
[docs]
def consume_all(self):
return self.consume(len(self.read_buffer))
[docs]
async def write(self, data: np.ndarray):
if self._failed:
await self.close()
raise PipeError('pipe failed')
if self.closed:
raise PipeError("Cannot write to a closed pipe")
if not self._validate_data(data):
return
# convert into a structured array
sarray = self._apply_dtype(data)
if self._caching:
for row in sarray:
self._cache[self._cache_index] = row
self._cache_index += 1
if self._cache_index >= len(self._cache):
await self.flush_cache()
else:
await self._write(sarray)
async def _write(self, sarray):
# send data to subscribers
for pipe in self.subscribers:
await pipe.write(sarray)
# if the queue size is infinite do not wait
if self.queue.maxsize <= 0:
self.queue.put_nowait(sarray)
else:
# wait until a slot is available
await self.queue.put(sarray)
self.queued_rows += len(sarray)
await asyncio.sleep(0)
if self.debug:
print("[%s:write] queueing block with [%d] rows" % (self.name, len(sarray)))
def write_nowait(self, data):
if self._failed:
raise PipeError('pipe failed')
if self.closed:
raise PipeError("Cannot write to a closed pipe")
if not self._validate_data(data):
return
# convert into a structured array
sarray = self._apply_dtype(data)
# send data to subscribers
for pipe in self.subscribers:
if type(pipe) is LocalPipe:
p: LocalPipe = pipe # to appease type checker
p.write_nowait(sarray)
else:
raise PipeError("cannot write_nowait to subscriber [%s]" % pipe.name)
self.queue.put_nowait(sarray)
self.queued_rows += len(sarray)
if self.debug:
print("[%s:write] queueing block with [%d] rows" % (self.name, len(sarray)))
[docs]
def enable_cache(self, lines: int):
self._caching = True
self._cache = np.empty(lines, self.dtype)
self._cache_index = 0
[docs]
async def flush_cache(self):
if self.closed:
raise PipeError("Cannot write to a closed pipe")
if self._cache_index > 0:
await self._write(self._cache[:self._cache_index])
self._cache_index = 0
self._cache = np.empty(len(self._cache), self.dtype)
[docs]
async def close_interval(self):
if self._failed:
raise PipeError('pipe failed')
if self.closed:
raise PipeError("Cannot write to a closed pipe")
if self.debug:
print("[%s:write] closing interval" % self.name)
if self._caching:
await self.flush_cache()
await self.queue.put(None)
# close intervals in any subscribers
for pipe in self.subscribers:
await pipe.close_interval()
[docs]
def close_interval_nowait(self):
"""
Same as close_interval but this is not a coroutine. This should only be used for
unit testing
"""
if self._failed:
raise PipeError('pipe failed')
if self.debug:
print("[%s:write] closing interval" % self.name)
self.queue.put_nowait(None)
# close intervals in any subscribers
for pipe in self.subscribers:
pipe.close_interval_nowait()
def change_layout(self, layout: str):
self._layout = layout
self.read_buffer = np.empty((0,), dtype=self.dtype)
self.queued_rows = 0
self.last_index = 0
# caching
if self._caching:
self._cache = np.empty(len(self._cache), self.dtype)
self._cache_index = 0
[docs]
async def close(self):
if self.closed:
return
if self._caching:
await self.flush_cache()
# close any subscribers
for pipe in self.subscribers:
await pipe.close()
self.closed = True
if self.close_cb is not None:
await self.close_cb()
[docs]
def close_nowait(self):
"""
Same as close but this is not a coroutine. This should only be used for
unit testing
"""
if len(self.subscribers) > 0:
raise PipeError("cannot close_nowait subscribers, use async")
if self.close_cb is not None:
raise PipeError("close_cb cannot be executed, use async")
self.closed = True