Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 86 additions & 34 deletions neo/rawio/blackrockrawio.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,13 +323,25 @@ def _parse_header(self):
self._nsx_basic_header = {}
self._nsx_ext_header = {}
self._nsx_data_header = {}
self._nsx_sampling_frequency = {}

# Read headers
for nsx_nb in self._avail_nsx:
spec_version = self._nsx_spec[nsx_nb] = self._extract_nsx_file_spec(nsx_nb)
# read nsx headers
nsx_header_reader = self._nsx_header_reader[spec_version]
self._nsx_basic_header[nsx_nb], self._nsx_ext_header[nsx_nb] = nsx_header_reader(nsx_nb)

Copy link
Contributor Author

@h-mayorquin h-mayorquin Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is to better document how the sampling frequency of each stream is calculated according to the spec.

# The Blackrock defines period as the number of 1/30_000 seconds between data points
# E.g. it is 1 for 30_000, 3 for 10_000, etc
nsx_period = self._nsx_basic_header[nsx_nb]["period"]
Comment on lines +336 to +337
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this 30_000 true for all versions? I vaguely remember that different versions had different resolutions but maybe I'm wrong?

And seems like this was hardcoded the same way below before.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's how the spec defines it and the history of the spec does not mention changes on that value.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me. I just couldn't remember.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can confirm, the nominal base rate has always been 30 kHz, and this is the rate used by sample groups 5 and 6. Other sample groups are just sub-sampling this base rate (hopefully with anti-aliasing built in).

sampling_rate = 30_000.0 / nsx_period
self._nsx_sampling_frequency[nsx_nb] = float(sampling_rate)


# Parase data packages
for nsx_nb in self._avail_nsx:

# The only way to know if it is the Precision Time Protocol of file spec 3.0
# is to check for nanosecond timestamp resolution.
is_ptp_variant = (
Expand Down Expand Up @@ -386,7 +398,8 @@ def _parse_header(self):
self._match_nsx_and_nev_segment_ids(nsx_nb)

self.nsx_datas = {}
self.sig_sampling_rates = {}
# Keep public attribute for backward compatibility but let's use the private one and maybe deprecate this at some point
self.sig_sampling_rates = {nsx_number: self._nsx_sampling_frequency[nsx_number] for nsx_number in self.nsx_to_load}
if len(self.nsx_to_load) > 0:
for nsx_nb in self.nsx_to_load:
basic_header = self._nsx_basic_header[nsx_nb]
Expand All @@ -403,8 +416,7 @@ def _parse_header(self):
_data_reader_fun = self._nsx_data_reader[spec_version]
self.nsx_datas[nsx_nb] = _data_reader_fun(nsx_nb)

sr = float(self.main_sampling_rate / basic_header["period"])
self.sig_sampling_rates[nsx_nb] = sr
sr = self._nsx_sampling_frequency[nsx_nb]

if spec_version in ["2.2", "2.3", "3.0"]:
ext_header = self._nsx_ext_header[nsx_nb]
Expand Down Expand Up @@ -473,7 +485,7 @@ def _parse_header(self):
length = self.nsx_datas[nsx_nb][data_bl].shape[0]
if self._nsx_data_header[nsx_nb] is None:
t_start = 0.0
t_stop = max(t_stop, length / self.sig_sampling_rates[nsx_nb])
t_stop = max(t_stop, length / self._nsx_sampling_frequency[nsx_nb])
else:
timestamps = self._nsx_data_header[nsx_nb][data_bl]["timestamp"]
if hasattr(timestamps, "size") and timestamps.size == length:
Expand All @@ -482,7 +494,7 @@ def _parse_header(self):
t_stop = max(t_stop, timestamps[-1] / ts_res + sec_per_samp)
else:
t_start = timestamps / ts_res
t_stop = max(t_stop, t_start + length / self.sig_sampling_rates[nsx_nb])
t_stop = max(t_stop, t_start + length / self._nsx_sampling_frequency[nsx_nb])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this be explained to me why would the length lead to being longer than the the t_stop itself? Maybe I need to read the whole reader to understand this, but if there is a two second explanation that would be great!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is overly-defensive programming to say that t_stop should be larger than 0, check the whole block:

                t_stop = 0.0
                for nsx_nb in self.nsx_to_load:
                    spec = self._nsx_spec[nsx_nb]
                    if "timestamp_resolution" in self._nsx_basic_header[nsx_nb].dtype.names:
                        ts_res = self._nsx_basic_header[nsx_nb]["timestamp_resolution"]
                    elif spec == "2.1":
                        ts_res = self._nsx_params[spec](nsx_nb)["timestamp_resolution"]
                    else:
                        ts_res = 30_000
                    period = self._nsx_basic_header[nsx_nb]["period"]
                    sec_per_samp = period / 30_000  # Maybe 30_000 should be ['sample_resolution']
                    length = self.nsx_datas[nsx_nb][data_bl].shape[0]
                    if self._nsx_data_header[nsx_nb] is None:
                        t_start = 0.0
                        t_stop = max(t_stop, length / self._nsx_sampling_frequency[nsx_nb])
                    else:
                        timestamps = self._nsx_data_header[nsx_nb][data_bl]["timestamp"]
                        if hasattr(timestamps, "size") and timestamps.size == length:
                            # FileSpec 3.0 with PTP -- use the per-sample timestamps
                            t_start = timestamps[0] / ts_res
                            t_stop = max(t_stop, timestamps[-1] / ts_res + sec_per_samp)
                        else:
                            t_start = timestamps / ts_res
                            t_stop = max(t_stop, t_start + length / self._nsx_sampling_frequency[nsx_nb])

writing 0 directly would have been more readable than defining the variable though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree because we define t_stop in all branches right? So we don't need to guard with an initial t_stop. That was definitely my confusion. We could always update that later. It's not crucial.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's do it now, we both already read and understood and we think this could be improved.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forget about it, the loop above is properly sort of aligning the t_stops across the nsx segments.

So, yes, let's improve this later.

self._sigs_t_starts[nsx_nb].append(t_start)

if self._avail_files["nev"]:
Expand Down Expand Up @@ -1041,43 +1053,83 @@ def _read_nsx_dataheader_spec_v30_ptp(
filesize = self._get_file_size(filename)

data_header = {}
index = 0

if offset is None:
# This is read as an uint32 numpy scalar from the header so we transform it to python int
offset = int(self._nsx_basic_header[nsx_nb]["bytes_in_headers"])
header_size = int(self._nsx_basic_header[nsx_nb]["bytes_in_headers"])
else:
header_size = offset

ptp_dt = [
("reserved", "uint8"),
("timestamps", "uint64"),
("num_data_points", "uint32"),
("samples", "int16", self._nsx_basic_header[nsx_nb]["channel_count"]),
("samples", "int16", (self._nsx_basic_header[nsx_nb]["channel_count"],)),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This removes a warning from numpy, raw number as shapes are deprecate and will be removed:

FutureWarning: Passing (type, 1) or '1type' as a synonym of type is deprecated; in a future version of numpy, it will be understood as (type, (1,)) / '(1,)type'. npackets = int((filesize - offset) / np.dtype(ptp_dt).itemsize)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And based on our testing this change is also backward compatible or should we test this over a greater range of numpy versions than our current skinny IO tests?

]
npackets = int((filesize - offset) / np.dtype(ptp_dt).itemsize)
struct_arr = np.memmap(filename, dtype=ptp_dt, shape=npackets, offset=offset, mode="r")
npackets = int((filesize - header_size) / np.dtype(ptp_dt).itemsize)
struct_arr = np.memmap(filename, dtype=ptp_dt, shape=npackets, offset=header_size, mode="r")

if not np.all(struct_arr["num_data_points"] == 1):
# some packets have more than 1 sample. Not actually ptp. Revert to non-ptp variant.
return self._read_nsx_dataheader_spec_v22_30(nsx_nb, filesize=filesize, offset=offset)

# It is still possible there was a data break and the file has multiple segments.
# We can no longer rely on the presence of a header indicating a new segment,
# so we look for timestamp differences greater than double the expected interval.
_period = self._nsx_basic_header[nsx_nb]["period"] # 30_000 ^-1 s per sample
_nominal_rate = 30_000 / _period # samples per sec; maybe 30_000 should be ["sample_resolution"]
_clock_rate = self._nsx_basic_header[nsx_nb]["timestamp_resolution"] # clocks per sec
clk_per_samp = _clock_rate / _nominal_rate # clk/sec / smp/sec = clk/smp
seg_thresh_clk = int(2 * clk_per_samp)
seg_starts = np.hstack((0, 1 + np.argwhere(np.diff(struct_arr["timestamps"]) > seg_thresh_clk).flatten()))
for seg_ix, seg_start_idx in enumerate(seg_starts):
if seg_ix < (len(seg_starts) - 1):
seg_stop_idx = seg_starts[seg_ix + 1]
else:
seg_stop_idx = len(struct_arr) - 1
seg_offset = offset + seg_start_idx * struct_arr.dtype.itemsize
num_data_pts = seg_stop_idx - seg_start_idx
return self._read_nsx_dataheader_spec_v22_30(nsx_nb, filesize=filesize, offset=header_size)


# Segment data, at the moment, we segment, where the data has gaps that are longer
# than twice the sampling period.
sampling_rate = self._nsx_sampling_frequency[nsx_nb]
segmentation_threshold = 2.0 / sampling_rate

# The raw timestamps are the indices of an ideal clock that ticks at `timestamp_resolution` times per second.
# We convert this indices to actual timestamps in seconds
raw_timestamps = struct_arr["timestamps"]
timestamps_sampling_rate = self._nsx_basic_header[nsx_nb]["timestamp_resolution"] # clocks per sec uint64 or uint32
timestamps_in_seconds = raw_timestamps / timestamps_sampling_rate
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this division lead to any issues uint/float should just be float right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, let me double-check the casting rules to see if nasty surprises could appear.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think that the unsigned integers will be cast to float before dividing so I don't expect anything here, can you think on something?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No I didn't have anything in mind, but since we've dealt with so many random overflows lately especially on the Neo side with the changes in numpy 2.0 I'm just scared of a blind spot in array math :/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good, yes, I think this is safe


time_differences = np.diff(timestamps_in_seconds)
gap_indices = np.argwhere(time_differences > segmentation_threshold).flatten()
segment_starts = np.hstack((0, 1 + gap_indices))

# Report gaps if any are found
if len(gap_indices) > 0:
import warnings
threshold_ms = segmentation_threshold * 1000

# Calculate all gap details in vectorized operations
gap_durations_seconds = time_differences[gap_indices]
gap_durations_ms = gap_durations_seconds * 1000
gap_positions_seconds = timestamps_in_seconds[gap_indices] - timestamps_in_seconds[0]

# Build gap detail lines all at once
gap_detail_lines = [
f"| {index:>15,} | {pos:>21.6f} | {dur:>21.3f} |\n"
for index, pos, dur in zip(gap_indices, gap_positions_seconds, gap_durations_ms)
]

segmentation_report_message = (
f"\nFound {len(gap_indices)} gaps for nsx {nsx_nb} where samples are farther apart than {threshold_ms:.3f} ms.\n"
f"Data will be segmented at these locations to create {len(segment_starts)} segments.\n\n"
"Gap Details:\n"
"+-----------------+-----------------------+-----------------------+\n"
"| Sample Index | Sample at | Gap Jump |\n"
"| | (Seconds) | (Milliseconds) |\n"
"+-----------------+-----------------------+-----------------------+\n"
+ ''.join(gap_detail_lines) +
"+-----------------+-----------------------+-----------------------+\n"
)
warnings.warn(segmentation_report_message)

# Calculate all segment boundaries and derived values in one operation
segment_boundaries = list(segment_starts) + [len(struct_arr) - 1]
segment_num_data_points = [segment_boundaries[i+1] - segment_boundaries[i] for i in range(len(segment_starts))]

size_of_data_block = struct_arr.dtype.itemsize
segment_offsets = [header_size + pos * size_of_data_block for pos in segment_starts]

num_segments = len(segment_starts)
for segment_index in range(num_segments):
seg_offset = segment_offsets[segment_index]
num_data_pts = segment_num_data_points[segment_index]
seg_struct_arr = np.memmap(filename, dtype=ptp_dt, shape=num_data_pts, offset=seg_offset, mode="r")
data_header[seg_ix] = {
data_header[segment_index] = {
"header": None,
"timestamp": seg_struct_arr["timestamps"], # Note, this is an array, not a scalar
"nb_data_points": num_data_pts,
Expand All @@ -1089,7 +1141,7 @@ def _read_nsx_data_spec_v21(self, nsx_nb):
"""
Extract nsx data from a 2.1 .nsx file
"""
filename = ".".join([self._filenames["nsx"], f"ns{nsx_nb}"])
filename = f"{self._filenames['nsx']}.ns{nsx_nb}"

# get shape of data
shape = (
Expand Down Expand Up @@ -1132,13 +1184,13 @@ def _read_nsx_data_spec_v30_ptp(self, nsx_nb):
yielding a timestamp per sample. Blocks can arise
if the recording was paused by the user.
"""
filename = ".".join([self._filenames["nsx"], f"ns{nsx_nb}"])
filename = f"{self._filenames['nsx']}.ns{nsx_nb}"

ptp_dt = [
("reserved", "uint8"),
("timestamps", "uint64"),
("num_data_points", "uint32"),
("samples", "int16", self._nsx_basic_header[nsx_nb]["channel_count"]),
("samples", "int16", (self._nsx_basic_header[nsx_nb]["channel_count"],)),
]

data = {}
Expand All @@ -1161,7 +1213,7 @@ def _read_nev_header(self, ext_header_variants):
"""
Extract nev header information from a of specific .nsx header variant
"""
filename = ".".join([self._filenames["nev"], "nev"])
filename = f"{self._filenames['nev']}.nev"

# basic header
dt0 = [
Expand Down
Loading