pylablib.core.mthread package

Submodules

pylablib.core.mthread.controller module

class pylablib.core.mthread.controller.IThreadController(name=None)[source]

Bases: object

Generic thread controller.

Deals with correctly initializing and destroying the message queue, processing standard messages, and synchronizing with other threads.

Parameters:name (str) – thread name (can be used to, e.g., get the controller from a different thread).
limit_queue_length(tag, length)[source]

Set length limit for a given tag.

process_interrupt(msg)[source]

Process interrupt message (automatically called for all messages with tag starting with "interrupt").

Automatically called by the controller; to be overridden in subclasses.

process_message(_)[source]

Instant message processing.

If return value is True, the message is assumed to be processed internally (i.e., it doesn’t get explicitly received).

Automatically called by the controller; to be overridden in subclasses.

exhaust_messages(tags=None, filt=None)[source]

Read and return (instantaneously) all available messages which satisfy the filter filt.

Called from the controlled thread.

wait_for_message(tags=None, timeout=None, filt=None, exhaust=False, discard_on_timeout=False)[source]

Wait for a message with given tags and satisfying the filter filt.

if exhaust is True, returns list of all messages satisfying filt, if several of them are available immediately. if discard_on_timeout is True and the wait timed out, mark the message for discarding using filt (see tag_queue.TaggedQueue.get()).

Called from the controlled thread.

check_interrupt()[source]

Check for interrupt messages.

Useful to insert in the middle of computationally-heavy code with no synchronization, to respond to interrupts from other threads (e.g., stopping requests).

Called from the controlled thread.

sleep(delay)[source]

Sleep while still receiving interrupts.

Called from the controlled thread.

run()[source]

Body of the thread.

Automatically called by the controller; to be overridden in subclasses.

finalize()[source]

Finalize the thread execution (regardless of the stopping reason).

Automatically called by the controller; to be overridden in subclasses.

add_message(msg, sync=True, on_broken='error')[source]

Add the message to the queue.

If sync is True, do the synchronization (wait for receiving and scheduling) after sending the message. on_broken decides what happens if thethread is stopped or hasn’t started yet (see threadprop.on_error()).

Called from any thread.

add_new_message(tag, value=None, priority=0, schedule_sync='wait', receive_sync='none', sync=True, timeout=None, on_broken='error')[source]

Create a new message, add it to the thread’s queue, and return it.

If sync is True, do the synchronization (wait for receiving and scheduling) after sending the message. on_broken decides what happens if thethread is stopped or hasn’t started yet (see threadprop.on_error()).

Called from any thread.

send_message(tag, value=None, priority=0, schedule_sync='wait', receive_sync='none', sync=True, timeout=None, on_broken='error')

Create a new message, add it to the thread’s queue, and return it.

If sync is True, do the synchronization (wait for receiving and scheduling) after sending the message. on_broken decides what happens if thethread is stopped or hasn’t started yet (see threadprop.on_error()).

Called from any thread.

start(as_dependent=False, as_daemon=False)[source]

Start the controller.

if as_dependent is True, the new thread becomes dependent on the caller thread (it stops when the caller thread stops). if as_deamon is True, the new thread becomes a daemon (if only daemon threads are running, they get stopped).

start_continuing(stop_after_run=True)[source]

Start the current controller in the current non-controlled thread.

If stop_after_run is True, the controller is stopped after the run() function is done; otherwise, th controller continues (e.g., run() can be empty, which means that this function simply initializes the controller).

interrupt(subclass, value, sync=True, priority=0, timeout=None, on_broken='error')[source]

Send an interrupt with the given subclass and value.

If sync is True, wait until the interrupt is received (with the given timeout). on_broken decides what happens if thethread is stopped or hasn’t started yet (see threadprop.on_error()).

Called from any thread.

call_from_thread(func, args=None, kwargs=None, sync_recv=True, sync_done=False, as_interrupt=False, priority=0, on_broken='error')[source]

Call a function func with the arguments (args and kwargs) in in this controller thread.

Called from any thread.

stop(sync=True)[source]

Stop the thread.

If called from the current thread, stop self. If called from a different thread, send a stop interrupt. In this case, if sync is True, wait until the thread received the message.

sync(point='interrupt', timeout=None, on_broken='error')[source]

Synchronize with the thread.

point determines where the synchronization happens. Can be either "interrupt" (sync on any interrupt), "start" (synchronize with the thread after its start), or "stop" (synchronize with the thread after its stop).

Called from a non-controlled thread.

add_dependent_thread(dependent=None)[source]

Add a dependent thread (caller’s thread by default) to this controller.

A dependent thread is automatically stopped after this thread is stopped.

Called from a non-controlled thread.

set_as_dependent()[source]

Set this thread as a dependent for the caller thread.

A dependent thread is automatically stopped after the caller thread is stopped.

Called from a non-controlled thread.

current_stage()[source]

Return current stage of the process.

Can have following values:
  • "created": thread is created, but not started
  • "starting": thread is starting, but is not running yet (notifying waiting threads)
  • "running": thread is executing its run code
  • "stopping": thread has done running and is currently stopping (notifying waiting threads, cleaning up dependent threads and daemons)
  • "cleaning": cleaning the message queue; communication is impossible at this point
  • "broken": thread is finished executing

Called from any thread.

passed_stage(stage)[source]

Check if the thread passed the given stage.

For stage description, see current_stage().

Called from any thread.

running()[source]

Check if the thread is running,

Called from any thread.

is_daemon()[source]

Check if the thread is daemon,

Called from any thread.

pylablib.core.mthread.controller.wait_for_thread_name(name)[source]

Wait until a thread with the given name starts.

class pylablib.core.mthread.controller.SimpleThreadController(name, job, cleanup=None, args=None, kwargs=None, self_as_arg=False)[source]

Bases: pylablib.core.mthread.controller.IThreadController

Simple thread.

Runs a single task, with a possible cleanup after the end.

Parameters:
  • name (str) – thread name (can be used to, e.g., get the controller from a different thread).
  • job (callable) – function to be executed in the thread.
  • cleanup (callable) – if not None, function to be called when the thread is stopped (regardless of the stopping reason).
  • args (list) – arguments for job and cleanup functions.
  • kwargs (dict) – keyword arguments for job and cleanup functions.
  • self_as_arg (bool) – if True, pass this controller as a first argument to the job and cleanup functions.
run()[source]

Body of the thread.

Automatically called by the controller; to be overridden in subclasses.

finalize()[source]

Finalize the thread execution (regardless of the stopping reason).

Automatically called by the controller; to be overridden in subclasses.

class pylablib.core.mthread.controller.ServiceThreadController(name, reply, setup=None, cleanup=None, args=None, kwargs=None, stopped_recipient_action='ignore')[source]

Bases: pylablib.core.mthread.controller.IThreadController

Service thread.

Receives and processes messages, and replies using a reply function.

Parameters:
  • name (str) – thread name (can be used to, e.g., get the controller from a different thread).
  • reply (callable) – message processing function; if it returns a tuple, interpret it as tag and value for a reply message.
  • setup (callable) – if not None, function to be called when the thread is starting.
  • cleanup (callable) – if not None, function to be called when the thread is stopped (regardless of the stopping reason).
  • args (list) – arguments for reply, startup and cleanup functions.
  • kwargs (dict) – keyword arguments for reply, startup and cleanup functions.
  • stopped_recipient_action (str) – action to take if the reply recipient has stopped; can be "error" (raise an error), "stop" (stop the thread; similar to th previous) or "ignore" (ignore and continue).
process_request(tag, value)[source]
run()[source]

Body of the thread.

Automatically called by the controller; to be overridden in subclasses.

finalize()[source]

Finalize the thread execution (regardless of the stopping reason).

Automatically called by the controller; to be overridden in subclasses.

class pylablib.core.mthread.controller.RepeatingThreadController(name, job, delay=0, setup=None, cleanup=None, args=None, kwargs=None, self_as_arg=False)[source]

Bases: pylablib.core.mthread.controller.IThreadController

Recurring task thread.

Periodically repeats a single function.

Parameters:
  • name (str) – thread name (can be used to, e.g., get the controller from a different thread).
  • job (callable) – periodically called function.
  • delay (float) – calling period.
  • setup (callable) – if not None, function to be called when the thread is starting.
  • cleanup (callable) – if not None, function to be called when the thread is stopped (regardless of the stopping reason).
  • args (list) – arguments for job, startup and cleanup functions.
  • kwargs (dict) – keyword arguments for job, startup and cleanup functions.
  • self_as_arg (bool) – if True, pass this controller as a first argument to the job and cleanup functions.
execute()[source]
process_message(msg)[source]

Instant message processing.

If return value is True, the message is assumed to be processed internally (i.e., it doesn’t get explicitly received).

Automatically called by the controller; to be overridden in subclasses.

run()[source]

Body of the thread.

Automatically called by the controller; to be overridden in subclasses.

finalize()[source]

Finalize the thread execution (regardless of the stopping reason).

Automatically called by the controller; to be overridden in subclasses.

control(value, sync=True, priority=0)[source]

Send a control signal to the thread.

If sync is True, wait until the signal is received before continuing.

Called from a non-controlled thread.

pause(do_pause=True, sync=True)[source]

Pause or resume the thread (depending on do_pause value).

resume(sync=True)[source]

Resume the thread execution if it’s paused.

trigger(sync=True)[source]

Trigger an execution cycle immediately (without waiting for the required delay).

The execution is only performed if the thread is not paused.

single(sync=True)[source]

Trigger a single execution cycle and pause afterwards.

sync(point='waiting')[source]

Synchronize with the thread.

point determines where the synchronization happens. Can be either "interrupt" (sync on any interrupt), "start" (synchronize with the thread after its start), or "stop" (synchronize with the thread after its stop).

Called from a non-controlled thread.

set_delay(delay)[source]

Set the repetition delay.

start(as_dependent=False, as_daemon=False, paused=False, skip_first=None)[source]

Start the thread.

if as_dependent is True, the new thread becomes dependent on the caller thread (it stops when the caller thread stops). if as_deamon is True, the new thread becomes a daemon (if only daemon threads are running, they get stopped). If paused is True, the thread starts in a paused state (but it will still execute the first cycle, unless skip_first is True). If skip_first is True, skip the first cycle execution (by default True if paused==True and False otherwise).

class pylablib.core.mthread.controller.MultiRepeatingThreadController(name, setup=None, cleanup=None, args=None, kwargs=None, self_as_arg=False)[source]

Bases: pylablib.core.mthread.controller.IThreadController

add_job(name, job, period)[source]
run()[source]

Body of the thread.

Automatically called by the controller; to be overridden in subclasses.

finalize()[source]

Finalize the thread execution (regardless of the stopping reason).

Automatically called by the controller; to be overridden in subclasses.

class pylablib.core.mthread.controller.TimerThreadController(period, callback, setup=None, cleanup=None, name=None)[source]

Bases: pylablib.core.mthread.controller.RepeatingThreadController

Timer thread.

Simplified version of the RepeatingThreadController. Doesn’t require a name, starts as a dependent and a daemon by default.

Parameters:
  • period (float) – calling period.
  • callback (callable) – periodically called function.
  • setup (callable) – if not None, function to be called when the thread is starting.
  • cleanup (callable) – if not None, function to be called when the thread is stopped (regardless of the stopping reason).
  • name (str) – thread name (can be used to, e.g., get the controller from a different thread). By default, a unique identifier.
start(as_dependent=True, as_daemon=True, skip_first=None, single=False)[source]

Start the thread.

if as_dependent is True, the new thread becomes dependent on the caller thread (it stops when the caller thread stops). if as_deamon is True, the new thread becomes a daemon (if only daemon threads are running, they get stopped). If skip_first is True, skip the first cycle execution. If single is True, start in a single mode (only execute once). In combination with skip_first=True, performs one callback function call after the period delay.

pylablib.core.mthread.controller.timer_message_notifier(period, tag='timer', listener=None, queue_limit=None, name=None)[source]

Build a timer notifier thread.

This thread (TimerThreadController) sends notification messages to a listener thread (caller thread by default) with a given period. tag specifies the message tag. If queue_limit is not None, sets the limit to how many notification messages can be in the queue at a given time.

pylablib.core.mthread.message module

class pylablib.core.mthread.message.CallNotifier(wait=None, notify=None, skippable=False)[source]

Bases: pylablib.core.mthread.notifier.ISkippableNotifier

Wrapper for notifier.ISkippableNotifier, with external functions provided for _do_wait and _do_notify methods.

Parameters:
pylablib.core.mthread.message.build_notifier(note_tag, note_value, sync, notification_controller)[source]

Build a notifier object.

sync can be:
  • a callable object, in which case it is called as a notifier (waiting is absent);

  • a tuple (note_tag, note_value), in which case it is interpreted as a "message" notifier with the corresponding tags (see below),

    while the note_tag and note_value arguments are ignored;

  • a string, in which case it determines a synchronization primitive type. Possible types are:
    • "none": ‘dummy’ synchronizer (no waiting, return immediately);
    • "wait_event": standard wait-notify pattern (wait() call waits until the notify() is called from a different thread).
      Waiting is implemented using the standard python threading.Event primitive (completely synchronous, can’t be interrupted; should be used carefully);
    • "wait": standard wait-notify pattern (wait() call waits until the notify() is called from a different thread).
      Waiting is implemented using the thread message queue; (synchronous, but still responds to interrupts);
    • "message": send notifying message, but don’t do any waiting (asynchronous)

    note_tag and note_value arguments are used for "wait" and "message" synchronizers

notification_controller is a thread controller for the thread to be waiting/notified using this primitive. If it’s a no_thread_controller, sync types are coerced: "wait" is interpreted as "wait_even", and "message" is interpreted as "none".

class pylablib.core.mthread.message.Message(tag='', value=None, priority=0, sender=None, schedule_sync=None, receive_sync=None)[source]

Bases: object

A message object.

Parameters:
  • tags (str) – message tag (used for control in the message queue).
  • value – message value (if appropriate).
  • priority (int) – message priority (standard is 0).
  • sender – sender controller.
  • schedule_sync (CallNotifier) – object which is notified when the message is scheduled (None means no notifier).
  • receive_sync (CallNotifier) – object which is notified when the message is received (None means no notifier).
scheduled()[source]

Notify the message of being scheduled.

Called internally by the message queue.

received()[source]

Notify the message of being received.

Called internally by the message queue.

sync()[source]

Wait until this message is scheduled and received by the target thread.

pylablib.core.mthread.message_queue module

class pylablib.core.mthread.message_queue.MessageQueue(owner)[source]

Bases: object

Mostly a wrapper around tag_queue.TaggedQueue, with several routines specifically for dealing with messages.

Parameters:owner – owner thread controller.
limit_length(tag, length)[source]

Set length limit for a given tag.

static build_notifier(uid, sync, event_type, notification_controller)[source]

Create a notifier for a message with an ID uid.

If sync is a tuple (tag, value), it specifies the notifier message parameters (see message.build_notifier()). Otherwise, they are determined by the uid.

Called from the sender thread.

static build_message(tag, value=None, priority=0, schedule_sync='wait', receive_sync='none', sender=None)[source]

Create a message.

tag and value determine the message contents, priority is its priority for scheduling. schedule_sync and receive_sync specify synchronizers for scheduling and receiving this message (see message.build_notifier()). sender is the sender thread controller (current controller by default).

Called from a sender thread.

add_message(msg, sync=True)[source]

Add the message to the queue.

If sync is True, do the synchronization (wait for receiving and scheduling) after sending the message.

Called from a sender thread.

exhaust_messages(filt, interrupt_check)[source]

Read and return (instantaneously) all available messages which satisfy the filter filt.

interrupt_check is an interrupt filter function, which pre-processes the message and return True if it was an interrupt (in which case it’s omitted in the output).

Called from the receiver thread.

wait_for_message(filt, interrupt_check, timeout=None, exhaust=False, discard_on_timeout=False, discard_filt=None)[source]

Wait for a message satisfying the filter filt.

interrupt_check is an interrupt filter function, which pre-processes the message and return True if it was an interrupt (in which case the waiting continues). if exhaust is True, returns list of all messages satisfying filt, if several of them are available immediately. if discard_on_timeout is True and the wait timed out, mark the message for discarding using discard_filt (see tag_queue.TaggedQueue.get()).

Called from the receiver thread.

clear(notify_all=True, ignore_exceptions=True, mark_broken=True)[source]

Clear the queue.

See tag_queue.TaggedQueue.clear()

broken()[source]

Check if the queue is broken.

See tag_queue.TaggedQueue.broken()

fix()[source]

Fix broken queue.

See tag_queue.TaggedQueue.fix()

pylablib.core.mthread.message_queue.send_message(dest, tag, value=None, priority=0, schedule_sync='wait', receive_sync='none', sync=True, on_broken='error')[source]

Send a message to the thread dest from the current thread.

See MessageQueue.add_message().

pylablib.core.mthread.message_queue.exhaust_messages(tags=None, filt=None)[source]

Exhaust messages for the current thread.

See MessageQueue.exhaust_messages().

pylablib.core.mthread.message_queue.wait_for_message(tags=None, timeout=None, filt=None, exhaust=False, discard_on_timeout=False)[source]

Wait for a message for the current thread.

See MessageQueue.wait_for_message().

pylablib.core.mthread.notifier module

class pylablib.core.mthread.notifier.ISkippableNotifier(skippable=False)[source]

Bases: object

Generic skippable notifier.

The main methods are wait() (wait until the event happend) and notify() (notify that the event happend). Only calls underlying waiting and notifying methods once, duplicate calls are ignored.

Parameters:skippable (bool) – if True, allows for skippable wait events (if notify() is called before wait(), neither methods are actually called).
wait(*args, **kwargs)[source]

Wait for the notification.

Can only be called once per notifier lifetime. If the notifier allows skipping, and this method is called after notify(), return immediately.

notify(*args, **kwargs)[source]

Notify the waiting process.

Can only be called once per notifier lifetime. If the notifier allows skipping, and this method is called before wait(), return immediately.

waiting()[source]

Check if waiting is in progress.

done_wait()[source]

Check if waiting is done.

success_wait()[source]

Check if waiting is done successfully.

done_notify()[source]

Check if notifying is done.

waiting_state()[source]
notifying_state()[source]

pylablib.core.mthread.sync_primitives module

class pylablib.core.mthread.sync_primitives.ValueSynchronizer(sync=True, note=None, receiver=None)[source]

Bases: pylablib.core.mthread.notifier.ISkippableNotifier

wait(timeout=None)[source]

Wait for the notification.

Can only be called once per notifier lifetime. If the notifier allows skipping, and this method is called after notify(), return immediately.

notify(value=None)[source]

Notify the waiting process.

Can only be called once per notifier lifetime. If the notifier allows skipping, and this method is called before wait(), return immediately.

get_value()[source]
uid()[source]
class pylablib.core.mthread.sync_primitives.SyncCall(func, args=None, kwargs=None, sync=True, note=None)[source]

Bases: object

value(sync=True, timeout=None, default=None)[source]
wait(timeout=None)[source]
done()[source]
class pylablib.core.mthread.sync_primitives.BasicSynchronizer(receiver=None, blocking_sync=False)[source]

Bases: object

wait(timeout=None)[source]
notify()[source]
uid()[source]
class pylablib.core.mthread.sync_primitives.PendingSynchronizerPool(wait_func, notify_func, blocking_sync=False, recursive_calls=False)[source]

Bases: object

pending_num()[source]
wait(blocking=True, timeout=None, blocking_sync=None, *args, **vargs)[source]
notify(*args, **vargs)[source]
class pylablib.core.mthread.sync_primitives.ResourceSynchronizerPool(grab_func, free_func, blocking_sync=False, recursive_calls=False)[source]

Bases: object

pending_num()[source]
grab(blocking=True, timeout=None, blocking_sync=None, *args, **vargs)[source]
free(*args, **vargs)[source]
class pylablib.core.mthread.sync_primitives.ISyncObject[source]

Bases: object

acquire(blocking=True, timeout=None, blocking_sync=None)[source]
release()[source]
class pylablib.core.mthread.sync_primitives.IResourceSyncObject(blocking_sync=False)[source]

Bases: pylablib.core.mthread.sync_primitives.ISyncObject

acquire(blocking=True, timeout=None, blocking_sync=None)[source]
release()[source]
class pylablib.core.mthread.sync_primitives.Lock(blocking_sync=False)[source]

Bases: pylablib.core.mthread.sync_primitives.IResourceSyncObject

class pylablib.core.mthread.sync_primitives.RLock(blocking_sync=False)[source]

Bases: pylablib.core.mthread.sync_primitives.IResourceSyncObject

recursion_depth()[source]
full_release()[source]
full_acquire(n, blocking=True, timeout=None)[source]
class pylablib.core.mthread.sync_primitives.Semaphore(value, upper_bound=None, blocking_sync=False)[source]

Bases: pylablib.core.mthread.sync_primitives.IResourceSyncObject

class pylablib.core.mthread.sync_primitives.Event(flag=False, blocking_sync=False)[source]

Bases: object

wait(blocking=True, timeout=None, blocking_sync=None)[source]
set()[source]
clear()[source]
class pylablib.core.mthread.sync_primitives.VersionEvent(blocking_sync=False)[source]

Bases: object

current_version()[source]
wait(version=None, blocking=True, timeout=None, blocking_sync=None)[source]
update()[source]
class pylablib.core.mthread.sync_primitives.TaskSet(blocking_sync=None)[source]

Bases: object

wait(blocking=True, timeout=None, blocking_sync=None)[source]
set(value)[source]
get()[source]
done(number=1)[source]
add(number=1)[source]
class pylablib.core.mthread.sync_primitives.Condition(lock=None, blocking_sync=False)[source]

Bases: pylablib.core.mthread.sync_primitives.ISyncObject

acquire(blocking=True, timeout=None, blocking_sync=None)[source]
release()[source]
wait(timeout=None, blocking_sync=None)[source]
notify(n=1)[source]
notify_all()[source]
exception pylablib.core.mthread.sync_primitives.BrokenBarrierError(msg=None)[source]

Bases: Exception

class pylablib.core.mthread.sync_primitives.Barrier(parties, blocking_sync=None)[source]

Bases: object

class SyncState[source]

Bases: object

wait(blocking=True, timeout=None, blocking_sync=None)[source]
exception pylablib.core.mthread.sync_primitives.QueueEmptyError(msg=None)[source]

Bases: Exception

exception pylablib.core.mthread.sync_primitives.QueueFullError(msg=None)[source]

Bases: Exception

class pylablib.core.mthread.sync_primitives.Queue(max_size=None, blocking_sync=False)[source]

Bases: object

qsize()[source]
empty()[source]
full()[source]
get(blocking=True, timeout=None, blocking_sync=None)[source]
put(item, blocking=True, timeout=None, blocking_sync=None)[source]
class pylablib.core.mthread.sync_primitives.ThreadObserverPool(expand_tuple=True)[source]

Bases: pylablib.core.utils.observer_pool.ObserverPool

add_observer(callback, name=None, filt=None, priority=0, attr=None, cacheable=False)[source]

Add the observer callback.

Same as observer_pool.ObserverPool.add_observer(), but callback can be a string, in which case it’s interpreted as sending a message to a thread with the given name.

class pylablib.core.mthread.sync_primitives.StateUIDGenerator(thread_safe=True)[source]

Bases: object

disable()[source]
enable()[source]
is_enabled()[source]
wait_enabled(timeout=None)[source]
up(enable=True)[source]
check(value, allow_disabled=False)[source]

pylablib.core.mthread.tag_queue module

Tagged queue with priority and size limitation (set tag-wise).

exception pylablib.core.mthread.tag_queue.BrokenQueueError(msg=None)[source]

Bases: RuntimeError

An error signalizing that the queue is in a broken (shut down) state.

class pylablib.core.mthread.tag_queue.TaggedQueue[source]

Bases: object

Tagged queue with priority and size limitation (set tag-wise).

Supports multi-thread adding, but only single-thread extracting (receiving function is completely synchronous). Item is assumed to have tag and priority properties, and scheduled() and received() methods (callbacks).

Extraction and discarding is based on filter functions. These should be as simple as possible and have short determined runtime (no synchronization or locks) to prevent deadlocks. When item is scheduled (i.e., added and passes length limitation), item.scheduled() method is called; when item is extracted, item.received() method is called; under various circumstances, both of these methods may be called in either adding thread, or extracting thread. It is guaranteed that scheduled() is called before received(), and both methods are called eventually if the message is ever extracted or discarded.

limit_length(tag, length)[source]

Set length limit for a given tag.

put(item)[source]

Put an item in the queue.

This function doesn’t perform synchronization waits as long as scheduled() and received() methods of item don’t perfrom them.

get(filt=None, timeout=None, discard_on_timeout=False, discard_filt=None)[source]

Extract an item from the queue which satisfies filt filter function.

If timeout is not None, it determines the wait time. If it is passed before an item has been acquired, the return None. If discard_on_timeout is True and timeout is passed, marks discard_filt (same as filt by default) for discarding; this means that the next time a single message satisfying discard_filt is scheduled (either directly during put(), or from scheduling queue during get()), it is silently ‘received’ (notified of scheduling and receiving, but never explicitly passed to the destination thread).

remove(item, only_unscheduled=False)[source]

Remove the item if it is in the queue without calling its scheduled or received methods.

Return True if removal is successful, and False otherwise. If only_unscheduled==True, only remove item if it hasn’t been scheduled yet.

clear(notify_all=True, ignore_exceptions=True, mark_broken=True)[source]

Clear the queue.

If notify_all==True, behave as if all the messages are received or discarded; otherwise, just remove them silently (equivalent to remove() method). If ignore_exceptions==True, ignore exceptions on scheduling and receiving (e.g., if the notified thread is stopped); the queue is cleaned regardless. If mark_broken==True, mark the queue as broken (any subsequent calls to its methods raise BrokenQueueError).

broken()[source]

Check if the queue is broken.

fix()[source]

Fix broken queue.

Should be used carefully, since it can introduce problems with outside logic that relies on a broken queue staying broken.

pylablib.core.mthread.tag_queue.build_filter(tags=None, filt=None, uncond_tags=None, tag_separator=None)[source]

Build a filter function.

Parameters:
  • tags ([str]) – list of prefixes that match message tags.
  • filt (callable) – an additional filter function which needs to be satisfied (checking, e.g., message content to decide if it should be extracted).
  • uncond_tags ([str]) – works like tags, but independently of filt function (allows message even if filt returns False).
  • tag_separator (str) – a separator used to divide tag levels (usually '.' or '/'). If it’s not None, tags are matched only either exactly, or if they’re followed by tag separator (i.e., each tag level is treated as an indivisible word).

pylablib.core.mthread.threadprop module

exception pylablib.core.mthread.threadprop.ThreadError(msg=None)[source]

Bases: RuntimeError

Generic thread error.

exception pylablib.core.mthread.threadprop.NotRunningThreadError(msg=None)[source]

Bases: pylablib.core.mthread.threadprop.ThreadError

Thread error for a case of a missing or stopped thread.

exception pylablib.core.mthread.threadprop.NoControllerThreadError(msg=None)[source]

Bases: pylablib.core.mthread.threadprop.ThreadError

Thread error for a case of thread having no conrollers.

pylablib.core.mthread.threadprop.on_error(action, error_object=None)[source]

React to an error depending on the action.

action can be 'error' (raise error_object if it’s supplied, or ThreadError by default), 'stop' (raise an appropriate exception to stop the thread), 'return_error' (return error_object) or 'ignore' (do nothing).

exception pylablib.core.mthread.threadprop.InterruptException(msg=None)[source]

Bases: Exception

Generic interrupt exception (raised by some function to signal interrupts from other threads).

exception pylablib.core.mthread.threadprop.InterruptExceptionStop(msg=None)[source]

Bases: pylablib.core.mthread.threadprop.InterruptException

Interrupt exception denoting thread stop request.

class pylablib.core.mthread.threadprop.NoThreadController[source]

Bases: object

A ‘dummy’ thread controller implementing the most standard function for a thread without any explicitly created controller.

add_message(msg, sync=True, on_broken='error')[source]
add_new_message(tag, value=None, priority=0, schedule_sync='wait', receive_sync='none', sender=None, sync=True, on_broken='error')[source]
sleep(delay)[source]

Sleep for delay seconds.

stop()[source]

Stop the thread.

pylablib.core.mthread.threadprop.current_controller(require_controller=False)[source]

Return the controller of the current thread.

If the thread has no controller and require_controller==False, return a NoThreadController object; otherwise, raise NoControllerThreadError.

pylablib.core.mthread.threadprop.has_controller(thread)[source]

Check if the current thread has a controller.

pylablib.core.mthread.threadprop.all_controllers()[source]

Return a list of all the available thread controllers.

pylablib.core.mthread.threadprop.controller_by_name(name, require_controller=False)[source]

Return a controller for a given name.

If the controller is not found and require_controller==False, return a NoThreadController object; otherwise, raise NoControllerThreadError.

pylablib.core.mthread.threadprop.as_controller(ctrl, require_controller=False)[source]

Return a controller corresponding to ctrl.

ctrl can be None (return current thread controller), a thread name, or a thread controller instance. If the cooresponding controller doesn’t exist and require_controller==False, return a NoThreadController object; otherwise, raise NoControllerThreadError.

class pylablib.core.mthread.threadprop.NoMessageQueue(owner)[source]

Bases: object

pylablib.core.mthread.threadprop.current_message_queue(require_queue=False)[source]

Return a message queue corresponding to the current thread.

If the queue doesn’t exist and require_queue==False, return a NoMessageQueue object; otherwise, raise NoControllerThreadError.

pylablib.core.mthread.threadprop.sleep(delay)[source]

Sleep for delay seconds.

Behavior depends on whether the thread has a controller.

pylablib.core.mthread.threadprop.stop()[source]

Stop the current thread by raising an appropriate interrupt exception.

Behavior depends on whether the thread has a controller.

pylablib.core.mthread.threadprop.kill_thread(th, sync=True)[source]

Stop thread th (can be a thread controller, or the thread name).

If sync==True, return only after the thread is stopped. Ignore any errors if the thread is already stopped.

pylablib.core.mthread.threadprop.kill_all(sync=True, include_current=True)[source]

Stop all threads.

If sync==True, return only after all threads are stopped. If include_current==True, stop the current as well.

Module contents