Basic Reader

Source: example_modules/jouleexamples/example_reader.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/usr/bin/env python3

from joule import ReaderModule
from joule.utilities import time_now
import asyncio
import numpy as np


class ExampleReader(ReaderModule):
    "Example reader: generates random values"

    async def run(self, parsed_args, output):
        while True:
            value = np.random.rand()  # data from sensor
            await output.write(np.array([[time_now(), value]]))
            await asyncio.sleep(1)


def main():
    r = ExampleReader()
    r.start()


if __name__ == "__main__":
    main()

Reader modules should extend the base joule.ReaderModule class. The child class must implement the joule.ReaderModule.run() coroutine which should perform the following in a loop:

  1. Read data from the input
  2. Timestamp data with Unix microseconds
  3. Insert data into the output stream
  4. Sleep to create the data rate

Line 11 reads data from the input (a random number function). Line 12 timestamps the data and inserts it into the output stream. Line 13 sleeps for one second creating a 1Hz sample rate. Note that the asyncio.sleep coroutine is used instead of the time.sleep function.

Note

The loop structure shown above should only be used for low bandwidth data sources. For higher bandwidth data pipe caching should be enabled or the data should be written in chunks as shown below. Write frequency should be 1Hz or lower to reduce inter-process communication and network overhead.

High Bandwidth Reader

Source: example_modules/jouleexamples/high_bandwidth_reader.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
#!/usr/bin/env python3

from joule import ReaderModule
from joule.utilities import time_now
import asyncio
import numpy as np


class HighBandwidthReader(ReaderModule):
    """ Produce a 1Hz ramp sampled at [rate] Hz """

    def custom_args(self, parser):
        grp = parser.add_argument_group("module",
                                        "module specific arguments")
        grp.add_argument("--rate", type=float,
                         required=True,
                         help="sample rate in Hz")

    async def run(self, parsed_args, output):
        start_ts = time_now()
        # run 5 times per second
        period = 1
        samples_per_period = np.round(parsed_args.rate * period)
        while True:
            end_ts = start_ts + period * 1e6
            ts = np.linspace(start_ts, end_ts,
                             samples_per_period, endpoint=False)
            vals = np.linspace(0, 33, samples_per_period)
            start_ts = end_ts
            chunk = np.hstack((ts[:, None], vals[:, None]))
            await output.write(chunk)
            await asyncio.sleep(period)


def main():
    r = HighBandwidthReader()
    r.start()

if __name__ == "__main__":
    main()

Describe the argument parsing setup

Intermittent Reader

Another example showing how to handle sensor errors by creating intervals

Source: example_modules/jouleexamples/intermittent_reader.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/usr/bin/env python3

from joule import ReaderModule
from joule.utilities import time_now
import asyncio
import numpy as np
import logging

ERROR_PROBABILITY = 0.25


class IntermittentReader(ReaderModule):
    """ Like HighBandwidth reader with random data interruptions """

    def custom_args(self, parser):
        grp = parser.add_argument_group("module",
                                        "module specific arguments")
        grp.add_argument("--rate", type=float,
                         required=True,
                         help="sample rate in Hz")

    async def run(self, parsed_args, output):
        start_ts = time_now()
        period = 1
        samples_per_period = np.round(parsed_args.rate * period)
        while True:
            try:
                end_ts = start_ts + period * 1e6
                ts = np.linspace(start_ts, end_ts,
                                 samples_per_period, endpoint=False)
                vals = np.linspace(0, 33, samples_per_period)
                start_ts = end_ts
                chunk = np.hstack((ts[:, None], vals[:, None]))
                # simulate an error
                if np.random.rand() < ERROR_PROBABILITY:
                    raise ValueError
                await output.write(chunk)
            except ValueError:
                logging.error("simulated data interruption")
                await output.close_interval()
            await asyncio.sleep(period)


def main():
    r = IntermittentReader()
    r.start()


if __name__ == "__main__":
    main()