Source code for pylablib.core.utils.dictionary

"""
Tree-like multi-level dictionary with advanced indexing options.
"""

from functools import reduce
from future.utils import viewitems as viewitems_, viewvalues as viewvalues_

from . import funcargparse, general, strdump
import re
import collections
import pandas as pd

_depends_local=["..utils.strdump"]

[docs]def split_path(path, omit_empty=True, sep=None): """ Split generic path into individual path entries. Args: path: Generic path. Lists and tuples (possible nested) are flattened; strings are split according to separators; non-strings are converted into strings first. omit_empty (bool): Determines if empty entries are skipped. sep (str): If not ``None``, defines regex for path separators; default separator is ``'/'``. Returns: list: A list of individual entries. """ if not (isinstance(path, list) or isinstance(path, tuple)): path=[path] else: path=general.flatten_list(path) if sep is None: path=[e for t in path for e in str(t).split("/")] else: path=[e for t in path for e in re.split(sep,t)] if omit_empty: path=[p for p in path if p!=""] return path
[docs]def normalize_path_entry(entry, case_sensitive=True, case_normalization="lower"): """Normalize the case of the entry if it's not case-sensitive. Normalization is either either ``'lower'`` or ``'upper'``.""" funcargparse.check_parameter_range(case_normalization,"case_normalization",{"lower","upper"}) if not case_sensitive: if case_normalization=="lower": return entry.lower() else: return entry.upper() else: return entry
[docs]def normalize_path(path, omit_empty=True, case_sensitive=True, case_normalization="lower", sep=None, force=False): """ Split and normalize generic path into individual path entries. Args: path: Generic path. Lists and tuples (possible nested) are flattened; strings are split according to separators; non-strings are converted into strings first. omit_empty (bool): Determines if empty entries are skipped. case_sensitive (bool): If ``False``, entries case is normalized according to case_normalization. case_normalization (str): Normalization rules; either ``'lower'`` or ``'upper'``. sep (str): If not None, defines regex for path separators; default separator is ``'/'``. force (bool): If ``False``, treat lists as if they're already normalized. Returns: list: A list of individual normalized entries. """ if isinstance(path,list) and not force: return path funcargparse.check_parameter_range(case_normalization,"case_normalization",{"lower","upper"}) path=split_path(path,omit_empty,sep=sep) if not case_sensitive: if case_normalization=="lower": path=[p.lower() for p in path] else: path=[p.upper() for p in path] return path
[docs]def is_dictionary(obj, generic=False): """ Determine if the object is a dictionary. Args: obj: object generic (bool): if ``False``, passes only :class:`Dictionary` (or subclasses) objects; otherwise, passes any dictionary-like object. Returns: bool """ return Dictionary.is_dictionary(obj,generic=generic)
[docs]def as_dictionary(obj, case_sensitive=True, case_normalization="lower"): """ Convert object into :class:`Dictionary` with the given parameters. If object is already a :class:`Dictionary` (or its subclass), return unchanged, even if its parameters are different. """ return Dictionary.as_dictionary(obj,case_sensitive=case_sensitive,case_normalization=case_normalization)
[docs]def as_dict(obj, style="nested", copy=True): """ Convert object into standard `dict` with the given parameters. If object is already a `dict`, return unchanged, even if the parameters are different. """ if isinstance(obj,dict): return obj return Dictionary.as_dictionary(obj).as_dict(style=style,copy=copy)
[docs]class Dictionary(object): """ Multi-level dictionary. Access is done by path (all path elements are converted into strings and concatenated to form a single string path). If dictionary is not case-sensitive, all inserted and accessed paths are normalized to lower or upper case. Args: root (dict or Dictionary): Initial value. case_sensitive (bool): If ``False``, entries case is normalized according to `case_normalization`. case_normalization (str): Normalization rules; either ``'lower'`` or ``'upper'``. copy (bool): If ``True``, make copy of the supplied data; otherwise, just make it the root. Warning: If ``copy==False``, the root data is already assumed to be normalized. If it isn't, the behavior might be incorrect. """ def __init__(self, root=None, case_sensitive=True, case_normalization="lower", copy=True): object.__init__(self) self._case_sensitive=case_sensitive self._case_normalization=case_normalization if root is not None: if isinstance(root,pd.Series): root=dict(zip(root.index,root)) elif isinstance(root,pd.DataFrame): if root.shape[1]==1: root=dict(zip(root.index,root.iloc(axis=1)[0])) elif root.shape[1]==2: root=dict(zip(root.iloc(axis=1)[0],root.iloc(axis=1)[1])) else: raise ValueError("only accept 1- and 2-column arrays") root=Dictionary._get_root(root) if copy: self._data={} self.merge_branch(root) # automatically normalizes source else: self._data=root else: self._data={} def _make_similar_dict(self, root=None, copy=True): return Dictionary(root=root,copy=copy,case_sensitive=self._case_sensitive,case_normalization=self._case_normalization) def _normalize_path_entry(self, entry): return normalize_path_entry(entry,case_sensitive=self._case_sensitive,case_normalization=self._case_normalization) def _normalize_path(self, path): return normalize_path(path,omit_empty=True,case_sensitive=self._case_sensitive,case_normalization=self._case_normalization) @staticmethod def _is_branch(v): return isinstance(v,dict) @staticmethod def _get_root(source): if isinstance(source, Dictionary): return source._data elif Dictionary._is_branch(source): return source else: raise ValueError("source isn't a tree") @staticmethod def _is_empty(source): if isinstance(source, Dictionary): return not source._data elif Dictionary._is_branch(source): return not source else: return False
[docs] @staticmethod def is_dictionary(obj, generic=True): """ Determine if the object is a dictionary. Args: obj generic (bool): if False, passes only :class:`Dictionary` (or subclasses) objects; otherwise, passes any dictionary-like object. Returns: bool """ if generic: return isinstance(obj, Dictionary) or Dictionary._is_branch(obj) else: return isinstance(obj, Dictionary)
[docs] @staticmethod def as_dictionary(obj, case_sensitive=True, case_normalization="lower"): """ Convert object into :class:`Dictionary` with the given parameters. If object is already a :class:`Dictionary` (or its subclass), return unchanged, even if its parameters are different. """ if isinstance(obj,DictionaryPointer): return Dictionary(obj,copy=False) if isinstance(obj, Dictionary): return obj else: return Dictionary(obj,case_sensitive=case_sensitive,case_normalization=case_normalization)
def _get_valid_subpath(self, s_path): branch=self._data for i,p in enumerate(s_path): if self._is_branch(branch) and p in branch: branch=branch[p] else: break return s_path[:i] def _get_branch(self, s_path, append=False, overwrite_leaves=False): branch=self._data for p in s_path: if append: new_branch=branch.setdefault(p,{}) if not self._is_branch(new_branch): if overwrite_leaves: new_branch=branch[p]={} else: return None branch=new_branch elif p in branch: branch=branch[p] if not self._is_branch(branch): return None else: return None return branch def _attach_node(self, dest, key, value, branch_option="normalize"): """ Attach a node. branch_option decides what to do if the value is dictionary-like: just attach root, copy, or normalize all the keys attaching empty dictionary does nothing. """ try: value=Dictionary._get_root(value) if value: # adding empty dictionary doesn't change anything if branch_option=="attach": dest[key]=value else: branch={} self._insert_branch(value,branch,overwrite=True,normalize_paths=(branch_option=="normalize")) dest[key]=branch except ValueError: dest[key]=value
[docs] def add_entry(self, path, value, force=False, branch_option="normalize"): """ Add value to a given path (overwrite leaf value if necessary). Doesn't replace leaves with branches and vice-verse if ``force==False``. Args: path value force (bool): If ``True``, change leaf into a branch and vice-versa; otherwise, raises :exc:`ValueError` if the conversion is necessary. branch_option (str): Decides what to do if the value is dictionary-like: - ``'attach'`` -- just attach the root, - ``'copy'`` -- copy and attach, - ``'normalize'`` -- copy while normalizing all the keys according to the current rules. """ funcargparse.check_parameter_range(branch_option,"branch_option",{"attach","copy","normalize"}) if self._is_empty(value): if force: self.del_entry(path) return self path=self._normalize_path(path) if path==[]: raise KeyError("can't reassign root") if force: branch=self._get_branch(path[:-1],append=True,overwrite_leaves=True) else: branch=self._get_branch(path[:-1],append=True,overwrite_leaves=False) if branch is None: wrong_path="/".join(self._get_valid_subpath(path)) raise KeyError("can't replace the leaf '{0}' with a subtree; delete the leaf explicitly first, or use force=True".format(wrong_path)) if self._is_branch(branch.get(path[-1],None)): wrong_path="/".join(path) raise KeyError("can't replace the subtree '{0}' with a leaf; delete the subtree explicitly first, or use force=True".format(wrong_path)) self._attach_node(branch,path[-1],value,branch_option=branch_option) return self
def _get_entry(self, path): path=self._normalize_path(path) if path==[]: return self._data branch=self._get_branch(path[:-1],append=False) if branch and (path[-1] in branch): return branch[path[-1]] else: raise KeyError("unaccessible entry with path {0}".format(path))
[docs] def get_entry(self, path, as_pointer=False): """ Get entry at a given path Args: path as_pointer (bool): If ``True`` and entry is not a leaf, return :class:`Dictionary` or :class:`DictionaryPointer` """ value=self._get_entry(path) if self._is_branch(value): if as_pointer: return DictionaryPointer(self,path,case_sensitive=self._case_sensitive,case_normalization=self._case_normalization,copy=False) else: return self._make_similar_dict(value,copy=False) else: return value
[docs] def has_entry(self, path, kind="all"): """ Determine if the path is in the dictionary. `kind` determines which kind of path to consider and can be ``'leaf'``, ``'branch'`` or ``'all'``. """ funcargparse.check_parameter_range(kind,"kind",{"leaf","branch","all"}) try: v=self._get_entry(path) return (kind=="all") or (kind=="branch" and self._is_branch(v)) or (kind=="leaf" and not self._is_branch(v)) except KeyError: return False
[docs] def get_max_prefix(self, path, kind="all"): """ Find the longest prefix of `path` contained in the dictionary. Return tuple ``(prefix, rest)``, where both path entries are normalized according to the dictionary rules. `kind` determines which kind of path to consider and can be ``'leaf'``, ``'branch'`` or ``'all'``. If the longest prefix is of a different kind, return ``(None,None)``. """ funcargparse.check_parameter_range(kind,"kind",{"leaf","branch","all"}) s_path=self._normalize_path(path) if s_path==[]: if not self._data and kind!="branch": return ([],[]) if self._data and kind!="leaf": return ([],[]) return (None,None) branch=self._data for i,p in enumerate(s_path): if p in branch: branch=branch[p] if not self._is_branch(branch): return (None,None) if kind=="branch" else (s_path[:i+1],s_path[i+1:]) else: return (None,None) if kind=="leaf" else (s_path[:i],s_path[i:]) return (None,None) if kind=="leaf" else (s_path,[])
[docs] def del_entry(self, path): """ Delete entry from the dictionary. Return ``True`` if the path was present.""" path=self._normalize_path(path) if path==[]: return False branch=self._get_branch(path[:-1],append=False) if branch: try: del branch[path[-1]] return True except KeyError: pass return False
__getitem__=get_entry __setitem__=add_entry __contains__=has_entry __delitem__=del_entry def __len__(self): return len(self._data)
[docs] def size(self): """Return the total size of the dictionary (number of nodes).""" def _branch_size(branch): if self._is_branch(branch): return sum(_branch_size(v) for v in viewvalues_(branch)) else: return 1 return _branch_size(self._data)
[docs] def get(self, path, default=None): """ Analog of ``dict.get()``: ``D.get(k,d) -> D[k] if k in D else d``. """ try: return self.__getitem__(path) except KeyError: return default
[docs] def setdefault(self, path, default=None): """ Analog of ``dict.setdefault()``: ``D.setdefault(k,d) -> D.get(k,d)``, also sets ``D[k]=d`` if ``k is not in D``. """ try: return self.__getitem__(path) except KeyError: self.__setitem__(path, default) return default
[docs] def viewitems(self, ordered=False, leafs=False, path_kind="split", wrap_branches=True): """ Analog of ``dict.viewitems()``, by default iterating only over the immediate children of the root. Args: ordered (bool): If ``True``, loop over keys in alphabetic order. leafs (bool): If ``True``, loop over leaf nodes (i.e., behave as 'flat' dictionary); otherwise, loop over immediate children (i.e., behave as 'nested' dictionary) path_kind (str): either ``"split"`` (each path is a tuple of individual keys), or ``"joined"`` (each path is a single string) wrap_branches (bool): if ``True``, wrap sub-branches into :class:`DictionaryPointer` objects; otherwise, return them as nested built-in dictionaries """ if leafs: funcargparse.check_parameter_range(path_kind,"path_kind",{"split","joined"}) makep=tuple if path_kind=="split" else "/".join for p,v in self.iternodes(to_visit="leafs",ordered=ordered,include_path=True): yield makep(p),v else: items_=sorted(viewitems_(self._data)) if ordered else viewitems_(self._data) if wrap_branches: makev=lambda p,v: (self._fast_build_branch_pointer([p],v) if self._is_branch(v) else v) else: makev=lambda p,v: v for p,v in items_: yield p,makev(p,v)
iteritems=viewitems # for compatibility items=viewitems
[docs] def viewvalues(self, ordered=False, leafs=False, wrap_branches=True): """ Analog of ``dict.viewvalues()``, iterating only over the immediate children of the root. Args: ordered (bool): If ``True``, loop over keys in alphabetic order. leafs (bool): If ``True``, loop over leaf nodes (i.e., behave as 'flat' dictionary); otherwise, loop over immediate children (i.e., behave as 'nested' dictionary) wrap_branches (bool): if ``True``, wrap sub-branches into :class:`DictionaryPointer` objects; otherwise, return them as nested built-in dictionaries """ for _,v in self.items(ordered=ordered,leafs=leafs,wrap_branches=wrap_branches): yield v
itervalues=viewvalues values=viewvalues
[docs] def viewkeys(self, ordered=False): """ Analog of ``dict.viewkeys()``, iterating only over the immediate children of the root. Args: ordered (bool): If ``True``, loop over keys in alphabetic order. """ return sorted(self._data) if ordered else list(self._data)
iterkeys=viewkeys # for compatibility def __iter__(self): return self._data.__iter__() keys=viewkeys
[docs] def paths(self, ordered=False, topdown=False, path_kind="split"): """ Return list of all leaf paths. Args: ordered (bool): If ``True``, loop over paths in alphabetic order. topdown (bool): If ``True``, return node's leafs before its subtrees leafs. path_kind (str): either ``"split"`` (each path is a tuple of individual keys), or ``"joined"`` (each path is a single string) """ ps=[] funcargparse.check_parameter_range(path_kind,"path_kind",{"split","joined"}) makep=tuple if path_kind=="split" else "/".join for p,_ in self.iternodes(to_visit="leafs",ordered=ordered,topdown=topdown,include_path=True): ps.append(makep(p)) return ps
def _iterbranches(self, ordered=False, topdown=False): if topdown: yield self source=self._data path=self.get_path() if ordered: iter_range=sorted(viewitems_(source)) else: iter_range=viewitems_(source) for k,v in iter_range: if self._is_branch(v): ptr=self._fast_build_branch_pointer(path+[k],v) for b in ptr._iterbranches(ordered=ordered,topdown=topdown): yield b if not topdown: yield self
[docs] def iternodes(self, to_visit="leafs", ordered=False, include_path=False, topdown=False): """ Iterate over nodes. Args: to_visit (str): Can be ``'leafs'``, ``'branches'`` or ``'all'`` and determines which parts of the dictionary are visited. ordered (bool): If ``True``, loop over paths in alphabetic order. include_path (bool): Include in the return value. topdown (bool): If ``True``, visit node and its leafs before its subtrees leafs. Yield: Values for leafs and :class:`DictionaryPointer` for branches. If ``include_path==True``, yields tuple ``(path, value)``, where `path` is in the form of a normalized list. """ funcargparse.check_parameter_range(to_visit,"to_visit",{"branches","leafs","all"}) for br in self._iterbranches(ordered=ordered,topdown=topdown): path=br.get_path() if topdown and (to_visit in {"branches","all"}): yield (path,br) if include_path else br if to_visit in {"leafs","all"}: for k,v in br.viewitems(ordered=ordered,wrap_branches=False): if not self._is_branch(v): yield (path+[k],v) if include_path else v if (not topdown) and (to_visit in {"branches","all"}): yield (path,br) if include_path else br
nodes=iternodes def __str__(self): iterleafs=self.iternodes(ordered=True,to_visit="leafs",include_path=True) content="\n".join("'{0}': {1}".format("/".join(k),str(v)) for k,v in iterleafs) return "{0}({1})".format(type(self).__name__,content) __repr__=__str__ def _insert_branch(self, source, dest, overwrite=True, normalize_paths=True): for k,v in viewitems_(source): if normalize_paths: k=self._normalize_path(k) if len(k)>1: v=reduce((lambda d,sk: {sk:d}), [v]+k[:0:-1]) # build dict corresponding to {"k[1]/k[2]/.../k[-1]":v} k=k[0] else: k=self._normalize_path_entry(str(k)) try: v=self._get_root(v) is_branch=True except ValueError: is_branch=False if is_branch: if k in dest and not (self._is_branch(dest[k])): if overwrite: dest[k]={} else: continue dest.setdefault(k,{}) self._insert_branch(v,dest[k],overwrite=overwrite,normalize_paths=normalize_paths) else: if overwrite: dest[k]=v else: dest.setdefault(k,v)
[docs] def merge_branch(self, source, branch="", overwrite=True, normalize_paths=True): """ Attach source (:class:`dict` or other :class:`Dictionary`) to a given branch; source is automatically deep-copied. Args: source (dict or Dictionary) branch (tuple or str): Destination path. overwrite (bool): If ``True``, replaces the old entries with the new ones (it only matters for leaf assignments). normalize_paths (bool): If ``True`` and the dictionary isn't case sensitive, perform normalization if the `source`. """ source=Dictionary._get_root(source) if not source: return self path=self._normalize_path(branch) dest=self._get_branch(path,append=True,overwrite_leaves=overwrite) if dest is None: raise KeyError("can't replace the leaf '{0}' with a subtree; delete the leaf explicitly first, or use force=True".format("/".join(path))) self._insert_branch(source,dest,overwrite=overwrite,normalize_paths=normalize_paths) return self
update=merge_branch
[docs] def detach_branch(self, branch=""): """Remove branch from the current dictionary and return it as a separate :class:`Dictionary`.""" subtree=self[branch] del self[branch] return subtree
@staticmethod def _deep_copy(leaf): if Dictionary._is_branch(leaf): res={} for k,v in viewitems_(leaf): res[k]=Dictionary._deep_copy(v) else: res=leaf return res
[docs] def branch_copy(self, branch=""): """Get a copy of the branch as a :class:`Dictionary`.""" source=self._get_branch(self._normalize_path(branch),append=False) if source is None: raise KeyError("unaccessible entry with path {0}".format(branch)) return self._make_similar_dict(self._deep_copy(source),copy=False)
[docs] def copy(self): """Get a full copy the dictionary.""" return self.branch_copy()
[docs] def updated(self, source, branch="", overwrite=True, normalize_paths=True): """ Get a copy of the dictionary and attach a new branch to it. Parameters are the same as in the :meth:`Dictionary.merge_branch`. """ cpy=self.copy() cpy.merge_branch(source,branch=branch,overwrite=overwrite,normalize_paths=normalize_paths) return cpy
[docs] def as_dict(self, style="nested", copy=True): """ Convert into a :class:`dict` object. Args: style (str): Determines style of the returned : - ``'nested'`` -- subtrees are turned into nested dictionaries, - ``'flat'`` -- single dictionary is formed with full paths as keys. copy (bool): If ``False`` and ``style=='nested'``, return the root dictionary. """ if isinstance(self,dict): return self.copy() if copy else self funcargparse.check_parameter_range(style,"style",{"nested","flat"}) if style=="nested": return self.copy()._data if copy else self._data else: d={} for p,v in self.iternodes(to_visit="leafs",include_path=True): d["/".join(p)]=v return d
[docs] def as_pandas(self, index_key=True, as_series=True): """ Convert into a pandas DataFrame or Series object. Args: index_key (bool): If ``False``, create a 2-column table with the first column (``"key"``) containing string path and the second column (``"value"``) containing value; otherwise, move key to the table index. as_series (bool): If ``index_key==True`` and ``as_series==True``, convert the resulting DataFrame into 1D Series (the key is the index); otherwise, keep it as a single-column table """ data=[("/".join(p), v) for p,v in self.iternodes(to_visit="leafs",include_path=True,ordered=True)] table=pd.DataFrame(data,columns=["key","value"]) if index_key: table=table.set_index("key") if as_series: table=table["value"] return table
[docs] def get_path(self): return [] # for compatibility with pointer
[docs] def branch_pointer(self, branch=""): """ Get a :class:`DictionaryPointer` of a given branch. """ return DictionaryPointer(self,branch,case_sensitive=self._case_sensitive,case_normalization=self._case_normalization,copy=False)
def _fast_build_branch_pointer(self, norm_path, node): return DictionaryPointer._fast_build(self,norm_path,node,case_sensitive=self._case_sensitive,case_normalization=self._case_normalization,copy=False)
[docs] def map_self(self, func, to_visit="leafs", pass_path=False, topdown=False, branch_option="normalize"): """ Apply `func` to the nodes in the dictionary. Args: func (callable): Mapping function. Leafs are passed by value, branches (if visited) are passed as :class:`DictionaryPointer`. to_visit (str): Can be ``'leafs'``, ``'branches'`` or ``'all'`` and determines which parts of the dictionary passed to the map function. pass_path (bool): If ``True``, pass the node path (in the form of a normalized list) as a first argument to `func`. topdown (bool): If ``True``, visit node and its leafs before its subtrees leafs. branch_option (str): If the function returns a dict-like object, determines how to """ funcargparse.check_parameter_range(to_visit,"to_visit",{"branches","leafs","all"}) funcargparse.check_parameter_range(branch_option,"branch_option",{"attach","copy","normalize"}) visit_branches=to_visit in {"branches","all"} visit_leafs=to_visit in {"leafs","all"} for br in self._iterbranches(topdown=topdown): path=br.get_path() source=br._data for k,v in viewitems_(source): if self._is_branch(v): if visit_branches: ptr=self._fast_build_branch_pointer(path+[k],v) res=func(ptr.get_path(),ptr) if pass_path else func(ptr) if res is not ptr: self._attach_node(source,k,res,branch_option=branch_option) elif visit_leafs: res=func(path+[k],v) if pass_path else func(v) if res is not v: self._attach_node(source,k,res,branch_option=branch_option) return self
[docs] def filter_self(self, pred, to_visit="leafs", pass_path=False, topdown=False): """ Remove all the nodes from the dictionary for which `pred` returns ``False``. Args: pred (callable): Filter function. Leafs are passed to `pred` by value, branches (if visited) are passed as :class:`DictionaryPointer`. to_visit (str): Can be ``'leafs'``, ``'branches'`` or ``'all'`` and determines which parts of the dictionary passed to the predicate. pass_path (bool): If ``True``, pass the node path (in the form of a normalized list) as a first argument to `pred`. topdown (bool): If ``True``, visit node and its leafs before its subtrees leafs. """ funcargparse.check_parameter_range(to_visit,"to_visit",{"branches","leafs","all"}) visit_branches=to_visit in {"branches","all"} visit_leafs=to_visit in {"leafs","all"} for br in self._iterbranches(topdown=topdown): path=br.get_path() source=br._data for k,v in list(source.items()): keep=True if self._is_branch(v): if visit_branches: ptr=self._fast_build_branch_pointer(path+[k],v) keep=pred(ptr.get_path(),ptr) if pass_path else pred(ptr) elif visit_leafs: keep=pred(path+[k],v) if pass_path else pred(v) if not keep: del source[k] return self
[docs] class DictionaryDiff(collections.namedtuple("DictionaryDiff",["same","changed_from","changed_to","removed","added"])): # making Sphinx autodoc generate correct docstring """ Describes a difference between the two dictionaries. Attributes: same (:class:`Dictionary`): Contains the leafs which is the same. changed_from (:class:`Dictionary`): Contains the leafs from the first dictionary which have different values in the second dictionary. changed_to (:class:`Dictionary`): Contains the leafs from the second dictionary which have different values in the first dictionary. removed (:class:`Dictionary`): Contains the leafs from the first dictionary which are absent in the second dictionary. added (:class:`Dictionary`): Contains the leafs from the second dictionary which are absent in the first dictionary. """
[docs] def diff(self, other): """ Perform an element-wise comparison to another Dictionary. If the other Dictionary has a different case sensitivity, raise :exc:`ValueError`. Returns: :class:`Dictionary.DictionaryDiff` """ if self._case_sensitive!=other._case_sensitive: raise ValueError("can't compare dictionaries with different case sensitivity") self_paths=set(["/".join(p) for p in self.paths()]) if self._case_sensitive or self._case_normalization!=other._case_normalization: other_paths=other.paths() else: other_paths=[ tuple([self._normalize_path_entry(e) for e in p]) for p in other.paths() ] other_paths=set(["/".join(p) for p in other_paths]) same_paths=set.intersection(self_paths,other_paths) added=self._make_similar_dict() removed=self._make_similar_dict() for p in set.difference(self_paths,same_paths): removed[p]=self[p] for p in set.difference(other_paths,same_paths): added[p]=other[p] same=self._make_similar_dict() changed_from=self._make_similar_dict() changed_to=self._make_similar_dict() for p in same_paths: vs,vo=self[p],other[p] if vs==vo: same[p]=vs else: changed_from[p]=vs changed_to[p]=vo return self.DictionaryDiff(same,changed_from,changed_to,removed,added)
[docs] @staticmethod def diff_flatdict(first, second): """ Find the difference between flat :class:`dict` objects. Returns: :class:`Dictionary.DictionaryDiff` """ first_paths=set(first) second_paths=set(second) same_paths=first_paths&second_paths added=dict([ (k,second[k]) for k in (second_paths-same_paths) ]) removed=dict([ (k,first[k]) for k in (first_paths-same_paths) ]) same={} changed_from={} changed_to={} for p in same_paths: vf,vs=first[p],second[p] if vf==vs: same[p]=vf else: changed_from[p]=vf changed_to[p]=vs return Dictionary.DictionaryDiff(same,changed_from,changed_to,removed,added)
[docs] class DictionaryIntersection(collections.namedtuple("DictionaryIntersection",["common","individual"])): # making Sphinx autodoc generate correct docstring """ Describes the result of finding intersection of multiple dictionaries. Attributes: common (:class:`Dictionary`): Contains the intersection of all dictionaries. individual ([:class:`Dictionary`]): Contains list of difference from intersection for all dictionaries. """
[docs] @staticmethod def find_intersection(dicts, use_flatten=False): """ Find intersection of multiple dictionaries. Args: dicts ([:class:`Dictionary`]) use_flatten (bool): If ``True`` flatten all dictionaries before comparison (works faster for a large number of dictionaries). Returns: :class:`Dictionary.DictionaryIntersection` """ if len(dicts)==0: return Dictionary.DictionaryIntersection(Dictionary(),[]) if len(dicts)==1: return Dictionary.DictionaryIntersection(dicts[0],[Dictionary()]) if not use_flatten: common=dicts[0] for d in dicts[1:]: common=common.diff(d).same individual=[d.diff(common).removed for d in dicts] return Dictionary.DictionaryIntersection(common,individual) else: d0=dicts[0] for d in dicts[1:]: if d._case_sensitive!=d0._case_sensitive: raise ValueError("can't compare dictionaries with different case sensitivity") if not d0._case_sensitive: for d in dicts[1:]: if d._case_normalization!=d0._case_normalization: return Dictionary.find_intersection(dicts,use_flatten=False) fdicts=[d.as_dict("flat") for d in dicts] common=fdicts[0] for d in fdicts[1:]: common=Dictionary.diff_flatdict(common,d).same individual=[Dictionary.diff_flatdict(d,common).removed for d in fdicts] common=d0._make_similar_dict(common) individual=[d0._make_similar_dict(i) for i in individual] return Dictionary.DictionaryIntersection(common,individual)
def _add_dict(self, d1, d2): if self._is_branch(d1): for k,v in viewitems_(d2): if k in d1: self._add_dict(d1[k],v) else: d1[k]=v def _dfs_pattern(self, path, root, wildkey, wildpath, match_leaves, wrap_nodes=None): if wrap_nodes is None: wrap_nodes=not match_leaves res=(root,) if wrap_nodes else root if not path: return res, not (match_leaves and self._is_branch(root)) if not self._is_branch(root): return res, (len(path)==1 and path[0]==wildpath) if path[0]==wildkey: res={} for k,v in viewitems_(root): mv,succ=self._dfs_pattern(path[1:],v,wildkey,wildpath,match_leaves,wrap_nodes=wrap_nodes) if succ: res[k]=mv return res,bool(res) elif path[0]==wildpath: mvd,succd=self._dfs_pattern(path[1:],root,wildkey,wildpath,match_leaves,wrap_nodes=wrap_nodes) mvk={} for k,v in viewitems_(root): mv,succ=self._dfs_pattern(path,v,wildkey,wildpath,match_leaves,wrap_nodes=wrap_nodes) if succ: mvk[k]=mv if succd: if mvk: self._add_dict(mvd,mvk) return mvd,True else: return mvk,bool(mvk) elif path[0] in root: mv,succ=self._dfs_pattern(path[1:],root[path[0]],wildkey,wildpath,match_leaves,wrap_nodes=wrap_nodes) return ({path[0]:mv} if succ else None), succ else: return None,False
[docs] def get_matching_paths(self, pattern, wildkey="*", wildpath="**", only_leaves=True): """ Get all paths in the tree that match the provided pattern. Args: wildkey (str): Pattern symbol that matches any key. wildpath (str): Pattern symbol that matches any subpath (possibly empty). only_leaves (bool): If ``True``, only check leaf paths; otherwise, check subtree paths (i.e., incomplete leaf paths) as well. Basically, ``only_leaves=False`` is analogous to adding wildpath at the end of the pattern. """ s_path=self._normalize_path(pattern) dfs_tree,matched=self._dfs_pattern(s_path,self._data,wildkey,wildpath,match_leaves=only_leaves) if not matched: return [] def _get_paths(d): if self._is_branch(d): return [ [k]+p for k,v in viewitems_(d) for p in _get_paths(v)] else: return [[]] paths=_get_paths(dfs_tree) return paths
[docs] def get_matching_subtree(self, pattern, wildkey="*", wildpath="**", only_leaves=True): """ Get a subtree containing nodes with paths matching the provided pattern. Args: wildkey (str): Pattern symbol that matches any key. wildpath (str): Pattern symbol that matches any subpath (possibly empty). only_leaves (bool): If ``True``, only check leaf paths; otherwise, check subtree paths (i.e., incomplete leaf paths) as well. Basically, ``only_leaves=False`` is analogous to adding wildpath at the end of the pattern. """ s_path=self._normalize_path(pattern) if s_path[-1]==wildpath: return self.get_matching_subtree(s_path[:-1],wildkey,wildpath,only_leaves=False) dfs_tree,matched=self._dfs_pattern(s_path,self._data,wildkey,wildpath,match_leaves=only_leaves,wrap_nodes=False) if not matched: return self._make_similar_dict({},copy=False) return self._make_similar_dict(dfs_tree,copy=False)
### Conversion to and from a tuple ### ### Used in .strdump module (see that module for more info) ### def _dump_dictionary(d, dumpf): v=d.as_dict("nested") v=dumpf(v) return v,d._case_sensitive,d._case_normalization def _load_dictionary(v, loadf): d,case_sensitive,case_normalization=v return Dictionary(loadf(d),case_sensitive=case_sensitive,case_normalization=case_normalization,copy=False) strdump.dumper.add_class(Dictionary,_dump_dictionary,_load_dictionary,"dict",recursive=True)
[docs]class DictionaryPointer(Dictionary): """ Similar to :class:`Dictionary`, but can point at one of the branches instead of the full dictionary. Effect is mostly equivalent to prepending some path to all queries. Args: root (dict or Dictionary): Complete tree. pointer: Path to the pointer location. case_sensitive (bool): If ``False``, entries case is normalized according to `case_normalization`. case_normalization (str): Normalization rules; either ``'lower'`` or ``'upper'``. copy (bool): If ``True``, make copy of the supplied data; otherwise, just make it the root. Warning: If ``copy==False``, the root data is already assumed to be normalized. If it isn't, the behavior might be incorrect. """ def __init__(self, root=None, pointer=None, case_sensitive=True, case_normalization="lower", copy=True): Dictionary.__init__(self,root=root,case_sensitive=case_sensitive,case_normalization=case_normalization,copy=copy) self._root=self._data if len(pointer)==0: self._path=[] else: self.move(pointer) def __str__(self): iterleafs=self.iternodes(ordered=True,to_visit="leafs",include_path=True) path_length=len(self.get_path()) content="\n".join("'{0}': {1}".format("/".join(k[path_length:]),str(v)) for k,v in iterleafs) return "{0}(location = '{1}'; {2})".format(type(self).__name__,"/".join(self.get_path()),content) __repr__=__str__
[docs] def get_path(self): """ Return pointer path in the whole dictionary. """ return self._path
[docs] def move(self, path="", absolute=True): """ Move the pointer to a new path. Args: path absolute (bool): If ``True``, path is specified with respect to the root; otherwise, it's specified with respect to the current position (and can only go deeper). """ path=self._normalize_path(path) if not absolute: path=self._path+path self._path=path self._data=self._root self._data=self._get_branch(self._path)
@staticmethod def _fast_build(root, norm_path, node, case_sensitive=True, case_normalization="lower", copy=False): ptr=DictionaryPointer(root=root,pointer=[],case_sensitive=case_sensitive,case_normalization=case_normalization,copy=copy) ptr._data=node ptr._path=norm_path return ptr
[docs] def branch_pointer(self, branch=""): """ Get a :class:`DictionaryPointer` of a given branch. """ branch=self._path+self._normalize_path(branch) return DictionaryPointer(self._root,branch,case_sensitive=self._case_sensitive,case_normalization=self._case_normalization,copy=False)
[docs]class PrefixTree(Dictionary): """ Expansion of a :class:`Dictionary` designed to store data related to prefixes. Each branch node can have a leaf with a name given by wildcard (``'*'`` by default) or matchcard (``'.'`` by default). Wildcard assumes that the branch node path is a prefix; matchcard assumes exact match. These leafs are inspected when specific prefix tree functions (find_largest_prefix and find_all_prefixes) are used. Args: root (dict or Dictionary): Complete tree. case_sensitive (bool): If ``False``, entries case is normalized according to `case_normalization`. case_normalization (str): Normalization rules; either ``'lower'`` or ``'upper'``. wildcard (str): Symbol for a wildcard entry. matchcard (str): Symbol for a matchcard entry. copy (bool): If ``True``, make copy of the supplied data; otherwise, just make it the root. Warning: If ``copy==False``, the root data is already assumed to be normalized. If it isn't, the behavior might be incorrect. """ def __init__(self, root=None, case_sensitive=True, case_normalization="lower", wildcard="*", matchcard=".", copy=True): Dictionary.__init__(self,root,case_sensitive=case_sensitive,case_normalization=case_normalization,copy=copy) self._wildcard=wildcard self._matchcard=matchcard
[docs] def copy(self): """Get a full copy the prefix tree.""" return PrefixTree(self.branch_copy(),case_sensitive=self._case_sensitive,case_normalization=self._case_normalization, wildcard=self._wildcard,matchcard=self._matchcard,copy=False)
def _loop_over_prefixes(self, path, allow_nomatch_exact=True): s_path=self._normalize_path(path) l=len(s_path) branch=self._data for i,p in enumerate(s_path): if not self._is_branch(branch): return if self._wildcard in branch: yield i,branch[self._wildcard] if p in branch: branch=branch[p] else: return if not self._is_branch(branch): if allow_nomatch_exact: #yield None,branch yield l,branch else: if self._wildcard in branch: yield l,branch[self._wildcard] if self._matchcard in branch: yield l,branch[self._matchcard]
[docs] def find_largest_prefix(self, path, default=None, allow_nomatch_exact=True, return_path=False, return_subpath=False): """ Find the entry which is the largest prefix of a given path. Args: path default: Default value if the path isn't found. allow_nomatch_exact (bool): If ``True``, just element with the given path can be returned; otherwise, only elements stored under wildcards and matchcards are considered. return_path (bool): If ``True``, return path to the element (i.e., the largest prefix) instead of the element itself. return_subpath (bool): If ``True``, return tuple with a second element being part of the `path` left after subtraction of the prefix. """ s_path=self._normalize_path(path) cut_pos=0 data=default for l in self._loop_over_prefixes(s_path,allow_nomatch_exact=allow_nomatch_exact): cut_pos,data=l if return_subpath: return (s_path[:cut_pos] if return_path else data),s_path[cut_pos:] else: return (s_path[:cut_pos] if return_path else data)
[docs] def find_all_prefixes(self, path, allow_nomatch_exact=True, return_path=True, return_subpath=False): """ Find list of all the entries which are prefixes of a given path. Args: path default: Default value if the path isn't found. allow_nomatch_exact (bool): If ``True``, just element with the given path can be returned; otherwise, only elements stored under wildcards and matchcards are considered. return_path (bool): If ``True``, return path to the element (i.e., the largest prefix) instead of the element itself. return_subpath (bool): If ``True``, return tuple with a second element being part of the `path` left after subtraction of the prefix. """ s_path=self._normalize_path(path) pfxs=[] for l in self._loop_over_prefixes(s_path,allow_nomatch_exact=allow_nomatch_exact): cut_pos,data=l if return_subpath: pfxs.append( ((s_path[:cut_pos] if return_path else data),s_path[cut_pos:]) ) else: pfxs.append( (s_path[:cut_pos] if return_path else data) ) return pfxs
[docs]def combine_dictionaries(dicts, func, select="all", pass_missing=False): """ Combine several dictionaries element-wise (only for leafs) using a given function. Args: dicts(list or tuple): list of dictionaries (:class:`Dictionary` or ``dict``) to be combined func(callable): combination function. Takes a single argument, which is a list of elements to be combined. select(str): determins which keys are selected for the resulting dictionary. Can be either ``"all"`` (only keep keys which are present in all the dictionaries), or ``"any"`` (keep keys which are present in at least one dictionary). Only keys that point to leafs count; if a key points to a non-leaf branch in some dictionary, it is considered absent from this dictionary. pass_missing(bool): if ``select=="any"``, this parameter determines whether missing elements will be passed to `func` as ``None``, or omitted entirely. """ funcargparse.check_parameter_range(select,"select",["all","any"]) if not dicts: return Dictionary() dicts=[as_dictionary(d) for d in dicts] paths=set(dicts[0].paths()) if select=="all": paths=set([p for p in paths if all([d.has_entry(p,kind="leaf") for d in dicts]) ]) else: for d in dicts: paths.update(d.paths()) result=dicts[0]._make_similar_dict() for p in paths: if select=="any" and pass_missing: values=[d[p] for d in dicts if d.has_entry(p,"leaf")] else: values=[(d[p] if d.has_entry(p,"leaf") else None) for d in dicts] joined_value=func(values) result[p]=joined_value return result
## Simple substitute for Serializable ## ## Generate (local) objects hierarchy from Dictionary ## local here means that object is created based only on its immediate children, not on grand children or parents
[docs]class DictionaryNode(object): def __init__(self, **vargs): object.__init__(self) for name, value in viewitems_(vargs): setattr(self,name,value) def __str__(self): return str(self.__dict__) def __repr__(self): return "DictionaryNode({})".format(self)
def _default_object_generator(data, name=None): return DictionaryNode(**data)
[docs]def dict_to_object_local(data, name=None, object_generator=_default_object_generator): obj_dict={} for name,value in viewitems_(data): if Dictionary._is_branch(value): obj_dict[name]=dict_to_object_local(value,name=name,object_generator=object_generator) else: obj_dict[name]=value return object_generator(obj_dict,name)
[docs]class PrefixShortcutTree(object): """ Convenient storage for dictionary path shortcuts. Args: shortcuts (dict): Dictionary of shortcuts ``{shortcut: full_path}``. """ def __init__(self, shortcuts=None): self._tree=PrefixTree() if shortcuts: self.add_shortcuts(shortcuts)
[docs] def copy(self): """Return full copy.""" res=PrefixShortcutTree() res._tree=self._tree.copy() return res
[docs] def add_shortcut(self, source, dest, exact=False): """ Add a single shortcut. Args: source: Shortcut path. dest: expanded path corresponding to the shortcut. exact (bool): If ``True``, the shortcut works only for the exact path; otherwise, it works for any path with 'source' as a prefix. """ self._tree[source,"." if exact else "*"]=normalize_path(dest) return self
[docs] def add_shortcuts(self, shortcuts, exact=False): """ Add a dictionary of shortcuts ``{shortcut: full_path}``. Arguments are the same as in :meth:`PrefixShortcutTree.add_shortcut`. """ for s,d in viewitems_(shortcuts): self.add_shortcut(s,d,exact=exact) return self
[docs] def remove_shortcut(self, source): """Remove a shortcut from the tree""" pfx=self._tree.find_largest_prefix(source,return_path=True) del self._tree[pfx]
[docs] def updated(self, shortcuts, exact=False): """ Make a copy and add additional shortcuts. Arguments are the same as in :meth:`PrefixShortcutTree.add_shortcuts`. """ return self.copy().add_shortcuts(shortcuts,exact=exact)
def _find_shortcut(self, source): dest,subpath=self._tree.find_largest_prefix(source,return_subpath=True) if dest and subpath: dest=dest+subpath return dest def __call__(self, source, recursive=True): """ Find and expand shortcuts in the path. Args: source: Source path. recursive (bool): If ``True``, keep substituting shortcuts while possible; otherwise, do a single substitute. """ source=normalize_path(source) if recursive: while True: dest=self._find_shortcut(source) if dest is None: return source source=dest else: return self._find_shortcut(source) or source
### Indexing accessor decorator ###
[docs]class ItemAccessor(object): def __init__(self, getter=None, setter=None, deleter=None, normalize_names=True, path_separator="/", allow_incomplete_paths=False): object.__init__(self) self.getter=getter self.setter=setter self.deleter=deleter self.normalize_names=normalize_names or allow_incomplete_paths self.allow_incomplete_paths=allow_incomplete_paths self.path_separator=path_separator def _norm_name(self, name): if self.normalize_names: return "/".join(normalize_path(name,sep=self.path_separator)) return name
[docs] class ItemAccessorBranch(object): def __init__(self, src, branch): object.__init__(self) self.src=src self.branch=src._norm_name(branch)+src.path_separator def __getitem__(self, name): return self.src.__getitem__(self.branch+self.src._norm_name(name)) def __setitem__(self, name, value): return self.src.__setitem__(self.branch+self.src._norm_name(name),value) def __delitem__(self, name): return self.src.__delitem__(self.branch+self.src._norm_name(name))
def __getitem__(self, name): name=self._norm_name(name) try: if self.getter is not None: return self.getter(name) except KeyError: if not self.allow_incomplete_paths: raise if self.allow_incomplete_paths: return self.ItemAccessorBranch(self,name+self.path_separator) raise NotImplementedError("getter is not specified") def __setitem__(self, name, value): name=self._norm_name(name) if self.setter is not None: return self.setter(name, value) raise NotImplementedError("setter is not specified") def __delitem__(self, name): name=self._norm_name(name) if self.deleter is not None: return self.deleter(name) raise NotImplementedError("deleter is not specified")
[docs] def get(self, name, default=None): try: return self.__getitem__(name) except KeyError: return default