Source code for pylablib.core.gui.qt.thread.synchronizing

from . import threadprop
from ....mthread import notifier
from ....utils import general

import threading, time, collections


[docs]class QThreadNotifier(notifier.ISkippableNotifier): """ Wait-notify thread synchronizer for controlled Qt threads based on :class:`.notifier.ISkippableNotifier`. Like :class:`.notifier.ISkippableNotifier`, the main functions are :meth:`.ISkippableNotifier.wait` (wait in a message loop until notified or until timeout expires) and :meth:`.ISkippableNotifier.notify` (notify the waiting thread). Both of these can only be called once and will raise and error otherwise. Along with notifying a variable can be passed, which can be accessed using :meth:`get_value` and :meth:`get_value_sync`. Args: skippable (bool): if ``True``, allows for skippable wait events (if :meth:`.ISkippableNotifier.notify` is called before :meth:`.ISkippableNotifier.wait`, neither methods are actually called). """ _uid_gen=general.UIDGenerator(thread_safe=True) _notify_tag="#sync.notifier" def __init__(self, skippable=True): notifier.ISkippableNotifier.__init__(self,skippable=skippable) self._uid=None self.value=None def _pre_wait(self, *args, **kwargs): self._controller=threadprop.current_controller(require_controller=True) self._uid=self._uid_gen() return True def _do_wait(self, timeout=None): try: self._controller.wait_for_sync(self._notify_tag,self._uid,timeout=timeout) return True except threadprop.TimeoutThreadError: return False def _pre_notify(self, value=None): self.value=value def _do_notify(self, *args, **kwargs): self._controller.send_sync(self._notify_tag,self._uid) return True
[docs] def get_value(self): """Get the value passed by the notifier (doesn't check if it has been passed already)""" return self.value
[docs] def get_value_sync(self, timeout=None): """Wait (with the given `timeout`) for the value passed by the notifier""" if not self.done_wait(): self.wait(timeout=timeout) return self.get_value()
[docs]class QMultiThreadNotifier(object): """ Wait-notify thread synchronizer that can be used for multiple threads and called multiple times. Performs similar function to conditional variables. The synchronizer has an internal counter which is incread by 1 every time it is notified. The wait functions have an option to wait until the counter reaches the specific counter value (usually, 1 above the last wait call). """ def __init__(self): object.__init__(self) self._lock=threading.Lock() self._cnt=0 self._notifiers={}
[docs] def wait(self, state=1, timeout=None): """ Wait until notifier counter is equal to at least `state` Return current counter state plus 1, which is the next smallest value resulting in waiting. """ with self._lock: if self._cnt>=state: return self._cnt+1 n=QThreadNotifier() self._notifiers.setdefault(state,[]).append(n) success=n.wait(timeout=timeout) if success: return n.get_value() raise threadprop.TimeoutThreadError("synchronizer timed out")
[docs] def wait_until(self, condition, timeout=None): """ Wait until `condition` is met. `condition` is a function which is called (in the waiting thread) every time the synchronizer is notified. If it return non-``False``, the waiting is complete and its result is returned. """ ctd=general.Countdown(timeout) cnt=1 while True: res=condition() if res: return res cnt=self.wait(cnt,timeout=ctd.time_left())
[docs] def notify(self): """Notify all waiting threads""" with self._lock: self._cnt+=1 cnt=self._cnt notifiers=[] for k in list(self._notifiers): if k<=self._cnt: notifiers+=self._notifiers.pop(k) for n in notifiers: n.notify(cnt)