Source code for pylablib.core.gui.qt.thread.callsync

from . import threadprop
from .synchronizing import QThreadNotifier, QMultiThreadNotifier
from ....utils import general, funcargparse

import threading, time, collections

_depends_local=[".synchronizing"]

[docs]class QCallResultSynchronizer(QThreadNotifier):
[docs] def get_progress(self): """ Get the progress of the call execution. Can be ``"waiting"`` (call is not done executing), ``"done"`` (call done successfully), ``"fail"`` (call failed, probably due to thread being stopped), ``"skip"`` (call was skipped), or ``"exception"`` (call raised an exception). """ value=self.value if value is None: return "waiting" tag=value[0] if tag=="result": return "done" return tag
[docs] def skipped(self): """Check if the call was skipped""" return self.get_progress()=="skip"
[docs] def failed(self): """Check if the call failed""" return self.get_progress()=="fail"
[docs] def get_value_sync(self, timeout=None, default=None, error_on_fail=True, error_on_skip=True, pass_exception=True): """ Wait (with the given `timeout`) for the value passed by the notifier If ``error_on_fail==True`` and the controlled thread notifies of a fail (usually, if it's stopped before it executed the call), raise :exc:`.qt.thread.threadprop.NoControllerThreadError`; otherwise, return `default`. If ``error_on_skip==True`` and the call was skipped (e.g., due to full call queue), raise :exc:`.qt.thread.threadprop.SkippedCallError`; otherwise, return `default`. If ``pass_exception==True`` and the returned value represents exception, re-raise it in the caller thread; otherwise, return `default`. """ res=QThreadNotifier.get_value_sync(self,timeout=timeout) if res is not None: kind,value=res if kind=="result": return value elif kind=="exception": if pass_exception: raise value else: return default elif kind=="skip": if error_on_skip: raise threadprop.SkippedCallError() return default elif kind=="fail": if error_on_fail: raise threadprop.NoControllerThreadError("failed executing remote call: controller is stopped") return default else: raise ValueError("unrecognized return value kind: {}".format(kind)) else: if error_on_fail: raise threadprop.TimeoutThreadError return default
[docs]class QDummyResultSynchronizer(object): """Dummy result synchronizer for call which don't require result synchronization (e.g., signals)"""
[docs] def notify(self, value): pass
[docs]class QScheduledCall(object): """ Object representing a scheduled remote call. Args: func: callable to be invoked in the destination thread args: arguments to be passed to `func` kwargs: keyword arguments to be passed to `func` result_synchronizer: result synchronizer object; can be ``None`` (create new :class:`QCallResultSynchronizer`), ``"async"`` (no result synchronization), or a :class:`QCallResultSynchronizer` object. """ TCallback=collections.namedtuple("TCallback",["func","pass_result","call_on_fail"]) def __init__(self, func, args=None, kwargs=None, result_synchronizer=None): object.__init__(self) self.func=func self.args=args or [] self.kwargs=kwargs or {} if result_synchronizer=="async": result_synchronizer=QDummyResultSynchronizer() elif result_synchronizer is None: result_synchronizer=QCallResultSynchronizer() self.result_synchronizer=result_synchronizer self.callbacks=[] self._notified=[0] # hack to avoid use of locks ([0] is False, [] is True, use .pop() to atomically check and change) def _check_notified(self): try: self._notified.pop() return False except IndexError: return True def __call__(self): if self._check_notified(): return try: res=("fail",None) res=("result",self.func(*self.args,**self.kwargs)) except Exception as e: res=("exception",e) raise finally: for c in self.callbacks: if c.call_on_fail or res[0]=="result": if c.pass_result: c.func(res[1] if res[0]=="result" else None) else: c.func() self.result_synchronizer.notify(res)
[docs] def add_callback(self, callback, pass_result=True, call_on_fail=False, position=None): """ Set the callback to be executed after the main call is done. Callback is not provided with any arguments. If ``pass_result==True``, pass function result to the callback (or ``None`` if call failed); otherwise, pass no arguments. If ``callback_on_fail==True``, call it even if the original call raised an exception. `position` specifies callback position in the call list (by default, end of the list). """ cb=self.TCallback(callback,pass_result,call_on_fail) if position is None: self.callbacks.append(cb) else: self.callbacks.insert(position,cb)
[docs] def fail(self): """Notify that the call is failed (invoked by the destination thread)""" if self._check_notified(): return self.result_synchronizer.notify(("fail",None))
[docs] def skip(self): """Notify that the call is skipped (invoked by the destination thread)""" if self._check_notified(): return self.result_synchronizer.notify(("skip",None))
### Call schedulers ### TDefaultCallInfo=collections.namedtuple("TDefaultCallInfo",["call_time"])
[docs]class QScheduler(object): """ Generic call scheduler. Two methods are used by the external scheduling routines: :meth:`build_call` to create a :class:`QScheduledCall` with appropriate parameters, and :meth:`schedule`, which takes a call and schedules it. The :meth:`schedule` method should return ``True`` if the scheduling was successfull (at least, for now), and ``False`` otherwise. Args: call_info_argname: if not ``None``, supplies a name of a keyword argument via which call info (generated by :meth:`build_call_info`) is passed on function call """ def __init__(self, call_info_argname=None): object.__init__(self) self.call_info_argname=call_info_argname
[docs] def build_call_info(self): """Build call info tuple which can be passed to scheduled calls""" return TDefaultCallInfo(time.time())
[docs] def build_call(self, func, args=None, kwargs=None, callback=None, pass_result=True, callback_on_fail=True, sync_result=True): """ Build :class:`QScheduledCall` for subsequent scheduling. Args: func: function to be called args: arguments to be passed to `func` kwargs: keyword arguments to be passed to `func` callback: optional callback to be called when `func` is done pass_result (bool): if ``True``, pass `func` result as a single argument to the callback; otherwise, give no arguments callback_on_fail (bool): if ``True``, execute the callback on call fail or skip (if it requires an argument, ``None`` is supplied); otherwise, only execute it if the call was successfull sync_result: if ``True``, the call has a default result synchronizer; otherwise, no synchronization is made. """ result_synchronizer=None if sync_result else "async" scheduled_call=QScheduledCall(func,args,kwargs,result_synchronizer=result_synchronizer) if self.call_info_argname: scheduled_call.kwargs[self.call_info_argname]=self.build_call_info() if callback is not None: scheduled_call.add_callback(callback,pass_result=pass_result,call_on_fail=callback_on_fail) return scheduled_call
[docs] def schedule(self, call): """Schedule the call""" return False
[docs] def clear(self): """Clear the scheduler""" pass
[docs]class QDirectCallScheduler(QScheduler): """ Simplest call scheduler: directly executes the calls on scheduling Args: call_info_argname: if not ``None``, supplies a name of a keyword argument via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call """
[docs] def build_call(self, func, args=None, kwargs=None, callback=None, pass_result=True, callback_on_fail=True, sync_result=False): return QScheduler.build_call(self,func,args=args,kwargs=kwargs, callback=callback,pass_result=pass_result,callback_on_fail=callback_on_fail,sync_result=sync_result)
[docs] def schedule(self, call): call() return True
[docs]class QQueueScheduler(QScheduler): """ Call scheduler with a builtin call queue. Supports placing the calls and retrieving them (from the destination thread). Has ability to skip some calls if, e.g., the queue is too full. Whether the call should be skipped is determined by :meth:`can_schedule` (should be overloaded in subclasses). Used as a default command scheduler. Args: on_full_queue: action to be taken if the call can't be scheduled (i.e., :meth:`can_schedule` returns ``False``); can be ``"skip_current"`` (skip the call which is being scheduled), ``"skip_newest"`` (skip the most recent call; place the current) ``"skip_oldest"`` (skip the oldest call in the queue; place the current), ``"wait"`` (wait until the call can be scheduled, which is checked after every call removal from the queue; place the call), or ``"call"`` (execute the call directly in the calling thread; should be used with caution). call_info_argname: if not ``None``, supplies a name of a keyword argument via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call Methods to overload: ``can_schedule``: check if the call can be scheduled ``call_added``: called when a new call has been added to the queue ``call_popped``: called when a call has been removed from the queue (either for execution, or for skipping) """ def __init__(self, on_full_queue="current", call_info_argname=None): QScheduler.__init__(self,call_info_argname=call_info_argname) self.call_queue=collections.deque() funcargparse.check_parameter_range(on_full_queue,"on_full_queue",{"skip_current","skip_newest","skip_oldest","call","wait"}) self.on_full_queue=on_full_queue self.lock=threading.Lock() self.call_popped_notifier=QMultiThreadNotifier() if on_full_queue=="wait" else None self.working=True
[docs] def can_schedule(self, call): """Check if the call can be scheduled""" return True
[docs] def call_added(self, call): """Called whenever `call` has been added to the queue""" pass
[docs] def call_popped(self, call, head): """ Called whenever `call` has been removed from the queue `head` determines whether the call has been removed from the queue head, or from the queue tail. """ pass
def _add_call(self, call): self.call_queue.append(call) self.call_added(call) def _pop_call(self, head=False): try: call=self.call_queue.popleft() if head else self.call_queue.pop() if self.call_popped_notifier is not None: self.call_popped_notifier.notify() self.call_popped(call,head) return call except IndexError: return None
[docs] def schedule(self, call): """Schedule a call""" if not self.working: call.fail() return if self.on_full_queue=="wait": while True: with self.lock: if self.can_schedule(call): self._add_call(call) return True elif not self.working: return wait_n=self.call_popped_notifier.wait(-1) self.call_popped_notifier.wait(wait_n) scheduled=True skipped_call=None with self.lock: if self.can_schedule(call): self._add_call(call) elif self.on_full_queue=="skip_newest": skipped_call=self._pop_call() self._add_call(call) elif self.on_full_queue=="skip_oldest": skipped_call=self._pop_call(head=True) self._add_call(call) elif self.on_full_queue=="skip_current": skipped_call=call scheduled=False else: scheduled=False if skipped_call is not None: skipped_call.skip() if self.on_full_queue=="call" and not scheduled: call() scheduled=True return scheduled
[docs] def pop_call(self): """ Pop the call from the queue head. If the queue is empty, return ``None`` """ with self.lock: return self._pop_call(head=True)
[docs] def has_calls(self): """Check if there are queued calls""" return bool(self.call_queue)
[docs] def clear(self, close=True): """ Clear the call queue. If ``close==True``, mark the queue as closed (any attempt to schedule more calls fails automatically) and fail all calls in the queue; otherwise, skip all calls currently in the queue. """ if close: self.working=False with self.lock: all_calls=[] c=self._pop_call(head=True) while c is not None: all_calls.append(c) c=self._pop_call() for c in all_calls: if close: c.fail() else: c.skip() if self.call_popped_notifier is not None: self.call_popped_notifier.notify()
[docs]class QQueueLengthLimitScheduler(QQueueScheduler): """ Queued call scheduler with a length limit. Args: max_len: maximal queue length; non-positive values are interpreted as no limit on_full_queue: action to be taken if the call can't be scheduled (the queue is full); can be ``"skip_current"`` (skip the call which is being scheduled), ``"skip_newest"`` (skip the most recent call; place the current) ``"skip_oldest"`` (skip the oldest call in the queue; place the current), ``"wait"`` (wait until the queue has space; place the call), or ``"call"`` (execute the call directly in the calling thread; should be used with caution). call_info_argname: if not ``None``, supplies a name of a keyword argument via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call """ def __init__(self, max_len=1, on_full_queue="skip_current", call_info_argname=None): QQueueScheduler.__init__(self,on_full_queue=on_full_queue,call_info_argname=call_info_argname) self.max_len=max_len
[docs] def change_max_len(self, max_len): """Change maximal length of the call queue (doesn't affect already scheduled calls)""" self.max_len=max_len
[docs] def get_current_len(self): """Get current number of calls in the queue""" return len(self.call_queue)
[docs] def can_schedule(self, call): return self.max_len<=0 or len(self.call_queue)<self.max_len
[docs]class QQueueSizeLimitScheduler(QQueueScheduler): """ Queued call scheduler with a generic size limit; similar to :class:`QQueueLengthLimitScheduler`, but more flexible and can implement more restrictions (e.g., queue length and arguments RAM size). Args: max_size: maximal total size of the arguments; can be either a single number, or a tuple (if several different size metrics are involved); non-positive values are interpreted as no limit size_calc: function that takes a single argument (call to be placed) and returns its size; can be either a single number, or a tuple (if several different size metrics are involved); by default, simply returns 1, which makes the scheduler behavior identical to :class:`QQueueLengthLimitScheduler` on_full_queue: action to be taken if the call can't be scheduled (the queue is full); can be ``"skip_current"`` (skip the call which is being scheduled), ``"skip_newest"`` (skip the most recent call; place the current) ``"skip_oldest"`` (skip the oldest call in the queue; place the current), ``"wait"`` (wait until the queue has space; place the call), or ``"call"`` (execute the call directly in the calling thread; should be used with caution). call_info_argname: if not ``None``, supplies a name of a keyword argument via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call """ def __init__(self, max_size=1, size_calc=None, on_full_queue="skip_current", call_info_argname=None): QQueueScheduler.__init__(self,on_full_queue=on_full_queue,call_info_argname=call_info_argname) self.max_size=funcargparse.as_sequence(max_size) self.size_queues=tuple([[] for _ in self.max_size]) self.size_calc=size_calc
[docs] def change_max_size(self, max_size): """Change size restrictions""" self.max_size=funcargparse.as_sequence(max_size)
[docs] def get_current_size(self): """Get current size metrics""" return tuple([sum(q) for q in self.size_queues])
def _get_size(self, call): if self.size_calc is not None: return self.size_calc(call) return 1
[docs] def call_added(self, call): size=funcargparse.as_sequence(self._get_size(call)) for s,q in zip(size,self.size_queues): q.append(s)
[docs] def call_popped(self, call, head): for q in self.size_queues: q.pop(0 if head else -1)
[docs] def can_schedule(self, call): for ms,q in zip(self.max_size,self.size_queues): if ms>0 and sum(q)>=ms: return False return True
[docs]class QThreadCallScheduler(QScheduler): """ Call scheduler via thread calls (:meth:`.QThreadController.call_in_thread_callback`) Args: thread: destination thread (by default, thread which creates the scheduler) tag: if supplied, send the call in a message with the given tag; otherwise, use the interrupt call (generally, higher priority method). priority: message priority (only when `tag` is not ``None``) call_info_argname: if not ``None``, supplies a name of a keyword argument via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call """ def __init__(self, thread=None, tag=None, priority=0, call_info_argname=None): QScheduler.__init__(self,call_info_argname=call_info_argname) self.thread=thread or threadprop.current_controller() self.tag=tag self.priority=priority
[docs] def schedule(self, call): self.thread._place_call(call,tag=self.tag,priority=self.priority) return True
[docs]class QSignalThreadCallScheduler(QThreadCallScheduler): """ Extended call scheduler via thread calls, which can limit number of queued calls. Args: thread: destination thread (by default, thread which creates the scheduler) limit_queue: call queue limit (non-positive numbers are interpreted as no limit) tag: if supplied, send the call in a message with the given tag; otherwise, use the interrupt call (generally, higher priority method). priority: message priority (only when `tag` is not ``None``) call_info_argname: if not ``None``, supplies a name of a keyword argument via which call info (generated by :meth:`QScheduler.build_call_info`) is passed on function call """ def __init__(self, thread=None, limit_queue=1, tag=None, priority=0, call_info_argname=None): QThreadCallScheduler.__init__(self,thread,tag=tag,priority=priority,call_info_argname=call_info_argname) self.limit_queue=limit_queue self.queue_cnt=0 self.queue_cnt_lock=threading.Lock() def _call_done(self): with self.queue_cnt_lock: self.queue_cnt-=1
[docs] def schedule(self, call): if self.limit_queue<=0 or self.queue_cnt<self.limit_queue: with self.queue_cnt_lock: self.queue_cnt+=1 call.add_callback(self._call_done,pass_result=False,call_on_fail=True,position=0) return QThreadCallScheduler.schedule(self,call) else: call.skip() return False