diff --git a/abx/accumulate.py b/abx/accumulate.py index 4cf8a2f..b06ba93 100644 --- a/abx/accumulate.py +++ b/abx/accumulate.py @@ -102,6 +102,81 @@ import yaml wordre = re.compile(r'([A-Z]+[a-z]*|[a-z]+|[0-9]+)') +def merge_slices(slices): + """ + Given a list of slice objects, merge into minimum list of new slices to cover same elements. + + The idea is to catch contiguous or overlapping slices and reduce them to a single slice. + + Arguments: + slices (list(slice)): List of slices to be merged. + """ + if isinstance(slices, slice): + slices = [slices] + slices = list(slices) + ordered = sorted(slices, key = lambda a: a.start) + merged = [] + while ordered: + s = ordered.pop(0) + while ordered and ordered[0].start <= s.stop: + r = ordered.pop(0) + s = slice(s.start, max(s.stop,r.stop)) + merged.append(s) + return tuple(merged) + +def update_slices(old_slices, new): + if isinstance(old_slices, slice): + old_slices = [old_slices] + + new_slices = [] + for old in old_slices: + if (old.start < new.start <= old.stop) and (new.stop >= old.stop): + # Leading overlap Old: |-----| + # New: |-----| + new_slices.append(slice(old.start, new.start)) + elif (old.start <= new.stop < old.stop) and (new.start <= old.start): + # Trailing overlap Old: |-----| + # New: |-----| + new_slices.append(slice(new.stop, old.stop)) + elif (new.start <= old.start) and (new.stop >= old.stop): + # Contains Old: |--| + # New: |------| + pass + elif (new.start > old.stop) or (new.stop < old.start): + # No overlap Old: |---| + # New: |---| + new_slices.append(old) + elif (old.start < new.start) and (new.stop < old.stop): + # Split Old: |-------| + # New: |--| + new_slices.append(slice(old.start,new.start)) + new_slices.append(slice(new.stop, old.stop)) + + if len(new_slices)==1: + new_slices = new_slices[0] + elif len(new_slices)==0: + new_slices = None + else: + new_slices = tuple(new_slices) + + return new_slices + +def listable(val): + """ + Can val be coerced to UnionList? + """ + return ((isinstance(val, collections.abc.Sequence) or + isinstance(val, collections.abc.Set)) + and not + (type(val) in (bytes, str)) ) + +def dictable(val): + """ + Can val be coerced to RecursiveDict? + """ + return isinstance(val, collections.abc.Mapping) + + class OrderedSet(collections.abc.Set): """ List-based set from Python documentation example. @@ -144,13 +219,59 @@ class UnionList(list): files, which may or may not contain repetitions for different uses, but also makes accumulation idempotent (running the union twice will not increase the size of the result, because no new values will be found). + + Attributes: + source: A dictionary mapping source objects to slice objects + according to which union (or original definition) they + come from. """ - def union(self, other): + def __init__(self, data, source=None, override=True): + self.source = {} + super().__init__(data) + + if hasattr(data, 'source') and not override: + self.source = data.source.copy() + if source is not None and None in self.source: + self.source[source] = self.source[None] + del self.source[None] + else: + self.source[source] = slice(0,len(self)) + + # if source is None and hasattr(data, 'source'): + # self.source = data.source.copy() + # else: + # self.source[source] = slice(0,len(self)) + + def __repr__(self): + return "UnionList(%s)" % super().__repr__() + + def __getitem__(self, query): + if isinstance(query, int) or isinstance(query, slice): + return super().__getitem__(query) + elif isinstance(query, tuple): + result = [] + for element in query: + result.extend(super().__getitem__(element)) + return result + elif query in self.source: + return self[self.source[query]] + else: + raise ValueError("No source %s, " % repr(query) + + "not a direct int, slice, or tuple of same.") + + def union(self, other, source=None): """ Returns a combination of the current list with unique new options added. Arguments: - other (list): The other list from which new options will be taken. + other (list): + The other list from which new options will be taken. + + source(hashable): + A provided object identifying the source of the new + information (can be any type -- will be stored in + the 'source' dictionary, along with the slice to + which it applies). Returns: A list with the original options and any unique new options from the @@ -160,9 +281,68 @@ class UnionList(list): in the original list will be unharmed. """ combined = UnionList(self) + combined.source = {} + + old_len = len(combined) + + # This is the actual union operation + j = old_len + new_elements = [] for element in other: if element not in self: - combined.append(element) + new_elements.append(element) + + combined.extend(new_elements) + + combined.source = self.source.copy() + + if source is None and hasattr(other, 'source'): + # Other is a UnionList and may have complex source information + for j, element in enumerate(new_elements): + for src in other.source: + if src not in self.source: + combined.source[src] = [] + elif isinstance(self.source[src], slice): + combined.source[src] = [self.source[src]] + elif isinstance(self.source[src], tuple): + combined.source[src] = list(self.source[src]) + if element in other[other.source[src]]: + combined.source[src].append(slice(old_len,old_len+j+1)) + + for src in combined.source: + combined.source[src] = merge_slices(combined.source[src]) + if len(combined.source[src]) == 0: + del combined.source[src] + elif len(combined.source[src]) == 1: + combined.source[src] = combined.source[src][0] + + else: + # Source-naive list, only explicitly provided source: + new_slice = slice(old_len, len(combined)) + + for src in self.source: + upd = update_slices(self.source[src], new_slice) + if upd: + combined.source[src] = upd + + if source in self.source: + # If a source is used twice, we have to merge it + # into the existing slices for that source + if isinstance(self.source[source], slice): + new_slices = (self.source[source], new_slice) + + elif isinstance(self.source[source], collections.Sequence): + new_slices = self.source[source] + (new_slice,) + + new_slices = tuple(merge_slices(new_slices)) + + if len(new_slices) == 1: + combined.source[source] = new_slices[0] + else: + combined.source[source] = tuple(new_slices) + else: + combined.source[source] = new_slice + return combined class RecursiveDict(collections.OrderedDict): @@ -173,14 +353,22 @@ class RecursiveDict(collections.OrderedDict): as UnionLists and applying the union operation to combine them (when the replacement value is also a list). """ + def __init__(self, data=None, source=None, active_source=None): + self.active_source = active_source + self.source = {} + super().__init__() + if isinstance(data, collections.abc.Mapping): + self.update(data, source=source) + def clear(self): """ Clear the dictionary to an empty state. """ for key in self: del self[key] + self.source = {} - def update(self, mapping): + def update(self, other, source=None): """ Load information from another dictionary / mapping object. @@ -206,27 +394,42 @@ class RecursiveDict(collections.OrderedDict): There are issues that can happen if a dictionary value is replaced with a list or a scalar in the update source. """ - for key in mapping: + if source is None and hasattr(other, 'source'): + def get_source(key): + return other.source[key] + else: + def get_source(key): + return source + + for key in other: if key in self: - if (isinstance(self[key], collections.abc.Mapping) and - isinstance(mapping[key], collections.abc.Mapping)): - # Subdictionary - newvalue = RecursiveDict(self[key]) - newvalue.update(RecursiveDict(mapping[key])) - self[key] = newvalue - - elif ((isinstance(self[key], collections.abc.MutableSequence) or - isinstance(self[key], collections.abc.Set)) and - (isinstance(mapping[key], collections.abc.MutableSequence) or - isinstance(mapping[key], collections.abc.Set))): - # Sublist - self[key] = UnionList(self[key]).union(UnionList(mapping[key])) + old = self[key] + new = other[key] + + if dictable(old) and dictable(new): + old.update(RecursiveDict(new), source=get_source(key)) + elif listable(old) and listable(new): + self.__setitem__(key, old.union(new), source=self.source[key]) + #self.__setitem__(key, old.union(UnionList(new)), + # source=self.source[key]) + + # self.__setitem__(key, old.union(UnionList(new), + # source=get_source(key)), + # source=self.source[key]) else: # scalar - self[key] = mapping[key] + self.__setitem__(key, other[key], source=get_source(key)) else: # new key - self[key] = mapping[key] + self.__setitem__(key, other[key], source=get_source(key)) + + def copy(self): + copy = RecursiveDict() + for key in self: + copy[key] = self[key] + for key in self.source: + copy.source[key] = self.source[key] + return copy def get_data(self): """ @@ -242,13 +445,16 @@ class RecursiveDict(collections.OrderedDict): new[key]=self[key] return new - def __setitem__(self, key, value): - if isinstance(value, collections.abc.Mapping): - super().__setitem__(key, RecursiveDict(value)) + def __setitem__(self, key, value, source=None): + if not source: + source = self.active_source - elif isinstance(value, collections.abc.MutableSequence): - super().__setitem__(key, UnionList(value)) + self.source[key] = source + if dictable(value): + super().__setitem__(key, RecursiveDict(value, source=source)) + elif listable(value): + super().__setitem__(key, UnionList(value, source=source, override=False)) else: super().__setitem__(key,value) @@ -268,11 +474,11 @@ class RecursiveDict(collections.OrderedDict): s = s + ')' return s - def from_yaml(self, yaml_string): + def from_yaml(self, yaml_string, source=None): """ Initialize dictionary from YAML contained in a string. """ - self.update(yaml.safe_load(yaml_string)) + self.update(yaml.safe_load(yaml_string), source=source) return self def from_yaml_file(self, path): @@ -280,7 +486,7 @@ class RecursiveDict(collections.OrderedDict): Initialize dictionary from a separate YAML file on disk. """ with open(path, 'rt') as yamlfile: - self.update(yaml.safe_load(yamlfile)) + self.update(yaml.safe_load(yamlfile), source=path) return self def to_yaml(self): @@ -413,7 +619,7 @@ def combine_yaml(yaml_paths): data = RecursiveDict() for path in yaml_paths: with open(path, 'rt') as yaml_file: - data.update(yaml.safe_load(yaml_file)) + data.update(yaml.safe_load(yaml_file), source=path) return data def get_project_data(filepath): diff --git a/abx/file_context.py b/abx/file_context.py index af5caa7..2388d2c 100644 --- a/abx/file_context.py +++ b/abx/file_context.py @@ -242,7 +242,7 @@ class FileContext(NameContext): 'scene': 0} # Defaults - self.provided_data = RecursiveDict(DEFAULT_YAML) + self.provided_data = RecursiveDict(DEFAULT_YAML, source='default') self.abx_fields = DEFAULT_YAML['abx'] def clear_notes(self): @@ -280,7 +280,7 @@ class FileContext(NameContext): # Data from YAML Files #self._collect_yaml_data() - self.provided_data = RecursiveDict(DEFAULT_YAML) + self.provided_data = RecursiveDict(DEFAULT_YAML, source='default') kitcat_root, kitcat_data, abx_data = accumulate.get_project_data(self.filepath) self.root = kitcat_root diff --git a/testdata/myproject/Episodes/A.001-Pilot/abx.yaml b/testdata/myproject/Episodes/A.001-Pilot/abx.yaml index 0882a0e..27d3830 100644 --- a/testdata/myproject/Episodes/A.001-Pilot/abx.yaml +++ b/testdata/myproject/Episodes/A.001-Pilot/abx.yaml @@ -13,3 +13,7 @@ testdict: b: 3 d: 3 e: 4 + + D: + - b: 2 + c: 3 \ No newline at end of file diff --git a/testdata/myproject/Library/Library.yaml b/testdata/myproject/Library/Library.yaml new file mode 100644 index 0000000..0905254 --- /dev/null +++ b/testdata/myproject/Library/Library.yaml @@ -0,0 +1,30 @@ +--- +project_unit: + code: library + name: Library + from_folders: 3 + +project_schema: + - rank: library + delimiter: '-' + type: string + maxlength: 32 + + - rank: dept + type: + graphics: Graphic Art (2D) + models: Models (3D) + sound: Sound Effects + music: Music and Cues + voice: Voice Lines + stock: Assembled Stock Footage Elements + + - rank: category + type: string + maxlength: 32 + + - rank: subcat + type: string + maxlength: 32 + + \ No newline at end of file diff --git a/testdata/myproject/Library/models/props/MyProp-By-me_here-prop.blend b/testdata/myproject/Library/models/props/MyProp-By-me_here-prop.blend new file mode 100644 index 0000000..55ca3ec Binary files /dev/null and b/testdata/myproject/Library/models/props/MyProp-By-me_here-prop.blend differ diff --git a/testdata/myproject/abx.yaml b/testdata/myproject/abx.yaml index ef4d8f6..9d41aee 100644 --- a/testdata/myproject/abx.yaml +++ b/testdata/myproject/abx.yaml @@ -18,3 +18,9 @@ testdict: c: 2 d: 3 + D: + - a: 1 + b: 2 + - a: 2 + b: 3 + \ No newline at end of file diff --git a/tests/__pycache__/test_accumulate.cpython-35.pyc b/tests/__pycache__/test_accumulate.cpython-35.pyc index 16ba3cc..66c8ca0 100644 Binary files a/tests/__pycache__/test_accumulate.cpython-35.pyc and b/tests/__pycache__/test_accumulate.cpython-35.pyc differ diff --git a/tests/test_accumulate.py b/tests/test_accumulate.py index dec16a5..5c7e962 100644 --- a/tests/test_accumulate.py +++ b/tests/test_accumulate.py @@ -33,7 +33,9 @@ It also provides convenient conversions to YAML or JSON serializations of the data for output to text files or text blocks. """ -import unittest, os +import unittest, os, collections + +import yaml # This is the most ridiculous work-around, but it seems to be necessary to # get Python 3 to import the modules for testing @@ -43,21 +45,60 @@ sys.path.append(os.path.normpath(os.path.join(__file__, '..', '..'))) from abx import accumulate -class AccumulationTests(unittest.TestCase): +class SourceTracking_UnionList(unittest.TestCase): """ - Test combination operations give correct results. + Test that source-tracking of UnionList elements is working. """ + + def test_merge_slices(self): + slices = [slice(0,3), slice(2,4), slice(3,5), slice(6,10), + slice(8,9), slice(10,12), slice(14,16)] + + self.assertEqual(accumulate.merge_slices(slices), + (slice(0,5), slice(6,12), slice(14,16))) + + def test_update_slices_two_overlaps(self): + old = [slice(0,3), slice(5,7)] + new = slice(2,6) + self.assertEqual(accumulate.update_slices( + old, new), + (slice(0,2), slice(6,7))) + + def test_update_slices_single_overlap_leading(self): + self.assertEqual(accumulate.update_slices( + slice(0,4), slice(3,7)), + slice(0,3)) + + def test_update_slices_single_overlap_trailing(self): + self.assertEqual(accumulate.update_slices( + slice(3,7), slice(0,4)), + slice(4,7)) + + def test_update_slices_contains(self): + self.assertEqual(accumulate.update_slices( + slice(2,4), slice(2,7)), + None) - def setUp(self): - # AFAIK, I don't need any setup - # I'm only putting this in in case I need to add something to it. - pass + def test_update_slices_split(self): + self.assertEqual(accumulate.update_slices( + slice(0,8), slice(3,5)), + (slice(0,3), slice(5,8))) + - def tearDown(self): - # AFAIK, I don't need any teardown - pass - - def test_union_list_union(self): + def test_union_list_def_no_source(self): + A = accumulate.UnionList([1,2,3]) + + self.assertEqual(len(A), 3) + self.assertEqual(A.source[None], slice(0,3)) + + def test_union_list_def_with_source(self): + A = accumulate.UnionList([1,2,3], source='MySource') + + self.assertEqual(len(A), 3) + self.assertEqual(A.source['MySource'], slice(0,3)) + self.assertDictEqual(A.source, {'MySource':slice(0,3)}) + + def test_union_list_union_no_source(self): # I start out with a UnionList containing a simple list: A = accumulate.UnionList([1,2,3]) @@ -68,6 +109,179 @@ class AccumulationTests(unittest.TestCase): # The new list should have the original elements plus the # the new elements that don't repeat existing elements: self.assertEqual(C, [1,2,3,4,5]) + + def test_union_list_union_no_source_to_source(self): + A = accumulate.UnionList([1,2,3]) + C = A.union([3,4,5], source='New') + self.assertDictEqual(C.source, + {None:slice(0,3), 'New':slice(3,5)}) + self.assertListEqual(C[C.source['New']], [4,5]) + + def test_union_list_union_source_to_source(self): + A = accumulate.UnionList([1,2,3], source='Old') + C = A.union([3,4,5], source='New') + self.assertDictEqual(C.source, + {'Old':slice(0,3), 'New':slice(3,5)}) + self.assertListEqual(C[C.source['New']], [4,5]) + + def test_union_list_union_w_union_list(self): + A = accumulate.UnionList([1,2,3], source='A') + B = accumulate.UnionList([3,4,5], source='B') + + C = A.union(B) + D = B.union(A) + + self.assertListEqual(C, [1,2,3,4,5]) + self.assertDictEqual(C.source, + {'A':slice(0,3), 'B':slice(3,5)}) + + self.assertListEqual(D, [3,4,5,1,2]) + self.assertDictEqual(D.source, + {'B':slice(0,3), 'A':slice(3,5)}) + + def test_union_list_union_source_to_source_twice(self): + A = accumulate.UnionList([1,2,3], source='Original') + B = A.union([3,4,5], source = 'Old') + C = B.union([6,4,8], source = 'New') + self.assertListEqual(C, [1,2,3,4,5,6,8]) + self.assertDictEqual(C.source, + {'Original':slice(0,3), 'Old':slice(3,5), 'New':slice(5,7)}) + self.assertListEqual(C[C.source['Old']], [4,5]) + + + def test_union_list_union_source_to_no_source(self): + A = accumulate.UnionList([1,2,3], source='Original') + B = A.union([3,4,5]) + C = B.union([6,4,8]) + self.assertListEqual(C, [1,2,3,4,5,6,8]) + self.assertDictEqual(C.source, + {'Original':slice(0,3), None:slice(3,7)}) + self.assertListEqual(C[C.source[None]], [4,5,6,8]) + + def test_union_list_union_noncontiguous_same_source(self): + A = accumulate.UnionList([1,2,3], source='Original') + B = A.union([3,4,5], source='A') + C = B.union([4,6,8], source='B') + D = C.union([6,12,18], source='A') + self.assertListEqual(D, [1,2,3,4,5,6,8,12,18]) + self.assertDictEqual(D.source, + {'Original':slice(0,3), + 'B':slice(5,7), + 'A':(slice(3,5),slice(7,9)) }) + self.assertListEqual(D[D.source['A']], [4,5,12,18]) + + def test_union_list_syntax_sweet(self): + A = accumulate.UnionList([1,2,3], source='Original') + B = A.union([3,4,5], source='A') + C = B.union([4,6,8], source='B') + D = C.union([6,12,18], source='A') + self.assertListEqual(D['Original'], [1,2,3]) + self.assertListEqual(D['B'], [6,8]) + self.assertListEqual(D['A'], [4,5,12,18]) + + + def test_unionlist_sourced_union_wo_source(self): + L = accumulate.UnionList([1,2,3], source='s1') + M = accumulate.UnionList([4], source='s2') + N = L.union(M) + self.assertDictEqual(N.source, + {'s1': slice(0, 3, None), 's2': slice(3, 4, None)}) + + def test_unionlist_source_union_w_source(self): + L = accumulate.UnionList([1,2,3], source='s1') + M = accumulate.UnionList([4], source='s2') + N = L.union(M, source='s2') + self.assertDictEqual(N.source, + {'s1': slice(0, 3, None), 's2': slice(3, 4, None)}) + + def test_unionlist_override_source(self): + L = accumulate.UnionList([1,2,3], source='s1') + M = accumulate.UnionList(L, source='s2') + self.assertDictEqual(M.source, + {'s2':slice(0,3)}) + + def test_unionlist_default_source(self): + L = accumulate.UnionList([1,2,3], source='s1') + M = accumulate.UnionList(L, source='s2', override=False) + N = accumulate.UnionList([1,2,3], source='s2', override=False) + self.assertDictEqual(M.source, + {'s1':slice(0,3)}) + self.assertDictEqual(N.source, + {'s2':slice(0,3)}) + + + + +class SourceTracking_RecursiveDict(unittest.TestCase): + """ + Test source-tracking of keys in RecursiveDict. + """ + def test_recursive_dict_def_no_source(self): + A = accumulate.RecursiveDict({'a':1, 'b':2, 'c':3}) + self.assertEqual(A.source['a'], None) + self.assertEqual(A.source['b'], None) + self.assertEqual(A.source['c'], None) + + def test_recursive_dict_def_source(self): + A = accumulate.RecursiveDict({'a':1, 'b':2, 'c':3}, source='Old') + self.assertEqual(A.source['a'], 'Old') + self.assertEqual(A.source['b'], 'Old') + self.assertEqual(A.source['c'], 'Old') + + def test_recursive_dict_update_sourced_w_sourced(self): + A = accumulate.RecursiveDict({'a':1, 'b':2, 'c':3}, source='Old') + A.update({'d':4, 'e':5}, source='New') + + self.assertEqual(A.source['a'], 'Old') + self.assertEqual(A.source['b'], 'Old') + self.assertEqual(A.source['c'], 'Old') + self.assertEqual(A.source['d'], 'New') + self.assertEqual(A.source['e'], 'New') + + def test_copy_recursive_dict(self): + A = accumulate.RecursiveDict({'a':1, 'b':2, 'c':3}, source='Old') + B = A.copy() + B.update({'d':4}, source='New') + + self.assertDictEqual(A, {'a':1, 'b':2, 'c':3}) + self.assertEqual(A.source['a'], 'Old') + self.assertDictEqual(B, {'a':1, 'b':2, 'c':3, 'd':4}) + self.assertEqual(B.source['d'], 'New') + + def test_recursive_dict_source_inheritance(self): + A = accumulate.RecursiveDict({'a':1}, source='A') + B = accumulate.RecursiveDict({'b':2}, source='B') + C = accumulate.RecursiveDict({'c':3}, source='C') + B.update(C) + A.update(B) + + self.assertEqual(A.source, {'a':'A', 'b':'B', 'c':'C'}) + + def test_unionlist_in_recursivedict_sourced_union(self): + Q = accumulate.RecursiveDict({'A':[1,2,3]}, source='s1') + R = accumulate.RecursiveDict({'A':[4]}, source='s2') + Q.update(R) + self.assertDictEqual(Q.source, + {'A':'s1'}) + self.assertDictEqual(Q['A'].source, + {'s1': slice(0, 3, None), 's2': slice(3, 4, None)}) + + +class AccumulationTests(unittest.TestCase): + """ + Test combination operations give correct results. + """ + TESTDATA = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..', 'testdata')) + + MYPROJ_ABX_YAML = os.path.join(TESTDATA, 'myproject/abx.yaml') + PILOT_ABX_YAML = os.path.join(TESTDATA, 'myproject/Episodes/A.001-Pilot/abx.yaml') + + def setUp(self): + with open(self.MYPROJ_ABX_YAML, 'rt') as myprog_f: + self.myproj_abx = yaml.safe_load(myprog_f) + with open(self.PILOT_ABX_YAML, 'rt') as pilot_f: + self.pilot_abx = yaml.safe_load(pilot_f) def test_subdictionary_updates_instead_of_being_replaced(self): # I start out with a dictionary that contains a subdictionary @@ -117,6 +331,88 @@ class AccumulationTests(unittest.TestCase): self.assertEqual(first_recursive['L'], [1,2,3,4,5,6]) + def test_recursive_dict_load_w_source(self): + A =accumulate.RecursiveDict(self.myproj_abx, source='myproj', active_source='live') + + print("A.get_data() = ", A.get_data()) + print("A.source = ", A.source) + + self.assertEqual(sorted(list(A.keys())), ['abx', 'testdict', 'testscalar']) + + self.assertEqual(sorted(list(A['testdict'].keys())), ['A', 'B', 'C', 'D']) + self.assertEqual(sorted(list(A['testdict']['A'])), ['item1', 'item2', 'item3']) + + self.assertDictEqual(A.source, + {'abx':'myproj', 'testdict':'myproj', 'testscalar':'myproj'}) + + + self.assertDictEqual(A['testdict'].source, { + 'A':'myproj', 'B':'myproj', 'C':'myproj', 'D':'myproj'}) + + self.assertDictEqual(A['testdict']['A'].source, {'myproj':slice(0,3)}) + self.assertDictEqual(A['testdict']['C'].source, {'a':'myproj', 'b':'myproj', + 'c':'myproj', 'd':'myproj'}) + self.assertDictEqual(A['testdict'].get_data(), + {'A':['item1', 'item2', 'item3'], + 'B':1, + 'C':{'a':1, 'b':1, 'c':2, 'd':3}, + 'D':[ + {'a':1, 'b':2}, + {'a':2, 'b':3} + ]}) + + + def test_recursive_dict_load_and_update_w_source(self): + A = accumulate.RecursiveDict(self.myproj_abx, source='myproj', active_source='live') + B = accumulate.RecursiveDict(self.pilot_abx, source='pilot') + + A.update(B) + + self.assertEqual(sorted(list(A.keys())), ['abx', 'testdict', 'testscalar']) + + self.assertEqual(sorted(list(A['testdict'].keys())), ['A', 'B', 'C', 'D']) + self.assertEqual(sorted(list(A['testdict']['A'])), ['item1', 'item2', 'item3', 'item4']) + + self.assertDictEqual(A.source, + {'abx':'myproj', 'testdict':'myproj', 'testscalar':'pilot'}) + + self.assertDictEqual(A['testdict'].source, { + 'A':'myproj', 'B':'pilot', 'C':'myproj', 'D':'myproj'}) + + self.assertDictEqual(A['testdict']['A'].source, + {'myproj':slice(0,3), 'pilot':slice(3,4)}) + + + + + def test_recursive_dict_update_w_source(self): + A = accumulate.RecursiveDict({'a':1}, source='A') + B = accumulate.RecursiveDict({'b':2}, source='B') + C = accumulate.RecursiveDict({'c':3}, source='C') + B.update(C) + A.update(B) + self.assertDictEqual(A.get_data(), {'a':1, 'b':2, 'c':3}) + self.assertDictEqual(A.source, {'a':'A', 'b':'B', 'c':'C'}) + + + def test_recursive_dict_update_w_source_override_source(self): + A = accumulate.RecursiveDict({'a':1}, source='A') + B = accumulate.RecursiveDict({'b':2}, source='B') + C = accumulate.RecursiveDict({'c':3}, source='C') + B.update(C, source='D') + self.assertDictEqual(B.source, {'b':'B', 'c':'D'}) + A.update(B, source='E') + self.assertDictEqual(A.source, {'a':'A', 'b':'E', 'c':'E'}) + + + def test_recursive_dict_and_union_list_correct_instances(self): + A = accumulate.UnionList([1,2,3]) + B = accumulate.RecursiveDict({'A':'a', 'B':'b'}) + + self.assertTrue(isinstance(A, collections.abc.MutableSequence)) + self.assertTrue(isinstance(B, collections.abc.Mapping)) + + class CollectYaml(unittest.TestCase): TESTDATA = os.path.abspath( os.path.join(os.path.dirname(__file__), '..', 'testdata')) @@ -213,11 +509,19 @@ class CollectYaml(unittest.TestCase): testdata = accumulate.combine_yaml(abx_files) + print("\ntestdata.get_data() = ", testdata.get_data()) + print("testdata['testdict'].source = ", testdata['testdict'].source) + self.assertEqual(testdata['testscalar'], 'loweryaml') self.assertEqual( list(testdata['testdict']['A']), ['item1', 'item2', 'item3', 'item4']) + self.assertEqual( + testdata.source['testdict'], abx_files[0]) + self.assertEqual( + testdata['testdict'].source['A'], abx_files[0]) + def test_collecting_yaml_from_empty_dir(self): files = accumulate.collect_yaml_files( os.path.join(self.TESTDATA, 'empty/'), diff --git a/tests/test_myproject_library.py b/tests/test_myproject_library.py new file mode 100644 index 0000000..49e7910 --- /dev/null +++ b/tests/test_myproject_library.py @@ -0,0 +1,41 @@ +# tests_myproject_library.py +""" +Tests that use the 'myproject' test article and its 'library' to test features of loading +project files. +""" + +import unittest, os +import yaml + +import sys +print("__file__ = ", __file__) +sys.path.append(os.path.normpath(os.path.join(__file__, '..', '..'))) + +from abx import file_context + + +class TestLoadingSchemaHierarchies(unittest.TestCase): + TESTDATA = os.path.abspath( + os.path.join(os.path.dirname(__file__), '..', 'testdata')) + + TESTPROJECTYAML = os.path.join(TESTDATA, 'myproject', 'myproject.yaml') + + TESTPATH = os.path.join(TESTDATA, 'myproject/Episodes/' + + 'A.001-Pilot/Seq/LP-LastPoint/' + + 'A.001-LP-1-BeginningOfEnd-anim.txt') + + TESTLIBPATH = os.path.join(TESTDATA, 'myproject/Library/' + + 'models/props/MyProp-By-me_here-prop.blend') + + def test_load_std_schema_from_shotfile(self): + # Probably duplicates test_file_context + fc = file_context.FileContext(self.TESTPATH) + print("\n") + print( fc.schemas) + self.assertEqual(fc.schemas, + None) + + + + + \ No newline at end of file diff --git a/tests/test_name_schema.py b/tests/test_name_schema.py index a67b5b7..7708181 100644 --- a/tests/test_name_schema.py +++ b/tests/test_name_schema.py @@ -16,6 +16,8 @@ sys.path.append(os.path.normpath(os.path.join(__file__, '..', '..'))) from abx import name_schema +from abx import ranks as ranks_mod + class FileContext_NameSchema_Interface_Tests(unittest.TestCase): """ Test the interfaces presented by FieldSchema. @@ -96,5 +98,26 @@ class FileContext_NameSchema_Interface_Tests(unittest.TestCase): self.assertEqual(schema_chain[5].rank, 'camera') self.assertEqual(schema_chain[5].codetype[1], ('c2', 'c2', 'c2')) + def test_FieldSchema_Branch_load_from_project_yaml(self): + with open(self.TESTPROJECTYAML, 'rt') as yaml_file: + data = yaml.safe_load(yaml_file) + schema_dicts = data['project_schema'] + + ranks = [s['rank'] for s in schema_dicts] + + branch = ranks_mod.Branch( + ranks_mod.Trunk, + data['project_unit'][-1]['code'], + 1, + ranks) + + print("\nbranch = ", branch) + + print("\nbranch.rank('project') = ", repr(branch.rank('project'))) + + self.assertTrue(False) + + + \ No newline at end of file