Source code for liveserial.log

"""Methods for grabbing data and logging it to CSV files.
"""
from liveserial import msg
[docs]class Logger(object): """Logs data points from the serial port to CSV. The arguments to this class constructor are also available as attributes on the class instance. Args: interval (int): how often (in milliseconds) to read data from the serial buffer. dataqs (list): of :class:`multiprocessing.Queue`; stores the data read in from the serial port. errorqs (list): of :class:`multiprocessing.Queue`; stores any error raised during serial port reading. livefeed (monitor.LiveDataFeed): feed for storing the latest data points obtained from the serial port. method (str): one of ['average', 'last'] specifies how to aggregate multiple data points in the buffer. logdir (str): directory to place log files in for the sensors. If `None`, then data will *not* be logged to disk. logfreq (int): how often (in seconds) to write the accumulated data points to CSV. plotting (bool): when False, the values being read off will be printed if logging is not enabled. config (ConfigParser): global sensor configuration parser; if the argument is a `str`, then the file path. aggregate (bool): when True, the logger should check sensor config for aggregate ports to be handled; otherwise, aggregate ports are ignored. Attributes: timer (threading.Timer): executes calls to the serial port reader to get the latest data and push it to the live feed. lastsave (float): timestamp indicating the last time the CSV file was appended to csvdata (dict): lists of sensor time and value readings, keyed by sensor identifier. config (ConfigParser or str): global sensor configuration parser. If `str`, then the config is loaded from the specified file path. aggregate (dict): keys are *aggregate* sensor names; values are functions that accept a dict of the latest physical sensor values, and return single, aggregated values for plotting. """ def __init__(self, interval, dataqs, livefeed, method="last", logdir=None, logfreq=10, plotting=False, config=None, aggregate=False): self.interval = interval #Our first business is to make sure that we have only a list of *unique* #data queues to be querying. self.dataqs = [] for dq in dataqs: if dq not in self.dataqs: self.dataqs.append(dq) self.livefeed = livefeed self.method = method from os import path, makedirs if logdir is not None: self.logdir = path.abspath(path.expanduser(logdir)) if not path.isdir(self.logdir): msg.info("Making new directory path '{}'.".format(self.logdir)) makedirs(self.logdir) else: self.logdir = None self.logfreq = logfreq self.lastsave = None self.csvdata = {} self.plotting = plotting self.timer = None self._timer_calls = 0 """Number of times that the timer has executed during the application run. """ self._cancel = False """When true, the main thread is trying to shut us down; don't start the timer again once it fires. """ self.config = config if aggregate: self.aggregate = self._sensor_aggregates() else: self.aggregate = None def _sensor_aggregates(self): """Returns the transformations for each aggregate sensor so that they can be quickly compiled each time the logger timer executes. """ from liveserial.config import sensors aggsensors = sensors(self.config, port="aggregate") result = {} for sensor, instance in aggsensors.items(): if (instance.transform is not None and instance.sensors is not None): def aggfun(aggdata): #We don't take the first index in the data element because #it is the time stamp, which must be common to all of them. v = [] t = None for s in instance.sensors: if s in aggdata: v.append(aggdata[s][1:]) if t is None: t = [aggdata[s][0]] for vals in zip(*v): t.append(instance.transform(vals)) return t result[sensor] = aggfun return result
[docs] def sensor_option(self, sensor, option, default=None, cast=None): """Returns the specified sensor option if available. Args: sensor (str): name of the sensor to return `option` for. option (str): option name. default: if the option is not configured, the default value to return. cast (function): if the raw value needs to be cast or transformed, the function to perform that transformation. """ from liveserial.config import sensors, Sensor s = sensors(self.config, sensor) if isinstance(s, Sensor): if hasattr(s, option): value = getattr(s, option) elif option in s.options: # pragma: no cover value = s.options[option] if cast is not None: # pragma: no cover return cast(value) else: return value else: # pragma: no cover return default
[docs] def ready(self, delay=None, wait=1.): """Returns True once we have accumulated a few timer calls of data. This ensures that we know how many sensors are running on the same COM port. Args: delay (float): fraction of a second to wait before checking if the data is there. """ if delay is not None: from time import sleep sleep(delay) havepts = False if len(self.csvdata) > 0: datapoints = sum([len(v) for v in self.csvdata.values()])/len(self.csvdata) havepts = datapoints > 2 return (self._timer_calls > wait/self.interval and (self.logdir is None or havepts))
[docs] def start(self): """Starts a new timer for the configured interval to gather data from the serial stream. """ if not self._cancel: from threading import Timer self.timer = Timer(self.interval, self._read_serial) self.timer.start()
[docs] def stop(self): """Stops the automatic collection and logging of data. Cleans up threads. """ self._cancel = True if self.timer is not None: self.timer.cancel() #Write whatever data is left over to CSV file. self._csv_append()
def _read_serial(self): """Reads the latest buffered serial information and places it onto the live feed for the application. """ self._timer_calls += 1 from liveserial.monitor import get_all_from_queue sensedata = {} havedata = False #Just iterate over the various queues we have and process their data. for dataq in self.dataqs: for qdata in get_all_from_queue(dataq): sensor = qdata[0] if sensor not in sensedata: sensedata[sensor] = [] sensedata[sensor].append(qdata[1:]) havedata = True # We average/discard the data in the queue to produce the single entry that # will be posted to the livefeed. if havedata: if self.aggregate: aggdata = {} for sensor, qdata in sensedata.items(): data = None if self.method == "average": #We use the last data point's time stamp as the authoritative #one for the averaged set. tstamp = qdata[-1][0] #For the values, we take a simple mean. from numpy import mean ldata = mean(qdata, axis=0) elif self.method == "last": ldata = qdata[-1] if ldata is not None: self.livefeed.add_data(sensor, ldata) if self.logdir is not None: if sensor not in self.csvdata: self.csvdata[sensor] = [] self.csvdata[sensor].extend(qdata) elif not self.plotting: # pragma: no cover print("{}: {}".format(sensor, ldata)) if self.aggregate: aggdata[sensor] = ldata #Now that we have processed all the physical sensors, let's see if #we have any aggregate sensors that need to be processed. if self.aggregate: for aggsense, aggfun in self.aggregate.items(): adata = aggfun(aggdata) self.livefeed.add_data(aggsense, adata) if self.logdir is not None: if aggsense not in self.csvdata: self.csvdata[aggsense] = [] self.csvdata[aggsense].append(adata) elif not self.plotting: # pragma: no cover print("{}: {}".format(aggsense, adata)) #Before we restart the timer again, see if we need to save the data to #CSV. self.save() self.start() def _csv_append(self): """Appends the new data points to the relevant CSV files for each of the sensor's whose data is being tracked. """ from os import path import csv for sensor in self.csvdata: if self.logdir is None: # pragma: no cover #This should never fire because of checks further up the chain. #it is here as as sanity check to keep the file system clean. continue #Let's figure out how many columns to post in the header. if len(self.csvdata[sensor]) > 0: #We subtract 1 because the time is handled separately. N = len(self.csvdata[sensor][0])-1 else: # pragma: no cover #No sense in writing to the file at this time; we have no #data to write! continue #Check if we have configuration settings available for this sensor. columns = None if self.config is not None: logids = self.sensor_option(sensor, "logging") #We have one issue with the column labeling. We have switched #the important value to be in position 1, whereas it could be #anywhere in the list. Fix the order of the columns. cols = self.sensor_option(sensor, "columns") if logids is None and cols is not None: # pragma: no cover #It is possible that the user will specify one or the other, #but I want to limit the number of concurrent streams for #the unit tests (especially for multi-port testing). logids = list(range(len(cols))) vindex = self.sensor_option(sensor, "value_index", [0]) #The other issue is that if the user limits the columns being #logged to include only a few columns, they will supply only a #few column labels. if (logids is not None and cols is not None and len(logids) == len(cols)): columns = {l: c for l, c in zip(logids, cols)} elif cols is not None: # pragma: no cover msg.warn("A different number of column headings than " "logging ids was specified in configuration.") else: logids = None vindex = [1] if logids is None: #The user didn't say what to log, so we log everything. logids = list(range(1, N+1)) logpath = path.join(self.logdir, "{}.csv".format(sensor)) from os import linesep if not path.isfile(logpath): with open(logpath, 'wb') as f: if columns is None: columns = {li: "Value {}".format(i+1) for i, li in enumerate(logids)} #Now, write the columns in the order specified in the #logindex. However, we must remember that until now, we have #kept the value in position 1, so we have to unmix that. strcols = [columns[li] for li in logids] header = "{}{}".format(','.join(["Time"] + strcols), linesep) f.write(header.encode("ASCII")) from six import PY2 mode = 'ab' if PY2 else 'a' kwds = {} if PY2 else {"newline": ''} with open(logpath, mode, **kwds) as f: #We log the full data stream from the sensor unless the logging is #limited by the configuration file. logids.insert(0, 0) writer = csv.writer(f) for idata in self.csvdata[sensor]: if idata is not None: #Sometimes, the aggregate data functions return None #because one of the sensors didn't have data ready when #the aggregation was performed. Just ignore those. writer.writerow([idata[li] for li in logids]) #Since we appended the most recent data points, just reset the #lists of points to be empty again. self.csvdata[sensor] = []
[docs] def save(self): """Saves the logger's buffered points to a CSV file. If the file exists, then the data points are appended. """ #We need to see if enough time has passed since the last from datetime import datetime from time import time if self.lastsave is not None: elapsed = (datetime.fromtimestamp(time()) - datetime.fromtimestamp(self.lastsave)).total_seconds() else: elapsed = self.logfreq + 1 if elapsed > self.logfreq: self._csv_append() self.lastsave = time()