Source code for pylablib.core.mthread.sync_primitives

from . import threadprop, notifier
from ..utils import general, observer_pool, py3

import threading

_depends_local=["..utils.general","..utils.observer_pool"]




_defualt_value_sync_tag="sync.notify.value"
[docs]class ValueSynchronizer(notifier.ISkippableNotifier): _uid_gen=general.UIDGenerator(thread_safe=True) def __init__(self, sync=True, note=None, receiver=None): notifier.ISkippableNotifier.__init__(self,skippable=True) self._receiver=receiver self._uid=self._uid_gen() if sync is True: sync=_defualt_value_sync_tag self._sync=sync if isinstance(note,py3.textstring): def note_func(value): try: threadprop.as_controller(self._receiver).add_new_message(note,value,receive_sync="none",schedule_sync="wait_event",on_broken="ignore") except (threadprop.NotRunningThreadError,threadprop.NoControllerThreadError): pass else: note_func=note self._note_func=note_func self._notify_lock=threading.Lock() self._waiting="init" self._notifying="init" self._blocking_sync=(self._sync=="event") if self._sync and (self._blocking_sync or (receiver is threadprop.no_thread_controller) or (receiver is None)): self._sync_event=threading.Event() else: self._sync_event=None self._value=None def _pre_wait(self, *args, **kwargs): self._receiver=self._receiver or threadprop.current_controller() return True def _do_wait(self, timeout=None): if self._sync: if self._blocking_sync or (self._receiver is threadprop.no_thread_controller): return self._sync_event.wait(timeout=timeout) else: if threadprop.current_controller() is not self._receiver: raise RuntimeError("only receiver can wait for the synchronizer") msg=self._receiver.wait_for_message([self._sync],timeout=timeout,filt=(lambda msg: msg.value==self._uid),discard_on_timeout=True) return (msg is not None) else: return True def _pre_notify(self, value=None): self._value=value def _post_notify(self, value=None): if self._note_func: self._note_func(value) def _do_notify(self, value=None): if self._sync: if self._blocking_sync or (self._receiver is threadprop.no_thread_controller): self._sync_event.set() else: try: dest=threadprop.as_controller(self._receiver) dest.add_new_message(self._sync,self._uid,receive_sync="none",schedule_sync="wait_event",on_broken="ignore") except (threadprop.NotRunningThreadError,threadprop.NoControllerThreadError): pass
[docs] def wait(self, timeout=None): return notifier.ISkippableNotifier.wait(self,timeout)
[docs] def notify(self, value=None): return notifier.ISkippableNotifier.notify(self,value)
[docs] def get_value(self): with self._notify_lock: if self._notifying=="init": raise RuntimeError("value hasn't been set") return self._value
[docs] def uid(self): return self._uid
[docs]class SyncCall(object): def __init__(self, func, args=None, kwargs=None, sync=True, note=None): object.__init__(self) self.func=func self.args=args or [] self.kwargs=kwargs or {} self.synchronizer=ValueSynchronizer(sync=sync,note=note,receiver=threadprop.current_controller()) def __call__(self): res=self.func(*self.args,**self.kwargs) self.synchronizer.notify(res)
[docs] def value(self, sync=True, timeout=None, default=None): if sync: if self.synchronizer.wait(timeout): return self.synchronizer.get_value() else: return default else: return self.synchronizer
[docs] def wait(self, timeout=None): return self.synchronizer.wait(timeout)
[docs] def done(self): return self.synchronizer.done_wait()
[docs]class BasicSynchronizer(object): _sync_tag="sync.notify" _uid_gen=general.UIDGenerator(thread_safe=True) def __init__(self, receiver=None, blocking_sync=False): object.__init__(self) self._receiver=receiver self._uid=self._uid_gen() self._notify_lock=threading.Lock() self._waiting=False self._notified=False self._blocking_sync=blocking_sync if self._blocking_sync or (receiver is threadprop.no_thread_controller) or (receiver is None): self._sync_event=threading.Event() else: self._sync_event=None def _do_wait(self, timeout=None): if (self._receiver is threadprop.no_thread_controller) or self._blocking_sync: return self._sync_event.wait(timeout=timeout) else: msg=self._receiver.wait_for_message([self._sync_tag],timeout=timeout,filt=(lambda msg: msg.value==self._uid),discard_on_timeout=True) return (msg is not None) def _do_notify(self): if (self._receiver is threadprop.no_thread_controller) or self._blocking_sync: self._sync_event.set() else: try: dest=threadprop.as_controller(self._receiver) dest.add_new_message(self._sync_tag,self._uid,receive_sync="none",schedule_sync="wait_event",on_broken="ignore") except threadprop.NoControllerThreadError: pass
[docs] def wait(self, timeout=None): with self._notify_lock: if self._notified or self._waiting: return True self._waiting=True self._receiver=self._receiver or threadprop.current_controller() return self._do_wait(timeout)
[docs] def notify(self): with self._notify_lock: if self._notified: return self._notified=True if not self._waiting: return self._do_notify()
[docs] def uid(self): return self._uid
[docs]class PendingSynchronizerPool(object): def __init__(self, wait_func, notify_func, blocking_sync=False, recursive_calls=False): self._wait_func=wait_func self._notify_func=notify_func self._blocking_sync=blocking_sync self._pending={} self._state_lock=threading.RLock() if recursive_calls else threading.Lock()
[docs] def pending_num(self): with self._state_lock: return len(self._pending)
def _extract_pending(self, uid=None): if len(self._pending)==0: return None if uid is None: return self._pending.popitem()[1] else: return self._pending.pop(uid,None) def _extract_and_notify(self, uid=None): with self._state_lock: syncher=self._extract_pending(uid) if syncher: syncher.notify() return True return False def _create_synchronizer(self, blocking_sync=None): if blocking_sync is None: return BasicSynchronizer(blocking_sync=self._blocking_sync) else: return BasicSynchronizer(blocking_sync=blocking_sync)
[docs] def wait(self, blocking=True, timeout=None, blocking_sync=None, *args, **vargs): with self._state_lock: if self._wait_func(*args,**vargs): return True elif not blocking: return False else: syncher=self._create_synchronizer(blocking_sync) self._pending[syncher.uid()]=syncher raised_error=True try: syncher.wait(timeout) raised_error=False finally: notified=not self._extract_and_notify(syncher.uid()) if notified and raised_error: # has been notified in notify, but raised an error in parallel self._extract_and_notify() return notified
[docs] def notify(self, *args, **vargs): with self._state_lock: n=self._notify_func(*args,**vargs) if not n: return False if n<0: n=len(self._pending) synchers=[self._extract_pending() for _ in range(n)] for s in synchers: if s: s.notify() return True
[docs]class ResourceSynchronizerPool(object): def __init__(self, grab_func, free_func, blocking_sync=False, recursive_calls=False): self._grab_func=grab_func self._free_func=free_func self._blocking_sync=blocking_sync self._pending={} self._state_lock=threading.RLock() if recursive_calls else threading.Lock()
[docs] def pending_num(self): with self._state_lock: return len(self._pending)
def _extract_pending(self, uid=None): if len(self._pending)==0: return None if uid is None: return self._pending.popitem()[1] else: return self._pending.pop(uid,None) def _extract_and_notify(self, uid=None): with self._state_lock: syncher=self._extract_pending(uid) if syncher: syncher.notify() return True return False def _create_synchronizer(self, blocking_sync=None): if blocking_sync is None: return BasicSynchronizer(blocking_sync=self._blocking_sync) else: return BasicSynchronizer(blocking_sync=blocking_sync)
[docs] def grab(self, blocking=True, timeout=None, blocking_sync=None, *args, **vargs): countdown=general.Countdown(timeout) while True: if countdown.passed(): blocking=False with self._state_lock: res=self._grab_func(*args,**vargs) if res: return res elif not blocking: return False else: syncher=self._create_synchronizer(blocking_sync) self._pending[syncher.uid()]=syncher raised_error=True try: if not syncher.wait(countdown.time_left()): blocking=False raised_error=False finally: notified=not self._extract_and_notify(syncher.uid()) if notified and raised_error: # has been notified in notify, but raised an error in parallel self._extract_and_notify()
[docs] def free(self, *args, **vargs): with self._state_lock: n=self._free_func(*args,**vargs) if not n: return False if n<0: n=len(self._pending) synchers=[self._extract_pending() for _ in range(n)] for s in synchers: if s is not None: s.notify() return True
[docs]class ISyncObject(object): def __init__(self): object.__init__(self) def __enter__(self): self.acquire() return self def __exit__(self, *args): try: self.release() except RuntimeError: pass return False
[docs] def acquire(self, blocking=True, timeout=None, blocking_sync=None): raise NotImplementedError("ISyncObject.acquire")
[docs] def release(self): raise NotImplementedError("ISyncObject.release")
[docs]class IResourceSyncObject(ISyncObject): def __init__(self, blocking_sync=False): ISyncObject.__init__(self) self._resource=ResourceSynchronizerPool(self._try_acquire,self._try_release,blocking_sync=blocking_sync) def _try_acquire(self): raise NotImplementedError("IResourceSyncObject._try_acquire") def _try_release(self): raise NotImplementedError("IResourceSyncObject._try_release")
[docs] def acquire(self, blocking=True, timeout=None, blocking_sync=None): return self._resource.grab(blocking=blocking,timeout=timeout,blocking_sync=blocking_sync)
[docs] def release(self): return self._resource.free()
[docs]class Lock(IResourceSyncObject): def __init__(self, blocking_sync=False): IResourceSyncObject.__init__(self,blocking_sync=blocking_sync) self._locked=False def _try_acquire(self): if self._locked: return False self._locked=True return True def _try_release(self): if not self._locked: raise RuntimeError("attempting to release a free lock") self._locked=False return 1
[docs]class RLock(IResourceSyncObject): def __init__(self, blocking_sync=False): IResourceSyncObject.__init__(self,blocking_sync=blocking_sync) self._locked=False self._locked_cnt=0 self._locked_owner=None def _try_acquire(self): cth=threading.current_thread() if not self._locked: self._locked=True self._locked_cnt=1 self._locked_owner=cth return True elif self._locked_owner==cth: self._locked_cnt=self._locked_cnt+1 return True return False def _try_release(self): cth=threading.current_thread() if not self._locked: raise RuntimeError("attempting to release a free lock") if cth is not self._locked_owner: raise RuntimeError("attempting to release a lock from a non-owner thread") self._locked_cnt=self._locked_cnt-1 if self._locked_cnt>0: return 0 self._locked=False self._locked_owner=None return 1
[docs] def recursion_depth(self): return self._locked_cnt
[docs] def full_release(self): n=0 while not self.release(): n=n+1 return n+1
[docs] def full_acquire(self, n, blocking=True, timeout=None): for _ in range(n): self.acquire(blocking=blocking,timeout=timeout)
[docs]class Semaphore(IResourceSyncObject): def __init__(self, value, upper_bound=None, blocking_sync=False): IResourceSyncObject.__init__(self,blocking_sync=blocking_sync) self._value=value self._upper_bound=upper_bound def _try_acquire(self): if self._value>0: self._value=self._value-1 return True return False def _try_release(self): if self._upper_bound is not None and self._upper_bound<=self._value: raise ValueError("increasing semaphore above upper bound") self._value=self._value+1 return 1
[docs]class Event(object): def __init__(self, flag=False, blocking_sync=False): object.__init__(self) self._event_state=PendingSynchronizerPool(self._get_flag,self._set_flag,blocking_sync=blocking_sync) self._flag=flag def _get_flag(self): return self._flag def _set_flag(self, flag): self._flag=flag return -1 if self._flag else 0
[docs] def wait(self, blocking=True, timeout=None, blocking_sync=None): return self._event_state.wait(blocking=blocking,timeout=timeout,blocking_sync=blocking_sync)
[docs] def set(self): self._event_state.notify(flag=True)
[docs] def clear(self): self._event_state.notify(flag=False)
[docs]class VersionEvent(object): def __init__(self, blocking_sync=False): object.__init__(self) self._event_state=ResourceSynchronizerPool(self._check_version,self._update_version,blocking_sync=blocking_sync) self._version=0 def _check_version(self, version): return self._version>=version def _update_version(self): self._version=self._version+1 return -1
[docs] def current_version(self): with self._event_state._state_lock: return self._version
[docs] def wait(self, version=None, blocking=True, timeout=None, blocking_sync=None): if version is None: version=self.current_version()+1 return self._event_state.grab(version=version,blocking=blocking,timeout=timeout,blocking_sync=blocking_sync)
[docs] def update(self): self._event_state.free()
[docs]class TaskSet(object): def __init__(self, blocking_sync=None): object.__init__(self) self._event_state=PendingSynchronizerPool(self._tasks_done,self._tasks_change,blocking_sync=blocking_sync) self._tasks_num=0 def _tasks_done(self): return self._tasks_num==0 def _tasks_change(self, delta=0, value=None): new_val=self._tasks_num+delta if value is None else value if new_val<0: raise ValueError("decreasing number of tasks below zero") self._tasks_num=new_val return 0 if new_val>0 else -1
[docs] def wait(self, blocking=True, timeout=None, blocking_sync=None): return self._event_state.wait(blocking=blocking,timeout=timeout,blocking_sync=blocking_sync)
[docs] def set(self, value): self._event_state.notify(value=value)
[docs] def get(self): return self._tasks_num
[docs] def done(self, number=1): self._event_state.notify(delta=-number)
[docs] def add(self, number=1): self._event_state.notify(delta=number)
[docs]class Condition(ISyncObject): _waiting_uid_gen=general.UIDGenerator(thread_safe=True) def __init__(self, lock=None, blocking_sync=False): ISyncObject.__init__(self) lock=lock or RLock() self._condition_lock=lock self._owner_lock=threading.Lock() self._lock_owner=None self._waiting_synch=PendingSynchronizerPool(lambda: False, lambda n: n, blocking_sync=blocking_sync)
[docs] def acquire(self, blocking=True, timeout=None, blocking_sync=None): if self._condition_lock.acquire(blocking,timeout,blocking_sync=blocking_sync): with self._owner_lock: self._lock_owner=threading.current_thread() return True return False
[docs] def release(self): with self._owner_lock: if self._condition_lock.release(): self._lock_owner=None
def _acq_cond_lock(self, n): if isinstance(self._condition_lock,RLock): self._condition_lock.full_acquire(n) else: self._condition_lock.acquire() with self._owner_lock: assert self._lock_owner is None # if the lock was acquired, there should be no owner self._lock_owner=threading.current_thread() def _rel_cond_lock(self): with self._owner_lock: self._lock_owner=None if isinstance(self._condition_lock,RLock): return self._condition_lock.full_release() else: self._condition_lock.release() def _check_owner(self): cth=threading.current_thread() with self._owner_lock: if self._lock_owner is not cth: raise RuntimeError("method can only be called by the owner of the lock")
[docs] def wait(self, timeout=None, blocking_sync=None): self._check_owner() n=self._rel_cond_lock() result=self._waiting_synch.wait(timeout=timeout,blocking_sync=blocking_sync) self._acq_cond_lock(n) return result
[docs] def notify(self, n=1): self._check_owner() self._waiting_synch.notify(n=n)
[docs] def notify_all(self): self._check_owner() self._waiting_synch.notify(n=-1)
[docs]class BrokenBarrierError(Exception): def __init__(self, msg=None): msg=msg or "trying to wait for a broken barrier" Exception.__init__(self, msg)
[docs]class Barrier(object): def __init__(self, parties, blocking_sync=None): object.__init__(self) self._barrier_state=PendingSynchronizerPool(self._start_waiting,self._stop_waiting,blocking_sync=blocking_sync,recursive_calls=True) self._parties=parties self._waiting=0 self._broken=False self._state_lock=threading.Lock() self._sync_state=None self._res_idx=0 def _start_waiting(self): self._waiting=self._waiting+1 if self._waiting==self._parties: self._barrier_state.notify() return self._waiting==self._parties def _stop_waiting(self): self._waiting=0 return -1
[docs] class SyncState(object): def __init__(self): object.__init__(self) self.event=threading.Event() self.idx=0 self.notified=0
[docs] def wait(self, blocking=True, timeout=None, blocking_sync=None): if self._broken: raise BrokenBarrierError() with self._state_lock: if self._sync_state is None or self._sync_state.idx==self._parties: self._sync_state=self.SyncState() sync_state=self._sync_state idx=sync_state.idx sync_state.idx=sync_state.idx+1 try: res=False # so that it's set in the finally block res=self._barrier_state.wait(blocking=blocking,timeout=timeout,blocking_sync=blocking_sync) finally: if not res: self._broken=True self._barrier_state.notify() sync_state.event.set() if self._broken: sync_state.event.set() raise BrokenBarrierError() with self._state_lock: sync_state.notified=sync_state.notified+1 if sync_state.notified==self._parties: sync_state.event.set() sync_state.event.wait() if self._broken: raise BrokenBarrierError() return idx
[docs]class QueueEmptyError(Exception): def __init__(self, msg=None): msg=msg or "queue empty" Exception.__init__(self, msg)
[docs]class QueueFullError(Exception): def __init__(self, msg=None): msg=msg or "queue full" Exception.__init__(self, msg)
[docs]class Queue(object): def __init__(self, max_size=None, blocking_sync=False): object.__init__(self) self._queue=[] self._max_size=max_size self._space_res=ResourceSynchronizerPool(self._grab_space, lambda: 1, blocking_sync=blocking_sync) self._item_res=ResourceSynchronizerPool(self._grab_item, lambda: 1, blocking_sync=blocking_sync)
[docs] def qsize(self): return len(self._queue)
[docs] def empty(self): return len(self._queue)==0
[docs] def full(self): return self._max_size is not None and len(self._queue)>=self._max_size
def _grab_item(self): if len(self._queue)>0: res=self._queue.pop(0) return (res,) else: return False def _grab_space(self, item): if not self.full(): self._queue.append(item) return True return False
[docs] def get(self, blocking=True, timeout=None, blocking_sync=None): item=self._item_res.grab(blocking,timeout,blocking_sync=blocking_sync) if item: self._space_res.free() return item[0] else: raise QueueEmptyError()
[docs] def put(self, item, blocking=True, timeout=None, blocking_sync=None): if self._space_res.grab(blocking,timeout,blocking_sync=blocking_sync,item=item): self._item_res.free() else: raise QueueFullError()
[docs]class ThreadObserverPool(observer_pool.ObserverPool):
[docs] def add_observer(self, callback, name=None, filt=None, priority=0, attr=None, cacheable=False): """ Add the observer callback. Same as :meth:`.observer_pool.ObserverPool.add_observer`, but callback can be a string, in which case it's interpreted as sending a message to a thread with the given name. """ if isinstance(callback,py3.textstring): controller=threadprop.current_controller(require_controller=True) callback=controller.add_new_message return observer_pool.ObserverPool.add_observer(self,callback,name=name,filt=filt,priority=priority,attr=attr,cacheable=cacheable)
[docs]class StateUIDGenerator(object): def __init__(self, thread_safe=True): self._value=0 if thread_safe: self._lock=threading.Lock() else: self._lock=general.DummyResource() self._enabled=True self._enabled_evt=Event(True)
[docs] def disable(self): with self._lock: self._enabled_evt.clear() self._enabled=False
[docs] def enable(self): with self._lock: self._enabled=True self._enabled_evt.set()
[docs] def is_enabled(self): with self._lock: return self._enabled
[docs] def wait_enabled(self, timeout=None): return self._enabled_evt.wait(blocking=(timeout!=0),timeout=timeout)
[docs] def up(self, enable=True): with self._lock: self._value=self._value+1 if enable: self._enabled=True self._enabled_evt.set() return self._value
[docs] def check(self, value, allow_disabled=False): with self._lock: if not (self._enabled or allow_disabled): return False return value is None or self._value==value
def __call__(self, timeout=None): timed_out=(timeout!=0) while True: with self._lock: if self._enabled: return self._value if timed_out: return -1 timed_out=not self.wait_enabled(timeout)