Source code for pylablib.core.mthread.message_queue

from . import tag_queue, message, threadprop
from ..utils import general


[docs]class MessageQueue(object): """ Mostly a wrapper around :class:`.tag_queue.TaggedQueue`, with several routines specifically for dealing with messages. Args: owner: owner thread controller. """ def __init__(self, owner): object.__init__(self) self.queue=tag_queue.TaggedQueue() self.owner=owner
[docs] def limit_length(self, tag, length): """ Set length limit for a given tag. """ self.queue.limit_length(tag,length)
_message_sync_tag="sync.message"
[docs] @staticmethod def build_notifier(uid, sync, event_type, notification_controller): """ Create a notifier for a message with an ID `uid`. If `sync` is a tuple ``(tag, value)``, it specifies the notifier message parameters (see :func:`.message.build_notifier`). Otherwise, they are determined by the `uid`. Called from the sender thread. """ if isinstance(sync,tuple): tag,value=sync else: tag="{0}.{1}".format(MessageQueue._message_sync_tag,event_type) value=uid return message.build_notifier(tag,value,sync,notification_controller)
# Adding messages
[docs] @staticmethod def build_message(tag, value=None, priority=0, schedule_sync="wait", receive_sync="none", sender=None): """ Create a message. `tag` and `value` determine the message contents, `priority` is its priority for scheduling. `schedule_sync` and `receive_sync` specify synchronizers for scheduling and receiving this message (see :func:`.message.build_notifier`). `sender` is the sender thread controller (current controller by default). Called from a sender thread. """ sender=sender or threadprop.current_controller() msg=message.Message(tag,value,priority=priority,sender=sender) msg.schedule_sync=MessageQueue.build_notifier(msg.uid,schedule_sync,"schedule",sender) msg.receive_sync=MessageQueue.build_notifier(msg.uid,receive_sync,"receive",sender) return msg
[docs] def add_message(self, msg, sync=True): """ Add the message to the queue. If `sync` is ``True``, do the synchronization (wait for receiving and scheduling) after sending the message. Called from a sender thread. """ self.queue.put(msg) if sync: msg.sync() return msg
# Receiving messages def _read_process_loop(self, filt, interrupt_check, timeout, discard_on_timeout=False, discard_filt=None): countdown=general.Countdown(timeout) while True: msg=self.queue.get(filt,timeout=countdown.time_left(),discard_on_timeout=discard_on_timeout,discard_filt=discard_filt) if msg is not None: if not interrupt_check(msg): return msg else: return None
[docs] def exhaust_messages(self, filt, interrupt_check): """ Read and return (instantaneously) all available messages which satisfy the filter `filt`. `interrupt_check` is an interrupt filter function, which pre-processes the message and return ``True`` if it was an interrupt (in which case it's omitted in the output). Called from the receiver thread. """ recv_msg=[] while True: new_msg=self._read_process_loop(filt,interrupt_check,0) if new_msg is None: return recv_msg else: recv_msg.append(new_msg) return recv_msg
[docs] def wait_for_message(self, filt, interrupt_check, timeout=None, exhaust=False, discard_on_timeout=False, discard_filt=None): """ Wait for a message satisfying the filter `filt`. `interrupt_check` is an interrupt filter function, which pre-processes the message and return ``True`` if it was an interrupt (in which case the waiting continues). if `exhaust` is ``True``, returns list of all messages satisfying `filt`, if several of them are available immediately. if `discard_on_timeout` is ``True`` and the wait timed out, mark the message for discarding using `discard_filt` (see :meth:`.tag_queue.TaggedQueue.get`). Called from the receiver thread. """ if exhaust: msg=self.exhaust_messages(filt,interrupt_check) if len(msg)>0: return msg msg=self._read_process_loop(filt,interrupt_check,timeout,discard_on_timeout=discard_on_timeout,discard_filt=discard_filt) if exhaust: return [] if msg is None else [msg] else: return msg
# Clearing
[docs] def clear(self, notify_all=True, ignore_exceptions=True, mark_broken=True): """ Clear the queue. See :meth:`.tag_queue.TaggedQueue.clear` """ self.queue.clear(notify_all,ignore_exceptions,mark_broken)
[docs] def broken(self): """ Check if the queue is broken. See :meth:`.tag_queue.TaggedQueue.broken` """ return self.queue.broken()
[docs] def fix(self): """ Fix broken queue. See :meth:`.tag_queue.TaggedQueue.fix` """ return self.queue.fix()
### Send message to a thread ###
[docs]def send_message(dest, tag, value=None, priority=0, schedule_sync="wait", receive_sync="none", sync=True, on_broken="error"): """ Send a message to the thread `dest` from the current thread. See :meth:`MessageQueue.add_message`. """ try: dest=threadprop.as_controller(dest) return dest.send_message(tag,value,priority,schedule_sync,receive_sync,sync=sync,on_broken=on_broken) except threadprop.NoControllerThreadError as e: return threadprop.on_error(on_broken,e)
### Receive messages (only for threads with controllers) ###
[docs]def exhaust_messages(tags=None, filt=None): """ Exhaust messages for the current thread. See :meth:`MessageQueue.exhaust_messages`. """ return threadprop.current_controller(True).exhaust_messages(tags=tags,filt=filt)
[docs]def wait_for_message(tags=None, timeout=None, filt=None, exhaust=False, discard_on_timeout=False): """ Wait for a message for the current thread. See :meth:`MessageQueue.wait_for_message`. """ return threadprop.current_controller(True).wait_for_message(tags=tags,timeout=timeout,filt=filt,exhaust=exhaust,discard_on_timeout=discard_on_timeout)