"""Class for monitoring a COM port using pyserial on a separate thread so that
that the main UI thread is not blocked or slowed. Code adapted from
https://github.com/mba7/SerialPort-RealTime-Data-Plotter/blob/master/com_monitor.py
"""
import threading, serial
try:
from Queue import Empty
except ImportError: # pragma: no cover
#Why couldn't they have called it queue from the start in py2?
from queue import Empty
from liveserial import msg
rxdelims = {}
"""dict: keys are port names; values are compiled regex objects. We wanted to
put these in the thread object, but then it can't be pickled, which makes issues
for the multiprocessing. Instead, we just have to do a dict lookup in the local
:meth:`Thread.run()`.
"""
[docs]class ComMonitorThread(threading.Thread):
""" A thread for monitoring a COM port. The COM port is
opened when the thread is started.
Args:
data_q (multiprocessing.Queue):
Queue for received data. Items in the queue are
(data, timestamp) pairs, where data is a binary
string representing the received data, and timestamp
is the time elapsed from the thread's start (in
seconds).
error_q (multiprocessing.Queue):
Queue for error messages. In particular, if the
serial port fails to open for some reason, an error
is placed into this queue.
port (str):
The COM port to open. Must be recognized by the
system.
port_baud (int):
Rate at which information is transferred in a communication channel
(in bits/second).
stopbits (float): Serial communication parameter; one of (1, 1.5, 2).
parity: (str): Serial communication parameter; one of ['N', 'E', 'O', 'M', 'S'].
port_timeout (float):
The timeout used for reading the COM port. If this
value is low, the thread will return data in finer
grained chunks, with more accurate timestamps, but
it will also consume more CPU.
listener (bool): specifies that this COMThread is a listener, which prints
out the raw stream in real-time, but doesn't analyze it.
infer_limit (int): number of raw lines to consider before reaching consensus
on the format of the data (when inferring). If None, then a format
inferrer is not created.
virtual (bool): when True, additional serial port parameters are set so that
the monitor can work with `socat` or other virtual ports.
encoding (str): encoding type used to decode the byte stream from the serial
port.
delimiter (str): regex describing the sequence of characters used to
separate columns of values in a single line from the serial port.
Attributes:
alive (threading.Event): event for asynchronously handling the reads from
the serial port.
serial_arg (dict): arguments used to contstruct the :class:`serial.Serial`.
serial_port (serial.Serial): serial instance for communication.
sensors (dict): keys are sensor names; values are
:class:`liveserial.config.Sensor` instances used to
parse raw lines read from the serial port.
inferrer (FormatInferrer): for inferring the format in the absence of
configured sensor structure.
"""
def __init__(self, data_q, error_q, port, port_baud,
port_stopbits=serial.STOPBITS_ONE,
port_parity=serial.PARITY_NONE, port_timeout = 0.01,
listener=False, virtual=False, infer_limit=15,
encoding="UTF-8", delimiter=r"\s"):
threading.Thread.__init__(self)
self.port = port
self.serial_port = None
self.serial_arg = dict(port = port,
baudrate = int(port_baud),
stopbits = float(port_stopbits),
parity = port_parity,
timeout = float(port_timeout))
if virtual:
msg.std("Running in virtual serial port mode.", 2)
self.serial_arg["dsrdtr"] = True
self.serial_arg["rtscts"] = True
self.data_q = data_q
self.error_q = error_q
self.listener = listener
self.encoding = encoding
import re
self.delimiter = delimiter
global rxdelims
rxdelims[port] = re.compile(delimiter)
self.sensors = {}
self._manual_sensors = False
"""bool: when True, the sensors list was constructed manually using a
configuration file; otherwise, it was inferred from the first few lines
of data from the serial port.
"""
if infer_limit is not None:
self.inferrer = FormatInferrer(infer_limit)
else: # pragma: no cover
self.inferrer = None
self.alive = threading.Event()
self.alive.set()
@staticmethod
[docs] def from_port(port, port_baud=9600, virtual=False):
"""Returns a COMMonitor instance for the specified port using the
default configurati of port parameters and with inferrence for the structure
of the data.
Args:
port (str):
The COM port to open. Must be recognized by the
system.
port_baud (int):
Rate at which information is transferred in a communication channel
(in bits/second).
"""
from multiprocessing import Queue
dataq = Queue()
errorq = Queue()
return ComMonitorThread(dataq, errorq, port, port_baud, virtual=virtual)
@staticmethod
[docs] def from_config(config, port, dataq=None, errorq=None, listener=False,
sfilter=None):
"""Returns a COMMonitor instance from the specified configuration
parser.
Args:
config (ConfigParser): instance from which to extract the sensor list
and port information. `str` is also allowed, in which case it
should be the path to the config file to load.
port (str): name of the port to configure for.
data_q (multiprocessing.Queue):
Queue for received data. Items in the queue are
(data, timestamp) pairs, where data is a binary
string representing the received data, and timestamp
is the time elapsed from the thread's start (in
seconds).
error_q (multiprocessing.Queue):
Queue for error messages. In particular, if the
serial port fails to open for some reason, an error
is placed into this queue.
listener (bool): specifies that this COMThread is a listener, which prints
out the raw stream in real-time, but doesn't analyze it.
sfilter (list): of sensor names that should be *included* in the
monitor. By default, all sensors in the config are included that
match the port.
Returns:
ComMonitorThread: instance created using the configuration
parameters.
"""
from multiprocessing import Queue
#We allow the user to choose to use a common data and error queue
#between all the threads. If they don't specify one, then we will just
#create a new one.
#This method is usually called from the entry script, which allows all
#the ports to share the same multiprocessing queues by default.
if dataq is None: # pragma: no cover
dataq = Queue()
if errorq is None: # pragma: no cover
errorq = Queue()
from liveserial.config import ports
params = ports(config, port)
vtext = "{}: using {} as config-set serial parameters."
msg.std(vtext.format(port, params))
result = ComMonitorThread(dataq, errorq, port, listener=listener,
**params)
from liveserial.config import sensors, Sensor
sdict = sensors(config, port=port, monitor=result)
for sensor, instance in sdict.items():
if (sfilter is None or sfilter == "all") or sensor in sfilter:
result.add_sensor(sensor, instance)
return result
[docs] def add_sensor(self, name, sensor):
"""Adds a sensor to the COM monitors active list. Sensors in the active
list have their data parsed and pushed to the data queue. If no sensors
are added to the list, the monitor will try to infer the sensor
configuration using the first few raw data samples.
Args:
sensor (liveserial.config.Sensor): instance with parsed
configuration values.
"""
self._manual_sensors = True
self.sensors[name] = sensor
[docs] def run(self):
"""Starts the COM monitoring thread. If an existing serial connection is
open, it will be closed and a new one will be created. The monitoring
will continue indefinitely until :meth:`join` is called.
"""
try:
if self.serial_port: # pragma: no cover
self.serial_port.close()
self.serial_port = serial.Serial(**self.serial_arg)
msg.info("Serial port communication enabled.", 2)
except serial.SerialException as e: # pragma: no cover
self.error_q.put(e.strerror)
return
from time import time
start = time()
lastlisten = None
decoderr = 0 #Number of types the decode has failed.
rxsplit = rxdelims[self.port]
while self.alive.isSet():
line = self.serial_port.readline()
if self.listener:
if lastlisten is None or time() - lastlisten > 0.05:
print(line)
lastlisten = time()
#This is crazy to me... The if statement above and its contents
#are covered by the unit tests, but coverage claims that this
#continue statement is not...
continue # pragma: no cover
try:
raw = line.decode(self.encoding).strip()
except: # pragma: no cover
#It is too much hassle to get our port data simulator to simulate garbage
#for now. TODO: update the simulator to randomly spew out garbage.
if len(line) > 0:
decoderr += 1
if decoderr < 3:
emsg = "Couldn't decode line {} using {}."
msg.warn(emsg.format(line, self.encoding), -1)
raw = rxsplit.split(raw)
if len(raw) == 0: # pragma: no cover
#No data read from the stream.
continue
vals, sensor = None, None
if self._manual_sensors:
for osensor in self.sensors.values():
vals, skey = osensor.parse(raw), osensor.key
sensor = osensor.name
#The parsing will return None if the raw line does not apply
#to its configuration. The moment we found the one that does
#apply, there is not sense in still checking.
if vals is not None:
break
else:
#We try to infer the structure of the data from the raw line.
if self.inferrer is not None:
vals, sensor = self.inferrer.parse(raw)
if vals is not None and len(vals) > 0:
if sensor is None:
#We use the id of the com thread (which corresponds
#one-to-one with the serial ports) as the sensor key; that
#way, multiple inferred, null-key sensors from different
#ports can still be differentiated in the logs.
sensor = id(self)
timestamp = time() - start
self.data_q.put((sensor, timestamp) + tuple(vals))
#else:
# There must have been a communication glitch; just ignore
# this data point.
# clean up
if self.serial_port:
self.serial_port.close()
[docs] def join(self, timeout=None, terminate=True):
"""Tells the thread monitoring the COM port to clean up and return.
Args:
timeout (float): number of seconds (or fractions of seconds) to wait
until returning. If `None`, then the operation will
block until the thread terminates. See also
:meth:`threading.Thread.join`.
terminate (bool): when True, the data collection is told to stop before
trying to join the underlying thread; otherwise, the thread will
keep processing data until join is called with terminate=True.
"""
if terminate:
self.alive.clear()
threading.Thread.join(self, timeout)
[docs]def enumerate_serial_ports():
"""Scans for available serial ports.
Returns:
list: of `str` with the availables port names.
"""
from os import name
from glob import glob
if name == 'nt': # pragma: no cover
#We don't have CI setup for windows at the moment.
from os import waitpid, path
from subprocess import Popen, PIPE
cmd = 'powershell -c "[System.IO.Ports.SerialPort]::getportnames()"'
pps = Popen(cmd, stdout=PIPE, stderr=PIPE)
output, error = pps.communicate()
if len(error) > 0:
msg.error(''.join([s.decode("UTF-8") for s in error]))
result = []
for oline in output.decode("UTF-8").strip().split():
if len(oline) > 0:
result.append(oline)
return result
else:
return glob('/dev/tty.*')
[docs]def get_all_from_queue(Q):
"""Generator to yield one after the others all items currently in the queue Q,
without any waiting.
Args:
Q (Queue.Queue): queue to empty items from.
"""
msg.info("Retrieving entire queue.", 3)
try:
while True:
yield Q.get_nowait()
except Empty:
raise StopIteration
[docs]def get_item_from_queue(Q, timeout=0.01):
""" Attempts to retrieve an item from the queue Q. If Q is
empty, None is returned.
Args:
Q (Queue.Queue): queue to get an item from.
timeout (float):
Blocks for 'timeout' seconds in case the queue is empty,
so don't use this method for speedy retrieval of multiple
items (use get_all_from_queue for that).
"""
try:
item = Q.get(True, 0.01)
except Empty:
return None
return item
[docs]class LiveDataFeed(object):
"""A simple "live data feed" abstraction that allows a reader to read the most
recent data and find out whether it was updated since the last read.
Attributes:
has_new_data (dict): A boolean attribute telling the reader whether the
data was updated since the last read; keyed by sensor identifier.
cur_data (dict): most recent data point placed on the feed; keyed by the
sensor identifier.
"""
def __init__(self):
self.cur_data = {}
self.has_new_data = {}
[docs] def add_data(self, sensor, data):
"""Add new data to the feed.
Args:
sensor (str): sensor identifier for the data point.
"""
self.cur_data[sensor] = data
self.has_new_data[sensor] = True
[docs] def read_data(self, sensor):
"""Returns the most recent data."""
self.has_new_data[sensor] = False
return self.cur_data[sensor]