Source code for pylablib.core.gui.qt.thread.signal_pool

from ....utils import observer_pool, py3
from . import callsync

import collections


def _as_name_list(lst):
    if lst is None:
        return None
    elif isinstance(lst,py3.textstring):
        return [lst]
    return lst
TSignal=collections.namedtuple("TSignal",["src","tag","value"])
[docs]class SignalPool(object): """ Signal dispatcher (somewhat similar in functionality to Qt signals). Manages dispatching signals between sources and destinations (callback functions). Each signal has defined source, destination (both can also be ``"all"`` or ``"any"``, see methods descriptions for details), tag and value. Any thread can send a signal or subscribe for a signal with given filters (source, destination, tag, additional filters). If a signal is emitted, it is checked against filters for all subscribers, and the passing ones are then called. """ def __init__(self): object.__init__(self) self._pool=observer_pool.ObserverPool() self._schedulers={}
[docs] def subscribe_nonsync(self, callback, srcs="any", dsts="any", tags=None, filt=None, priority=0, scheduler=None, id=None): """ Subscribe asynchronous callback to a signal. If signal is sent, `callback` is called from the sending thread (not subscribed thread). Therefore, should be used with care. In Qt, analogous to making signal connection with a direct call. Args: callback: callback function, which takes 3 arguments: signal source, signal tag, signal value. src(str or [str]): signal source or list of sources (controller names) to filter the subscription; can be ``"any"`` (any source) or ``"all"`` (only signals specifically having ``"all"`` as a source). src(str or [str]): signal destination or list of destinations (controller names) to filter the subscription; can be ``"any"`` (any destination) or ``"all"`` (only source specifically having ``"all"`` as a destination). tags: signal tag or list of tags to filter the subscription (any tag by default). filt(callable): additional filter function which takes 4 arguments: signal source, signal destination, signal tag, signal value, and checks whether signal passes the requirements. priority(int): subscription priority (higher priority subscribers are called first). scheduler: if defined, signal call gets scheduled using this scheduler instead of being called directly (which is the default behavior) id(int): subscription ID (by default, generate a new unique name). Returns: subscription ID, which can be used to unsubscribe later. """ srcs=_as_name_list(srcs) dsts=_as_name_list(dsts) tags=_as_name_list(tags) src_any="any" in srcs dst_any="any" in dsts def full_filt(tag, value): src,dst,tag=tag if (tags is not None) and (tag is not None) and (tag not in tags): return False if (not src_any) and (src!="all") and (src not in srcs): return False if (not dst_any) and (dst!="all") and (dst not in dsts): return False return filt(src,dst,tag,value) if (filt is not None) else True if scheduler is not None: _orig_callback=callback def schedule_call(*args, **kwargs): call=scheduler.build_call(_orig_callback,args,kwargs,sync_result=False) scheduler.schedule(call) callback=schedule_call id=self._pool.add_observer(callback,name=id,filt=full_filt,priority=priority,cacheable=(filt is None)) if scheduler is not None: self._schedulers[id]=scheduler return id
[docs] def subscribe(self, callback, srcs="any", dsts="any", tags=None, filt=None, priority=0, limit_queue=1, dest_controller=None, call_tag=None, add_call_info=False, id=None): """ Subscribe synchronous callback to a signal. If signal is sent, `callback` is called from the `dest_controller` thread (by default, thread which is calling this function) via the thread call mechanism (:meth:`.QThreadController.call_in_thread_callback`). In Qt, analogous to making signal connection with a queued call. Args: callback: callback function, which takes 3 arguments: signal source, signal tag, signal value. srcs(str or [str]): signal source name or list of source names to filter the subscription; can be ``"any"`` (any source) or ``"all"`` (only signals specifically having ``"all"`` as a source). dsts(str or [str]): signal destination name or list of destination names to filter the subscription; can be ``"any"`` (any destination) or ``"all"`` (only source specifically having ``"all"`` as a destination). tags: signal tag or list of tags to filter the subscription (any tag by default). filt(callable): additional filter function which takes 4 arguments: signal source, signal destination, signal tag, signal value, and checks whether signal passes the requirements. priority(int): subscription priority (higher priority subscribers are called first). limit_queue(int): limits the maximal number of scheduled calls (if the signal is sent while at least `limit_queue` callbacks are already in queue to be executed, ignore it) 0 or negative value means no limit (not recommended, as it can unrestrictedly bloat the queue) call_tag(str or None): tag used for the synchronized call; by default, use the interrupt call (which is the default of ``call_in_thread``). add_call_info(bool): if ``True``, add a fourth argument containing a call information (tuple with a single element, a timestamps of the call). id(int): subscription ID (by default, generate a new unique name). Returns: subscription ID, which can be used to unsubscribe later. """ scheduler=callsync.QSignalThreadCallScheduler(thread=dest_controller,limit_queue=limit_queue, tag=call_tag,call_info_argname="call_info" if add_call_info else None) return self.subscribe_nonsync(callback,srcs=srcs,dsts=dsts,tags=tags,filt=filt,priority=priority,scheduler=scheduler,id=id)
[docs] def unsubscribe(self, id): """Unsubscribe from a subscription with a given ID.""" self._pool.remove_observer(id) if id in self._schedulers: scheduler=self._schedulers.pop(id) scheduler.clear()
[docs] def signal(self, src, dst="any", tag=None, value=None): """ Send a signal. Args: src(str): signal source; can be a name, ``"all"`` (will pass all subscribers' source filters), or ``"any"`` (will only be passed to subscribers specifically subscribed to signal with ``"any"`` source). dst(str): signal destination; can be a name, ``"all"`` (will pass all subscribers' destination filters), or ``"any"`` (will only be passed to subscribers specifically subscribed to signal with ``"any"`` destination). tag(str): signal tag. value: signal value. """ to_call=self._pool.find_observers(TSignal(src,dst,tag),value) for _,obs in to_call: obs.callback(src,tag,value)