Source tracking in accumulate module.

This commit is contained in:
filmfreedom-org 2021-06-14 14:48:26 -05:00
parent 43a7adb379
commit 9db1245ab3
10 changed files with 657 additions and 43 deletions

View File

@ -102,6 +102,81 @@ import yaml
wordre = re.compile(r'([A-Z]+[a-z]*|[a-z]+|[0-9]+)') wordre = re.compile(r'([A-Z]+[a-z]*|[a-z]+|[0-9]+)')
def merge_slices(slices):
"""
Given a list of slice objects, merge into minimum list of new slices to cover same elements.
The idea is to catch contiguous or overlapping slices and reduce them to a single slice.
Arguments:
slices (list(slice)): List of slices to be merged.
"""
if isinstance(slices, slice):
slices = [slices]
slices = list(slices)
ordered = sorted(slices, key = lambda a: a.start)
merged = []
while ordered:
s = ordered.pop(0)
while ordered and ordered[0].start <= s.stop:
r = ordered.pop(0)
s = slice(s.start, max(s.stop,r.stop))
merged.append(s)
return tuple(merged)
def update_slices(old_slices, new):
if isinstance(old_slices, slice):
old_slices = [old_slices]
new_slices = []
for old in old_slices:
if (old.start < new.start <= old.stop) and (new.stop >= old.stop):
# Leading overlap Old: |-----|
# New: |-----|
new_slices.append(slice(old.start, new.start))
elif (old.start <= new.stop < old.stop) and (new.start <= old.start):
# Trailing overlap Old: |-----|
# New: |-----|
new_slices.append(slice(new.stop, old.stop))
elif (new.start <= old.start) and (new.stop >= old.stop):
# Contains Old: |--|
# New: |------|
pass
elif (new.start > old.stop) or (new.stop < old.start):
# No overlap Old: |---|
# New: |---|
new_slices.append(old)
elif (old.start < new.start) and (new.stop < old.stop):
# Split Old: |-------|
# New: |--|
new_slices.append(slice(old.start,new.start))
new_slices.append(slice(new.stop, old.stop))
if len(new_slices)==1:
new_slices = new_slices[0]
elif len(new_slices)==0:
new_slices = None
else:
new_slices = tuple(new_slices)
return new_slices
def listable(val):
"""
Can val be coerced to UnionList?
"""
return ((isinstance(val, collections.abc.Sequence) or
isinstance(val, collections.abc.Set))
and not
(type(val) in (bytes, str)) )
def dictable(val):
"""
Can val be coerced to RecursiveDict?
"""
return isinstance(val, collections.abc.Mapping)
class OrderedSet(collections.abc.Set): class OrderedSet(collections.abc.Set):
""" """
List-based set from Python documentation example. List-based set from Python documentation example.
@ -144,13 +219,59 @@ class UnionList(list):
files, which may or may not contain repetitions for different uses, but files, which may or may not contain repetitions for different uses, but
also makes accumulation idempotent (running the union twice will not also makes accumulation idempotent (running the union twice will not
increase the size of the result, because no new values will be found). increase the size of the result, because no new values will be found).
Attributes:
source: A dictionary mapping source objects to slice objects
according to which union (or original definition) they
come from.
""" """
def union(self, other): def __init__(self, data, source=None, override=True):
self.source = {}
super().__init__(data)
if hasattr(data, 'source') and not override:
self.source = data.source.copy()
if source is not None and None in self.source:
self.source[source] = self.source[None]
del self.source[None]
else:
self.source[source] = slice(0,len(self))
# if source is None and hasattr(data, 'source'):
# self.source = data.source.copy()
# else:
# self.source[source] = slice(0,len(self))
def __repr__(self):
return "UnionList(%s)" % super().__repr__()
def __getitem__(self, query):
if isinstance(query, int) or isinstance(query, slice):
return super().__getitem__(query)
elif isinstance(query, tuple):
result = []
for element in query:
result.extend(super().__getitem__(element))
return result
elif query in self.source:
return self[self.source[query]]
else:
raise ValueError("No source %s, " % repr(query) +
"not a direct int, slice, or tuple of same.")
def union(self, other, source=None):
""" """
Returns a combination of the current list with unique new options added. Returns a combination of the current list with unique new options added.
Arguments: Arguments:
other (list): The other list from which new options will be taken. other (list):
The other list from which new options will be taken.
source(hashable):
A provided object identifying the source of the new
information (can be any type -- will be stored in
the 'source' dictionary, along with the slice to
which it applies).
Returns: Returns:
A list with the original options and any unique new options from the A list with the original options and any unique new options from the
@ -160,9 +281,68 @@ class UnionList(list):
in the original list will be unharmed. in the original list will be unharmed.
""" """
combined = UnionList(self) combined = UnionList(self)
combined.source = {}
old_len = len(combined)
# This is the actual union operation
j = old_len
new_elements = []
for element in other: for element in other:
if element not in self: if element not in self:
combined.append(element) new_elements.append(element)
combined.extend(new_elements)
combined.source = self.source.copy()
if source is None and hasattr(other, 'source'):
# Other is a UnionList and may have complex source information
for j, element in enumerate(new_elements):
for src in other.source:
if src not in self.source:
combined.source[src] = []
elif isinstance(self.source[src], slice):
combined.source[src] = [self.source[src]]
elif isinstance(self.source[src], tuple):
combined.source[src] = list(self.source[src])
if element in other[other.source[src]]:
combined.source[src].append(slice(old_len,old_len+j+1))
for src in combined.source:
combined.source[src] = merge_slices(combined.source[src])
if len(combined.source[src]) == 0:
del combined.source[src]
elif len(combined.source[src]) == 1:
combined.source[src] = combined.source[src][0]
else:
# Source-naive list, only explicitly provided source:
new_slice = slice(old_len, len(combined))
for src in self.source:
upd = update_slices(self.source[src], new_slice)
if upd:
combined.source[src] = upd
if source in self.source:
# If a source is used twice, we have to merge it
# into the existing slices for that source
if isinstance(self.source[source], slice):
new_slices = (self.source[source], new_slice)
elif isinstance(self.source[source], collections.Sequence):
new_slices = self.source[source] + (new_slice,)
new_slices = tuple(merge_slices(new_slices))
if len(new_slices) == 1:
combined.source[source] = new_slices[0]
else:
combined.source[source] = tuple(new_slices)
else:
combined.source[source] = new_slice
return combined return combined
class RecursiveDict(collections.OrderedDict): class RecursiveDict(collections.OrderedDict):
@ -173,14 +353,22 @@ class RecursiveDict(collections.OrderedDict):
as UnionLists and applying the union operation to combine them as UnionLists and applying the union operation to combine them
(when the replacement value is also a list). (when the replacement value is also a list).
""" """
def __init__(self, data=None, source=None, active_source=None):
self.active_source = active_source
self.source = {}
super().__init__()
if isinstance(data, collections.abc.Mapping):
self.update(data, source=source)
def clear(self): def clear(self):
""" """
Clear the dictionary to an empty state. Clear the dictionary to an empty state.
""" """
for key in self: for key in self:
del self[key] del self[key]
self.source = {}
def update(self, mapping): def update(self, other, source=None):
""" """
Load information from another dictionary / mapping object. Load information from another dictionary / mapping object.
@ -206,27 +394,42 @@ class RecursiveDict(collections.OrderedDict):
There are issues that can happen if a dictionary value is replaced There are issues that can happen if a dictionary value is replaced
with a list or a scalar in the update source. with a list or a scalar in the update source.
""" """
for key in mapping: if source is None and hasattr(other, 'source'):
def get_source(key):
return other.source[key]
else:
def get_source(key):
return source
for key in other:
if key in self: if key in self:
if (isinstance(self[key], collections.abc.Mapping) and old = self[key]
isinstance(mapping[key], collections.abc.Mapping)): new = other[key]
# Subdictionary
newvalue = RecursiveDict(self[key])
newvalue.update(RecursiveDict(mapping[key]))
self[key] = newvalue
elif ((isinstance(self[key], collections.abc.MutableSequence) or if dictable(old) and dictable(new):
isinstance(self[key], collections.abc.Set)) and old.update(RecursiveDict(new), source=get_source(key))
(isinstance(mapping[key], collections.abc.MutableSequence) or
isinstance(mapping[key], collections.abc.Set))):
# Sublist
self[key] = UnionList(self[key]).union(UnionList(mapping[key]))
elif listable(old) and listable(new):
self.__setitem__(key, old.union(new), source=self.source[key])
#self.__setitem__(key, old.union(UnionList(new)),
# source=self.source[key])
# self.__setitem__(key, old.union(UnionList(new),
# source=get_source(key)),
# source=self.source[key])
else: # scalar else: # scalar
self[key] = mapping[key] self.__setitem__(key, other[key], source=get_source(key))
else: # new key else: # new key
self[key] = mapping[key] self.__setitem__(key, other[key], source=get_source(key))
def copy(self):
copy = RecursiveDict()
for key in self:
copy[key] = self[key]
for key in self.source:
copy.source[key] = self.source[key]
return copy
def get_data(self): def get_data(self):
""" """
@ -242,13 +445,16 @@ class RecursiveDict(collections.OrderedDict):
new[key]=self[key] new[key]=self[key]
return new return new
def __setitem__(self, key, value): def __setitem__(self, key, value, source=None):
if isinstance(value, collections.abc.Mapping): if not source:
super().__setitem__(key, RecursiveDict(value)) source = self.active_source
elif isinstance(value, collections.abc.MutableSequence): self.source[key] = source
super().__setitem__(key, UnionList(value))
if dictable(value):
super().__setitem__(key, RecursiveDict(value, source=source))
elif listable(value):
super().__setitem__(key, UnionList(value, source=source, override=False))
else: else:
super().__setitem__(key,value) super().__setitem__(key,value)
@ -268,11 +474,11 @@ class RecursiveDict(collections.OrderedDict):
s = s + ')' s = s + ')'
return s return s
def from_yaml(self, yaml_string): def from_yaml(self, yaml_string, source=None):
""" """
Initialize dictionary from YAML contained in a string. Initialize dictionary from YAML contained in a string.
""" """
self.update(yaml.safe_load(yaml_string)) self.update(yaml.safe_load(yaml_string), source=source)
return self return self
def from_yaml_file(self, path): def from_yaml_file(self, path):
@ -280,7 +486,7 @@ class RecursiveDict(collections.OrderedDict):
Initialize dictionary from a separate YAML file on disk. Initialize dictionary from a separate YAML file on disk.
""" """
with open(path, 'rt') as yamlfile: with open(path, 'rt') as yamlfile:
self.update(yaml.safe_load(yamlfile)) self.update(yaml.safe_load(yamlfile), source=path)
return self return self
def to_yaml(self): def to_yaml(self):
@ -413,7 +619,7 @@ def combine_yaml(yaml_paths):
data = RecursiveDict() data = RecursiveDict()
for path in yaml_paths: for path in yaml_paths:
with open(path, 'rt') as yaml_file: with open(path, 'rt') as yaml_file:
data.update(yaml.safe_load(yaml_file)) data.update(yaml.safe_load(yaml_file), source=path)
return data return data
def get_project_data(filepath): def get_project_data(filepath):

View File

@ -242,7 +242,7 @@ class FileContext(NameContext):
'scene': 0} 'scene': 0}
# Defaults # Defaults
self.provided_data = RecursiveDict(DEFAULT_YAML) self.provided_data = RecursiveDict(DEFAULT_YAML, source='default')
self.abx_fields = DEFAULT_YAML['abx'] self.abx_fields = DEFAULT_YAML['abx']
def clear_notes(self): def clear_notes(self):
@ -280,7 +280,7 @@ class FileContext(NameContext):
# Data from YAML Files # Data from YAML Files
#self._collect_yaml_data() #self._collect_yaml_data()
self.provided_data = RecursiveDict(DEFAULT_YAML) self.provided_data = RecursiveDict(DEFAULT_YAML, source='default')
kitcat_root, kitcat_data, abx_data = accumulate.get_project_data(self.filepath) kitcat_root, kitcat_data, abx_data = accumulate.get_project_data(self.filepath)
self.root = kitcat_root self.root = kitcat_root

View File

@ -13,3 +13,7 @@ testdict:
b: 3 b: 3
d: 3 d: 3
e: 4 e: 4
D:
- b: 2
c: 3

30
testdata/myproject/Library/Library.yaml vendored Normal file
View File

@ -0,0 +1,30 @@
---
project_unit:
code: library
name: Library
from_folders: 3
project_schema:
- rank: library
delimiter: '-'
type: string
maxlength: 32
- rank: dept
type:
graphics: Graphic Art (2D)
models: Models (3D)
sound: Sound Effects
music: Music and Cues
voice: Voice Lines
stock: Assembled Stock Footage Elements
- rank: category
type: string
maxlength: 32
- rank: subcat
type: string
maxlength: 32

View File

@ -18,3 +18,9 @@ testdict:
c: 2 c: 2
d: 3 d: 3
D:
- a: 1
b: 2
- a: 2
b: 3

View File

@ -33,7 +33,9 @@ It also provides convenient conversions to YAML or JSON serializations of
the data for output to text files or text blocks. the data for output to text files or text blocks.
""" """
import unittest, os import unittest, os, collections
import yaml
# This is the most ridiculous work-around, but it seems to be necessary to # This is the most ridiculous work-around, but it seems to be necessary to
# get Python 3 to import the modules for testing # get Python 3 to import the modules for testing
@ -43,21 +45,60 @@ sys.path.append(os.path.normpath(os.path.join(__file__, '..', '..')))
from abx import accumulate from abx import accumulate
class AccumulationTests(unittest.TestCase): class SourceTracking_UnionList(unittest.TestCase):
""" """
Test combination operations give correct results. Test that source-tracking of UnionList elements is working.
""" """
def setUp(self): def test_merge_slices(self):
# AFAIK, I don't need any setup slices = [slice(0,3), slice(2,4), slice(3,5), slice(6,10),
# I'm only putting this in in case I need to add something to it. slice(8,9), slice(10,12), slice(14,16)]
pass
def tearDown(self): self.assertEqual(accumulate.merge_slices(slices),
# AFAIK, I don't need any teardown (slice(0,5), slice(6,12), slice(14,16)))
pass
def test_union_list_union(self): def test_update_slices_two_overlaps(self):
old = [slice(0,3), slice(5,7)]
new = slice(2,6)
self.assertEqual(accumulate.update_slices(
old, new),
(slice(0,2), slice(6,7)))
def test_update_slices_single_overlap_leading(self):
self.assertEqual(accumulate.update_slices(
slice(0,4), slice(3,7)),
slice(0,3))
def test_update_slices_single_overlap_trailing(self):
self.assertEqual(accumulate.update_slices(
slice(3,7), slice(0,4)),
slice(4,7))
def test_update_slices_contains(self):
self.assertEqual(accumulate.update_slices(
slice(2,4), slice(2,7)),
None)
def test_update_slices_split(self):
self.assertEqual(accumulate.update_slices(
slice(0,8), slice(3,5)),
(slice(0,3), slice(5,8)))
def test_union_list_def_no_source(self):
A = accumulate.UnionList([1,2,3])
self.assertEqual(len(A), 3)
self.assertEqual(A.source[None], slice(0,3))
def test_union_list_def_with_source(self):
A = accumulate.UnionList([1,2,3], source='MySource')
self.assertEqual(len(A), 3)
self.assertEqual(A.source['MySource'], slice(0,3))
self.assertDictEqual(A.source, {'MySource':slice(0,3)})
def test_union_list_union_no_source(self):
# I start out with a UnionList containing a simple list: # I start out with a UnionList containing a simple list:
A = accumulate.UnionList([1,2,3]) A = accumulate.UnionList([1,2,3])
@ -69,6 +110,179 @@ class AccumulationTests(unittest.TestCase):
# the new elements that don't repeat existing elements: # the new elements that don't repeat existing elements:
self.assertEqual(C, [1,2,3,4,5]) self.assertEqual(C, [1,2,3,4,5])
def test_union_list_union_no_source_to_source(self):
A = accumulate.UnionList([1,2,3])
C = A.union([3,4,5], source='New')
self.assertDictEqual(C.source,
{None:slice(0,3), 'New':slice(3,5)})
self.assertListEqual(C[C.source['New']], [4,5])
def test_union_list_union_source_to_source(self):
A = accumulate.UnionList([1,2,3], source='Old')
C = A.union([3,4,5], source='New')
self.assertDictEqual(C.source,
{'Old':slice(0,3), 'New':slice(3,5)})
self.assertListEqual(C[C.source['New']], [4,5])
def test_union_list_union_w_union_list(self):
A = accumulate.UnionList([1,2,3], source='A')
B = accumulate.UnionList([3,4,5], source='B')
C = A.union(B)
D = B.union(A)
self.assertListEqual(C, [1,2,3,4,5])
self.assertDictEqual(C.source,
{'A':slice(0,3), 'B':slice(3,5)})
self.assertListEqual(D, [3,4,5,1,2])
self.assertDictEqual(D.source,
{'B':slice(0,3), 'A':slice(3,5)})
def test_union_list_union_source_to_source_twice(self):
A = accumulate.UnionList([1,2,3], source='Original')
B = A.union([3,4,5], source = 'Old')
C = B.union([6,4,8], source = 'New')
self.assertListEqual(C, [1,2,3,4,5,6,8])
self.assertDictEqual(C.source,
{'Original':slice(0,3), 'Old':slice(3,5), 'New':slice(5,7)})
self.assertListEqual(C[C.source['Old']], [4,5])
def test_union_list_union_source_to_no_source(self):
A = accumulate.UnionList([1,2,3], source='Original')
B = A.union([3,4,5])
C = B.union([6,4,8])
self.assertListEqual(C, [1,2,3,4,5,6,8])
self.assertDictEqual(C.source,
{'Original':slice(0,3), None:slice(3,7)})
self.assertListEqual(C[C.source[None]], [4,5,6,8])
def test_union_list_union_noncontiguous_same_source(self):
A = accumulate.UnionList([1,2,3], source='Original')
B = A.union([3,4,5], source='A')
C = B.union([4,6,8], source='B')
D = C.union([6,12,18], source='A')
self.assertListEqual(D, [1,2,3,4,5,6,8,12,18])
self.assertDictEqual(D.source,
{'Original':slice(0,3),
'B':slice(5,7),
'A':(slice(3,5),slice(7,9)) })
self.assertListEqual(D[D.source['A']], [4,5,12,18])
def test_union_list_syntax_sweet(self):
A = accumulate.UnionList([1,2,3], source='Original')
B = A.union([3,4,5], source='A')
C = B.union([4,6,8], source='B')
D = C.union([6,12,18], source='A')
self.assertListEqual(D['Original'], [1,2,3])
self.assertListEqual(D['B'], [6,8])
self.assertListEqual(D['A'], [4,5,12,18])
def test_unionlist_sourced_union_wo_source(self):
L = accumulate.UnionList([1,2,3], source='s1')
M = accumulate.UnionList([4], source='s2')
N = L.union(M)
self.assertDictEqual(N.source,
{'s1': slice(0, 3, None), 's2': slice(3, 4, None)})
def test_unionlist_source_union_w_source(self):
L = accumulate.UnionList([1,2,3], source='s1')
M = accumulate.UnionList([4], source='s2')
N = L.union(M, source='s2')
self.assertDictEqual(N.source,
{'s1': slice(0, 3, None), 's2': slice(3, 4, None)})
def test_unionlist_override_source(self):
L = accumulate.UnionList([1,2,3], source='s1')
M = accumulate.UnionList(L, source='s2')
self.assertDictEqual(M.source,
{'s2':slice(0,3)})
def test_unionlist_default_source(self):
L = accumulate.UnionList([1,2,3], source='s1')
M = accumulate.UnionList(L, source='s2', override=False)
N = accumulate.UnionList([1,2,3], source='s2', override=False)
self.assertDictEqual(M.source,
{'s1':slice(0,3)})
self.assertDictEqual(N.source,
{'s2':slice(0,3)})
class SourceTracking_RecursiveDict(unittest.TestCase):
"""
Test source-tracking of keys in RecursiveDict.
"""
def test_recursive_dict_def_no_source(self):
A = accumulate.RecursiveDict({'a':1, 'b':2, 'c':3})
self.assertEqual(A.source['a'], None)
self.assertEqual(A.source['b'], None)
self.assertEqual(A.source['c'], None)
def test_recursive_dict_def_source(self):
A = accumulate.RecursiveDict({'a':1, 'b':2, 'c':3}, source='Old')
self.assertEqual(A.source['a'], 'Old')
self.assertEqual(A.source['b'], 'Old')
self.assertEqual(A.source['c'], 'Old')
def test_recursive_dict_update_sourced_w_sourced(self):
A = accumulate.RecursiveDict({'a':1, 'b':2, 'c':3}, source='Old')
A.update({'d':4, 'e':5}, source='New')
self.assertEqual(A.source['a'], 'Old')
self.assertEqual(A.source['b'], 'Old')
self.assertEqual(A.source['c'], 'Old')
self.assertEqual(A.source['d'], 'New')
self.assertEqual(A.source['e'], 'New')
def test_copy_recursive_dict(self):
A = accumulate.RecursiveDict({'a':1, 'b':2, 'c':3}, source='Old')
B = A.copy()
B.update({'d':4}, source='New')
self.assertDictEqual(A, {'a':1, 'b':2, 'c':3})
self.assertEqual(A.source['a'], 'Old')
self.assertDictEqual(B, {'a':1, 'b':2, 'c':3, 'd':4})
self.assertEqual(B.source['d'], 'New')
def test_recursive_dict_source_inheritance(self):
A = accumulate.RecursiveDict({'a':1}, source='A')
B = accumulate.RecursiveDict({'b':2}, source='B')
C = accumulate.RecursiveDict({'c':3}, source='C')
B.update(C)
A.update(B)
self.assertEqual(A.source, {'a':'A', 'b':'B', 'c':'C'})
def test_unionlist_in_recursivedict_sourced_union(self):
Q = accumulate.RecursiveDict({'A':[1,2,3]}, source='s1')
R = accumulate.RecursiveDict({'A':[4]}, source='s2')
Q.update(R)
self.assertDictEqual(Q.source,
{'A':'s1'})
self.assertDictEqual(Q['A'].source,
{'s1': slice(0, 3, None), 's2': slice(3, 4, None)})
class AccumulationTests(unittest.TestCase):
"""
Test combination operations give correct results.
"""
TESTDATA = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', 'testdata'))
MYPROJ_ABX_YAML = os.path.join(TESTDATA, 'myproject/abx.yaml')
PILOT_ABX_YAML = os.path.join(TESTDATA, 'myproject/Episodes/A.001-Pilot/abx.yaml')
def setUp(self):
with open(self.MYPROJ_ABX_YAML, 'rt') as myprog_f:
self.myproj_abx = yaml.safe_load(myprog_f)
with open(self.PILOT_ABX_YAML, 'rt') as pilot_f:
self.pilot_abx = yaml.safe_load(pilot_f)
def test_subdictionary_updates_instead_of_being_replaced(self): def test_subdictionary_updates_instead_of_being_replaced(self):
# I start out with a dictionary that contains a subdictionary # I start out with a dictionary that contains a subdictionary
# with some data, under key 'A': # with some data, under key 'A':
@ -117,6 +331,88 @@ class AccumulationTests(unittest.TestCase):
self.assertEqual(first_recursive['L'], [1,2,3,4,5,6]) self.assertEqual(first_recursive['L'], [1,2,3,4,5,6])
def test_recursive_dict_load_w_source(self):
A =accumulate.RecursiveDict(self.myproj_abx, source='myproj', active_source='live')
print("A.get_data() = ", A.get_data())
print("A.source = ", A.source)
self.assertEqual(sorted(list(A.keys())), ['abx', 'testdict', 'testscalar'])
self.assertEqual(sorted(list(A['testdict'].keys())), ['A', 'B', 'C', 'D'])
self.assertEqual(sorted(list(A['testdict']['A'])), ['item1', 'item2', 'item3'])
self.assertDictEqual(A.source,
{'abx':'myproj', 'testdict':'myproj', 'testscalar':'myproj'})
self.assertDictEqual(A['testdict'].source, {
'A':'myproj', 'B':'myproj', 'C':'myproj', 'D':'myproj'})
self.assertDictEqual(A['testdict']['A'].source, {'myproj':slice(0,3)})
self.assertDictEqual(A['testdict']['C'].source, {'a':'myproj', 'b':'myproj',
'c':'myproj', 'd':'myproj'})
self.assertDictEqual(A['testdict'].get_data(),
{'A':['item1', 'item2', 'item3'],
'B':1,
'C':{'a':1, 'b':1, 'c':2, 'd':3},
'D':[
{'a':1, 'b':2},
{'a':2, 'b':3}
]})
def test_recursive_dict_load_and_update_w_source(self):
A = accumulate.RecursiveDict(self.myproj_abx, source='myproj', active_source='live')
B = accumulate.RecursiveDict(self.pilot_abx, source='pilot')
A.update(B)
self.assertEqual(sorted(list(A.keys())), ['abx', 'testdict', 'testscalar'])
self.assertEqual(sorted(list(A['testdict'].keys())), ['A', 'B', 'C', 'D'])
self.assertEqual(sorted(list(A['testdict']['A'])), ['item1', 'item2', 'item3', 'item4'])
self.assertDictEqual(A.source,
{'abx':'myproj', 'testdict':'myproj', 'testscalar':'pilot'})
self.assertDictEqual(A['testdict'].source, {
'A':'myproj', 'B':'pilot', 'C':'myproj', 'D':'myproj'})
self.assertDictEqual(A['testdict']['A'].source,
{'myproj':slice(0,3), 'pilot':slice(3,4)})
def test_recursive_dict_update_w_source(self):
A = accumulate.RecursiveDict({'a':1}, source='A')
B = accumulate.RecursiveDict({'b':2}, source='B')
C = accumulate.RecursiveDict({'c':3}, source='C')
B.update(C)
A.update(B)
self.assertDictEqual(A.get_data(), {'a':1, 'b':2, 'c':3})
self.assertDictEqual(A.source, {'a':'A', 'b':'B', 'c':'C'})
def test_recursive_dict_update_w_source_override_source(self):
A = accumulate.RecursiveDict({'a':1}, source='A')
B = accumulate.RecursiveDict({'b':2}, source='B')
C = accumulate.RecursiveDict({'c':3}, source='C')
B.update(C, source='D')
self.assertDictEqual(B.source, {'b':'B', 'c':'D'})
A.update(B, source='E')
self.assertDictEqual(A.source, {'a':'A', 'b':'E', 'c':'E'})
def test_recursive_dict_and_union_list_correct_instances(self):
A = accumulate.UnionList([1,2,3])
B = accumulate.RecursiveDict({'A':'a', 'B':'b'})
self.assertTrue(isinstance(A, collections.abc.MutableSequence))
self.assertTrue(isinstance(B, collections.abc.Mapping))
class CollectYaml(unittest.TestCase): class CollectYaml(unittest.TestCase):
TESTDATA = os.path.abspath( TESTDATA = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', 'testdata')) os.path.join(os.path.dirname(__file__), '..', 'testdata'))
@ -213,11 +509,19 @@ class CollectYaml(unittest.TestCase):
testdata = accumulate.combine_yaml(abx_files) testdata = accumulate.combine_yaml(abx_files)
print("\ntestdata.get_data() = ", testdata.get_data())
print("testdata['testdict'].source = ", testdata['testdict'].source)
self.assertEqual(testdata['testscalar'], 'loweryaml') self.assertEqual(testdata['testscalar'], 'loweryaml')
self.assertEqual( self.assertEqual(
list(testdata['testdict']['A']), list(testdata['testdict']['A']),
['item1', 'item2', 'item3', 'item4']) ['item1', 'item2', 'item3', 'item4'])
self.assertEqual(
testdata.source['testdict'], abx_files[0])
self.assertEqual(
testdata['testdict'].source['A'], abx_files[0])
def test_collecting_yaml_from_empty_dir(self): def test_collecting_yaml_from_empty_dir(self):
files = accumulate.collect_yaml_files( files = accumulate.collect_yaml_files(
os.path.join(self.TESTDATA, 'empty/'), os.path.join(self.TESTDATA, 'empty/'),

View File

@ -0,0 +1,41 @@
# tests_myproject_library.py
"""
Tests that use the 'myproject' test article and its 'library' to test features of loading
project files.
"""
import unittest, os
import yaml
import sys
print("__file__ = ", __file__)
sys.path.append(os.path.normpath(os.path.join(__file__, '..', '..')))
from abx import file_context
class TestLoadingSchemaHierarchies(unittest.TestCase):
TESTDATA = os.path.abspath(
os.path.join(os.path.dirname(__file__), '..', 'testdata'))
TESTPROJECTYAML = os.path.join(TESTDATA, 'myproject', 'myproject.yaml')
TESTPATH = os.path.join(TESTDATA, 'myproject/Episodes/' +
'A.001-Pilot/Seq/LP-LastPoint/' +
'A.001-LP-1-BeginningOfEnd-anim.txt')
TESTLIBPATH = os.path.join(TESTDATA, 'myproject/Library/' +
'models/props/MyProp-By-me_here-prop.blend')
def test_load_std_schema_from_shotfile(self):
# Probably duplicates test_file_context
fc = file_context.FileContext(self.TESTPATH)
print("\n")
print( fc.schemas)
self.assertEqual(fc.schemas,
None)

View File

@ -16,6 +16,8 @@ sys.path.append(os.path.normpath(os.path.join(__file__, '..', '..')))
from abx import name_schema from abx import name_schema
from abx import ranks as ranks_mod
class FileContext_NameSchema_Interface_Tests(unittest.TestCase): class FileContext_NameSchema_Interface_Tests(unittest.TestCase):
""" """
Test the interfaces presented by FieldSchema. Test the interfaces presented by FieldSchema.
@ -96,5 +98,26 @@ class FileContext_NameSchema_Interface_Tests(unittest.TestCase):
self.assertEqual(schema_chain[5].rank, 'camera') self.assertEqual(schema_chain[5].rank, 'camera')
self.assertEqual(schema_chain[5].codetype[1], ('c2', 'c2', 'c2')) self.assertEqual(schema_chain[5].codetype[1], ('c2', 'c2', 'c2'))
def test_FieldSchema_Branch_load_from_project_yaml(self):
with open(self.TESTPROJECTYAML, 'rt') as yaml_file:
data = yaml.safe_load(yaml_file)
schema_dicts = data['project_schema']
ranks = [s['rank'] for s in schema_dicts]
branch = ranks_mod.Branch(
ranks_mod.Trunk,
data['project_unit'][-1]['code'],
1,
ranks)
print("\nbranch = ", branch)
print("\nbranch.rank('project') = ", repr(branch.rank('project')))
self.assertTrue(False)