Source code for liveserial.livemon

#!/usr/bin/python
from liveserial import msg
def examples():
    """Prints examples of using the script to the console using colored output.
    """
    script = "LIVE-SERIAL Real-time Serial Port Plotter/Logger"
    explain = ("For any device connected to a serial port (including USB), "
               "it is useful to see and log the data from the port in "
               "real-time (for example in response to human-interaction. "
               "This script sets up live monitoring of a serial port on a "
               "separate thread and plots the data as it is received. Logging "
               "can also be enabled in real-time.")
    contents = [(("Start plotting the data from the COM3 serial port."), 
                 "livemon.py COM3",
                 "If '-logdir' is not specified, *no* data will be logged."),
                (("Plot *and* log the data from /dev/tty."),
                 "livemon.py /dev/tty -logdir ~/sensordata", ""),
                ("Log the data, but don't generate a live plot.",
                 "livemon.py COM3 -noplot -logdir C:\\sensordata\\", ""),
                ("List the available serial ports on the system.",
                 "livemon.py list", ""),
                ("Use the config file `custom.cfg` to log and plot the data. "
                 "Plot any sensors listed in that file.",
                 "livemon.py auto -config custom.cfg",
                 "If your file is called `sensors.cfg`, you can just use "
                 "`livemon.py auto`.")]
    required = ("REQUIRED: working serial port.")
    output = ("RETURNS: plot window; for logging-only mode, the data being "
              "logged is also periodically printed to stdout.")
    details = ("The plotting uses `matplotlib` with the default configured "
               "backend. If you want a different backend, set the rc config "
               "for `matplotlib` using online documentation. However, many "
               "backends don't play well with the animation (depending on OS "
               "type and version, etc., etc.; so use carefully.")
    outputfmt = ("")

    msg.example(script, explain, contents, required, output, outputfmt, details)

def _list_serial(ports=None):
    """Lists all of the available serial ports on the local machine. If `port`
    is specified, then returns True if port is in the list.

    Args:
    ports (list): name(s) of ports to check existence for.

    Returns:
    bool: specifying whether the given port is in the list, or
    list: of available ports on this machine.
    """
    from liveserial.monitor import enumerate_serial_ports
    available = enumerate_serial_ports()
    if ports is not None:
        result = True
        for port in ports:
            if port not in available:
                msg.err("Port '{}' is not valid.".format(port))
                result = False
        return result
    else:
        return available

def _list_config_ports(configfile):
    """Lists all the ports explicitly mentioned in the config file.
    """
    try:
        from configparser import ConfigParser
    except ImportError: # pragma: no cover
        #Renaming of modules to lower case in python 3.
        from ConfigParser import ConfigParser

    config = ConfigParser()
    config.readfp(open(configfile))
    
    from fnmatch import fnmatch
    result = []
    for section in config.sections():
        if fnmatch(section, "sensor.*"):
            if config.has_option(section, "port"):
                port = config.get(section, "port")
                if port not in result:
                    result.append(port)
        elif fnmatch(section, "port.*"):
            port = section[5:]
            if port not in result: # pragma: no cover
                result.append(port)

    return result

script_options = {
    "port": {"nargs": "+",
             "help": "Name of the port(s) to plot/log."},
    "-noplot": {"action": "store_true",
                "help": "Don't plot the data; only log it."},
    "-autoconf": {"action": "store_true",
                  "help": ("Runs in automatic mode by loading a "
                           "configuration file in the current directory "
                           "called 'sensors.cfg'.")},
    "-config": {"help": ("Specify a configuration file to get sensor "
                         "setup information from.")},
    "-logdir": {"help": ("Path to the directory where sensor data will "
                         "be logged.")},
    "-baudrate": {"type": int, "default": 9600,
                  "help": ("Rate at which information is transferred in a "
                           "communication channel (in bits/second).")},
    "-stopbits": {"type": float, "default": 1., "choices": [1, 1.5, 2],
                  "help": "Serial communication parameter."},
    "-parity": {"type": str, "default": 'N',
                "choices": ['N', 'E', 'O', 'M', 'S'],
                "help": "Serial communication parameter."},
    "-timeout": {"type": float, "default": 0.01,
                 "help": ("The timeout used for reading the COM port. If "
                          "this value is low, the thread will return data "
                          "in finer grained chunks, with more accurate "
                          "timestamps, but it will also consume more CPU.")},
    "-refresh": {"type": int, "default": 100,
                 "help": ("How often (in milliseconds) to plot new data "
                          "obtained from the serial port.")},
    "-buffertime": {"type": float, "default": 25,
                    "help": ("How often (in milliseconds) to query buffered data "
                             "obtained from the serial port.")},
    "-logfreq": {"type": float, "default": 10,
                 "help": ("How often (in *seconds*) to save the buffered "
                          "data points to CSV.")},
    "-method": dict(default="average", choices=["last", "average"],
                    help=("Specifies how buffered data is aggregated each "
                          "time it is read from the serial port.")),
    "-listen": dict(action="store_true",
                    help=("Prints the raw output from the serial port "
                          "instead of plotting and logging it. Useful "
                          "for debugging port connection issues.")),
    "-virtual": dict(action="store_true",
                     help=("Specifies that the port being connected to is "
                           "virtual (e.g., with `socat`), which changes the "
                           "parameters for connection.")),
    "-sensors": dict(nargs="+", default="all",
                     help="Filter the list of sensors being logged/plotted."),
    "-maxpts": dict(type=int, default=100,
                    help=("Maximum number of values to keep in the plot "
                          "for each sensor")),
    "-window": dict(type=float, default=20.,
                    help="Width of window in time units."),
    "-wait": dict(type=float, default=1.,
                  help=("Number of seconds to wait before failing because "
                        "no data is present on the stream for plotting."))
    }
"""dict: default command-line arguments and their
    :meth:`argparse.ArgumentParser.add_argument` keyword arguments.
"""

def _parser_options():
    """Parses the options and arguments from the command line."""
    #We have two options: get some of the details from the config file,
    import argparse
    from liveserial import base
    pdescr = "Real-time serial port plotter/logger."
    parser = argparse.ArgumentParser(parents=[base.bparser], description=pdescr)
    for arg, options in script_options.items():
        parser.add_argument(arg, **options)
        
    args = base.exhandler(examples, parser)
    if args is None:
        return

    #Handle the automatic configuration file setup.
    if args["autoconf"] or (args["port"]==["auto"] and not args["config"]):
        from os import path
        args["config"] = path.abspath("sensors.cfg")
        msg.info("Using {} as auto-selected config path".format(args["config"]))

    if args["config"]:
        #Try and override any of the other options with those of the config
        #file, if it exists.
        from os import path
        args["config"] = path.abspath(path.expanduser(args["config"]))
        from liveserial.config import script
        args.update(script(args["config"]))
        msg.info("Using script args from config {}.".format(args["config"]))
        
    if args["port"] == ["list"]:
        msg.okay("Available Serial Ports")
        for port in _list_serial():
            msg.info("  {}".format(port))
        return None
    elif args["port"] == ["auto"]:
        #We load all the ports present in the config file and assume that the
        #user meant all of those.
        args["port"] = _list_config_ports(args["config"])
    else:
        if not _list_serial(args["port"]):
            return None

    #Convert the units for the buffer and refresh times.
    args["refresh"] /= 1000.
    args["buffertime"] /= 1000.
    
    if args["noplot"] and not args["logdir"]: # pragma: no cover
        msg.warn("Data will only be logged if `-logdir` is specified.", -1)

    return args

def _get_com(args):
    """Gets a list of configured COM ports for serial communication.
    """
    from liveserial.monitor import ComMonitorThread as CMT
    from multiprocessing import Queue
    dataq, errorq = Queue(), Queue()
    result = []
    msg.info("Starting setup of ports {}.".format(args["port"]), 2)
    if args["config"]:
        for port in args["port"]:
            if port.lower() != "aggregate":
                #The aggregate port name is just a shortcut so that we can plot
                #transforms between multiple sensor streams. It doesn't actually
                #represent a physical port that will be monitored.
                com = CMT.from_config(args["config"], port, dataq, errorq,
                                      args["listen"], args["sensors"])
                result.append(com)                               
    else:
        for port in args["port"]:
            com = CMT(dataq, errorq, port, args["baudrate"],
                      args["stopbits"], args["parity"], args["timeout"],
                      args["listen"], args["virtual"])
            result.append(com)
    return result

def _com_start(coms):
    """Starts the serial port communication thread using the specified object.

    Args:
        coms (list): of :class:`monitor.ComMonitorThread` instances to start
          running.
    """
    from liveserial.monitor import get_item_from_queue
    for i, com in enumerate(coms):
        com.start()
        msg.okay("COM monitoring thread {} started.".format(i), 2)

        #Even though the thread is going, it doesn't mean that everything is working
        #the way we hope. Check the first item. We have to sleep to give it time to
        #initialize.
        com.join(0.05, terminate=False)
        com_error = get_item_from_queue(com.error_q)
        if com_error is not None: # pragma: no cover
            #We can't easily simulate failure on the serial port to test this message.
            msg.err("monitor thread error--{}".format(com_error))
            coms[i] = None
    
[docs]def run(args, runtime=None, testmode=False): """Starts monitoring of the serial data in a separate thread. Starts the plotting or logging based on command-line args. Args: args (dict): result of :func:`_parser_options` call that parses the command-line arguments. Could also be :data:`script_options` to use all default values. runtime (float): how many seconds to run for before terminating. testmode (bool): when True, the plotting backend is changed so that no window is produced; plotting functions are altered slightly to make testing possible. """ from liveserial.base import set_testmode set_testmode(testmode) #When args is None, it means that examples or help or equivalent wants to #cancel the execution of the script. if args is None: return #Get the serial port for communications; this also tests that we are getting #data. vardir = {} coms = _get_com(args) #The data feed keeps track of the latest, aggregated data selected from the #buffer in the com thread. from liveserial.monitor import LiveDataFeed feed = LiveDataFeed() #Logging queries the com thread buffer to get data, and then aggregates it #and puts it on the live data feed. Optionally, the data is also #periodically saved to CSV. from liveserial.log import Logger dataqs = [c.data_q for c in coms] #The logger prints values to screen if it isn't running in plotting #mode. Plotting mode means that the plot window is present, or that #'-listen' was specified. plotting = args["listen"] or (not args["noplot"]) logger = Logger(args["buffertime"], dataqs, feed, args["method"], args["logdir"], args["logfreq"], plotting, args["config"], aggregate="aggregate" in args["port"]) import signal def exit_handler(signal, frame): # pragma: no cover """Cleans up the serial communication and logging. """ msg.warn("SIGINT >> cleaning up threads.", -1) logger.stop() for com in coms: com.join(1) if plotter is not None: plotter.stop() print("") exit(0) #Matplotlib's cleanup code for animations is lousy--it doesn't #work. I tried calling the private _stop() in a relevant scope and #it still let the application hang. signal.signal(signal.SIGINT, exit_handler) #Add the local variables to the dictionary. This dict is passed in by unit #tests usually, which want to investigate the values in each object. if vardir is not None: vardir["feed"] = feed vardir["com"] = coms vardir["logger"] = logger #Now that we actually have a way to quit the infinite loop, we can start the #data acquisition process. plotter = None _com_start(coms) if any([com is None for com in coms]): # pragma: no cover msg.err("One of the COM threads didn't initialize properly.") msg.info("COM Threads that were Okay") for com in coms: if com is not None: msg.std(com.port) return #Wait until we have some data before the logger gets put to work with it. if not args["listen"]: from time import sleep tries = 0 while (not all([not com.data_q.empty() for com in coms]) and tries < 10): # pragma: no cover #If we don't need this delay, it shouldn't trigger unit test #problems. sleep(0.05) tries += 1 logger.start() #The plotter looks at the live feed data to plot the latest aggregated #points. if not args["noplot"] and not args["listen"]: tries = 0 maxtries = args["wait"]/0.05 while not logger.ready(0.05, args["wait"]) and tries <= maxtries: tries += 1 from liveserial.plotting import Plotter import matplotlib.pyplot as plt plotter = Plotter(feed, args["refresh"], args["maxpts"], args["window"], testmode, logger) if vardir is not None: vardir["plotter"] = plotter if not testmode: # pragma: no cover plt.show() _runtime = 0 while all([com.is_alive() for com in coms]): #Here is the tricky bit; we want to join the threads for a second to #keep the main thread busy while everything is going on. More than a #second causes delay when the user sends SIGINT to stop the whole #process. We instead join on every thread for fractions of a second. for com in coms: com.join(1./len(coms), terminate=False) _runtime += 1. if runtime is not None: if _runtime >= runtime: logger.stop() for com in coms: com.join(1) if plotter is not None: plotter.stop() return vardir
if __name__ == '__main__': # pragma: no cover run(_parser_options())