Source code for liveserial.logging

"""Methods for grabbing data and logging it to CSV files.
"""
[docs]class Logger(object): """Logs data points from the serial port to CSV. The arguments to this class constructor are also available as attributes on the class instance. Args: interval (int): how often (in milliseconds) to read data from the serial buffer. dataq (Queue.Queue): stores the data read in from the serial port. errorq (Queue.Queue): stores any error raised during serial port reading. livefeed (monitor.LiveDataFeed): feed for storing the latest data points obtained from the serial port. method (str): one of ['average', 'last'] specifies how to aggregate multiple data points in the buffer. logdir (str): directory to place log files in for the sensors. If `None`, then data will *not* be logged to disk. logfreq (int): how often (in seconds) to write the accumulated data points to CSV. plotting (bool): when False, the values being read off will be printed if logging is not enabled. Attributes: timer (threading.Timer) executes calls to the serial port reader to get the latest data and push it to the live feed. lastsave (float): timestamp indicating the last time the CSV file was appended to csvdata (dict): lists of sensor time and value readings, keyed by sensor identifier. """ def __init__(self, interval, dataq, errorq, livefeed, method="last", logdir=None, logfreq=10, plotting=False): self.interval = interval self.dataq = dataq self.errorq = errorq self.livefeed = livefeed self.method = method self.logdir = logdir self.logfreq = logfreq self.lastsave = None self.csvdata = {} self.plotting = plotting self.timer = None self._timer_calls = 0 """Number of times that the timer has executed during the application run. """ self._cancel = False """When true, the main thread is trying to shut us down; don't start the timer again once it fires. """
[docs] def ready(self): """Returns True once we have accumulated a few timer calls of data. This ensures that we know how many sensors are running on the same COM port. """ havepts = False if len(self.csvdata) > 0: datapoints = sum([len(v) for v in self.csvdata.values()])/len(self.csvdata) havepts = datapoints > 2 return self._timer_calls > 5 and (self.logdir is None or havepts)
[docs] def start(self): """Starts a new timer for the configured interval to gather data from the serial stream. """ if not self._cancel: from threading import Timer self.timer = Timer(self.interval, self._read_serial) self.timer.start()
[docs] def stop(self): """Stops the automatic collection and logging of data. """ self._cancel = True if self.timer is not None: self.timer.cancel() #Write whatever data is left over to CSV file. self._csv_append()
def _read_serial(self): """Reads the latest buffered serial information and places it onto the live feed for the application. """ self._timer_calls += 1 from liveserial.monitor import get_all_from_queue sensedata = {} havedata = False for sensor, timestamp, qdata in get_all_from_queue(self.dataq): if sensor not in sensedata: sensedata[sensor] = [] sensedata[sensor].append((timestamp, qdata)) havedata = True # We average/discard the data in the queue to produce the single entry that # will be posted to the livefeed. if havedata: for sensor, qdata in sensedata.items(): data = None if self.method == "average": #We use the last data point's time stamp as the authoritative #one for the averaged set. tstamp = qdata[-1][0] #For the values, we take a simple mean. from numpy import mean data = (tstamp, mean([d[1] for d in qdata])) elif self.method == "last": data = qdata[-1] if data is not None: self.livefeed.add_data(sensor, data) if self.logdir is not None: if sensor not in self.csvdata: self.csvdata[sensor] = [] self.csvdata[sensor].append(data) elif not self.plotting: print("{0: <20f} {1: <20f}".format(*data)) #Before we restart the timer again, see if we need to save the data to #CSV. self.save() self.start() def _csv_append(self): """Appends the new data points to the relevant CSV files for each of the sensor's whose data is being tracked. """ from os import path import csv for sensor in self.csvdata: if self.logdir is None: continue logpath = path.join(self.logdir, "{}.csv".format(sensor)) if not path.isfile(logpath): with open(logpath, 'w') as f: f.write("Time,Value\n") with open(logpath, 'a') as f: writer = csv.writer(f) for idata in self.csvdata[sensor]: writer.writerow(idata) #Since we appended the most recent data points, just reset the #lists of points to be empty again. self.csvdata[sensor] = []
[docs] def save(self): """Saves the logger's buffered points to a CSV file. If the file exists, then the data points are appended. """ #We need to see if enough time has passed since the last from datetime import datetime from time import time if self.lastsave is not None: elapsed = (datetime.fromtimestamp(time()) - datetime.fromtimestamp(self.lastsave)).total_seconds() else: elapsed = self.logfreq + 1 if elapsed > self.logfreq: self._csv_append() self.lastsave = time()