Serial Port Simulation

For unit testing, we setup virtual serial ports. On UNIX-based systems, we can use socat to create the virtual ports. Once the linked, virtual ports exist, we write random data to them (with fixed random seed) using a simulator thread. The simulator is defined in liveserial.simulator.ComSimulatorThread and an entry script for creating a local simulator is included in simport.py.

Thread for simulating writes to a virtual port.

class liveserial.simulator.ComSimulatorThread(port='lscom-w', sensors=['W', None, 'K'], dataform=[(<type 'int'>, <type 'float'>), (<type 'float'>, <type 'float'>, <type 'float'>), (<type 'int'>, <type 'float'>)], seed=42)[source]

Simulates a sine wave, masquerading as a separate COM port on the machine so that we can unit test the code against it.

Parameters:
  • port (str) – name of the simulated port to write to.
  • dataform (dict) – keys are sensor ids; values are tuples of type that specifies how a row of simulated data will look when written to the COM port.
  • sensors (list) – of str giving sensor ids for which data will be randomly generated with equal probability between each sensor. If the sensor id is None, then no sensor key will be written to the stream.
  • seed (int) – random seed so the values are predictable.
alive

threading.Event – event for asynchronously handling the reads from the serial port.

Examples

Write three random columns with data types int, float and float to COM3 without any sensor identifying column.

>>> from liveserial.simulator import ComSimulatorThread
>>> cst = ComSimulatorThread("COM3", sensors=[None],
    ... dataform=[(int, float, float)])
>>> cst.start()

Note that the writing happens in its own thread (ComSimulatorThread inherits from threading.Thread), so it will run indefinitely if the thread is not joined. A typical use case is:

>>> import signal
>>> def exit_handler(signal, frame):
    ... cst.join(1)
>>> signal.signal(signal.SIGINT, exit_handler)
>>> cst.start()

This wires the signal interrupt request (usually raised by pressing ^C) to join the simulator thread.

join(timeout=None, terminate=True)[source]

Tells the thread simulating the COM port to clean up and return.

Parameters:timeout (float) – number of seconds (or fractions of seconds) to wait until returning. If None, then the operation will block until the thread terminates. See also threading.Thread.join().
run()[source]

Starts simulating the communication. This method should not be called directly. Instead, it is invoked automatically when start() is called on this thread object.