Source code for liveserial.simulator

"""Thread for simulating writes to a virtual port.
"""
import threading
from liveserial import msg
[docs]class ComSimulatorThread(threading.Thread): """Simulates a sine wave, masquerading as a separate COM port on the machine so that we can unit test the code against it. Args: port (str): name of the simulated port to write to. dataform (dict): keys are sensor ids; values are tuples of `type` that specifies how a row of simulated data will look when written to the COM port. sensors (list): of `str` giving sensor ids for which data will be randomly generated with equal probability between each sensor. If the sensor id is `None`, then no sensor key will be written to the stream. seed (int): random seed so the values are predictable. Attributes: alive (threading.Event): event for asynchronously handling the reads from the serial port. Examples: Write three random columns with data types `int`, `float` and `float` to `COM3` *without* any sensor identifying column. >>> from liveserial.simulator import ComSimulatorThread >>> cst = ComSimulatorThread("COM3", sensors=[None], ... dataform=[(int, float, float)]) >>> cst.start() Note that the writing happens in its own thread (:class:`ComSimulatorThread` inherits from :class:`threading.Thread`), so it will run indefinitely if the thread is not joined. A typical use case is: >>> import signal >>> def exit_handler(signal, frame): ... cst.join(1) >>> signal.signal(signal.SIGINT, exit_handler) >>> cst.start() This wires the signal interrupt request (usually raised by pressing ^C) to join the simulator thread. """ def __init__(self, port="lscom-w", sensors=["W", None, "K"], dataform=[(int, float), (float, float, float), (int, float)], seed=42): threading.Thread.__init__(self) self.dataform = {s: d for s, d in zip(sensors, dataform)} self.sensors = sensors from os import name if name == 'nt': # pragma: no cover self.port = port else: self.port = "/dev/tty.{}".format(port) from serial import Serial self.serial = Serial(self.port, 9600, dsrdtr=True, rtscts=True) self.seed = seed self.alive = threading.Event() self.alive.set()
[docs] def run(self): """Starts simulating the communication. This method should not be called directly. Instead, it is invoked automatically when :meth:`start` is called on this thread object. """ import random, time, math import os #Seed the random number generator so that it always produces the same #values for the random variables. random.seed(self.seed) while self.alive.isSet(): #Choose one of the sensors at random to generate data for. randsense = int(len(self.sensors)*random.random()) sensor = self.sensors[randsense] if sensor is not None: raw = [sensor] else: raw = [] for t in self.dataform[sensor]: if t is int: raw.append(random.randint(0, 100)) elif t is float: raw.append(random.randint(-1, 1) + random.random()) data = ' '.join([str(d) for d in raw]) + os.linesep #Usually people encode with UTF-8. However, we know that our data is #really simple and ASCII takes less space. x = self.serial.write(data.encode("ascii")) time.sleep(0.0025) if self.serial: self.serial.close()
[docs] def join(self, timeout=None, terminate=True): """Tells the thread simulating the COM port to clean up and return. Args: timeout (float): number of seconds (or fractions of seconds) to wait until returning. If `None`, then the operation will block until the thread terminates. See also :meth:`threading.Thread.join`. """ if terminate: self.alive.clear() self.serial.cancel_write() self.serial.flushOutput() threading.Thread.join(self, timeout)