Source code for pylablib.core.mthread.controller

from . import threadprop, message_queue, tag_queue, sync_primitives
from ..utils import funcargparse, general

import threading

_depends_local=["..utils.general"]
    
### Global thread inventory ###
# _running_threads_condition=sync_primitives.Condition(lock=sync_primitives.Lock(blocking_sync=True),blocking_sync=True)
# _running_threads_condition=sync_primitives.Condition(lock=threading.Lock(),blocking_sync=True)
_running_threads_condition=sync_primitives.Condition()
_running_threads={}
_thread_uids=general.NamedUIDGenerator(thread_safe=True)



[docs]class IThreadController(object): """ Generic thread controller. Deals with correctly initializing and destroying the message queue, processing standard messages, and synchronizing with other threads. Args: name(str): thread name (can be used to, e.g., get the controller from a different thread). """ def __init__(self, name=None): object.__init__(self) self.name=name or _thread_uids(type(self).__name__) self.message_queue=message_queue.MessageQueue(self) self.running_thread=None self.running_thread_lock=threading.RLock() self.start_event=sync_primitives.Event(False) self.stop_event=sync_primitives.Event(False) self._clear_on_stop=True self._stage="created" self._dependent_threads=[] self._daemon=False ### Message queue ###
[docs] def limit_queue_length(self, tag, length): """ Set length limit for a given tag. """ self.message_queue.limit_length(tag,length)
### Instant messages processing ### # These are overloaded methods, which shouldn't be called from the outside
[docs] def process_interrupt(self, msg): """ Process interrupt message (automatically called for all messages with tag starting with ``"interrupt"``). Automatically called by the controller; to be overridden in subclasses. """ if msg.tag=="interrupt.control": if msg.value=="stop": self._stop_self() elif msg.tag=="interrupt.execute": msg.value()
[docs] def process_message(self, _): """ Instant message processing. If return value is ``True``, the message is assumed to be processed internally (i.e., it doesn't get explicitly received). Automatically called by the controller; to be overridden in subclasses. """ return False
def _process_any_message(self, msg): if msg.tag.startswith("sync"): return False # always pass sync messages directly if msg.tag.startswith("interrupt"): self.process_interrupt(msg) return True # always silence interrupts if msg.tag=="execute": msg.value() return True return self.process_message(msg) ### Receiving messages ### def _build_message_filter(self, tags=None, filt=None): return tag_queue.build_filter(tags,filt,["interrupt"])
[docs] def exhaust_messages(self, tags=None, filt=None): """ Read and return (instantaneously) all available messages which satisfy the filter `filt`. Called from the controlled thread. """ filt=self._build_message_filter(tags,filt) return self.message_queue.exhaust_messages(filt,self._process_any_message)
[docs] def wait_for_message(self, tags=None, timeout=None, filt=None, exhaust=False, discard_on_timeout=False): """ Wait for a message with given `tags` and satisfying the filter `filt`. if `exhaust` is ``True``, returns list of all messages satisfying `filt`, if several of them are available immediately. if `discard_on_timeout` is ``True`` and the wait timed out, mark the message for discarding using `filt` (see :meth:`.tag_queue.TaggedQueue.get`). Called from the controlled thread. """ discard_filt=filt filt=self._build_message_filter(tags,filt) return self.message_queue.wait_for_message(filt,self._process_any_message,timeout,exhaust=exhaust,discard_on_timeout=discard_on_timeout,discard_filt=discard_filt)
[docs] def check_interrupt(self): """ Check for interrupt messages. Useful to insert in the middle of computationally-heavy code with no synchronization, to respond to interrupts from other threads (e.g., stopping requests). Called from the controlled thread. """ self.wait_for_message([],timeout=0)
[docs] def sleep(self, delay): """ Sleep while still receiving interrupts. Called from the controlled thread. """ self.wait_for_message([],timeout=delay)
### Thread function ### # These are overloaded methods, which shouldn't be called from the outside
[docs] def run(self): """ Body of the thread. Automatically called by the controller; to be overridden in subclasses. """ raise NotImplementedError("IThreadController.run")
[docs] def finalize(self): """ Finalize the thread execution (regardless of the stopping reason). Automatically called by the controller; to be overridden in subclasses. """ pass
# Event functions called at changes in state. def _on_restart(self): self.message_queue.fix() self.start_event.clear() self.stop_event.clear() def _on_start(self): with _running_threads_condition: if self.name in _running_threads: raise RuntimeError("thread with name {} is already running".format(self.name)) _running_threads[self.name]=self _running_threads_condition.notify_all() if _check_daemon_threads(): self._stop_self() self.start_event.set() def _on_stop(self): self.stop_event.set() with _running_threads_condition: del _running_threads[self.name] _running_threads_condition.notify_all() with self.running_thread_lock: for th in self._dependent_threads: threadprop.kill_thread(th,sync=True) _check_daemon_threads() def _run_full(self, stop_after=True): """ Full run routine (including state transitions and events). """ if self._stage=="broken": # reinitialize message queue self._stage="created" self._on_restart() try: self._stage="starting" self._on_start() self._stage="running" self.run() except threadprop.InterruptExceptionStop: stop_after=True finally: if stop_after: self.finalize() self._stage="stopping" self._on_stop() self._stage="cleaning" # at this point the thread can't use its queue if self._clear_on_stop: self.message_queue.clear() self.running_thread=None self._stage="broken" ### External calls ###
[docs] def add_message(self, msg, sync=True, on_broken="error"): """ Add the message to the queue. If `sync` is ``True``, do the synchronization (wait for receiving and scheduling) after sending the message. `on_broken` decides what happens if thethread is stopped or hasn't started yet (see :func:`.threadprop.on_error`). Called from any thread. """ try: return self.message_queue.add_message(msg,sync=sync) except tag_queue.BrokenQueueError as e: if self.passed_stage("stopping") or not self.passed_stage("created"): e=threadprop.NotRunningThreadError("can't send message to a non-running thread") return threadprop.on_error(on_broken,e)
[docs] def add_new_message(self, tag, value=None, priority=0, schedule_sync="wait", receive_sync="none", sync=True, timeout=None, on_broken="error"): """ Create a new message, add it to the thread's queue, and return it. If `sync` is ``True``, do the synchronization (wait for receiving and scheduling) after sending the message. `on_broken` decides what happens if thethread is stopped or hasn't started yet (see :func:`.threadprop.on_error`). Called from any thread. """ msg=self.message_queue.build_message(tag,value,priority,schedule_sync,receive_sync) return self.add_message(msg,sync=sync,on_broken=on_broken)# TODO: add timeout to messaging routines
send_message=add_new_message
[docs] def start(self, as_dependent=False, as_daemon=False): """ Start the controller. if `as_dependent` is ``True``, the new thread becomes dependent on the caller thread (it stops when the caller thread stops). if `as_deamon` is ``True``, the new thread becomes a daemon (if only daemon threads are running, they get stopped). """ with self.running_thread_lock: if self.running(): raise RuntimeError("current thread is already running") self.running_thread=threading.Thread(target=self._run_full,name=self.name) self.running_thread.thread_controller=self if as_dependent: self.set_as_dependent() self._daemon=as_daemon self.running_thread.start()
[docs] def start_continuing(self, stop_after_run=True): """ Start the current controller in the current non-controlled thread. If `stop_after_run` is ``True``, the controller is stopped after the :meth:`run` function is done; otherwise, th controller continues (e.g., :meth:`run` can be empty, which means that this function simply initializes the controller). """ with self.running_thread_lock: if self.running(): raise RuntimeError("current thread is already running") if threadprop.current_controller() is threadprop.no_thread_controller: self.running_thread=threading.current_thread() self.running_thread.thread_controller=self else: raise RuntimeError("current thread already has a controller") self._run_full(stop_after=stop_after_run)
[docs] def interrupt(self, subclass, value, sync=True, priority=0, timeout=None, on_broken="error"): """ Send an interrupt with the given `subclass` and `value`. If `sync` is ``True``, wait until the interrupt is received (with the given `timeout`). `on_broken` decides what happens if thethread is stopped or hasn't started yet (see :func:`.threadprop.on_error`). Called from any thread. """ tag="interrupt" if subclass is not None: tag="{0}.{1}".format(tag,subclass) receive_sync="wait" if sync else "none" self.add_new_message(tag,value,priority=priority,receive_sync=receive_sync,timeout=timeout,on_broken=on_broken)
def _ask_for_call(self, call, sync=True, as_interrupt=False, priority=0, on_broken="error"): tag="interrupt.execute" if as_interrupt else "execute" receive_sync="wait" if sync else "none" return self.add_new_message(tag,call,priority=priority,receive_sync=receive_sync,on_broken=on_broken)
[docs] def call_from_thread(self, func, args=None, kwargs=None, sync_recv=True, sync_done=False, as_interrupt=False, priority=0, on_broken="error"): """ Call a function `func` with the arguments (`args` and `kwargs`) in in this controller thread. Called from any thread. """ if sync_done: call=sync_primitives.SyncCall(func,args,kwargs) self._ask_for_call(call,sync=sync_recv,as_interrupt=as_interrupt,priority=priority,on_broken=on_broken) return call.value() else: args=args or [] kwargs=kwargs or {} def call(): func(*args,**kwargs) self._ask_for_call(call,sync=sync_recv,as_interrupt=as_interrupt,priority=priority,on_broken=on_broken)
def _stop_self(self): if self.passed_stage("created") and not self.passed_stage("running"): raise threadprop.InterruptExceptionStop()
[docs] def stop(self, sync=True): """ Stop the thread. If called from the current thread, stop self. If called from a different thread, send a stop interrupt. In this case, if `sync` is ``True``, wait until the thread received the message. """ if threadprop.current_controller() is self: self._stop_self() else: self.interrupt("control","stop",sync=sync,priority=10)
[docs] def sync(self, point="interrupt", timeout=None, on_broken="error"): """ Synchronize with the thread. `point` determines where the synchronization happens. Can be either ``"interrupt"`` (sync on any interrupt), ``"start"`` (synchronize with the thread after its start), or ``"stop"`` (synchronize with the thread after its stop). Called from a non-controlled thread. """ funcargparse.check_parameter_range(point,"point",{"interrupt","start","stop"}) if point=="interrupt": self.interrupt("control","sync",sync=True,timeout=timeout,on_broken=on_broken) elif point=="start": self.start_event.wait(timeout=timeout) elif point=="stop": self.stop_event.wait(timeout=timeout)
[docs] def add_dependent_thread(self, dependent=None): """ Add a dependent thread (caller's thread by default) to this controller. A dependent thread is automatically stopped after this thread is stopped. Called from a non-controlled thread. """ if dependent is self: return dependent=dependent or threadprop.current_controller() with self.running_thread_lock: if (dependent is not threadprop.no_thread_controller) and (dependent is not self): if not (dependent in self._dependent_threads): self._dependent_threads.append(dependent)
[docs] def set_as_dependent(self): """ Set this thread as a dependent for the caller thread. A dependent thread is automatically stopped after the caller thread is stopped. Called from a non-controlled thread. """ current_ctrl=threadprop.current_controller() if current_ctrl is not threadprop.no_thread_controller: current_ctrl.add_dependent_thread(self)
### Thread properties ### _stage_order={"created":0,"starting":1,"running":2,"stopping":3,"cleaning":4,"broken":5}
[docs] def current_stage(self): """ Return current stage of the process. Can have following values: - ``"created"``: thread is created, but not started - ``"starting"``: thread is starting, but is not running yet (notifying waiting threads) - ``"running"``: thread is executing its run code - ``"stopping"``: thread has done running and is currently stopping (notifying waiting threads, cleaning up dependent threads and daemons) - ``"cleaning"``: cleaning the message queue; communication is impossible at this point - ``"broken"``: thread is finished executing Called from any thread. """ return self._stage
[docs] def passed_stage(self, stage): """ Check if the thread passed the given `stage`. For stage description, see :meth:`current_stage`. Called from any thread. """ return self._stage_order[self._stage]>self._stage_order[stage]
[docs] def running(self): """ Check if the thread is running, Called from any thread. """ return self._stage=="running"
[docs] def is_daemon(self): """ Check if the thread is daemon, Called from any thread. """ return self._daemon
[docs]def wait_for_thread_name(name): # TODO: doesn't work? (thread owner problems in _running_threads_condition.wait()) """ Wait until a thread with the given name starts. """ with _running_threads_condition: while True: if name in _running_threads: return _running_threads[name] _running_threads_condition.wait()
def _check_daemon_threads(allow_non_controlled=True): """ Check all threads. If only daemon threads are left, kill them in sync way. """ with _running_threads_condition: all_daemon=all([d.is_daemon() for d in _running_threads.values()]) if allow_non_controlled: all_threads=threading.enumerate() has_non_controlled=any([not t.isDaemon() and not threadprop.has_controller(t) for t in all_threads]) all_daemon=all_daemon and not has_non_controlled if all_daemon: threadprop.kill_all(sync=True, include_current=False) return all_daemon
[docs]class SimpleThreadController(IThreadController): """ Simple thread. Runs a single task, with a possible cleanup after the end. Args: name(str): thread name (can be used to, e.g., get the controller from a different thread). job(callable): function to be executed in the thread. cleanup(callable): if not ``None``, function to be called when the thread is stopped (regardless of the stopping reason). args(list): arguments for `job` and `cleanup` functions. kwargs(dict): keyword arguments for `job` and `cleanup` functions. self_as_arg(bool): if ``True``, pass this controller as a first argument to the `job` and `cleanup` functions. """ def __init__(self, name, job, cleanup=None, args=None, kwargs=None, self_as_arg=False): IThreadController.__init__(self,name) self.job=job self.cleanup=cleanup self.args=args or [] if self_as_arg: self.args=[self]+self.args self.kwargs=kwargs or {}
[docs] def run(self): self.job(*self.args,**self.kwargs)
[docs] def finalize(self): if self.cleanup: self.cleanup(*self.args,**self.kwargs)
[docs]class ServiceThreadController(IThreadController): """ Service thread. Receives and processes messages, and replies using a ``reply`` function. Args: name(str): thread name (can be used to, e.g., get the controller from a different thread). reply(callable): message processing function; if it returns a tuple, interpret it as tag and value for a reply message. setup(callable): if not ``None``, function to be called when the thread is starting. cleanup(callable): if not ``None``, function to be called when the thread is stopped (regardless of the stopping reason). args(list): arguments for `reply`, `startup` and `cleanup` functions. kwargs(dict): keyword arguments for `reply`, `startup` and `cleanup` functions. stopped_recipient_action(str): action to take if the reply recipient has stopped; can be ``"error"`` (raise an error), ``"stop"`` (stop the thread; similar to th previous) or ``"ignore"`` (ignore and continue). """ def __init__(self, name, reply, setup=None, cleanup=None, args=None, kwargs=None, stopped_recipient_action="ignore"): funcargparse.check_parameter_range(stopped_recipient_action,"stopped_recipient_action",{"error","ignore","stop"}) IThreadController.__init__(self,name) self.reply=reply self.setup=setup self.cleanup=cleanup self.stopped_recipient_action=stopped_recipient_action self.args=args or [] self.kwargs=kwargs or {}
[docs] def process_request(self, tag, value): return self.reply(tag,value,*self.args,**self.kwargs)
[docs] def run(self): if self.setup: self.setup(*self.args,**self.kwargs) while True: msg=self.wait_for_message() reply=self.process_request(msg.tag,msg.value) if reply is not None: message_queue.send_message(msg.sender,reply[0],reply[1],on_broken=self.stopped_recipient_action)
[docs] def finalize(self): if self.cleanup: self.cleanup(*self.args,**self.kwargs)
[docs]class RepeatingThreadController(IThreadController): """ Recurring task thread. Periodically repeats a single function. Args: name(str): thread name (can be used to, e.g., get the controller from a different thread). job(callable): periodically called function. delay(float): calling period. setup(callable): if not ``None``, function to be called when the thread is starting. cleanup(callable): if not ``None``, function to be called when the thread is stopped (regardless of the stopping reason). args(list): arguments for `job`, `startup` and `cleanup` functions. kwargs(dict): keyword arguments for `job`, `startup` and `cleanup` functions. self_as_arg(bool): if ``True``, pass this controller as a first argument to the `job` and `cleanup` functions. """ def __init__(self, name, job, delay=0, setup=None, cleanup=None, args=None, kwargs=None, self_as_arg=False): IThreadController.__init__(self, name) self.job=job self.setup=setup self.cleanup=cleanup self.delay=delay self.paused=False self.single_shot=False self.args=args or [] if self_as_arg: self.args=[self]+self.args self.kwargs=kwargs or {}
[docs] def execute(self): self.job(*self.args,**self.kwargs)
[docs] def process_message(self, msg): if msg.tag=="control": if msg.value=="pause": self.paused=True elif msg.value=="resume": self.paused=False elif msg.value=="single": self.single_shot=True self.paused=True elif msg.value!="trigger": return True # ignore everything else, keep polling return False
[docs] def run(self): if self.setup: self.setup(*self.args,**self.kwargs) while True: countdown=general.Countdown(self.delay) if ((not self.paused) or (self.single_shot)) and not self.skip: self.execute() self.single_shot=False self.skip=False timeout=None if self.paused else countdown.time_left() self.wait_for_message(["control","execute"],timeout=timeout,exhaust=True)
[docs] def finalize(self): if self.cleanup: self.cleanup(*self.args,**self.kwargs)
##### External calls #####
[docs] def control(self, value, sync=True, priority=0): """ Send a control signal to the thread. If `sync` is ``True``, wait until the signal is received before continuing. Called from a non-controlled thread. """ receive_sync="wait" if sync else "none" self.add_new_message("control",value,priority=priority,receive_sync=receive_sync)
[docs] def pause(self, do_pause=True, sync=True): """ Pause or resume the thread (depending on `do_pause` value). """ if do_pause: self.control("pause",sync=sync) else: self.control("resume",sync=sync)
[docs] def resume(self, sync=True): """ Resume the thread execution if it's paused. """ self.pause(False,sync=sync)
[docs] def trigger(self, sync=True): """ Trigger an execution cycle immediately (without waiting for the required delay). The execution is only performed if the thread is not paused. """ self.control("trigger",sync=sync)
[docs] def single(self, sync=True): """ Trigger a single execution cycle and pause afterwards. """ self.control("single",sync=sync)
[docs] def sync(self, point="waiting"): funcargparse.check_parameter_range(point,"point",{"interrupt","start","stop","waiting"}) if point=="waiting": self.control("sync",sync=True) else: IThreadController.sync(self,point=point)
[docs] def set_delay(self, delay): """ Set the repetition delay. """ self.delay=delay
[docs] def start(self, as_dependent=False, as_daemon=False, paused=False, skip_first=None): """ Start the thread. if `as_dependent` is ``True``, the new thread becomes dependent on the caller thread (it stops when the caller thread stops). if `as_deamon` is ``True``, the new thread becomes a daemon (if only daemon threads are running, they get stopped). If `paused` is ``True``, the thread starts in a paused state (but it will still execute the first cycle, unless `skip_first` is ``True``). If `skip_first` is ``True``, skip the first cycle execution (by default ``True`` if ``paused==True`` and ``False`` otherwise). """ self.paused=paused self.skip=paused if (skip_first is None) else skip_first IThreadController.start(self,as_dependent=as_dependent,as_daemon=as_daemon)
[docs]class MultiRepeatingThreadController(IThreadController): _new_jobs_check_period=0.1 def __init__(self, name, setup=None, cleanup=None, args=None, kwargs=None, self_as_arg=False): IThreadController.__init__(self, name) self.setup=setup self.cleanup=cleanup self.paused=False self.single_shot=False self.args=args or [] if self_as_arg: self.args=[self]+self.args self.kwargs=kwargs or {} self.jobs={} self.timers={} self._jobs_list=[]
[docs] def add_job(self, name, job, period): if name in self.jobs: raise ValueError("job {} already exists".format(name)) self.jobs[name]=(job,period) self.timers[name]=general.Timer(period) self._jobs_list.append(name)
def _get_next_job(self): if not self._jobs_list: return None,None idx=None left=None for i,n in enumerate(self._jobs_list): t=self.timers[n] l=t.time_left() if l==0: idx,left=i,0 break elif (left is None) or (l<left): idx=i left=l n=self._jobs_list.pop(idx) self._jobs_list.append(n) return n,left
[docs] def run(self): if self.setup: self.setup(*self.args,**self.kwargs) while True: name,to=self._get_next_job() if name is None: self.sleep(self._new_jobs_check_period) else: self.sleep(to) job=self.jobs[name][0] self.timers[name].acknowledge(nmin=1) job(*self.args,**self.kwargs)
[docs] def finalize(self): if self.cleanup: self.cleanup(*self.args,**self.kwargs)
[docs]class TimerThreadController(RepeatingThreadController): """ Timer thread. Simplified version of the :class:`RepeatingThreadController`. Doesn't require a name, starts as a dependent and a daemon by default. Args: period(float): calling period. callback(callable): periodically called function. setup(callable): if not ``None``, function to be called when the thread is starting. cleanup(callable): if not ``None``, function to be called when the thread is stopped (regardless of the stopping reason). name(str): thread name (can be used to, e.g., get the controller from a different thread). By default, a unique identifier. """ def __init__(self, period, callback, setup=None, cleanup=None, name=None): name=name or _thread_uids("timer") RepeatingThreadController.__init__(self,name,callback,period,setup=setup,cleanup=cleanup)
[docs] def start(self, as_dependent=True, as_daemon=True, skip_first=None, single=False): """ Start the thread. if `as_dependent` is ``True``, the new thread becomes dependent on the caller thread (it stops when the caller thread stops). if `as_deamon` is ``True``, the new thread becomes a daemon (if only daemon threads are running, they get stopped). If `skip_first` is ``True``, skip the first cycle execution. If `single` is ``True``, start in a single mode (only execute once). In combination with ``skip_first=True``, performs one callback function call after the `period` delay. """ RepeatingThreadController.start(self,as_dependent=as_dependent,as_daemon=as_daemon,paused=single,skip_first=skip_first) if single: self.single()
[docs]def timer_message_notifier(period, tag="timer", listener=None, queue_limit=None, name=None): """ Build a timer notifier thread. This thread (:class:`TimerThreadController`) sends notification messages to a `listener` thread (caller thread by default) with a given `period`. `tag` specifies the message tag. If `queue_limit` is not ``None``, sets the limit to how many notification messages can be in the queue at a given time. """ listener=listener or threadprop.current_controller(require_controller=True) # if isinstance(listener, py3.textstring): # listener=wait_for_thread_name(listener) def callback(): message_queue.send_message(listener,tag,on_broken="stop") if queue_limit is not None: threadprop.as_controller(listener,require_controller=True).limit_queue_length(tag,queue_limit) return TimerThreadController(period,callback,name=name)