From 24235489fe7b243b66ab832220693d2fc5faf22b Mon Sep 17 00:00:00 2001 From: MattiasF Date: Wed, 28 Dec 2022 02:12:25 +0100 Subject: [PATCH 01/13] Added functiong for automatically splitting parameters when added to groups, added simple tests to check write function for split groups and corrected related bugs --- c3d/c3d.py | 120 +++++++++++++++++--- test/test_software_examples_write_read.py | 129 ++++++++++++++++++++-- 2 files changed, 222 insertions(+), 27 deletions(-) diff --git a/c3d/c3d.py b/c3d/c3d.py index 0f5ec8d..90a19f7 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -477,17 +477,18 @@ def encode_events(self, events): event_disp_flags = np.zeros(18, dtype=np.uint8) event_labels = np.empty(18, dtype=object) label_bytes = bytearray(18 * 4) - for i, (time, label) in enumerate(events): - if i > 17: + write_count = 0 # Initiate counter in-case events is an empty iterator + for time, label in events: + if write_count == 18: # Don't raise Error, header events are rarely used. warnings.warn('Maximum of 18 events can be encoded in the header, skipping remaining events.') break - event_timings[i] = time - event_labels[i] = label - label_bytes[i * 4:(i + 1) * 4] = label.encode('utf-8') + event_timings[write_count] = time + event_labels[write_count] = label + label_bytes[write_count * 4:(write_count + 1) * 4] = label.encode('utf-8') + write_count += 1 - write_count = min(i + 1, 18) event_disp_flags[:write_count] = 1 # Update event headers in self @@ -557,6 +558,7 @@ def num_elements(self): e = 1 for d in self.dimensions: e *= d + assert e == np.prod(self.dimensions), f"{self.dimensions}, {e}, {np.prod(self.dimensions)}" return e @property @@ -613,6 +615,7 @@ def read(self, handle): self.bytes = b'' if self.total_bytes: self.bytes = handle.read(self.total_bytes) + desc_size, = struct.unpack('B', handle.read(1)) self.desc = desc_size and self._dtypes.decode_string(handle.read(desc_size)) or '' @@ -1881,11 +1884,48 @@ def to_writer(self, conversion): class GroupEditable(Decorator): ''' Group instance decorator providing convenience functions for Writer editing. ''' - def __init__(self, group): + def __init__(self, group: Group): super(GroupEditable, self).__init__(group) def __contains__(self, key): return key in self._decoratee + + @property + def group(self) -> Group: + return self._decoratee + + def add_param(self, name, desc='', bytes_per_element=1, bytes=b'', dimensions=None): + """ Decorate the raw Group.add_param() function to split the inputs if the leading dimension is > 255. + """ + # Dimension must fit in a 8 bit unsigned int + if len(dimensions) == 0 or dimensions[-1] < 255: + # Forward + self.group.add_param(name, + desc=desc, + bytes_per_element=bytes_per_element, + bytes=bytes, + dimensions=dimensions) + return + + # Split the parameter into partial group parameters + num_param = int((dimensions[-1] - 1) / 255) + 1 # ceil(dim / 255) + elem_per_dim = np.prod(dimensions[:-1]) + + for index in range(num_param): + name_param = name if index == 0 else "%s%i" % (name, index + 1) + # Determine the byte array for the partial parameter + first_byte = elem_per_dim * 255 * index + last_byte = first_byte + elem_per_dim * 255 + bytes_param = bytes[first_byte:last_byte] + # Determine shape + dimensions_param = dimensions.copy() + dimensions_param[-1] = min(255, dimensions_param[-1] - index * 255) + self.group.add_param(name_param, + desc=desc, + bytes_per_element=bytes_per_element, + bytes=bytes_param, + dimensions=dimensions_param) + # # Add decorator functions (throws on overwrite) # @@ -1913,7 +1953,7 @@ def add_array(self, name, desc, data, dtype=None): ''' if not isinstance(data, np.ndarray): if dtype is not None: - raise ValueError('Must specify dtype when passning non-numpy array type.') + raise ValueError('Must specify dtype when passing non-numpy array type.') data = np.array(data, dtype=dtype) elif dtype is None: dtype = data.dtype @@ -1927,17 +1967,33 @@ def add_array(self, name, desc, data, dtype=None): def add_str(self, name, desc, data, *dimensions): ''' Add a string parameter. ''' + if len(dimensions) == 0: + if not is_iterable(data): + raise ValueError("Expected bytes or strings, was %s" % str(type(data))) + if isinstance(data, str): + # Single string entry + dimensions = (len(data), ) + else: + # List of string entries + label_str, label_max_size = Writer.pack_labels(data) + dimensions = (label_max_size, len(data)) + data = label_str + elif not isinstance(data, str): + raise ValueError("Expected input to be an encodable string matching the dimension input") + self.add_param(name, desc=desc, bytes_per_element=-1, bytes=data.encode('utf-8'), dimensions=list(dimensions)) - def add_empty_array(self, name, desc, bpe): + def add_empty_array(self, name, desc, bytes_per_element=0): ''' Add an empty parameter block. ''' - self.add_param(name, desc=desc, - bytes_per_element=bpe, dimensions=[0]) + self.add_param(name, + desc=desc, + bytes_per_element=bytes_per_element, + dimensions=[0]) # # Set decorator functions (overwrites) @@ -2064,6 +2120,7 @@ def from_reader(reader, conversion=None): is_header_only = conversion == 'copy_header' is_meta_copy = conversion == 'copy_metadata' is_meta_only = is_header_only or is_meta_copy + is_consume = conversion == 'convert' or conversion is None is_shallow_copy = conversion == 'shallow_copy' or is_header_only is_deep_copy = conversion == 'copy' or is_meta_copy @@ -2093,7 +2150,7 @@ def from_reader(reader, conversion=None): # Reformat header events writer._header.encode_events(writer._header.events) - # Transfer a minimal set parameters + # Transfer a minimal parameter set writer.set_start_frame(reader.first_frame) writer.set_point_labels(reader.point_labels) writer.set_analog_labels(reader.analog_labels) @@ -2224,6 +2281,29 @@ def add_frames(self, frames, index=None): @staticmethod def pack_labels(labels): + ''' Static method used to pack and pad the set of `labels` strings before + passing the output into a `c3d.group.Group.add_str`. + Parameters + ---------- + labels : iterable + List of strings to pack and pad into a single string suitable for encoding in a Parameter entry. + Example + ------- + >>> labels = ['RFT1', 'RFT2', 'RFT3', 'LFT1', 'LFT2', 'LFT3'] + >>> param_str, label_max_size = Writer.pack_labels(labels) + >>> writer.point_group.add_str('LABELS', + 'Point labels.', + label_str, + label_max_size, + len(labels)) + Returns + ------- + param_str : str + String containing `labels` packed into a single variable where + each string is padded to match the longest `labels` string. + label_max_size : int + Number of bytes associated with the longest `label` string, all strings are padded to this length. + ''' labels = np.ravel(labels) # Get longest label name label_max_size = 0 @@ -2234,14 +2314,20 @@ def pack_labels(labels): def set_point_labels(self, labels): ''' Set point data labels. ''' - label_str, label_max_size = Writer.pack_labels(labels) - self.point_group.add_str('LABELS', 'Point labels.', label_str, label_max_size, len(labels)) + grp = self.point_group + if labels is None: + grp.add_empty_array('LABELS', 'Point labels.') + else: + grp.add_str('LABELS', 'Point labels.', labels) def set_analog_labels(self, labels): ''' Set analog data labels. ''' - label_str, label_max_size = Writer.pack_labels(labels) - self.analog_group.add_str('LABELS', 'Analog labels.', label_str, label_max_size, len(labels)) + grp = self.analog_group + if labels is None: + grp.add_empty_array('LABELS', 'Analog labels.') + else: + grp.add_str('LABELS', 'Analog labels.', labels) def set_analog_general_scale(self, value): ''' Set ANALOG:GEN_SCALE factor (uniform analog scale factor). @@ -2389,7 +2475,7 @@ def write(self, handle): self.get('POINT:DATA_START').bytes = struct.pack(' 0: + for _ in range(frames): + writer.add_frames((np.random.randn(len(labels), 5), ())) + + writer.set_point_labels(labels) + writer.set_analog_labels(None) + return writer + +class GeneratedExamples(Base): + + def test_error_writing_no_frames(self): + """ Verify no frames generates a runtime error (illegal to write empty file). + """ + writer = c3d.Writer(point_rate=200) + #for _ in range(1): + # writer.add_frames((np.random.randn(24, 5), ())) + #writer.set_point_labels(['RFT1', 'RFT2', 'RFT3', 'RFT4', 'LFT1', 'LFT2', 'LFT3', 'LFT4', + # 'RSK1', 'RSK2', 'RSK3', 'RSK4', 'LSK1', 'LSK2', 'LSK3', 'LSK4', + # 'RTH1', 'RTH2', 'RTH3', 'RTH4', 'LTH1', 'LTH2', 'LTH3', 'LTH4' + # ]) + writer.set_point_labels(None) + writer.set_analog_labels(None) + + tmp_path = os.path.join(TEMP, 'no-frames.c3d') + + try: + with open(tmp_path, 'wb') as h: + writer.write(h) + assert False, "Expected RuntimeError writing empty file." + except RuntimeError as e: + pass # RuntimeError writing empty file + + def test_writing_single_frame(self): + """ Verify writing a file with a single frame. + """ + labels = ['RFT1', 'RFT2', 'RFT3', 'RFT4', 'LFT1', 'LFT2', 'LFT3', 'LFT4', + 'RSK1', 'RSK2', 'RSK3', 'RSK4', 'LSK1', 'LSK2', 'LSK3', 'LSK4', + 'RTH1', 'RTH2', 'RTH3', 'RTH4', 'LTH1', 'LTH2', 'LTH3', 'LTH4' + ] + writer = create_dummy_writer(labels) + + tmp_path = os.path.join(TEMP, 'single-frame.c3d') + with open(tmp_path, 'wb') as h: + writer.write(h) + + with open(tmp_path, 'rb') as handle: + B = c3d.Reader(handle) + + verify.equal_headers("test_writing_single_frame", writer, B, "Original", "WriteRead", True, True) + + for a, b in zip(labels, B.get('POINT.LABELS').string_array): + assert a == b, "Label missmatch" + + + def test_write_long_param(self): + writer = create_dummy_writer() + grp = writer.add_group(66, "UnittestGroup", "Generated for unittest purposes") + + num_param = 10 + data = [] + for index in range(255 * num_param): + value = "Str:" + format(index, '08d') + assert len(value) == 12, "Unittest is invalid, expected string with length 12" + data.append(value) + + grp.add_str("LongString", "String spanning %i parameters." % num_param, data) + + tmp_path = os.path.join(TEMP, 'long_parameter.c3d') + with open(tmp_path, 'wb') as h: + writer.write(h) + + with open(tmp_path, 'rb') as handle: + B = c3d.Reader(handle) + + verify.equal_headers("test_write_long_param", writer, B, "Original", "WriteRead", True, True) + + for index in range(0, num_param): + postfix = "" if index == 0 else str(index + 1) + param_name = "UnittestGroup.LongString" + postfix + agrp = writer.get(param_name) + bgrp = B.get(param_name) + assert np.array_equal(agrp.string_array, bgrp.string_array), "Expected string data to match" + + class Sample00(Base): ZIP = 'sample00.zip' zip_files = \ [ - ('Advanced Realtime Tracking GmbH', ['arthuman-sample.c3d', 'arthuman-sample-fingers.c3d']), - ('Codamotion', ['codamotion_gaitwands_19970212.c3d', 'codamotion_gaitwands_20150204.c3d']), - ('Cometa Systems', ['EMG Data Cometa.c3d']), - ('Innovative Sports Training', ['Gait with EMG.c3d', 'Static Pose.c3d']), - ('Motion Analysis Corporation', ['Sample_Jump2.c3d', 'Walk1.c3d']), - ('NexGen Ergonomics', ['test1.c3d']), + #('Advanced Realtime Tracking GmbH', ['arthuman-sample.c3d', 'arthuman-sample-fingers.c3d']), + #('Codamotion', ['codamotion_gaitwands_19970212.c3d', 'codamotion_gaitwands_20150204.c3d']), + #('Cometa Systems', ['EMG Data Cometa.c3d']), + #('Innovative Sports Training', ['Gait with EMG.c3d', 'Static Pose.c3d']), + #('Motion Analysis Corporation', ['Sample_Jump2.c3d', 'Walk1.c3d']), + #('NexGen Ergonomics', ['test1.c3d']), # Vicon files are weird, uses non-standard encodings. Walking01.c3d contain nan values. ('Vicon Motion Systems', ['pyCGM2 lower limb CGM24 Walking01.c3d', 'TableTennis.c3d']), ] - def test_read_write_examples(self): - ''' Compare write ouput to original read + def test_read_write_copy_examples(self): + ''' Compare data copied through the `Writer` class to data in the original file ''' print('----------------------------') @@ -62,6 +158,19 @@ def test_read_write_examples(self): print('... OK') print('DONE') + def test_read_write_shallow_copy_examples(self): + ''' Compare shallow copied data written by the `Writer` class to data in the original file + ''' + + print('----------------------------') + print('Shallow-copy') + print('----------------------------') + folder, files = self.zip_files[-1] + print('{} | Validating...'.format(folder)) + for file in files: + verify_read_write(self.ZIP, '{}/{}'.format(folder, file), cpy_mode='shallow_copy') + print('... OK') + print('DONE') class Sample01(Base): From 5fe8d95fb8aa8fff147d9e40afaacd9eb6a24cf1 Mon Sep 17 00:00:00 2001 From: MattiasF Date: Wed, 28 Dec 2022 12:52:13 +0100 Subject: [PATCH 02/13] Write only analog --- c3d/c3d.py | 36 +++++++++++---------- test/test_software_examples_write_read.py | 39 ++++++++++++++++++----- 2 files changed, 50 insertions(+), 25 deletions(-) diff --git a/c3d/c3d.py b/c3d/c3d.py index 90a19f7..c639f16 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -1162,19 +1162,19 @@ def group_listed(self): def _check_metadata(self): ''' Ensure that the metadata in our file is self-consistent. ''' assert self._header.point_count == self.point_used, ( - 'inconsistent point count! {} header != {} POINT:USED'.format( + 'Inconsistent point count, {} header != {} POINT:USED'.format( self._header.point_count, self.point_used, )) assert self._header.scale_factor == self.point_scale, ( - 'inconsistent scale factor! {} header != {} POINT:SCALE'.format( + 'Inconsistent scale factor, {} header != {} POINT:SCALE'.format( self._header.scale_factor, self.point_scale, )) assert self._header.frame_rate == self.point_rate, ( - 'inconsistent frame rate! {} header != {} POINT:RATE'.format( + 'Inconsistent frame rate, {} header != {} POINT:RATE'.format( self._header.frame_rate, self.point_rate, )) @@ -1184,7 +1184,7 @@ def _check_metadata(self): else: ratio = 0 assert self._header.analog_per_frame == ratio, ( - 'inconsistent analog rate! {} header != {} analog-fps / {} point-fps'.format( + 'Inconsistent analog rate, {} header != {} analog-fps / {} point-fps'.format( self._header.analog_per_frame, self.analog_rate, self.point_rate, @@ -1192,7 +1192,7 @@ def _check_metadata(self): count = self.analog_used * self._header.analog_per_frame assert self._header.analog_count == count, ( - 'inconsistent analog count! {} header != {} analog used * {} per-frame'.format( + 'Inconsistent analog count, {} header != {} analog used * {} per-frame'.format( self._header.analog_count, self.analog_used, self._header.analog_per_frame, @@ -1201,7 +1201,7 @@ def _check_metadata(self): try: start = self.get_uint16('POINT:DATA_START') if self._header.data_block != start: - warnings.warn('inconsistent data block! {} header != {} POINT:DATA_START'.format( + warnings.warn('Inconsistent data block, {} header != {} POINT:DATA_START'.format( self._header.data_block, start)) except AttributeError: warnings.warn('''no pointer available in POINT:DATA_START indicating the start of the data block, using @@ -2360,11 +2360,11 @@ def set_analog_offsets(self, values): ''' if is_iterable(values): data = np.array([v for v in values], dtype=np.int16) - self.analog_group.set_array('OFFSET', 'Analog channel offsets', data) elif values is None: - self.analog_group.set_empty_array('OFFSET', 'Analog channel offsets', 2) + data = np.zeros((self.analog_used, ), dtype=np.int16) else: raise ValueError('Expected iterable containing analog data offsets.') + self.analog_group.set_array('OFFSET', 'Analog channel offsets', data) def set_start_frame(self, frame=1): ''' Set the 'TRIAL:ACTUAL_START_FIELD' parameter and header.first_frame entry. @@ -2434,6 +2434,7 @@ def write(self, handle): # POINT group group = self.point_group + self._header.point_count = np.uint16(ppf) group.set('USED', 'Number of point samples', 2, '= UINT16_MAX: @@ -2454,6 +2455,7 @@ def write(self, handle): # ANALOG group group = self.analog_group + self._header.analog_count = np.uint16(np.prod(np.shape(analog))) group.set('USED', 'Analog channel count', 2, '= 0.0 - raw[~valid, 3] = -1 - raw[valid, :3] = points[valid, :3] / point_scale - raw[valid, 3] = np.bitwise_or(np.rint(points[valid, 3] / scale_mag).astype(np.uint8), - (points[valid, 4].astype(np.uint16) << 8), - dtype=np.uint16) + if self.point_used: + # If not empty + valid = points[:, 3] >= 0.0 + raw[~valid, 3] = -1 + raw[valid, :3] = points[valid, :3] / point_scale + raw[valid, 3] = np.bitwise_or(np.rint(points[valid, 3] / scale_mag).astype(np.uint8), + (points[valid, 4].astype(np.uint16) << 8), + dtype=np.uint16) # Transform analog data analog = analog * analog_scales_inv + analog_offsets @@ -2553,4 +2554,5 @@ def _write_frames(self, handle): analog = analog.astype(point_dtype) handle.write(raw.tobytes()) handle.write(analog.tobytes()) + self._pad_block(handle) diff --git a/test/test_software_examples_write_read.py b/test/test_software_examples_write_read.py index fd062ee..2de958e 100644 --- a/test/test_software_examples_write_read.py +++ b/test/test_software_examples_write_read.py @@ -59,12 +59,6 @@ def test_error_writing_no_frames(self): """ Verify no frames generates a runtime error (illegal to write empty file). """ writer = c3d.Writer(point_rate=200) - #for _ in range(1): - # writer.add_frames((np.random.randn(24, 5), ())) - #writer.set_point_labels(['RFT1', 'RFT2', 'RFT3', 'RFT4', 'LFT1', 'LFT2', 'LFT3', 'LFT4', - # 'RSK1', 'RSK2', 'RSK3', 'RSK4', 'LSK1', 'LSK2', 'LSK3', 'LSK4', - # 'RTH1', 'RTH2', 'RTH3', 'RTH4', 'LTH1', 'LTH2', 'LTH3', 'LTH4' - # ]) writer.set_point_labels(None) writer.set_analog_labels(None) @@ -77,7 +71,7 @@ def test_error_writing_no_frames(self): except RuntimeError as e: pass # RuntimeError writing empty file - def test_writing_single_frame(self): + def test_writing_single_point_frame(self): """ Verify writing a file with a single frame. """ labels = ['RFT1', 'RFT2', 'RFT3', 'RFT4', 'LFT1', 'LFT2', 'LFT3', 'LFT4', @@ -93,10 +87,39 @@ def test_writing_single_frame(self): with open(tmp_path, 'rb') as handle: B = c3d.Reader(handle) - verify.equal_headers("test_writing_single_frame", writer, B, "Original", "WriteRead", True, True) + verify.equal_headers("test_writing_single_point_frame", writer, B, "Original", "WriteRead", True, True) for a, b in zip(labels, B.get('POINT.LABELS').string_array): assert a == b, "Label missmatch" + + def test_writing_analog_frames(self): + """ Verify writing a file with a single frame. + """ + labels = ['RFT1', 'RFT2', 'RFT3', 'RFT4', 'LFT1', 'LFT2', 'LFT3', 'LFT4', + 'RSK1', 'RSK2', 'RSK3', 'RSK4', 'LSK1', 'LSK2', 'LSK3', 'LSK4', + 'RTH1', 'RTH2', 'RTH3', 'RTH4', 'LTH1', 'LTH2', 'LTH3', 'LTH4' + ] + writer = create_dummy_writer(labels) + writer = c3d.Writer(point_rate=12, analog_rate=36) + + frames = 1 + for _ in range(frames): + writer.add_frames(((), np.random.randn(len(labels), writer.analog_per_frame),)) + + writer.set_point_labels(None) + writer.set_analog_labels(labels) + + tmp_path = os.path.join(TEMP, 'single-frame.c3d') + with open(tmp_path, 'wb') as h: + writer.write(h) + + with open(tmp_path, 'rb') as handle: + B = c3d.Reader(handle) + + verify.equal_headers("test_writing_single_point_frame", writer, B, "Original", "WriteRead", True, True) + + for a, b in zip(labels, B.get('ANALOG.LABELS').string_array): + assert a == b, "Label missmatch" def test_write_long_param(self): From 0385568b295659d7f3967358f060d095595f402d Mon Sep 17 00:00:00 2001 From: MattiasF Date: Wed, 28 Dec 2022 13:09:40 +0100 Subject: [PATCH 03/13] Write only analog --- c3d/c3d.py | 16 +++++++++++++++- test/test_software_examples_write_read.py | 17 +++++++++++------ 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/c3d/c3d.py b/c3d/c3d.py index c639f16..84ae009 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -1433,7 +1433,10 @@ def point_used(self): @property def analog_used(self): - ''' Number of analog measurements, or channels, for each analog data sample. + ''' Number of analog measured variables, or channels, within a analog data sample/frame. + + Does not account for the number of samples for each channel per point frame, + see 'analog_per_frame' to find the total number of samples recorded per frame. ''' try: return self.get_uint16('ANALOG:USED') @@ -2273,6 +2276,17 @@ def add_frames(self, frames, index=None): raise ValueError( 'Expected frame input to be sequence of point and analog pairs on form (-1, 2). ' + 'Input was of shape {}.'.format(str(sh))) + + point_shape = np.shape(frames[0][0]) + analog_shape = np.shape(frames[0][1]) + + if len(analog_shape) != 2 or analog_shape[0] != self.analog_used or analog_shape[1] != self.analog_per_frame: + raise ValueError("Expected analog frame to be a 2D array on form (analog_used, analog_per_frame), " + "was on form %s" % str(analog_shape)) + + if len(point_shape) != 2 or point_shape[0] != self.point_used or point_shape[1] != 5: + raise ValueError("Expected point frame to be a 2D array on form (analog_used, 5), " + "was on form %s" % str(point_shape)) if index is not None: self._frames[index:index] = frames diff --git a/test/test_software_examples_write_read.py b/test/test_software_examples_write_read.py index 2de958e..81c558e 100644 --- a/test/test_software_examples_write_read.py +++ b/test/test_software_examples_write_read.py @@ -80,7 +80,7 @@ def test_writing_single_point_frame(self): ] writer = create_dummy_writer(labels) - tmp_path = os.path.join(TEMP, 'single-frame.c3d') + tmp_path = os.path.join(TEMP, 'single-point-frame.c3d') with open(tmp_path, 'wb') as h: writer.write(h) @@ -99,17 +99,14 @@ def test_writing_analog_frames(self): 'RSK1', 'RSK2', 'RSK3', 'RSK4', 'LSK1', 'LSK2', 'LSK3', 'LSK4', 'RTH1', 'RTH2', 'RTH3', 'RTH4', 'LTH1', 'LTH2', 'LTH3', 'LTH4' ] - writer = create_dummy_writer(labels) writer = c3d.Writer(point_rate=12, analog_rate=36) - frames = 1 - for _ in range(frames): - writer.add_frames(((), np.random.randn(len(labels), writer.analog_per_frame),)) + writer.add_frames(((), np.random.randn(len(labels), writer.analog_per_frame),)) writer.set_point_labels(None) writer.set_analog_labels(labels) - tmp_path = os.path.join(TEMP, 'single-frame.c3d') + tmp_path = os.path.join(TEMP, 'single-analog-frame.c3d') with open(tmp_path, 'wb') as h: writer.write(h) @@ -121,6 +118,14 @@ def test_writing_analog_frames(self): for a, b in zip(labels, B.get('ANALOG.LABELS').string_array): assert a == b, "Label missmatch" + def test_write_invalid_analog_frame_count(self): + """ Verify writing a file with a single frame. + """ + writer = c3d.Writer(point_rate=12, analog_rate=36) + + with self.assertRaises(ValueError): + writer.add_frames(((), np.random.randn(14, writer.analog_per_frame - 1),)) + def test_write_long_param(self): writer = create_dummy_writer() From 84575163604c674e47b61e4e6a7b3ace7ea59027 Mon Sep 17 00:00:00 2001 From: MattiasF Date: Wed, 28 Dec 2022 14:09:50 +0100 Subject: [PATCH 04/13] Added header_only and convert tests --- c3d/c3d.py | 47 +++++++++---- test/test_software_examples_write_read.py | 81 ++++++++++++++++++++--- 2 files changed, 104 insertions(+), 24 deletions(-) diff --git a/c3d/c3d.py b/c3d/c3d.py index 84ae009..65b53ba 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -492,7 +492,7 @@ def encode_events(self, events): event_disp_flags[:write_count] = 1 # Update event headers in self - self.long_event_labels = 0x3039 # Magic number + self.long_event_labels = True # Bool used instead of magic number 0x3039 self.event_count = write_count # Update event block self.event_timings = event_timings[:write_count] @@ -2140,10 +2140,7 @@ def from_reader(reader, conversion=None): if is_consume: writer._header = reader._header - reader._header = None writer._groups = reader._groups - reader._groups = None - del reader elif is_deep_copy: writer._header = copy.deepcopy(reader._header) writer._groups = copy.deepcopy(reader._groups) @@ -2167,6 +2164,9 @@ def from_reader(reader, conversion=None): # Copy frames for (i, point, analog) in reader.read_frames(copy=True, camera_sum=False): writer.add_frames((point, analog)) + if is_consume: + del reader + return writer @property @@ -2277,16 +2277,35 @@ def add_frames(self, frames, index=None): 'Expected frame input to be sequence of point and analog pairs on form (-1, 2). ' + 'Input was of shape {}.'.format(str(sh))) - point_shape = np.shape(frames[0][0]) - analog_shape = np.shape(frames[0][1]) - - if len(analog_shape) != 2 or analog_shape[0] != self.analog_used or analog_shape[1] != self.analog_per_frame: - raise ValueError("Expected analog frame to be a 2D array on form (analog_used, analog_per_frame), " - "was on form %s" % str(analog_shape)) - - if len(point_shape) != 2 or point_shape[0] != self.point_used or point_shape[1] != 5: - raise ValueError("Expected point frame to be a 2D array on form (analog_used, 5), " - "was on form %s" % str(point_shape)) + analog_frame = frames[0][1] + point_frame = frames[0][0] + analog_shape = np.shape(analog_frame) + point_shape = np.shape(point_frame) + + # Verify frame rate matches for analog + if len(analog_frame) and analog_shape[1] != self.analog_per_frame: + raise ValueError("Expected analog frame to be a 2D array with the second " + "dimension matching the analog_frame_rate / point_frame_rate = " + "%i, was %i" % (self.analog_per_frame, analog_shape[1])) + + # Verify frames added matches existing + if len(self._frames): + if len(analog_frame): + # If there are a analog frame included, verify shape + analog_shape = np.shape(analog_frame) + if len(analog_shape) != 2 or analog_shape[0] != self.analog_used: + raise ValueError("Expected analog frame to be a 2D array on form (analog_used, analog_per_frame), " + "was on form %s" % str(analog_shape)) + + if len(point_frame): + # If there are a point frame included, verify shape + if len(point_shape) != 2 or point_shape[0] != self.point_used or point_shape[1] != 5: + raise ValueError("Expected point frame to be a 2D array on form " + "(%i, 5), was on form %s" % (self.point_used, str(point_shape))) + else: + # Define the count in the header + self._header.point_count = point_shape[0] + self._header.analog_count = analog_shape[0] if index is not None: self._frames[index:index] = frames diff --git a/test/test_software_examples_write_read.py b/test/test_software_examples_write_read.py index 81c558e..fd93eba 100644 --- a/test/test_software_examples_write_read.py +++ b/test/test_software_examples_write_read.py @@ -13,6 +13,7 @@ def verify_read_write(zip, file_path, proc_type='INTEL', real=True, cpy_mode='copy'): ''' Compare read write ouput to original read file. ''' + assert cpy_mode != 'copy_header', "Copy mode not supported for verification" A = c3d.Reader(Zipload._get(zip, file_path)) if proc_type != 'INTEL': @@ -31,8 +32,12 @@ def verify_read_write(zip, file_path, proc_type='INTEL', real=True, cpy_mode='co with open(tmp_path, 'rb') as handle: B = c3d.Reader(handle) - verify.equal_headers(test_id, A, B, aname, bname, real, real) - verify.data_is_equal(A, B, aname, bname) + if cpy_mode == 'convert': + # Compare writer instead + verify.equal_headers(test_id, writer, B, aname, bname, real, real) + else: + verify.equal_headers(test_id, A, B, aname, bname, real, real) + verify.data_is_equal(A, B, aname, bname) def create_dummy_writer(labels=None, frames=1): @@ -101,7 +106,8 @@ def test_writing_analog_frames(self): ] writer = c3d.Writer(point_rate=12, analog_rate=36) - writer.add_frames(((), np.random.randn(len(labels), writer.analog_per_frame),)) + for i in range(5): + writer.add_frames(((), np.random.randn(len(labels), writer.analog_per_frame),)) writer.set_point_labels(None) writer.set_analog_labels(labels) @@ -119,13 +125,38 @@ def test_writing_analog_frames(self): assert a == b, "Label missmatch" def test_write_invalid_analog_frame_count(self): - """ Verify writing a file with a single frame. + """ Verify error is thrown when passing a single analog frame with invalid frame rate. """ writer = c3d.Writer(point_rate=12, analog_rate=36) with self.assertRaises(ValueError): writer.add_frames(((), np.random.randn(14, writer.analog_per_frame - 1),)) + def test_write_mismatching_analog_frame_count(self): + """ Verify error is thrown when passing multiple analog frames with mismatching shapes. + """ + writer = c3d.Writer(point_rate=12, analog_rate=36) + + writer.add_frames(((), np.random.randn(14, writer.analog_per_frame),)) + with self.assertRaises(ValueError): + writer.add_frames(((), np.random.randn(13, writer.analog_per_frame),)) + with self.assertRaises(ValueError): + writer.add_frames(((), np.random.randn(13, writer.analog_per_frame - 1),)) + + def test_write_mismatching_frames(self): + """ Verify error is thrown when passing multiple frames with mismatching shapes. + """ + writer = c3d.Writer(point_rate=12, analog_rate=36) + + writer.add_frames((np.random.randn(6, 5), np.random.randn(14, writer.analog_per_frame),)) + with self.assertRaises(ValueError): + writer.add_frames((np.random.randn(222, 5), np.random.randn(14, writer.analog_per_frame),)) + with self.assertRaises(ValueError): + writer.add_frames((np.random.randn(5, 3), np.random.randn(14, writer.analog_per_frame),)) + with self.assertRaises(ValueError): + writer.add_frames((np.random.randn(6, 5), np.random.randn(13, writer.analog_per_frame),)) + with self.assertRaises(ValueError): + writer.add_frames((np.random.randn(6, 5), np.random.randn(13, writer.analog_per_frame - 1),)) def test_write_long_param(self): writer = create_dummy_writer() @@ -162,12 +193,12 @@ class Sample00(Base): ZIP = 'sample00.zip' zip_files = \ [ - #('Advanced Realtime Tracking GmbH', ['arthuman-sample.c3d', 'arthuman-sample-fingers.c3d']), - #('Codamotion', ['codamotion_gaitwands_19970212.c3d', 'codamotion_gaitwands_20150204.c3d']), - #('Cometa Systems', ['EMG Data Cometa.c3d']), - #('Innovative Sports Training', ['Gait with EMG.c3d', 'Static Pose.c3d']), - #('Motion Analysis Corporation', ['Sample_Jump2.c3d', 'Walk1.c3d']), - #('NexGen Ergonomics', ['test1.c3d']), + ('Advanced Realtime Tracking GmbH', ['arthuman-sample.c3d', 'arthuman-sample-fingers.c3d']), + ('Codamotion', ['codamotion_gaitwands_19970212.c3d', 'codamotion_gaitwands_20150204.c3d']), + ('Cometa Systems', ['EMG Data Cometa.c3d']), + ('Innovative Sports Training', ['Gait with EMG.c3d', 'Static Pose.c3d']), + ('Motion Analysis Corporation', ['Sample_Jump2.c3d', 'Walk1.c3d']), + ('NexGen Ergonomics', ['test1.c3d']), # Vicon files are weird, uses non-standard encodings. Walking01.c3d contain nan values. ('Vicon Motion Systems', ['pyCGM2 lower limb CGM24 Walking01.c3d', 'TableTennis.c3d']), ] @@ -199,6 +230,36 @@ def test_read_write_shallow_copy_examples(self): verify_read_write(self.ZIP, '{}/{}'.format(folder, file), cpy_mode='shallow_copy') print('... OK') print('DONE') + + def test_read_write_convert_examples(self): + ''' Compare data written by a 'convert' `Reader` to data in the original file + ''' + + print('----------------------------') + print('Convert') + print('----------------------------') + folder, files = self.zip_files[-1] + print('{} | Validating...'.format(folder)) + for file in files: + verify_read_write(self.ZIP, '{}/{}'.format(folder, file), cpy_mode='convert') + print('... OK') + print('DONE') + + def test_read_write_header_examples(self): + ''' Compare data written by a 'copy_header' only `Writer` to data in the original file + ''' + + print('----------------------------') + print('copy_header') + print('----------------------------') + folder, files = self.zip_files[-1] + print('{} | Validating...'.format(folder)) + for file in files: + A = c3d.Reader(Zipload._get(self.ZIP, '{}/{}'.format(folder, file))) + writer = A.to_writer('copy_header') + verify.equal_headers('test_read_write_header_examples', A, writer, 'Original', 'Writer Copy', True, True) + print('... OK') + print('DONE') class Sample01(Base): From 373d952804d06dba7f7a89308cf61d558ee15a4c Mon Sep 17 00:00:00 2001 From: MattiasF Date: Wed, 28 Dec 2022 14:28:51 +0100 Subject: [PATCH 05/13] Bool for enabling testing all zip examples, support passing empty list to set_labels, long_event_flag check correction, comments --- c3d/c3d.py | 18 ++++--- test/test_software_examples_write_read.py | 63 ++++++++++++++++++----- test/verify.py | 4 +- 3 files changed, 63 insertions(+), 22 deletions(-) diff --git a/c3d/c3d.py b/c3d/c3d.py index 65b53ba..9eb9740 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -451,7 +451,7 @@ def events(self): ''' return zip(self.event_timings[self.event_disp_flags], self.event_labels[self.event_disp_flags]) - def encode_events(self, events): + def encode_events(self, events, long_event_labels=True): ''' Encode event data in the event block. Parameters @@ -1500,11 +1500,17 @@ def analog_resolution(self): @property def point_labels(self): - return self.get('POINT:LABELS').string_array + grp = self.get('POINT:LABELS') + if grp is None: + return None + return grp.string_array @property def analog_labels(self): - return self.get('ANALOG:LABELS').string_array + grp = self.get('ANALOG:LABELS') + if grp is None: + return None + return grp.string_array @property def frame_count(self): @@ -2102,8 +2108,8 @@ def from_reader(reader, conversion=None): 'copy' - Reader objects will be deep copied. 'copy_metadata' - Similar to 'copy' but only copies metadata and not point and analog frame data. - 'copy_shallow' - Similar to 'copy' but group parameters are - not copied. + 'copy_shallow' - Similar to 'copy' but group parameters are not copied. + Usefull for stripping away parameter meta data. 'copy_header' - Similar to 'copy_shallow' but only the header is copied (frame data is not copied). @@ -2340,7 +2346,7 @@ def pack_labels(labels): labels = np.ravel(labels) # Get longest label name label_max_size = 0 - label_max_size = max(label_max_size, np.max([len(label) for label in labels])) + label_max_size = max(label_max_size, np.max([0] + [len(label) for label in labels])) label_str = ''.join(label.ljust(label_max_size) for label in labels) return label_str, label_max_size diff --git a/test/test_software_examples_write_read.py b/test/test_software_examples_write_read.py index fd93eba..ce08256 100644 --- a/test/test_software_examples_write_read.py +++ b/test/test_software_examples_write_read.py @@ -9,6 +9,9 @@ from test.base import Base from test.zipload import Zipload, TEMP +# Run all examples on all copy tests +MINIMAL_TEST = True + def verify_read_write(zip, file_path, proc_type='INTEL', real=True, cpy_mode='copy'): ''' Compare read write ouput to original read file. @@ -224,10 +227,14 @@ def test_read_write_shallow_copy_examples(self): print('----------------------------') print('Shallow-copy') print('----------------------------') - folder, files = self.zip_files[-1] - print('{} | Validating...'.format(folder)) - for file in files: - verify_read_write(self.ZIP, '{}/{}'.format(folder, file), cpy_mode='shallow_copy') + if MINIMAL_TEST: + zip_files = [self.zip_files[-1]] + else: + zip_files = self.zip_files + for folder, files in zip_files: + print('{} | Validating...'.format(folder)) + for file in files: + verify_read_write(self.ZIP, '{}/{}'.format(folder, file), cpy_mode='shallow_copy') print('... OK') print('DONE') @@ -238,10 +245,14 @@ def test_read_write_convert_examples(self): print('----------------------------') print('Convert') print('----------------------------') - folder, files = self.zip_files[-1] - print('{} | Validating...'.format(folder)) - for file in files: - verify_read_write(self.ZIP, '{}/{}'.format(folder, file), cpy_mode='convert') + if MINIMAL_TEST: + zip_files = [self.zip_files[-1]] + else: + zip_files = self.zip_files + for folder, files in zip_files: + print('{} | Validating...'.format(folder)) + for file in files: + verify_read_write(self.ZIP, '{}/{}'.format(folder, file), cpy_mode='convert') print('... OK') print('DONE') @@ -252,12 +263,36 @@ def test_read_write_header_examples(self): print('----------------------------') print('copy_header') print('----------------------------') - folder, files = self.zip_files[-1] - print('{} | Validating...'.format(folder)) - for file in files: - A = c3d.Reader(Zipload._get(self.ZIP, '{}/{}'.format(folder, file))) - writer = A.to_writer('copy_header') - verify.equal_headers('test_read_write_header_examples', A, writer, 'Original', 'Writer Copy', True, True) + if MINIMAL_TEST: + zip_files = [self.zip_files[-1]] + else: + zip_files = self.zip_files + for folder, files in zip_files: + print('{} | Validating...'.format(folder)) + for file in files: + A = c3d.Reader(Zipload._get(self.ZIP, '{}/{}'.format(folder, file))) + writer = A.to_writer('copy_header') + verify.equal_headers('test_read_write_header_examples', A, writer, 'Original', 'Writer Copy', True, True) + print('... OK') + print('DONE') + + def test_read_write_copy_metadata_examples(self): + ''' Compare data written by a 'copy_metadata' only `Writer` to data in the original file + ''' + + print('----------------------------') + print('copy_metadata') + print('----------------------------') + if MINIMAL_TEST: + zip_files = [self.zip_files[-1]] + else: + zip_files = self.zip_files + for folder, files in zip_files: + print('{} | Validating...'.format(folder)) + for file in files: + A = c3d.Reader(Zipload._get(self.ZIP, '{}/{}'.format(folder, file))) + writer = A.to_writer('copy_metadata') + verify.equal_headers('test_read_write_copy_metadata_examples', A, writer, 'Original', 'Writer Copy', True, True) print('... OK') print('DONE') diff --git a/test/verify.py b/test/verify.py index 17e6d05..dfe63e4 100644 --- a/test/verify.py +++ b/test/verify.py @@ -213,8 +213,8 @@ def equal_headers(test_label, areader, breader, alabel, blabel, areal, breal): assert aheader.max_gap == bheader.max_gap, \ '{}, max_gap: {} {}, {} {}'.format( test_label, alabel, aheader.max_gap, blabel, bheader.max_gap) - assert aheader.long_event_labels == bheader.long_event_labels, \ - '{}, long_event_labels: {} {}, {} {}'.format( + assert isinstance(aheader.long_event_labels, bool) and isinstance(bheader.long_event_labels, bool), \ + '{}, expected long_event_labels to be boolean: {} {}, {} {}'.format( test_label, alabel, aheader.long_event_labels, blabel, bheader.long_event_labels) event_mismatch = max(0, bheader.event_count - aheader.event_count) From a3bf40efabab7fa6efdc19d89a434c2395a1800b Mon Sep 17 00:00:00 2001 From: MattiasF Date: Wed, 28 Dec 2022 16:02:06 +0100 Subject: [PATCH 06/13] Check invalid frame input --- c3d/c3d.py | 46 +++++++++++++++++------ test/test_software_examples_write_read.py | 30 +++++++++++++++ 2 files changed, 64 insertions(+), 12 deletions(-) diff --git a/c3d/c3d.py b/c3d/c3d.py index 9eb9740..af0bbe9 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -451,7 +451,7 @@ def events(self): ''' return zip(self.event_timings[self.event_disp_flags], self.event_labels[self.event_disp_flags]) - def encode_events(self, events, long_event_labels=True): + def encode_events(self, events): ''' Encode event data in the event block. Parameters @@ -2266,27 +2266,49 @@ def add_frames(self, frames, index=None): Parameters ---------- - frames : Single or sequence of (point, analog) pairs + frames : Single or sequence of (point, analog) pairs. A sequence or frame of frame data to add to the writer. index : int or None Insert the frame or sequence at the index (the first sequence frame will be inserted at give index). Note that the index should be relative to 0 rather then the frame number provided by read_frames()! ''' - sh = np.array(frames, dtype=object).shape - # Single frame - if len(sh) != 2: + def get_depth(data): + """ Find the number of nested arrays. + """ + if is_iterable(data): + try: + return get_depth(next(iter(data))) + 1 + except StopIteration as e: + return 1 # Empty iterable + return 0 + + if len(frames) > 1: + depth = max(get_depth(frames[0]), get_depth(frames[1])) + 1 + else: + try: + depth = max(get_depth(frames[0][0]), get_depth(frames[0][1])) + 2 + except IndexError as e: + raise ValueError( + 'Expected frame input to be sequence of point and analog pairs on form (-1, 2). ' + 'Input was of shape {}.'.format(str(np.array(frames, dtype=object).shape))) + + if depth == 3: + # Single frame frames = [frames] - sh = np.shape(frames) - # Sequence of invalid shape - if sh[1] != 2: + elif depth < 3 or depth > 4: raise ValueError( - 'Expected frame input to be sequence of point and analog pairs on form (-1, 2). ' + - 'Input was of shape {}.'.format(str(sh))) + 'Expected frame input to be sequence of point and analog data frame pairs on form (-1, 2). ' + 'Input was of shape {}.'.format(str(np.array(frames, dtype=object).shape))) + + # Ensure data frames are numpy arrays + frames = [ + (np.array(point), np.array(analog), ) for point, analog in frames + ] - analog_frame = frames[0][1] point_frame = frames[0][0] - analog_shape = np.shape(analog_frame) + analog_frame = frames[0][1] point_shape = np.shape(point_frame) + analog_shape = np.shape(analog_frame) # Verify frame rate matches for analog if len(analog_frame) and analog_shape[1] != self.analog_per_frame: diff --git a/test/test_software_examples_write_read.py b/test/test_software_examples_write_read.py index ce08256..fdfd663 100644 --- a/test/test_software_examples_write_read.py +++ b/test/test_software_examples_write_read.py @@ -79,6 +79,36 @@ def test_error_writing_no_frames(self): except RuntimeError as e: pass # RuntimeError writing empty file + def test_error_adding_invalid_frames(self): + """ Verify no frames generates a runtime error (illegal to write empty file). + """ + writer = c3d.Writer(point_rate=200) + writer.set_point_labels(None) + writer.set_analog_labels(None) + + with self.assertRaises(ValueError): + writer.add_frames(((), ()),) + + with self.assertRaises(ValueError): + # Invalid, to few dims + writer.add_frames(np.random.randn(3, 5)) + + with self.assertRaises(ValueError): + # Invalid, expect first dim to contain 2 elements + writer.add_frames(np.random.randn(3, 13, 5)) + + with self.assertRaises(ValueError): + # Mismatch due to invalid second dim + writer.add_frames(np.random.randn(4, 3, 27, 5)) + + with self.assertRaises(ValueError): + # Raise due to analog rate mismatch (invalid 4th dim) + writer.add_frames(np.random.randn(4, 2, 17, 5)) + + with self.assertRaises(ValueError): + # Raise due to extra dimensions + writer.add_frames(np.random.randn(5, 4, 2, 7, 5)) + def test_writing_single_point_frame(self): """ Verify writing a file with a single frame. """ From 03f82733745ea3894b0a2a47e7efb6c6f105df9f Mon Sep 17 00:00:00 2001 From: MattiasF Date: Wed, 28 Dec 2022 16:29:55 +0100 Subject: [PATCH 07/13] Corrected last np.array warning message, added get_depth to avoid numpy object array conversion and added tests for odd shaped inputs --- c3d/c3d.py | 17 +++++------ test/test_software_examples_write_read.py | 37 +++++++++++++++++++++-- test/verify.py | 2 +- 3 files changed, 44 insertions(+), 12 deletions(-) diff --git a/c3d/c3d.py b/c3d/c3d.py index af0bbe9..88c9f40 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -2282,15 +2282,14 @@ def get_depth(data): return 1 # Empty iterable return 0 - if len(frames) > 1: - depth = max(get_depth(frames[0]), get_depth(frames[1])) + 1 - else: - try: - depth = max(get_depth(frames[0][0]), get_depth(frames[0][1])) + 2 - except IndexError as e: - raise ValueError( - 'Expected frame input to be sequence of point and analog pairs on form (-1, 2). ' - 'Input was of shape {}.'.format(str(np.array(frames, dtype=object).shape))) + # Attempt find for multi frame arguments + try: + depth = max(get_depth(frames[0][0]), get_depth(frames[0][1])) + 2 + except IndexError as e: + depth = 0 + if len(frames) == 2: + # Attempt find for single frame argument + depth = max(get_depth(frames[0]), get_depth(frames[1])) + 1 if depth == 3: # Single frame diff --git a/test/test_software_examples_write_read.py b/test/test_software_examples_write_read.py index fdfd663..40c08d5 100644 --- a/test/test_software_examples_write_read.py +++ b/test/test_software_examples_write_read.py @@ -53,9 +53,14 @@ def create_dummy_writer(labels=None, frames=1): 'RTH1', 'RTH2', 'RTH3', 'RTH4', 'LTH1', 'LTH2', 'LTH3', 'LTH4' ] - if frames > 0: + if frames == 1: for _ in range(frames): writer.add_frames((np.random.randn(len(labels), 5), ())) + elif frames > 1: + new_frames = [] + for __ in range(5): + new_frames.append((np.random.randn(len(labels), 5), ())) + writer.add_frames(new_frames) writer.set_point_labels(labels) writer.set_analog_labels(None) @@ -129,6 +134,19 @@ def test_writing_single_point_frame(self): for a, b in zip(labels, B.get('POINT.LABELS').string_array): assert a == b, "Label missmatch" + + def test_writing_multiple_point_frame(self): + """ Verify writing a file with a single frame. + """ + writer = create_dummy_writer(frames=10) + + tmp_path = os.path.join(TEMP, 'single-point-frame.c3d') + with open(tmp_path, 'wb') as h: + writer.write(h) + + with open(tmp_path, 'rb') as handle: + B = c3d.Reader(handle) + verify.equal_headers("test_writing_multiple_point_frame", writer, B, "Original", "WriteRead", True, True) def test_writing_analog_frames(self): """ Verify writing a file with a single frame. @@ -139,9 +157,21 @@ def test_writing_analog_frames(self): ] writer = c3d.Writer(point_rate=12, analog_rate=36) - for i in range(5): + # Single frame input + for __ in range(5): writer.add_frames(((), np.random.randn(len(labels), writer.analog_per_frame),)) + # Twin frame input + new_frames = [((), np.random.randn(len(labels), writer.analog_per_frame),), + ((), np.random.randn(len(labels), writer.analog_per_frame),)] + writer.add_frames(new_frames) + + # Multi frame input + new_frames = [] + for __ in range(5): + new_frames.append(((), np.random.randn(len(labels), writer.analog_per_frame),)) + writer.add_frames(new_frames) + writer.set_point_labels(None) writer.set_analog_labels(labels) @@ -154,6 +184,9 @@ def test_writing_analog_frames(self): verify.equal_headers("test_writing_single_point_frame", writer, B, "Original", "WriteRead", True, True) + assert B.analog_sample_count == 12 * writer.analog_per_frame, "Expected {} samples was {}".format( + B.analog_sample_count, 12 * writer.analog_per_frame) + for a, b in zip(labels, B.get('ANALOG.LABELS').string_array): assert a == b, "Label missmatch" diff --git a/test/verify.py b/test/verify.py index dfe63e4..c543cad 100644 --- a/test/verify.py +++ b/test/verify.py @@ -130,7 +130,7 @@ def check_zipfile(file_path): print('----------------------------') print(type(self)) print('----------------------------') - if len(np.shape(self.zip_files)) == 1: + if len(np.array(self.zip_files, dtype=object).shape) == 1: for file in self.zip_files: check_zipfile(file) else: From aa0eb1816fce5fc61fb8cb2f419759e2e9d7c38e Mon Sep 17 00:00:00 2001 From: MattiasF Date: Wed, 28 Dec 2022 16:32:56 +0100 Subject: [PATCH 08/13] Verify point frame count --- test/test_software_examples_write_read.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/test/test_software_examples_write_read.py b/test/test_software_examples_write_read.py index 40c08d5..2728deb 100644 --- a/test/test_software_examples_write_read.py +++ b/test/test_software_examples_write_read.py @@ -58,7 +58,7 @@ def create_dummy_writer(labels=None, frames=1): writer.add_frames((np.random.randn(len(labels), 5), ())) elif frames > 1: new_frames = [] - for __ in range(5): + for __ in range(frames): new_frames.append((np.random.randn(len(labels), 5), ())) writer.add_frames(new_frames) @@ -138,7 +138,8 @@ def test_writing_single_point_frame(self): def test_writing_multiple_point_frame(self): """ Verify writing a file with a single frame. """ - writer = create_dummy_writer(frames=10) + num_frames = 11 + writer = create_dummy_writer(frames=num_frames) tmp_path = os.path.join(TEMP, 'single-point-frame.c3d') with open(tmp_path, 'wb') as h: @@ -146,6 +147,7 @@ def test_writing_multiple_point_frame(self): with open(tmp_path, 'rb') as handle: B = c3d.Reader(handle) + assert B.frame_count == num_frames, "Expected {} point frames was {}".format(num_frames, B.frame_count) verify.equal_headers("test_writing_multiple_point_frame", writer, B, "Original", "WriteRead", True, True) def test_writing_analog_frames(self): From a46d31f58bddc675551905324dd3ea789f75cf14 Mon Sep 17 00:00:00 2001 From: MattiasF Date: Wed, 28 Dec 2022 17:32:06 +0100 Subject: [PATCH 09/13] Write multiple parameter blocks --- c3d/c3d.py | 26 +++++++++---- test/test_software_examples_write_read.py | 47 +++++++++++++++++++++-- 2 files changed, 61 insertions(+), 12 deletions(-) diff --git a/c3d/c3d.py b/c3d/c3d.py index 88c9f40..91d1ba9 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -985,7 +985,9 @@ def add_param(self, name, **kwargs): name = name.upper() if name in self._params: raise KeyError('Parameter already exists with key {}'.format(name)) - self._params[name] = Param(name, self._dtypes, **kwargs) + param = Param(name, self._dtypes, **kwargs) + self._params[name] = param + return param def remove_param(self, name): '''Remove the specified parameter. @@ -1679,6 +1681,10 @@ def seek_param_section_header(): # read the remaining bytes in the parameter section. bytes = self._handle.read(endbyte - self._handle.tell()) else: + if offset_to_next - 2 < -1: + raise ValueError( + "Corrupt file with invalid offset written to file. Attempted to read parameter " + "with name {} and {} bytes".format(name, offset_to_next - 2)) bytes = self._handle.read(offset_to_next - 2) buf = io.BytesIO(bytes) @@ -1927,13 +1933,13 @@ def add_param(self, name, desc='', bytes_per_element=1, bytes=b'', dimensions=No last_byte = first_byte + elem_per_dim * 255 bytes_param = bytes[first_byte:last_byte] # Determine shape - dimensions_param = dimensions.copy() + dimensions_param = list(dimensions) # Create a list copy (assignable) dimensions_param[-1] = min(255, dimensions_param[-1] - index * 255) - self.group.add_param(name_param, - desc=desc, - bytes_per_element=bytes_per_element, - bytes=bytes_param, - dimensions=dimensions_param) + param = self.group.add_param(name_param, + desc=desc, + bytes_per_element=bytes_per_element, + bytes=bytes_param, + dimensions=dimensions_param) # # Add decorator functions (throws on overwrite) @@ -1970,7 +1976,7 @@ def add_array(self, name, desc, data, dtype=None): self.add_param(name, desc=desc, bytes_per_element=dtype.itemsize, - bytes=data, + bytes=data.flatten(), dimensions=data.shape) def add_str(self, name, desc, data, *dimensions): @@ -2562,6 +2568,10 @@ def _write_metadata(self, handle): assert handle.tell() == 512 # Groups + num_param_blocks = self.parameter_blocks() + if num_param_blocks > 255: + raise RuntimeError("To much data stored in parameter blocks. Maximum number of blocks is 255, " + "current file contains {} blocks".format(num_param_blocks)) handle.write(struct.pack( 'BBBB', 0, 0, self.parameter_blocks(), PROCESSOR_INTEL)) for group_id, group in self.group_listed(): diff --git a/test/test_software_examples_write_read.py b/test/test_software_examples_write_read.py index 2728deb..0f25396 100644 --- a/test/test_software_examples_write_read.py +++ b/test/test_software_examples_write_read.py @@ -226,7 +226,7 @@ def test_write_mismatching_frames(self): with self.assertRaises(ValueError): writer.add_frames((np.random.randn(6, 5), np.random.randn(13, writer.analog_per_frame - 1),)) - def test_write_long_param(self): + def test_write_long_str_param(self): writer = create_dummy_writer() grp = writer.add_group(66, "UnittestGroup", "Generated for unittest purposes") @@ -239,22 +239,61 @@ def test_write_long_param(self): grp.add_str("LongString", "String spanning %i parameters." % num_param, data) - tmp_path = os.path.join(TEMP, 'long_parameter.c3d') + tmp_path = os.path.join(TEMP, 'long_str_parameter.c3d') with open(tmp_path, 'wb') as h: writer.write(h) with open(tmp_path, 'rb') as handle: B = c3d.Reader(handle) - verify.equal_headers("test_write_long_param", writer, B, "Original", "WriteRead", True, True) + verify.equal_headers("test_write_long_str_param", writer, B, "Original", "WriteRead", True, True) + str_index = 0 for index in range(0, num_param): postfix = "" if index == 0 else str(index + 1) param_name = "UnittestGroup.LongString" + postfix agrp = writer.get(param_name) bgrp = B.get(param_name) - assert np.array_equal(agrp.string_array, bgrp.string_array), "Expected string data to match" + # Verify string parameter matches initial input and between read/write + read_data = bgrp.string_array + assert np.array_equal(agrp.string_array, read_data), "Expected string data to match" + assert np.array_equal(data[str_index:str_index + len(read_data)], read_data), "Expected string data to match" + str_index += len(read_data) + + def test_write_long_float_param(self): + writer = create_dummy_writer() + grp = writer.add_group(66, "UnittestGroup", "Generated for unittest purposes") + + num_param = 10 + + data = np.random.randn(2, 2, 255 * num_param).astype(np.float32) + grp.add_array("LongFltArray", "Float array spanning {} parameters.".format(num_param), data) + # Fortran to C order for comparison + data = data.reshape(data.shape[::-1]) + + tmp_path = os.path.join(TEMP, 'long_flt_parameter.c3d') + with open(tmp_path, 'wb') as h: + writer.write(h) + + with open(tmp_path, 'rb') as handle: + B = c3d.Reader(handle) + + verify.equal_headers("test_write_long_float_param", writer, B, "Original", "WriteRead", True, True) + + prm_index = 0 + for index in range(0, num_param): + postfix = "" if index == 0 else str(index + 1) + param_name = "UnittestGroup.LongFltArray" + postfix + agrp = writer.get(param_name) + bgrp = B.get(param_name) + + # Verify string parameter matches initial input and between read/write + read_data = bgrp.float32_array + input_data = data[prm_index:prm_index + len(read_data)] + assert np.array_equal(agrp.float32_array, read_data), "Expected array to match between Reader and Writer" + assert np.array_equal(input_data, read_data), "Expected array data to match the input data" + prm_index += len(read_data) class Sample00(Base): From 9f8d7079e765fc1d31c87df25bd5bd4c79264c2c Mon Sep 17 00:00:00 2001 From: MattiasF Date: Wed, 28 Dec 2022 18:01:48 +0100 Subject: [PATCH 10/13] Changed % -> .format --- c3d/c3d.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/c3d/c3d.py b/c3d/c3d.py index 91d1ba9..e88dd1f 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -1927,7 +1927,7 @@ def add_param(self, name, desc='', bytes_per_element=1, bytes=b'', dimensions=No elem_per_dim = np.prod(dimensions[:-1]) for index in range(num_param): - name_param = name if index == 0 else "%s%i" % (name, index + 1) + name_param = name if index == 0 else "{}{}".format(name, index + 1) # Determine the byte array for the partial parameter first_byte = elem_per_dim * 255 * index last_byte = first_byte + elem_per_dim * 255 @@ -1984,7 +1984,7 @@ def add_str(self, name, desc, data, *dimensions): ''' if len(dimensions) == 0: if not is_iterable(data): - raise ValueError("Expected bytes or strings, was %s" % str(type(data))) + raise ValueError("Expected bytes or strings, was {}".format(str(type(data)))) if isinstance(data, str): # Single string entry dimensions = (len(data), ) @@ -2318,8 +2318,8 @@ def get_depth(data): # Verify frame rate matches for analog if len(analog_frame) and analog_shape[1] != self.analog_per_frame: raise ValueError("Expected analog frame to be a 2D array with the second " - "dimension matching the analog_frame_rate / point_frame_rate = " - "%i, was %i" % (self.analog_per_frame, analog_shape[1])) + "dimension matching the analog_frame_rate / point_frame_rate = " + + "{}, was {}".format(self.analog_per_frame, analog_shape[1])) # Verify frames added matches existing if len(self._frames): @@ -2328,13 +2328,13 @@ def get_depth(data): analog_shape = np.shape(analog_frame) if len(analog_shape) != 2 or analog_shape[0] != self.analog_used: raise ValueError("Expected analog frame to be a 2D array on form (analog_used, analog_per_frame), " - "was on form %s" % str(analog_shape)) + "was on form {}".format(str(analog_shape))) if len(point_frame): # If there are a point frame included, verify shape if len(point_shape) != 2 or point_shape[0] != self.point_used or point_shape[1] != 5: - raise ValueError("Expected point frame to be a 2D array on form " - "(%i, 5), was on form %s" % (self.point_used, str(point_shape))) + raise ValueError("Expected point frame to be a 2D array on form " + + "({}, 5), was on form {}".format(self.point_used, str(point_shape))) else: # Define the count in the header self._header.point_count = point_shape[0] From f2aba6c92eac84ec8731282a52badd4cade4c0fe Mon Sep 17 00:00:00 2001 From: MattiasF Date: Thu, 29 Dec 2022 22:55:08 +0100 Subject: [PATCH 11/13] bytes => bytes_data, add_param properly take only bytes input, cleanup --- c3d/c3d.py | 63 ++++++++++++----------- test/test_software_examples_write_read.py | 1 + 2 files changed, 33 insertions(+), 31 deletions(-) diff --git a/c3d/c3d.py b/c3d/c3d.py index e88dd1f..16b8233 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -525,7 +525,7 @@ class Param(object): column-major order. For arrays of strings, the dimensions here will be the number of columns (length of each string) followed by the number of rows (number of strings). - bytes : str + bytes_data : str Raw data for this parameter. handle : File handle positioned at the first byte of a .c3d parameter description. @@ -537,7 +537,7 @@ def __init__(self, desc='', bytes_per_element=1, dimensions=None, - bytes=b'', + bytes_data=b'', handle=None): '''Set up a new parameter, only the name is required.''' self.name = name @@ -545,9 +545,11 @@ def __init__(self, self.desc = desc self.bytes_per_element = bytes_per_element self.dimensions = dimensions or [] - self.bytes = bytes + self.bytes_data = bytes_data if handle: self.read(handle) + elif not isinstance(bytes_data, bytes): + raise ValueError("Expected `bytes_data` input to be a bytes object.") def __repr__(self): return ''.format(self.desc) @@ -555,11 +557,7 @@ def __repr__(self): @property def num_elements(self): '''Return the number of elements in this parameter's array value.''' - e = 1 - for d in self.dimensions: - e *= d - assert e == np.prod(self.dimensions), f"{self.dimensions}, {e}, {np.prod(self.dimensions)}" - return e + return int(np.prod(self.dimensions)) @property def total_bytes(self): @@ -596,8 +594,8 @@ def write(self, group_id, handle): handle.write(struct.pack('b', self.bytes_per_element)) handle.write(struct.pack('B', len(self.dimensions))) handle.write(struct.pack('B' * len(self.dimensions), *self.dimensions)) - if self.bytes is not None and len(self.bytes) > 0: - handle.write(self.bytes) + if self.bytes_data is not None and len(self.bytes_data) > 0: + handle.write(self.bytes_data) desc = self.desc.encode('utf-8') handle.write(struct.pack('B', len(desc))) handle.write(desc) @@ -612,22 +610,22 @@ def read(self, handle): dims, = struct.unpack('B', handle.read(1)) self.dimensions = [struct.unpack('B', handle.read(1))[ 0] for _ in range(dims)] - self.bytes = b'' + self.bytes_data = b'' if self.total_bytes: - self.bytes = handle.read(self.total_bytes) + self.bytes_data = handle.read(self.total_bytes) desc_size, = struct.unpack('B', handle.read(1)) self.desc = desc_size and self._dtypes.decode_string(handle.read(desc_size)) or '' def _as(self, dtype): '''Unpack the raw bytes of this param as a single value of the given data type.''' - return np.frombuffer(self.bytes, count=1, dtype=dtype)[0] + return np.frombuffer(self.bytes_data, count=1, dtype=dtype)[0] def _as_array(self, dtype, copy=True): '''Unpack the raw bytes of this param as an array of the given data type.''' assert self.dimensions, \ '{}: cannot get value as {} array!'.format(self.name, dtype) - buffer_view = np.frombuffer(self.bytes, dtype=dtype) + buffer_view = np.frombuffer(self.bytes_data, dtype=dtype) # Reverse shape as the shape is defined in fortran format buffer_view = buffer_view.reshape(self.dimensions[::-1]) if copy: @@ -711,12 +709,12 @@ def float_value(self): @property def bytes_value(self): '''Get the param as a raw byte string.''' - return self.bytes + return self.bytes_data @property def string_value(self): '''Get the param as a unicode string.''' - return self._dtypes.decode_string(self.bytes) + return self._dtypes.decode_string(self.bytes_data) @property def int8_array(self): @@ -766,7 +764,7 @@ def float32_array(self): # _as_array but for DEC assert self.dimensions, \ '{}: cannot get value as {} array!'.format(self.name, self._dtypes.float32) - return DEC_to_IEEE_BYTES(self.bytes).reshape(self.dimensions[::-1]) # Reverse fortran format + return DEC_to_IEEE_BYTES(self.bytes_data).reshape(self.dimensions[::-1]) # Reverse fortran format else: # is_ieee or is_mips return self._as_array(self._dtypes.float32) @@ -832,7 +830,7 @@ def bytes_array(self): if len(self.dimensions) == 0: return np.array([]) elif len(self.dimensions) == 1: - return np.array(self.bytes) + return np.array(self.bytes_data) else: # Convert Fortran shape (data in memory is identical, shape is transposed) word_len = self.dimensions[0] @@ -843,7 +841,7 @@ def bytes_array(self): for i in np.ndindex(*dims): # Calculate byte offset as sum of each array index times the byte step of each dimension. off = np.sum(np.multiply(i, byte_steps)) - byte_arr[i] = self.bytes[off:off+word_len] + byte_arr[i] = self.bytes_data[off:off+word_len] return byte_arr @property @@ -1909,16 +1907,18 @@ def __contains__(self, key): def group(self) -> Group: return self._decoratee - def add_param(self, name, desc='', bytes_per_element=1, bytes=b'', dimensions=None): + def add_param(self, name, desc='', bytes_per_element=1, bytes_data=b'', dimensions=None): """ Decorate the raw Group.add_param() function to split the inputs if the leading dimension is > 255. """ + if not isinstance(bytes_data, bytes): + raise ValueError("Expected `bytes_data` to be a bytes object was of type {}".format(type(bytes_data))) # Dimension must fit in a 8 bit unsigned int if len(dimensions) == 0 or dimensions[-1] < 255: # Forward self.group.add_param(name, desc=desc, bytes_per_element=bytes_per_element, - bytes=bytes, + bytes_data=bytes_data, dimensions=dimensions) return @@ -1929,16 +1929,17 @@ def add_param(self, name, desc='', bytes_per_element=1, bytes=b'', dimensions=No for index in range(num_param): name_param = name if index == 0 else "{}{}".format(name, index + 1) # Determine the byte array for the partial parameter - first_byte = elem_per_dim * 255 * index - last_byte = first_byte + elem_per_dim * 255 - bytes_param = bytes[first_byte:last_byte] + abpe = abs(bytes_per_element) # Take absolute for strings (bpe == -1) + first_byte = elem_per_dim * 255 * index * abpe + last_byte = first_byte + elem_per_dim * 255 * abpe + bytes_param = bytes_data[first_byte:last_byte] # Determine shape dimensions_param = list(dimensions) # Create a list copy (assignable) dimensions_param[-1] = min(255, dimensions_param[-1] - index * 255) param = self.group.add_param(name_param, desc=desc, bytes_per_element=bytes_per_element, - bytes=bytes_param, + bytes_data=bytes_param, dimensions=dimensions_param) # @@ -1955,7 +1956,7 @@ def add(self, name, desc, bpe, format, data, *dimensions): self.add_param(name, desc=desc, bytes_per_element=bpe, - bytes=data, + bytes_data=data, dimensions=list(dimensions)) def add_array(self, name, desc, data, dtype=None): @@ -1976,7 +1977,7 @@ def add_array(self, name, desc, data, dtype=None): self.add_param(name, desc=desc, bytes_per_element=dtype.itemsize, - bytes=data.flatten(), + bytes_data=data.flatten().tobytes(), dimensions=data.shape) def add_str(self, name, desc, data, *dimensions): @@ -1999,7 +2000,7 @@ def add_str(self, name, desc, data, *dimensions): self.add_param(name, desc=desc, bytes_per_element=-1, - bytes=data.encode('utf-8'), + bytes_data=data.encode('utf-8'), dimensions=list(dimensions)) def add_empty_array(self, name, desc, bytes_per_element=0): @@ -2014,7 +2015,7 @@ def add_empty_array(self, name, desc, bytes_per_element=0): # Set decorator functions (overwrites) # def set(self, name, *args, **kwargs): - ''' Add or overwrite a parameter with 'bytes' package formated in accordance with 'format'. + ''' Add or overwrite a parameter with 'bytes_data' package formated in accordance with 'format'. ''' try: self.remove_param(name) @@ -2540,7 +2541,7 @@ def write(self, handle): # sync parameter information to header. blocks = self.parameter_blocks() - self.get('POINT:DATA_START').bytes = struct.pack(' Date: Thu, 29 Dec 2022 23:04:10 +0100 Subject: [PATCH 12/13] Corrected bytes=>bytes_data in tests --- test/test_c3d.py | 2 +- test/test_parameter_accessors.py | 2 +- test/test_parameter_bytes_conversion.py | 48 ++++++++++++------------- 3 files changed, 26 insertions(+), 26 deletions(-) diff --git a/test/test_c3d.py b/test/test_c3d.py index 1ae41e3..7b4b252 100644 --- a/test/test_c3d.py +++ b/test/test_c3d.py @@ -45,7 +45,7 @@ def test_paramsb(self): for p in g.param_values(): if len(p.dimensions) == 0: val = None - width = len(p.bytes) + width = len(p.bytes_data) if width == 2: val = p.int16_value elif width == 4: diff --git a/test/test_parameter_accessors.py b/test/test_parameter_accessors.py index 5238b83..111b32d 100644 --- a/test/test_parameter_accessors.py +++ b/test/test_parameter_accessors.py @@ -59,7 +59,7 @@ def verify_add_parameter(self, N): for i in range(1, N): test_name = 'TEST_ADD_PARAM_%i' % i arr = self.rnd.uniform(*self.flt_range, size=self.shape).astype(np.float32) - self.group.add_param(test_name, bytes_per_element=4, dimensions=arr.shape, bytes=arr.T.tobytes()) + self.group.add_param(test_name, bytes_per_element=4, dimensions=arr.shape, bytes_data=arr.T.tobytes()) assert self.group.get(test_name) is not None, 'Added group does not exist.' self.assert_group_items() diff --git a/test/test_parameter_bytes_conversion.py b/test/test_parameter_bytes_conversion.py index 14d02c4..0b09a93 100644 --- a/test/test_parameter_bytes_conversion.py +++ b/test/test_parameter_bytes_conversion.py @@ -54,7 +54,7 @@ def test_a_param_float32(self): for i in range(ParameterValueTest.TEST_ITERATIONS): value = np.float32(self.rnd.uniform(*ParameterValueTest.RANGE_32_BIT)) bytes = struct.pack(' 5) - P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes=arr.T.tobytes()) + P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes_data=arr.T.tobytes()) arr_out = P.string_array assert arr.T.shape == arr_out.shape, "Mismatch in 'string_array' converted shape. Was %s, expected %s" %\ @@ -323,7 +323,7 @@ def test_k_parse_random_string_array(self): # 4 dims for wlen in range(10): arr, shape = genRndByteArr(wlen, [7, 5, 3], wlen > 5) - P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes=arr.T.tobytes()) + P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes_data=arr.T.tobytes()) arr_out = P.string_array assert arr.T.shape == arr_out.shape, "Mismatch in 'string_array' converted shape. Was %s, expected %s" %\ @@ -335,7 +335,7 @@ def test_k_parse_random_string_array(self): # 5 dims for wlen in range(10): arr, shape = genRndByteArr(wlen, [7, 6, 5, 3], wlen > 5) - P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes=arr.T.tobytes()) + P = c3d.Param('STRING_TEST', self.dtypes, bytes_per_element=-1, dimensions=shape, bytes_data=arr.T.tobytes()) arr_out = P.string_array assert arr.T.shape == arr_out.shape, "Mismatch in 'string_array' converted shape. Was %s, expected %s" %\ From 029b5664a769cddd2e8ddff78609ea69d37c8be5 Mon Sep 17 00:00:00 2001 From: MattiasF Date: Fri, 6 Jan 2023 15:07:17 +0100 Subject: [PATCH 13/13] Moved get_depth() and related tests to separate branch --- c3d/c3d.py | 67 +++-------------------- test/test_software_examples_write_read.py | 34 ------------ 2 files changed, 8 insertions(+), 93 deletions(-) diff --git a/c3d/c3d.py b/c3d/c3d.py index 16b8233..ea80051 100644 --- a/c3d/c3d.py +++ b/c3d/c3d.py @@ -2279,67 +2279,16 @@ def add_frames(self, frames, index=None): Insert the frame or sequence at the index (the first sequence frame will be inserted at give index). Note that the index should be relative to 0 rather then the frame number provided by read_frames()! ''' - def get_depth(data): - """ Find the number of nested arrays. - """ - if is_iterable(data): - try: - return get_depth(next(iter(data))) + 1 - except StopIteration as e: - return 1 # Empty iterable - return 0 - - # Attempt find for multi frame arguments - try: - depth = max(get_depth(frames[0][0]), get_depth(frames[0][1])) + 2 - except IndexError as e: - depth = 0 - if len(frames) == 2: - # Attempt find for single frame argument - depth = max(get_depth(frames[0]), get_depth(frames[1])) + 1 - - if depth == 3: - # Single frame + sh = np.array(frames, dtype=object).shape + # Single frame + if len(sh) != 2: frames = [frames] - elif depth < 3 or depth > 4: + sh = np.shape(frames) + # Sequence of invalid shape + if sh[1] != 2: raise ValueError( - 'Expected frame input to be sequence of point and analog data frame pairs on form (-1, 2). ' - 'Input was of shape {}.'.format(str(np.array(frames, dtype=object).shape))) - - # Ensure data frames are numpy arrays - frames = [ - (np.array(point), np.array(analog), ) for point, analog in frames - ] - - point_frame = frames[0][0] - analog_frame = frames[0][1] - point_shape = np.shape(point_frame) - analog_shape = np.shape(analog_frame) - - # Verify frame rate matches for analog - if len(analog_frame) and analog_shape[1] != self.analog_per_frame: - raise ValueError("Expected analog frame to be a 2D array with the second " - "dimension matching the analog_frame_rate / point_frame_rate = " + - "{}, was {}".format(self.analog_per_frame, analog_shape[1])) - - # Verify frames added matches existing - if len(self._frames): - if len(analog_frame): - # If there are a analog frame included, verify shape - analog_shape = np.shape(analog_frame) - if len(analog_shape) != 2 or analog_shape[0] != self.analog_used: - raise ValueError("Expected analog frame to be a 2D array on form (analog_used, analog_per_frame), " - "was on form {}".format(str(analog_shape))) - - if len(point_frame): - # If there are a point frame included, verify shape - if len(point_shape) != 2 or point_shape[0] != self.point_used or point_shape[1] != 5: - raise ValueError("Expected point frame to be a 2D array on form " + - "({}, 5), was on form {}".format(self.point_used, str(point_shape))) - else: - # Define the count in the header - self._header.point_count = point_shape[0] - self._header.analog_count = analog_shape[0] + 'Expected frame input to be sequence of point and analog pairs on form (-1, 2). ' + + 'Input was of shape {}.'.format(str(sh))) if index is not None: self._frames[index:index] = frames diff --git a/test/test_software_examples_write_read.py b/test/test_software_examples_write_read.py index 929f439..f457b5f 100644 --- a/test/test_software_examples_write_read.py +++ b/test/test_software_examples_write_read.py @@ -191,40 +191,6 @@ def test_writing_analog_frames(self): for a, b in zip(labels, B.get('ANALOG.LABELS').string_array): assert a == b, "Label missmatch" - - def test_write_invalid_analog_frame_count(self): - """ Verify error is thrown when passing a single analog frame with invalid frame rate. - """ - writer = c3d.Writer(point_rate=12, analog_rate=36) - - with self.assertRaises(ValueError): - writer.add_frames(((), np.random.randn(14, writer.analog_per_frame - 1),)) - - def test_write_mismatching_analog_frame_count(self): - """ Verify error is thrown when passing multiple analog frames with mismatching shapes. - """ - writer = c3d.Writer(point_rate=12, analog_rate=36) - - writer.add_frames(((), np.random.randn(14, writer.analog_per_frame),)) - with self.assertRaises(ValueError): - writer.add_frames(((), np.random.randn(13, writer.analog_per_frame),)) - with self.assertRaises(ValueError): - writer.add_frames(((), np.random.randn(13, writer.analog_per_frame - 1),)) - - def test_write_mismatching_frames(self): - """ Verify error is thrown when passing multiple frames with mismatching shapes. - """ - writer = c3d.Writer(point_rate=12, analog_rate=36) - - writer.add_frames((np.random.randn(6, 5), np.random.randn(14, writer.analog_per_frame),)) - with self.assertRaises(ValueError): - writer.add_frames((np.random.randn(222, 5), np.random.randn(14, writer.analog_per_frame),)) - with self.assertRaises(ValueError): - writer.add_frames((np.random.randn(5, 3), np.random.randn(14, writer.analog_per_frame),)) - with self.assertRaises(ValueError): - writer.add_frames((np.random.randn(6, 5), np.random.randn(13, writer.analog_per_frame),)) - with self.assertRaises(ValueError): - writer.add_frames((np.random.randn(6, 5), np.random.randn(13, writer.analog_per_frame - 1),)) def test_write_long_str_param(self): writer = create_dummy_writer()