Serial Port Simulation¶
For unit testing, we setup virtual serial ports. On UNIX-based systems, we can
use socat to create the virtual ports. Once the linked, virtual ports exist,
we write random data to them (with fixed random seed) using a simulator
thread. The simulator is defined in
liveserial.simulator.ComSimulatorThread
and an entry script for
creating a local simulator is included in simport.py.
Thread for simulating writes to a virtual port.
-
class
liveserial.simulator.
ComSimulatorThread
(port='lscom-w', sensors=['W', None, 'K'], dataform=[(<type 'int'>, <type 'float'>), (<type 'float'>, <type 'float'>, <type 'float'>), (<type 'int'>, <type 'float'>)], seed=42)[source]¶ Simulates a sine wave, masquerading as a separate COM port on the machine so that we can unit test the code against it.
Parameters: - port (str) – name of the simulated port to write to.
- dataform (dict) – keys are sensor ids; values are tuples of type that specifies how a row of simulated data will look when written to the COM port.
- sensors (list) – of str giving sensor ids for which data will be randomly generated with equal probability between each sensor. If the sensor id is None, then no sensor key will be written to the stream.
- seed (int) – random seed so the values are predictable.
-
alive
¶ threading.Event – event for asynchronously handling the reads from the serial port.
Examples
Write three random columns with data types int, float and float to COM3 without any sensor identifying column.
>>> from liveserial.simulator import ComSimulatorThread >>> cst = ComSimulatorThread("COM3", sensors=[None], ... dataform=[(int, float, float)]) >>> cst.start()
Note that the writing happens in its own thread (
ComSimulatorThread
inherits fromthreading.Thread
), so it will run indefinitely if the thread is not joined. A typical use case is:>>> import signal >>> def exit_handler(signal, frame): ... cst.join(1) >>> signal.signal(signal.SIGINT, exit_handler) >>> cst.start()
This wires the signal interrupt request (usually raised by pressing ^C) to join the simulator thread.
-
join
(timeout=None, terminate=True)[source]¶ Tells the thread simulating the COM port to clean up and return.
Parameters: timeout (float) – number of seconds (or fractions of seconds) to wait until returning. If None, then the operation will block until the thread terminates. See also threading.Thread.join()
.