# accumulate.py """ Data structures for accumulating tree-structured data from multiple sources. Data is acquired from file and directory names and also from yaml files in the tree. The yaml files are loaded in increasing priority from upper directories to the local one, starting from the highest level file to contain a "project_root" key. The files named for their parent directory are assumed to be KitCAT files (i.e. "kitcat.yaml" and ".yaml" are treated the same way). Only files named "abx.yaml" are assumed to be configuration files specific to ABX. We collect these by going up the file path, and then load them coming down. If we find a "project_root" key, we ditch the previous data and start over. This way any project files found above the project root will be ignored. As a use case: if we were to store a new project inside of another project, the new project's project_root would make it blind to the settings in the containing project. Other directories in the parent project would still go to the parent project's root. This avoids having the location the project is stored affect the project data. The overall structure is a dictionary. When updating with new data, any element that is itself a dictionary is treated recursively (that is, it is updated with directory data when another dictionary is provided for the same key). If an element is a list, then data from successively-higher directories extends the list (see UnionList, below). If a scalar replaces a dictionary or list value in a more specific entry, then it clobbers it and any updated information in it. @author: Terry Hancock @copyright: 2019 Anansi Spaceworks. @license: GNU General Public License, version 2.0 or later. (Python code) @contact: digitante@gmail.com Demo: >>> import accumulate >>> T1 = accumulate.RecursiveDict(accumulate.TEST_DICT_1) >>> T2 = accumulate.RecursiveDict(accumulate.TEST_DICT_2) >>> import copy >>> Ta = copy.deepcopy(T1) >>> Tb = copy.deepcopy(T2) >>> Ta RecursiveDict({'A': 1, 'B': [1, 2, 3], 'C': {'a': 1, 'b': 2, 'c': 3}, 'D': {}, 'E': None, 'F': {'h': {'i': {'j': {'k': 'abcdefghijk'}}}}}) >>> Tb RecursiveDict({'C': {'d': 4, 'e': 5, 'f': 6}, 'D': (1, 2, 3), 'B': [4, 5, 6], 'E': 0}) >>> Ta.update(T2) >>> Ta RecursiveDict({'A': 1, 'B': [4, 5, 6, 1, 2, 3], 'C': {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6}, 'D': (1, 2, 3), 'E': 0, 'F': {'h': {'i': {'j': {'k': 'abcdefghijk'}}}}}) >>> Tb.update(T1) >>> Tb RecursiveDict({'C': {'d': 4, 'e': 5, 'f': 6, 'a': 1, 'b': 2, 'c': 3}, 'D': {}, 'B': [1, 2, 3, 4, 5, 6], 'E': None, 'A': 1, 'F': {'h': {'i': {'j': {'k': 'abcdefghijk'}}}}}) >>> """ TEST_DICT_1 = { 'A':1, 'B':[1,2,3], 'C':{'a':1, 'b':2, 'c':3}, 'D':{}, 'E':None, 'F':{'h':{'i':{'j':{'k':'abcdefghijk'}}}}, } TEST_DICT_2 = { 'C':{'d':4, 'e':5, 'f':6}, 'D':(1,2,3), 'B':[4,5,6], 'E':0 } YAML_TEST = """ A: 1 B: - 4 - 5 - 6 - 1 - 2 - 3 C: a: 1 b: 2 c: 3 d: 4 e: 5 f: 6 D: (1, 2, 3) E: 0 F: h: i: j: k: abcdefghijk """ import os, collections.abc, re import yaml wordre = re.compile(r'([A-Z]+[a-z]*|[a-z]+|[0-9]+)') class OrderedSet(collections.abc.Set): """ List-based set from Python documentation example. """ def __init__(self, iterable): self.elements = lst = [] for value in iterable: if value not in lst: lst.append(value) def __iter__(self): return iter(self.elements) def __contains__(self, value): return value in self.elements def __len__(self): return len(self.elements) def __repr__(self): return repr(list(self)) def union(self, other): return self.__or__(other) def intersection(self, other): return self.__and__(other) class UnionList(list): """ Special list-based collection, which implements a "union" operator similar to the one defined for sets. It only adds options from the other list which are not already in the current list. Note that it is intentionally asymmetric. The initial list may repeat values and they will be kept, so it does not require the list to consist only of unique entries (unlike Set collections). This allows us to use this type for loading list-oriented data from data files, which may or may not contain repetitions for different uses, but also makes accumulation idempotent (running the union twice will not increase the size of the result, because no new values will be found). """ def union(self, other): combined = UnionList(self) for element in other: if element not in self: combined.append(element) return combined class RecursiveDict(collections.OrderedDict): """ A dictionary which updates recursively, updating any values which are themselves dictionaries when the replacement value is a dictionary, rather than replacing them, and treating any values which are themselves lists as UnionLists and applying the union operation to combine them (when the replacement value is also a list). """ def clear(self): for key in self: del self[key] def update(self, mapping): for key in mapping: if key in self: if (isinstance(self[key], collections.abc.Mapping) and isinstance(mapping[key], collections.abc.Mapping)): # Subdictionary newvalue = RecursiveDict(self[key]) newvalue.update(RecursiveDict(mapping[key])) self[key] = newvalue elif ((isinstance(self[key], collections.abc.MutableSequence) or isinstance(self[key], collections.abc.Set)) and (isinstance(mapping[key], collections.abc.MutableSequence) or isinstance(mapping[key], collections.abc.Set))): # Sublist self[key] = UnionList(self[key]).union(UnionList(mapping[key])) else: # scalar self[key] = mapping[key] else: # new key self[key] = mapping[key] def get_data(self): new = {} for key in self: if isinstance(self[key], RecursiveDict): new[key]=dict(self[key].get_data()) elif isinstance(self[key], UnionList): new[key]=list(self[key]) else: new[key]=self[key] return new def __setitem__(self, key, value): if isinstance(value, collections.abc.Mapping): super().__setitem__(key, RecursiveDict(value)) elif isinstance(value, collections.abc.MutableSequence): super().__setitem__(key, UnionList(value)) else: super().__setitem__(key,value) def __repr__(self, compact=False): s = '' if not compact: s = s + '%s(' % self.__class__.__name__ s = s + '{' for key in self: if isinstance(self[key], RecursiveDict): s = s+"'%s'"%key + ': ' + "%s" % self[key].__repr__(compact=True) + ', ' else: s = s+ "'%s'"%key + ': ' + "%s" % repr(self[key]) + ', ' if s.endswith(', '): s= s[:-2] s = s + '}' if not compact: s = s + ')' return s def from_yaml(self, yaml_string): self.update(yaml.safe_load(yaml_string)) return self def from_yaml_file(self, path): with open(path, 'rt') as yamlfile: self.update(yaml.safe_load(yamlfile)) return self def to_yaml(self): return yaml.dump(self.get_data()) def to_yaml_file(self, path): with open(path, 'wt') as yamlfile: yamlfile.write(yaml.dump(self.get_data())) #-------- # Code for collecting the YAML files we need def collect_yaml_files(path, stems, dirmatch=False, sidecar=False, root='/'): """ Collect a list of file paths to YAML files. Does not attempt to read or interpret the files. @path: The starting point, typically the antecedent filename. @stems: File stem (or sequence of stems) we recognize (in priority order). @dirmatch: Also search for stems matching the containing directory name? @sidecar: Also search for stems matching the antecent filename's stem? @root: Top level directory to consider (do not search above this). "Stem" means the name with any extension after "." removed (typically, the filetype). """ yaml_paths = [] if type(stems) is str: stems = (stems,) path = os.path.abspath(path) path, filename = os.path.split(path) if sidecar: filestem = os.path.splitext(filename)[0] sidecar_path = os.path.join(path, filestem + '.yaml') if os.path.isfile(sidecar_path): yaml_paths.append(sidecar_path) while not os.path.abspath(path) == os.path.dirname(root): path, base = os.path.split(path) if dirmatch: yaml_path = os.path.join(path, base, base + '.yaml') if os.path.isfile(yaml_path): yaml_paths.append(yaml_path) for stem in stems: yaml_path = os.path.join(path, base, stem + '.yaml') if os.path.isfile(yaml_path): yaml_paths.append(yaml_path) yaml_paths.reverse() return yaml_paths def has_project_root(yaml_path): with open(yaml_path, 'rt') as yaml_file: data = yaml.safe_load(yaml_file) if 'project_root' in data: return True else: return False def trim_to_project_root(yaml_paths): for i in range(len(yaml_paths)-1,-1,-1): if has_project_root(yaml_paths[i]): return yaml_paths[i:] return yaml_paths def get_project_root(yaml_paths): trimmed = trim_to_project_root(yaml_paths) if trimmed: return os.path.dirname(trimmed[0]) else: # No project root was found! return '/' def combine_yaml(yaml_paths): data = RecursiveDict() for path in yaml_paths: with open(path, 'rt') as yaml_file: data.update(yaml.safe_load(yaml_file)) return data def get_project_data(filepath): # First, get the KitCAT data. kitcat_paths = collect_yaml_files(filepath, ('kitcat', 'project'), dirmatch=True, sidecar=True) kitcat_data = combine_yaml(trim_to_project_root(kitcat_paths)) kitcat_root = get_project_root(kitcat_paths) abx_data = combine_yaml(collect_yaml_files(filepath, 'abx', root=kitcat_root)) return kitcat_root, kitcat_data, abx_data