Source: example_modules/jouleexamples/example_reader.py
  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 #!/usr/bin/env python3 from joule import ReaderModule from joule.utilities import time_now import asyncio import numpy as np class ExampleReader(ReaderModule): "Example reader: generates random values" async def run(self, parsed_args, output): while True: value = np.random.rand() # data from sensor await output.write(np.array([[time_now(), value]])) await asyncio.sleep(1) def main(): r = ExampleReader() r.start() if __name__ == "__main__": main() 

Reader modules should extend the base joule.ReaderModule class. The child class must implement the joule.ReaderModule.run() coroutine which should perform the following in a loop:

1. Read data from the input
2. Timestamp data with Unix microseconds
3. Insert data into the output stream
4. Sleep to create the data rate

Line 11 reads data from the input (a random number function). Line 12 timestamps the data and inserts it into the output stream. Line 13 sleeps for one second creating a 1Hz sample rate. Note that the asyncio.sleep coroutine is used instead of the time.sleep function.

Note

The loop structure shown above should only be used for low bandwidth data sources. For higher bandwidth data pipe caching should be enabled or the data should be written in chunks as shown below. Write frequency should be 1Hz or lower to reduce inter-process communication and network overhead.

Source: example_modules/jouleexamples/high_bandwidth_reader.py
  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 #!/usr/bin/env python3 from joule import ReaderModule from joule.utilities import time_now import asyncio import numpy as np class HighBandwidthReader(ReaderModule): """ Produce a 1Hz ramp sampled at [rate] Hz """ def custom_args(self, parser): grp = parser.add_argument_group("module", "module specific arguments") grp.add_argument("--rate", type=float, required=True, help="sample rate in Hz") async def run(self, parsed_args, output): start_ts = time_now() # run 5 times per second period = 1 samples_per_period = np.round(parsed_args.rate * period) while True: end_ts = start_ts + period * 1e6 ts = np.linspace(start_ts, end_ts, samples_per_period, endpoint=False) vals = np.linspace(0, 33, samples_per_period) start_ts = end_ts chunk = np.hstack((ts[:, None], vals[:, None])) await output.write(chunk) await asyncio.sleep(period) def main(): r = HighBandwidthReader() r.start() if __name__ == "__main__": main() 
Source: example_modules/jouleexamples/intermittent_reader.py
  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 #!/usr/bin/env python3 from joule import ReaderModule from joule.utilities import time_now import asyncio import numpy as np import logging ERROR_PROBABILITY = 0.25 class IntermittentReader(ReaderModule): """ Like HighBandwidth reader with random data interruptions """ def custom_args(self, parser): grp = parser.add_argument_group("module", "module specific arguments") grp.add_argument("--rate", type=float, required=True, help="sample rate in Hz") async def run(self, parsed_args, output): start_ts = time_now() period = 1 samples_per_period = np.round(parsed_args.rate * period) while True: try: end_ts = start_ts + period * 1e6 ts = np.linspace(start_ts, end_ts, samples_per_period, endpoint=False) vals = np.linspace(0, 33, samples_per_period) start_ts = end_ts chunk = np.hstack((ts[:, None], vals[:, None])) # simulate an error if np.random.rand() < ERROR_PROBABILITY: raise ValueError await output.write(chunk) except ValueError: logging.error("simulated data interruption") await output.close_interval() await asyncio.sleep(period) def main(): r = IntermittentReader() r.start() if __name__ == "__main__": main()