diff --git a/element_interface/intan_loader.py b/element_interface/intan_loader.py new file mode 100644 index 0000000..7ce304a --- /dev/null +++ b/element_interface/intan_loader.py @@ -0,0 +1,287 @@ +import numpy as np +import os, sys, struct +from pathlib import Path +import matplotlib.pyplot as plt + + +def read_qstring(fid): + """Read Qt style QString. + + The first 32-bit unsigned number indicates the length of the string (in bytes). + If this number equals 0xFFFFFFFF, the string is null. + + Strings are stored as unicode. + """ + + (length,) = struct.unpack(" (os.fstat(fid.fileno()).st_size - fid.tell() + 1): + print(length) + raise Exception("Length too long.") + + # convert length from bytes to 16-bit Unicode words + length = int(length / 2) + + data = [] + for i in range(0, length): + (c,) = struct.unpack("= (3, 0): + a = "".join([chr(c) for c in data]) + else: + a = "".join([unichr(c) for c in data]) + + return a + + +def read_header(fid): + """Reads the Intan File Format header from the given file.""" + + # Check 'magic number' at beginning of file to make sure this is an Intan + # Technologies RHD2000 data file. + (magic_number,) = struct.unpack(" 0) and (signal_group_enabled > 0): + for signal_channel in range(0, signal_group_num_channels): + new_channel = { + "port_name": signal_group_name, + "port_prefix": signal_group_prefix, + "port_number": signal_group, + } + new_channel["native_channel_name"] = read_qstring(fid) + new_channel["custom_channel_name"] = read_qstring(fid) + ( + new_channel["native_order"], + new_channel["custom_order"], + signal_type, + channel_enabled, + new_channel["chip_channel"], + command_stream, + new_channel["board_stream"], + ) = struct.unpack( + ">> rhs_data = load_rhs("/home/inbox/organoids21/032520_US_885kHz_sham", file_expr="amp*dat") + + # Plot data + >>> plt.plot(rhs_data["time"], rhs_data["recordings"]["amp-B-000.dat"]) + >>> plt.xlabel("Time (s)") + >>> plt.ylabel("Reading") + >>> plt.show() + + Args: + folder (str): Folder that contains info.rhs, time.dat, and *.dat files + file_expr (str): regex pattern of the file names to be read. + + Returns: + rhs_data (dict): RHS data. + rhs_data["header"] (dict): Header. + rhs_data["recordings"] (dict): Readings from various files + rhs_data["timestamps"] (np.array_like): Relative timestamps in seconds. + """ + + header_filepath = next(Path(folder).glob("info.rhs")) + with open(header_filepath, "rb") as fid: + header = read_header(fid) + + time_file = next(Path(folder).glob("time.dat")) + + timestamps = ( + np.memmap(time_file, dtype=np.int32) + / header["frequency_parameters"]["amplifier_sample_rate"] + ) + + rhs_data = dict(header=header, timestamps=timestamps, recordings={}) + + file_paths = Path(folder).glob(file_expr) + file_paths = [x for x in file_paths if x.as_posix() != "time.dat"] + + for file_path in file_paths: + file_path = file_path.as_posix() + if "amp" in file_path: + signal = np.memmap(file_path, dtype=np.int16) + signal = signal * 0.195 # Convert to microvolts + elif "board-ANALOG-IN" in file_path or "board-ANALOG-OUT" in file_path: + signal = np.memmap(file_path, dtype=np.uint16) + signal = (signal - 32768) * 0.0003125 # Convert to volts + elif "dc-" in file_path: + signal = np.memmap(file_path, dtype=np.uint16) + signal = (signal - 512) * 19.23 # Convert to milivolts + elif "board-DIGITAL-IN" in file_path or "board-DIGITAL-OUT" in file_path: + signal = np.memmap(file_path, dtype=np.uint16) + elif "stim-" in file_path: + data = np.memmap(file_path, dtype=np.uint16) + i = np.bitwise_and(data, 255) * header["stim_step_size"] + sign = (128 - np.bitwise_and(data, 255)) / 128 + signal = i * sign + rhs_data["recordings"][Path(file_path).relative_to(folder).stem] = signal + + return rhs_data