Source code for slap2_utils.datafile

import os
import numpy as np
import re

from .subclasses.metadata import MetaData
from .utils.file_header import load_file_header_v2


[docs] class DataFile(): """ A class used to load a SLAP2 binary datafile for data processing. Attributes ---------- MAGIC_NUMBER : ``np.uint32`` A constant used to validate the data file. filename : str The name of the data file. metaDataFileName : str The name of the metadata file. datFileName : str The name of the data file with the '.dat' extension. rawData : list A list to store the raw data from the file. metaData : ``MetaData`` An instance of the MetaData class to handle metadata. StreamId : None A placeholder for stream ID. header : None A placeholder for the file header. lineHeaderIdxs : None A placeholder for the line header indices. fastZs : list A list to store fast Z values. zPixelReplacementMaps : None A placeholder for the Z pixel replacement maps. lineNumSuperPixels : None A placeholder for the number of super pixels per line. lineSuperPixelIDs : list A list to store super pixel IDs for each line. lineSuperPixelZIdxs : list A list to store Z indices for each line's super pixels. lineDataNumElements : list A list to store the number of elements in each line of data. lineDataStartIdxs : list A list to store the start indices of each line of data. numCycles : int The number of cycles in the data file. Descriptions for methods: -------------------------- Methods ---------- __init__() : Initialize the fields with values and call the _load_file() metbod. Return ------- Self with populated fields. Methods ---------- _load_file() : Loads the data and metadata files, and populates the attributes. Has no input besides the self object. Return ------- Self with populated fields. This includes filename, metadataFileName, datFileName, fastZs, lineSuperPixelZIdxs, lineSuperPixelIDS, zPixelReplacementMap2, lineNumSuperPixels, lineSuperPixelIDs, lineFastZIdxs, rawData and header. Methods ---------- load_file_header(rawData) : Loads the file header and validates the data format. Needs rawData as input with the self object. Return ------- Return header and edited self with populated fields, including lineDataStartIdxs, lineDataNumElements, lineDataNumElements """ def __init__(self, datfile): self.MAGIC_NUMBER = np.uint32(322379495) self.filename = datfile self.metaDataFileName = '' self.datFileName = '' self.rawData = [] self.metaData = None self.StreamId = None self.header = None self.lineHeaderIdxs = None self.fastZs = None self.zPixelReplacementMaps = None self.lineNumSuperPixels = None self.lineSuperPixelIDs = [] self.lineSuperPixelZIdxs = [] self.lineDataNumElements = [] self.lineDataStartIdxs = [] self.numCycles=0 self._load_file() def _load_file(self): # Loading file names and check whether file is found base_dir, filename = os.path.split(self.filename) n_base = re.sub(r'-TRIAL\d+','',os.path.splitext(filename)[0],flags=re.IGNORECASE) self.metaDataFileName = os.path.join(base_dir, n_base + '.meta') self.datFileName = os.path.join(base_dir, filename) if not os.path.isfile(self.metaDataFileName): raise FileNotFoundError('Metadata file not found.') self.metaData = MetaData(self.metaDataFileName) # Subfunction of loading parse plan def load_parse_plan(self, metaData): def filter_z_pixel_replacement_maps(z_maps): # Using list comprehension for simplified logic return [list(filter(lambda x: x[0] != x[1], map_)) for map_ in z_maps] fastz = metaData.AcquisitionContainer.ParsePlan['zs'][:] final_fastz = [-10203] #initiating with a value thats never found for i in fastz: final_fastz.append(i[0]) self.fastZs = final_fastz #Check if it breaks self.lineSuperPixelZIdxs = metaData.AcquisitionContainer.ParsePlan['acqParsePlan']['sliceIdx'] self.lineSuperPixelIDs = metaData.AcquisitionContainer.ParsePlan['acqParsePlan']['superPixelID'] self.zPixelReplacementMaps = metaData.AcquisitionContainer.ParsePlan['pixelReplacementMaps'] #Using list comprehension for simplified logic self.lineNumSuperPixels = [len(ids) for ids in self.lineSuperPixelIDs] self.lineFastZIdxs = np.zeros(len(self.lineSuperPixelZIdxs)) for lineIdx in range(len(self.lineSuperPixelZIdxs)): lineZIdxs_=self.lineSuperPixelZIdxs[lineIdx] if len(lineZIdxs_) != 1: self.lineFastZIdxs[lineIdx] = 0 else: self.lineFastZIdxs[lineIdx] = lineZIdxs_[0][0] + 1 # Add additional attributes from the MetaData file load_parse_plan(self, self.metaData) if not os.path.isfile(self.datFileName): raise FileNotFoundError('Data file not found.') self.rawData = np.memmap(self.filename, dtype='uint32') self.header = self.load_file_header(self.rawData) # Function for loading file header: def load_file_header(self, rawData): raw_data = np.frombuffer(rawData, dtype=np.uint32) if raw_data.dtype != 'uint32': raw_data.dtype('uint32') # See if magic number matches, returns an error if that is not the case file_magic_number = raw_data[0] assert file_magic_number == self.MAGIC_NUMBER, 'Data format error. This is not a SLAP2 data file.' # Currently, we have only implemented file version 2 file_format_version = raw_data[1] assert file_format_version <= 2, 'Unknown format version' if file_format_version == 2: header, self.numCycles = load_file_header_v2(self, raw_data) else: raise ValueError(f'Unknown file format version: {file_format_version}') # Load Indices raw_data = np.frombuffer(raw_data, dtype=np.uint16) if raw_data.dtype != 'uint16': raw_data.astype('uint16') # Load different fields in the object accordingly line_idxs = np.zeros(int(header['linesPerCycle']), dtype=int) line_size_bytes = np.zeros(int(header['linesPerCycle']), dtype=np.uint32) line_idxs[0] = header['firstCycleOffsetBytes'] // 2 + 1 line_size_bytes[0] = raw_data[line_idxs[0]-1] for idx in range(1, int(header['linesPerCycle'])): line_idxs[idx] = line_idxs[idx - 1] + line_size_bytes[idx - 1] // 2 line_size_bytes[idx] = raw_data[line_idxs[idx]-1] line_header_idxs = line_idxs self.lineDataStartIdxs = line_idxs + header['lineHeaderSizeBytes'] // 2 self.lineDataNumElements = (line_size_bytes - header['lineHeaderSizeBytes']) // 2 self.lineDataNumElements = [int(x) for x in self.lineDataNumElements] self.lineDataStartIdxs = [int(x) for x in self.lineDataStartIdxs] # May need to update this conditional in the future #if not 'referenceTimestamp' in list(header.keys()): # header.referenceTimestamp = np.uint64(0) # first_line_header = self.get_line_header(1, 1) # self.header.referenceTimestamp = first_line_header.timestamp #first_line_header = obj.get_line_header(1, 1) #obj.header.referenceTimestamp = first_line_header.timestamp return header def getLineData(self, lineIndices, cycleIndices, iChannel=None, method="Cython"): """Get line data from the data file. Parameters ---------- lineIndices : array_like Vector of positive integers specifying line indices (matlab indexing) cycleIndices : array_like Vector of positive integers specifying cycle indices (matlab indexing) iChannel : array_like, optional Vector of positive integers specifying channel indices (matlab indexing) (default is all channels) method : str, optional Method to use for data retrieval, either "Cython" or "Python" or "Numpy" (default is "Cython") Returns ------- lineData : list Line data for the specified indices """ # Default to all channels if not specified if iChannel is None: iChannel = np.arange(1, self.header['numChannels'] + 1) lineIndices = np.asarray(lineIndices, dtype=np.int64).ravel() cycleIndices = np.asarray(cycleIndices, dtype=np.int64).ravel() iChannel = np.asarray(iChannel, dtype=np.int64).ravel() if not np.all(lineIndices > 0) or not np.all(cycleIndices > 0) or not np.all(iChannel > 0): raise ValueError("All indices must be positive integers (Matlab indexing)") if np.any(lineIndices > self.header['linesPerCycle']): raise ValueError("Line indices must be <= linesPerCycle") if np.any(cycleIndices > self.numCycles): raise ValueError("Cycle indices must be <= numCycles") if len(cycleIndices) != len(lineIndices): raise ValueError("Number of cycle indices must match number of line indices") if np.any(iChannel > self.header['numChannels']): raise ValueError("Channel indices must be <= numChannels") hMemmap = np.memmap(self.datFileName, dtype='int16', mode='r') try: try: from .fast_line_data import fast_get_line_data lineData = fast_get_line_data( hMemmap, self.lineDataNumElements, self.lineDataStartIdxs, int(self.header['numChannels']), int(self.header['firstCycleOffsetBytes']), int(self.header['bytesPerCycle']), lineIndices, cycleIndices, iChannel ) except ImportError: lineData = [] for idx in range(len(lineIndices)): samples_per_channel = int(self.lineDataNumElements[lineIndices[idx]-1]) // int(self.header['numChannels']) tmpData = np.empty((samples_per_channel, len(iChannel)), dtype=np.int16) precomputed_offset = (self.lineDataStartIdxs[lineIndices[idx]-1] + (cycleIndices[idx] - 1) * int(self.header['bytesPerCycle']) // 2) for ch in range(len(iChannel)): offset = samples_per_channel * (iChannel[ch]-1) + precomputed_offset - 1 tmpData[:, ch] = hMemmap[offset:offset + samples_per_channel] lineData.append(tmpData) finally: if hasattr(hMemmap, '_mmap'): hMemmap._mmap.close() del hMemmap return lineData