Source code for pylablib.core.utils.general

"""
Collection of small utilities.
"""

from builtins import input
from future.utils import viewitems, viewvalues

import time
import threading
import os, signal
from . import functions

[docs]def set_props(obj, prop_names, props): """ Set multiple attributes of `obj`. Names are given by `prop_names` list and values are given by `props` list. """ for (n,p) in zip(prop_names,props): setattr(obj,n,p)
[docs]def get_props(obj, prop_names): """ Get multiple attributes of `obj`. Names are given by `prop_names` list. """ return [getattr(obj,p) for p in prop_names]
[docs]def try_method_wrapper(func, method_name=None, inherit_signature=True): """ Decorator that makes the function attempt to call the first argument's method instead of `func`. Before calling the function, try and call a method of the first argument named `method_name` (`func` name by default). If the method exists, call it instead of the wrapped function. If ``inherit_signature==True``, completely copy the signature of the wrapped method (name, args list, docstring, etc.). """ if method_name is None: method_name=func.__name__ def wrapped(*args, **kwargs): if args: self,args=args[0],args[1:] if "self" in kwargs: raise TypeError("{}() got multiple values for keyword argument 'self'".format(func.__name__)) elif kwargs: if "self" not in kwargs: raise TypeError("{}() reqiqres jeyword argument 'self'".format(func.__name__)) self=kwargs.pop("self") try: return getattr(self,method_name)(*args,**kwargs) except AttributeError: return func(self,*args,**kwargs) if inherit_signature: wrapped=functions.getargsfrom(func)(wrapped) else: wrapped.__doc__=func.__doc__ return wrapped
### Predicates ###
[docs]def to_predicate(x): """ Turn `x` into a predicate. If `x` is callable, it will be called with a single argument and returned value determines if the argument passes. If `x` is a container, an argument passes if it's contained in `x`. """ if hasattr(x,"__call__"): return x if hasattr(x,"__contains__"): return lambda e: e in x raise ValueError("can't build predicate for {0}".format(x))
### Container routines ###
[docs]def map_container(value, func): """ Map values in the container. `value` can be a ``tuple``, a ``list`` or a ``dict`` (mapping is applied to the values) raises :exc:`ValueError` if it's something else. """ if isinstance(value,tuple): return tuple(func(v) for v in value) if isinstance(value,list): return list(func(v) for v in value) if isinstance(value,dict): return dict([(k,func(v)) for k,v in viewitems(value)]) raise ValueError("value {} is not a container")
[docs]def recursive_map(value, func): """ Map container recursively. `value` can be a ``tuple``, a ``list`` or a ``dict`` (mapping is applied to the values). """ if isinstance(value,tuple): return tuple(recursive_map(v,func) for v in value) if isinstance(value,list): return list(recursive_map(v,func) for v in value) if isinstance(value,dict): return dict([(k,recursive_map(v,func)) for k,v in viewitems(value)]) return func(value)
### Dictionary routines ###
[docs]def any_item(d): """Return arbitrary tuple ``(key, value)`` contained in the dictionary (works both in Python 2 and 3)""" return next(iter(viewitems(d)))
[docs]def merge_dicts(*dicts): """ Combine multiple ``dict`` objects together. If multiple dictionaries have the same keys, later arguments have higher priority. """ res={} for d in dicts: if d is not None: res.update(d) return res
[docs]def filter_dict(pred, d, exclude=False): """ Filter dictionary based on a predicate. `pred` can be a callable or a container (in which case the predicate is true if a value is in the container). If ``exclude==True``, the predicate is inverted. """ if pred is None: return d.copy() if exclude else {} pred=to_predicate(pred) filtered={} for k,v in viewitems(d): if ( (not exclude) and pred(k) ) or ( exclude and not pred(k) ): filtered[k]=v return filtered
[docs]def map_dict_keys(func, d): """Map dictionary keys with `func`""" return dict((func(k),v) for k,v in viewitems(d))
[docs]def map_dict_values(func, d): """Map dictionary values with `func`""" return dict((k,func(v)) for k,v in viewitems(d))
[docs]def to_dict(d, default=None): """ Convert a ``dict`` or a ``list`` of pairs or single keys (or mixed) into a ``dict``. If a list element is single, `default` value is used. """ if d is None: return {} if isinstance(d,dict): return d res={} for e in d: if isinstance(e,list) or isinstance(e,tuple): res[d[0]]=d[1] else: res[d[0]]=default return res
[docs]def to_pairs_list(d, default=None): """ Convert a ``dict`` or a ``list`` of pairs or single keys (or mixed) into a ``list`` of pairs. If a list element is single, `default` value is used. When converting ``list`` into ``list``, the order is preserved. """ if d is None: return [] if isinstance(d,dict): collection=viewitems(d) else: collection=d res=[] for e in collection: if isinstance(e,list) or isinstance(e,tuple): res.append((e[0],e[1])) else: res.append((e,default)) return res
[docs]def invert_dict(d, kmap=None): """ Invert dictionary (switch keys and values). If `kmap` is supplied, it's a function mapping dictionary values into inverted dictionary keys (identity by default). """ return dict([(kmap(v),k) for (k,v) in viewitems(d)]) if kmap else dict([(v,k) for (k,v) in viewitems(d)])
### List routines ###
[docs]def flatten_list(l): """ Flatten nested ``list``/``tuple`` structure into a single list. """ for el in l: if isinstance(el, list) or isinstance(el, tuple): for sub in flatten_list(el): yield sub else: yield el
[docs]def partition_list(pred, l): """ Split the lis` `l` into two parts based on the predicate. """ t=[] f=[] pred=to_predicate(pred) for e in l: if pred(e): t.append(e) else: f.append(e) return t,f
[docs]def split_in_groups(key_func, l, continuous=True, max_group_size=None): """ Split the list `l` into groups according to the `key_func`. Go over the list and group the elements with the same key value together. If ``continuous==False``, groups all elements with the same key together regardless of where they are in the list. otherwise, group only continuous sequences of the elements with the same key together (element with different key in the middle will result in two groups). If ``continuous==True`` and `max_group_size` is not ``None``, it determines the maximal size of a group; larger groups are split into separate groups. """ if continuous: if len(l)==0: return [] groups=[] g=[l[0]] key=key_func(l[0]) for e in l[1:]: ek=key_func(e) if ek!=key or (max_group_size is not None and len(g)>=max_group_size): key=ek groups.append(g) g=[] g.append(e) groups.append(g) return groups else: groups={} for e in l: groups.get(key_func(e),[]).append(e) return list(viewvalues(groups))
[docs]def sort_set_by_list(s, l, keep_duplicates=True): """ Convert the set `s` into a list ordered by a list `l`. Elements in `s` which are not in `l` are omitted. If ``keep_duplicates==True``, keep duplicate occurrences in `l` in the result; otherwise, only keep the first occurrence. """ if keep_duplicates: return [e for e in l if e in s] else: res=[] s=s.copy() for e in l: if e in s: res.append(e) s.remove(e) return res
[docs]def compare_lists(l1, l2, sort_lists=False, keep_duplicates=True): """ Return three lists ``(l1 and l2, l1-l2, l2-l1)``. If ``sort_lists==True``, sort the first two lists by `l1`, and the last one by `l2`; otherwise, the order is undefined. If ``sort_lists==True``, `keep_duplicated` determines if duplicate elements show up in the result. """ s1,s2=set(l1),set(l2) diff_12=set.difference(s1,s2) diff_21=set.difference(s2,s1) both=set.intersection(s1,s2) if sort_lists: return sort_set_by_list(both,l1,keep_duplicates),sort_set_by_list(diff_12,l1,keep_duplicates),sort_set_by_list(diff_21,l2,keep_duplicates) else: return list(both),list(diff_12),list(diff_21)
### Dummy resource ###
[docs]class DummyResource(object): """ Object that acts as a resource (has ``__enter__`` and ``__exit__`` methods), but doesn't do anything. Analog of:: @contextlib.contextmanager def dummy_resource(): yield """ def __enter__(self): return None def __exit__(self, etype, error, etrace): return False
### Errors handling / retrying ###
[docs]class RetryOnException(object): """ Wrapper for repeating the same block of code several time if an exception occurs Useful for filesystem or communication operations, where retrying a failed operation is a valid option. Args: tries (int): Determines how many time will the chunk of code execute before re-rasing the exception; ``None`` (default) means no limit exceptions (Exception or list): A single exception class or a ``list`` of exception classes which are going to be silenced. Example:: for t in RetryOnException(tries,exceptions): with t: ... do stuff ... is analogue of:: for i in range(tries): try: ... do stuff ... except exceptions: if i==tries-1: raise """ def __init__(self, tries=None, exceptions=None): object.__init__(self) self.tries=tries if isinstance(exceptions, type) and issubclass(exceptions, Exception): exceptions=(exceptions,) self.exceptions=exceptions or (Exception,)
[docs] class ExceptionCatcher(object): def __init__(self, retrier, try_number): object.__init__(self) self.silent=retrier.tries is None or try_number+1<retrier.tries self.try_number=try_number self.retrier=retrier def __enter__(self): return self def __exit__(self, etype, error, etrace): self.etype=etype self.error=error self.etrace=etrace if etype is None: return True if not self.silent: return False for et in self.retrier.exceptions: if isinstance(error,et): return True return False
[docs] def reraise(self): raise self.error
def __iter__(self): cnt=0 while True: yield self.ExceptionCatcher(self,cnt) cnt=cnt+1
[docs]def retry_wait(func, try_times=1, delay=0., exceptions=None): """ Try calling function (with no arguments) at most `try_times` as long as it keeps raising exception. If `exceptions` is not ``None``, it specifies which exception types should be silenced. If an exception has been raised, wait `delay` seconds before retrying. """ for t in RetryOnException(try_times,exceptions): with t: return func() if delay>0: time.sleep(delay)
[docs]class SilenceException(object): """ Context which silences exceptions raised in a block of code. Args: exceptions (Exception or list): A single exception class or a list of exception classes which are going to be silenced. on_exception (callable): A callback to be invoked if an exception occurs. reraise (bool): Defines if the exception is re-rased after the callback has been invoked. A simple bit of syntax sugar. The code:: with SilenceException(exceptions,on_exception,reraise): ... do stuff ... is exactly analogous to:: try: ... do stuff ... except exceptions: on_exception() if reraise: raise """ def __init__(self, exceptions=None, on_exception=None, reraise=False): object.__init__(self) self.on_exception=on_exception self.reraise=reraise if isinstance(exceptions, type) and issubclass(exceptions, Exception): exceptions=(exceptions,) self.exceptions=exceptions or (Exception,) def __enter__(self): return self def __exit__(self, etype, error, etrace): if etype is None: return True for et in self.exceptions: if isinstance(error,et): if self.on_exception: return self.on_exception() return not self.reraise
### Process handling ###
[docs]def full_exit(code=signal.SIGTERM): """ Terminate the current process and all of its threads. Doesn't perform any cleanup or resource release; should only be used if the process is irrevocably damaged. """ os.kill(os.getpid(),code) os._exit(code)
### Tree routines ### def _topological_order_dfs(graph, start, path=None, visited=None, order=None, priority=None): path=set(start) if path is None else path order=[] if order is None else order visited=set() if visited is None else visited children=graph.get(start,[]) if priority is not None: children=sorted(children,key=lambda c: priority.get(c,None)) for child in children: if child in path: raise ValueError("graph contains loop; topological order is impossible") if child in visited: continue path.add(child) _topological_order_dfs(graph,child,path,visited,order) path.remove(child) order.append(start) visited.add(start) return order
[docs]def topological_order(graph, visit_order=None): """ Get a topological order of a graph. Return a list of nodes where each node is listed after its children. If `visit_order` is not ``None``, it is a list specifying nodes visiting order (nodes earlier in the list are visited first). Otherwise, the visit order is undefined. `graph` is a dictionary ``{node: [children]}``. If graph contains loops, raise :exc:`ValueError`. """ order=[] visited=set() if visit_order is None: nodes=set(graph) while len(nodes)>0: start=nodes.pop() _topological_order_dfs(graph,start,visited=visited,order=order) nodes.difference_update(visited) else: vo_set=set(visit_order) nodes=visit_order+[n for n in graph if n not in vo_set] priority=dict([(v,i) for i,v in enumerate(nodes)]) while len(nodes)>0: start=nodes.pop(0) _topological_order_dfs(graph,start,visited=visited,order=order,priority=priority) nodes=[n for n in nodes if n not in visited] return order
### UID generator ###
[docs]class UIDGenerator(object): """ Generator of unique numeric IDs. Args: thread_safe (bool): If ``True``, using lock to ensure that simultaneous calls from different threads are handled properly. """ def __init__(self, thread_safe=False): self._value=0 if thread_safe: self._lock=threading.Lock() else: self._lock=DummyResource() def __call__(self, inc=True): """ Return a new unique numeric ID. If ``inc==False``, don't increase the internal counter (the next call will return the same ID). """ with self._lock: if inc: self._value=self._value+1 return self._value
[docs]class NamedUIDGenerator(object): """ Generator of unique string IDs based on a name. Args: name_template (str): Format string with two parameters (name and numeric ID) used to generate string IDs. thread_safe (bool): If ``True``, using lock to ensure that simultaneous calls from different threads are handled properly. """ def __init__(self, name_template="{0}{1:03d}", thread_safe=False): self._uids={} self._name_template=name_template if thread_safe: self._lock=threading.Lock() else: self._lock=DummyResource() def __call__(self, name): """ Return a new unique string ID with the given `name`. """ with self._lock: uid=self._uids.setdefault(name,UIDGenerator(thread_safe=False))() return self._name_template.format(name,uid)
### Skipped calling wrapper ###
[docs]def call_every(func, times=1, cooldown=0., default=None): """ Wrap `func` such that calls to it are forwarded only under certain conditions. If ``times>1``, then `func` is called after at least `times` calls to the wrapped function. If ``cooldown>0``, then `func` is called after at least `cooldown` seconds passed since the last call. If both conditions are specified, they should be satisfied simultaneously. `default` specifies return value if `func` wasn't called. """ state=[times,-cooldown] # counter, last_call_time @functions.getargsfrom(func) def wrapped(*args, **kwargs): curr_t=time.time() if (state[0]>=times-1) and (curr_t>state[1]+cooldown): state[1]=curr_t state[0]=0 res=func(*args,**kwargs) else: state[0]+=1 res=default return res return wrapped
[docs]def call_limit(func, times=1, cooldown=0., limit=None, default=None): """ Wrap `func` such that calls to it are forwarded only under certain conditions. If ``times>1``, then `func` is called after at least `times` calls to the wrapped function. If ``cooldown>0``, then `func` is called after at least `cooldown` seconds passed since the last call. if ``limit is not None``, then `func` is called only first `limit` times. If several conditions are specified, they should be satisfied simultaneously. `default` specifies return value if `func` wasn't called. Returned function also has an added method ``reset``, which resets the internal call and time counters. """ state=[times,0,-cooldown] # misses since last call, successfull calls, last call time @functions.getargsfrom(func) def wrapped(*args, **kwargs): curr_t=time.time() if (state[0]>=times-1) and (curr_t>state[1]+cooldown) and (limit is None or state[2]<limit): state[:]=0,curr_t,state[2]+1 res=func(*args,**kwargs) else: state[0]+=1 res=default return res def reset(): state[:]=times,0,-cooldown wrapped.reset=reset return wrapped
### Docstring inheritance ###
[docs]def doc_inherit(parent): """ Wrapper for inheriting docstrings from parent classes. Takes parent class as an argument and replaces the docstring of the wrapped function by the docstring of the same-named function from the parent class (if available). """ def wrapper(func): if hasattr(parent,func.__name__): func.__doc__=getattr(parent,func.__name__).__doc__ return func return wrapper
### Countdown ###
[docs]class Countdown(object): """ Object for convenient handling of timeouts and countdowns with interrupts. Args: timeout (float): Countdown timeout; if ``None``, assumed to be infinite. """ def __init__(self, timeout): self.timeout=timeout self.reset()
[docs] def reset(self): self.start=time.time() if self.timeout is None: self.end=None else: self.end=self.timeout+self.start
[docs] def time_left(self, bound_below=True): """ Return the amount of time left. For infinite timeout, return ``None``. If ``bound_below==True``, instead of negative time return zero. """ if self.timeout==0 or self.timeout is None: return self.timeout dtime=self.end-time.time() if bound_below: dtime=max(dtime,0.) return dtime
[docs] def time_passed(self): """ Return the amount of time passed since the countdown start/reset. """ return time.time()-self.start
[docs] def passed(self): """ Check if the timeout has passed. """ if self.timeout is None: return False elif self.timeout==0: return True else: t=time.time() return self.end<=t
[docs]class Timer(object): """ Object for keeping time of repeating tasks. Args: period (float): Timer period. """ def __init__(self, period, skip_first=False): self.period=period self.reset(skip_first=skip_first)
[docs] def change_period(self, period, method="current"): """ Change the timer period. `method` specifies the changing method. Could be ``"current"`` (change the period of the ongoing tick), ``"next"`` (change the period starting from the next tick), ``"reset_skip"`` (reset the timer and skip the first tick) or ``"reset_noskip"`` (reset the timer and don't skip the first tick). """ if method not in {"current","next","reset_skip","reset_noskip"}: raise ValueError("unrecognized changing method: {}".format(method)) old_period=self.period self.period=period if method=="reset_noskip": self.reset(skip_first=False) elif method=="reset_skip": self.reset(skip_first=True) elif method=="current": self.next+=(period-old_period)
[docs] def reset(self, skip_first=False): """ Reset the timer. If ``skip_first==False``, timer ticks immediately; otherwise, it starts ticking only after one period. """ start=time.time() self.next=start+self.period if skip_first else start
[docs] def time_left(self, t=None, bound_below=True): """ Return the amount of time left before the next tick. If ``bound_below==True``, instead of negative time return zero. """ t=t or time.time() dtime=self.next-t if bound_below: dtime=max(dtime,0.) return dtime
[docs] def passed(self, t=None): """ Return the number of ticks passed. If timer period is zero, always return 1. """ t=t or time.time() return int((t-self.next)//self.period)+1 if self.period>0 else 1
[docs] def acknowledge(self, n=None, nmin=0): """ Acknowledge the timer tick. `n` specifies the number of tick to acknowledge (by default, all passed). Return number of actually acknowledged ticks (0 if the timer hasn't ticked since the last acknowledgement). """ npassed=max(self.passed(),nmin) if n is None: nack=npassed else: nack=min(npassed,n) self.next+=self.period*nack return nack
### Stream redirection ###
[docs]class StreamFileLogger(object): """ Steam logger that replaces standard output stream (usually stdout or stderr) and logs them into a file. Args: path: path to the destination logfile. The file is always appended. stream: an optional output stream into which the output will be duplicated; usually, the original stream which is being replaced lock: a thread lock object, which is used for any file writing operation; necessary if replacing standard streams (such as ``sys.stdout`` or ``sys.stderr``) in a multithreading environment. autoflush: if ``True``, flush after any write operation into `stream` It is also possible to subclass the file and overload :meth:`write_header` method to write a header before the first file write operation during the execution. The intended use is to log stdout or stderr streams:: import sys, threading sys.stderr = StreamFileLogger("error_log.txt", stream=sys.stderr, lock=threading.Lock()) """ def __init__(self, path, stream=None, lock=None, autoflush=False): object.__init__(self) self.paths=path if isinstance(path,list) else [path] self.stream=stream self.header_done=False self.lock=lock or DummyResource() self.autoflush=autoflush
[docs] def write_header(self, f): """Write header to file stream `f`""" pass
[docs] def add_path(self, path): """Add another logging path to the list""" with self.lock: if path not in self.paths: self.paths.append(path)
[docs] def remove_path(self, path): """Remove logging path to the list""" with self.lock: if path in self.paths: del self.paths[self.paths.index(path)] return True else: return False
[docs] def write(self, s): with self.lock: for p in self.paths: try: for t in RetryOnException(5,exceptions=IOError): with t: with open(p,"a") as f: if not self.header_done: self.write_header(f) self.header_done=True f.write(s) break time.sleep(0.1) except IOError: pass if self.stream is not None: self.stream.write(s) if self.autoflush: self.stream.flush()
[docs] def flush(self): with self.lock: if self.stream is not None: self.stream.flush()
### Debugging ### @functions.delaydef def setbp(): try: try: from traitlets.config.configurable import MultipleInstanceError except ImportError: from IPython.config.configurable import MultipleInstanceError try: import ipdb return ipdb.set_trace except (ImportError, MultipleInstanceError): from IPython.core.debugger import Tracer return Tracer() except Exception: import pdb return pdb.set_trace ### Misc ###
[docs]def wait_for_keypress(message="Waiting..."): input(message)