Source code for pylablib.core.utils.thread

"""
Simple threading routines.

A more extensive threading library is contained in the core.mthread package.
"""

import threading
try:
    from Queue import Queue, Empty
except ImportError:
    from queue import Queue, Empty


[docs]class PeriodicThread(object): """ A thread that runs in an infinite loop (until externally stopped) and executes its task with a given periodicity. To use, it needs to be inherited, with the subclass redefining :meth:`execute` or :meth:`process_message` method. """ def __init__(self): object.__init__(self) self.running=False self.paused=False self.message_queue=Queue(1) self.ack_queue=Queue(1)
[docs] def execute(self): """ Perform a single iteration of the loop. To be overridden in subclasses. """ pass
[docs] def process_message(self, msg): """ Process a message sent from the parent thread. To be overridden in subclasses. """ pass
[docs] def loop(self, period, sync): """ Main loop methods. Called automatically in a new thread when :meth:`start` is invoked. """ self.running=True if sync: self.ack_queue.put("start") try: while True: try: msg,sync=self.message_queue.get(timeout=period if not self.paused else None) if sync: self.ack_queue.put(msg) except Empty: msg=None if msg=="pause": self.paused=True elif msg=="resume": self.paused=False elif msg=="stop": break elif msg is not None: self.process_message(msg) if not self.paused: self.execute() finally: self.running=False self.paused=False
[docs] def wait_for_execution(self): """Synchronize with the thread (wait until the current iteration is executed).""" self.send_message(None,sync=True)
[docs] def send_message(self, msg, sync=True): """ Send a message to the thread. The message is processed by the thread in the :meth:`process_message` method (by default does nothing). If ``sync==True``, wait until the thread received (not necessarily processed) the message. """ if self.running: self.message_queue.put((msg,sync)) if sync: ack_msg=self.ack_queue.get() if ack_msg!=msg: raise RuntimeError("wrong acknowledgment '{0}' for message '{1}'".format(ack_msg,msg)) else: raise RuntimeError("thread is not running")
[docs] def start(self, period, sync=True): """ Start the thread. `period` specifies the job execution period, in seconds. If ``sync==True``, wait until the thread is started. """ if self.running: raise RuntimeError("thread is already running") threading.Thread(target=self.loop,args=(period,sync)).start() if sync: ack_msg=self.ack_queue.get() if ack_msg!="start": raise RuntimeError("wrong acknowledgment '{0}' for message 'start'".format(ack_msg))
[docs] def stop(self, sync=True): """ Stop the thread. If ``sync==True``, wait until the thread is stopped. """ self.send_message("stop",sync=sync)
[docs] def pause(self, sync=True): """ Pause the thread execution. If ``sync==True``, wait until the thread is paused. """ self.send_message("pause",sync=sync)
[docs] def resume(self, sync=True): """ Resume the thread execution. If ``sync==True``, wait until the thread is resumed. """ self.send_message("resume",sync=sync)
[docs] def is_looping(self): """Check if the thread is actively executing (not paused).""" return self.running and not self.paused
[docs] def is_running(self): """Check if the thread is running (possibly paused).""" return self.running