Source code for nres.data

from __future__ import annotations

import warnings

import numpy as np
import pandas as pd

from nres import utils

warnings.filterwarnings("ignore")


[docs]class Data: """ A class for handling neutron transmission data, including reading counts data, calculating transmission, and plotting the results. Attributes: ----------- table : pandas.DataFrame or None A dataframe containing energy, transmission, and error values. tgrid : pandas.Series or None A time-of-flight grid corresponding to the time steps in the data. signal : pandas.DataFrame or None The signal counts data (tof, counts, err). openbeam : pandas.DataFrame or None The open beam counts data (tof, counts, err). L : float or None Distance (meters) used in the energy conversion from time-of-flight. tstep : float or None Time step (seconds) for converting time-of-flight to energy. is_grouped : bool Whether this Data object contains grouped data. groups : dict or None Dict mapping index -> table for grouped data. indices : list or None List of string indices for grouped data. group_shape : tuple or None Tuple (nx, ny) for 2D, (n,) for 1D, None for named groups. """ def __init__(self, **kwargs): """ Initializes the Data object with optional keyword arguments. Parameters: ----------- **kwargs : dict, optional Additional keyword arguments to set any instance-specific properties. """ self.table = None self.tgrid = None self.signal = None self.openbeam = None self.L = None self.tstep = None # Grouped data attributes self.is_grouped = False self.groups = None # Dict mapping index -> table self.indices = None # List of string indices self.group_shape = None # Tuple (nx, ny) for 2D, (n,) for 1D, None for named @classmethod def _read_counts(cls, filename="run2_graphite_00000/graphite.csv"): """ Reads the counts data from a CSV file and calculates errors if not provided. Parameters: ----------- filename : str, optional The path to the CSV file containing time-of-flight (tof) and counts data. Default is 'run2_graphite_00000/graphite.csv'. Returns: -------- df : pandas.DataFrame A DataFrame containing columns: 'tof', 'counts', and 'err'. Errors are calculated as the square root of counts if not provided in the file. """ df = pd.read_csv( filename, names=["tof", "counts", "err"], header=None, skiprows=1 ) # If no error values provided, calculate as sqrt of counts if all(df["err"].isnull()): df["err"] = np.sqrt(df["counts"]) # Store label from filename (without path and extension) df.attrs["label"] = filename.split("/")[-1].rstrip(".csv") return df
[docs] @classmethod def from_counts( cls, signal: str, openbeam: str, empty_signal: str = "", empty_openbeam: str = "", tstep: float = 1.56255e-9, L: float = 10.59, L0: float = 1.0, t0: float = 0.0, verbosity: int = 1, ): """ Creates a Data object from signal and open beam counts data, calculates transmission, and converts tof to energy. Parameters: ----------- signal : str Path to the CSV file containing the signal data (tof, counts, err). openbeam : str Path to the CSV file containing the open beam data (tof, counts, err). empty_signal : str, optional Path to the CSV file containing the empty signal data for background correction. Default is an empty string. empty_openbeam : str, optional Path to the CSV file containing the empty open beam data for background correction. Default is an empty string. tstep : float, optional Time step (seconds) for converting time-of-flight (tof) to energy. Default is 1.56255e-9. L : float, optional Distance (meters) used in the energy conversion from time-of-flight. Default is 10.59 m. L0 : float, optional Flight path scale factor from vary_tof optimization. Default is 1.0. Values > 1.0 indicate a longer path, < 1.0 a shorter path. t0 : float, optional Time offset correction in seconds from vary_tof optimization. Default is 0.0. Will be converted to TOF channel units internally using tstep. verbosity : int, optional Verbosity level. If 0, suppresses warnings from sqrt operations. Default is 1. Returns: -------- Data A Data object containing transmission and energy data. """ # Read signal and open beam counts signal = cls._read_counts(signal) openbeam = cls._read_counts(openbeam) # Apply L0 and t0 corrections # Note: signal["tof"] is in TOF channel units, t0 is in seconds # Convert t0 to TOF channel units by dividing by tstep dtof = (1.0 - L0) * signal["tof"] + t0 / tstep corrected_tof = signal["tof"] + dtof # Convert corrected tof to energy using provided time step and distance signal["energy"] = utils.time2energy(corrected_tof * tstep, L) # Calculate transmission and associated error # Suppress RuntimeWarnings from sqrt if verbosity is 0 import warnings with warnings.catch_warnings(): if verbosity == 0: warnings.simplefilter("ignore", RuntimeWarning) transmission = signal["counts"] / openbeam["counts"] err = transmission * np.sqrt( (signal["err"] / signal["counts"]) ** 2 + (openbeam["err"] / openbeam["counts"]) ** 2 ) # If background (empty) data is provided, apply correction if empty_signal and empty_openbeam: empty_signal = cls._read_counts(empty_signal) empty_openbeam = cls._read_counts(empty_openbeam) transmission *= empty_openbeam["counts"] / empty_signal["counts"] err = transmission * np.sqrt( (signal["err"] / signal["counts"]) ** 2 + (openbeam["err"] / openbeam["counts"]) ** 2 + (empty_signal["err"] / empty_signal["counts"]) ** 2 + (empty_openbeam["err"] / empty_openbeam["counts"]) ** 2 ) # Construct a dataframe for energy, transmission, and error df = pd.DataFrame( {"energy": signal["energy"], "trans": transmission, "err": err} ) # Set the label attribute from the signal file df.attrs["label"] = signal.attrs["label"] # Create and return the Data object self_data = cls() self_data.table = df self_data.tgrid = signal["tof"] self_data.signal = signal self_data.openbeam = openbeam self_data.L = L self_data.tstep = tstep # Store L0 and t0 values to indicate TOF correction was applied self_data.L0 = L0 self_data.t0 = t0 # Store empty signal/openbeam if provided (for proper rebinning) # Note: At this point, if background correction was applied, empty_signal # and empty_openbeam have been reassigned to DataFrames (line 148-149) if ( empty_signal is not None and empty_openbeam is not None and isinstance(empty_signal, pd.DataFrame) ): self_data.empty_signal = empty_signal self_data.empty_openbeam = empty_openbeam else: self_data.empty_signal = None self_data.empty_openbeam = None return self_data
def _normalize_index(self, index): """ Normalize index for group lookup. Converts tuples like (10, 20) to strings like "(10,20)" for consistent access. Accepts both "(10,20)" and "(10, 20)" string formats. Parameters: ----------- index : int, tuple, or str The index to normalize Returns: -------- str String representation of the index (tuples without spaces) """ if isinstance(index, tuple): # (10, 20) -> "(10,20)" (no spaces) return str(index).replace(" ", "") if isinstance(index, str): # Remove spaces from string if it looks like a tuple: "(10, 20)" -> "(10,20)" return index.replace(" ", "") # 5 -> "5" return str(index) def _parse_string_index(self, string_idx): """ Parse a string index back to its original form. "(10, 20)" -> (10, 20) "5" -> 5 "center" -> "center" Parameters: ----------- string_idx : str String representation of index Returns: -------- tuple, int, or str Original index form """ import ast try: # Try to parse as Python literal (for tuples and ints) parsed = ast.literal_eval(string_idx) return parsed except (ValueError, SyntaxError): # If parsing fails, it's a named string return string_idx @classmethod def _extract_indices_from_filenames(cls, filenames, pattern): """Extract indices from filenames based on pattern.""" import os import re indices = [] # Auto-detect pattern if needed if pattern == "auto": # Try common patterns test_name = os.path.basename(filenames[0]) # Try 2D patterns - look for _x or _y to avoid matching dimension specs like 16x16 match_2d = re.search(r"_x(\d+).*_y(\d+)", test_name, re.IGNORECASE) if not match_2d: # Try without underscores but with word boundaries match_2d = re.search( r"\bx(\d+)[_\s].*\by(\d+)", test_name, re.IGNORECASE ) if match_2d: pattern = "_x{x}_y{y}" else: # Try 1D patterns - look for trailing numbers or with keywords match_1d = re.search( r"(?:idx|pixel|det)[_\s]*(\d+)", test_name, re.IGNORECASE ) if not match_1d: # Try just trailing number before extension match_1d = re.search(r"_(\d+)\.", test_name) if match_1d: pattern = "idx{i}" else: # No pattern found - use filenames as indices (for named ROIs) pattern = "{name}" # Extract based on pattern for fname in filenames: basename = os.path.basename(fname) if "{x}" in pattern and "{y}" in pattern: # 2D grid pattern - try multiple patterns # First try with underscores match = re.search(r"_x(\d+).*_y(\d+)", basename, re.IGNORECASE) if not match: # Try with word boundaries match = re.search( r"\bx(\d+)[_\s].*\by(\d+)", basename, re.IGNORECASE ) if not match: # Try simple pattern as last resort match = re.search( r"(?<![\dx])x(\d+).*(?<![\dx])y(\d+)", basename, re.IGNORECASE ) if match: x, y = int(match.group(1)), int(match.group(2)) indices.append((x, y)) else: raise ValueError( f"Could not extract x,y coordinates from: {basename}. " f"Filename should contain _x<num> and _y<num> patterns." ) elif "{i}" in pattern: # 1D array pattern - try multiple approaches # First try with keywords match = re.search( r"(?:idx|pixel|det)[_\s]*(\d+)", basename, re.IGNORECASE ) if not match: # Try trailing number before extension match = re.search(r"_(\d+)\.", basename) if not match: # Last resort - any number in the filename (rightmost) matches = re.findall(r"(\d+)", basename) if matches: match = type( "obj", (object,), {"group": lambda self, n: matches[-1]} )() if match: indices.append(int(match.group(1))) else: raise ValueError(f"Could not extract index from: {basename}") elif "{name}" in pattern: # Named groups - use filename without extension name = os.path.splitext(basename)[0] indices.append(name) else: # Unknown pattern - use sequential indices.append(len(indices)) return indices @classmethod def _determine_group_shape(cls, indices): """Determine group shape and dimensionality from indices.""" # Check for empty indices (works with both lists and numpy arrays) if len(indices) == 0: return None, False, False first_idx = indices[0] # Check if 2D (tuples) if isinstance(first_idx, tuple) and len(first_idx) == 2: # 2D grid - use max coordinates + 1 to handle sparse grids xs = [idx[0] for idx in indices] ys = [idx[1] for idx in indices] return (max(ys) + 1, max(xs) + 1), True, False # Check if 1D (ints) if isinstance(first_idx, (int, int.__class__)): # 1D array return (len(indices),), False, True # Named indices (strings) return None, False, False
[docs] @classmethod def from_transmission(cls, filename: str): """ Creates a Data object directly from a transmission data file containing energy, transmission, and error values. Parameters: ----------- filename : str Path to the file containing the transmission data (energy, transmission, error) separated by whitespace. Returns: -------- Data A Data object with the transmission data loaded into a dataframe. """ df = pd.read_csv( filename, names=["energy", "trans", "err"], header=None, skiprows=0, sep=r"\s+", ) # Create Data object and assign the dataframe self_data = cls() self_data.table = df return self_data
[docs] @classmethod def from_grouped_arrays( cls, tof, trans, err, L: float, tstep: float, L0: float = 1.0, t0: float = 0.0, indices: list = None, ): """ Creates a Data object from grouped transmission arrays. This method is useful for creating grouped data from numpy arrays, such as data from imaging detectors or multi-sample measurements. Parameters: ----------- tof : array-like Time-of-flight bins (1D array). trans : array-like Transmission values. Shape: (n_groups, n_energy_bins) err : array-like Transmission uncertainties. Shape: (n_groups, n_energy_bins) L : float Flight path length in meters. tstep : float Time step in seconds for TOF to energy conversion. L0 : float, optional Flight path scale factor. Default is 1.0. t0 : float, optional Time offset in seconds. Default is 0.0. indices : list, optional List of group indices. Can be: - List of ints for 1D array: [0, 1, 2, ...] - List of tuples for 2D grid: [(0,0), (0,1), (1,0), ...] - List of strings for named groups: ['sample1', 'sample2', ...] If not provided, will use sequential integers. Returns: -------- Data A Data object with grouped data. Examples: --------- >>> # Create grouped data for 10 pixels with 100 energy bins each >>> tof = np.arange(1, 101) >>> trans_2d = np.random.rand(10, 100) >>> err_2d = trans_2d * 0.05 >>> data = Data.from_grouped_arrays(tof, trans_2d, err_2d, L=10.0, tstep=1e-6) """ tof = np.asarray(tof) trans = np.asarray(trans) err = np.asarray(err) # Validate shapes if trans.ndim != 2 or err.ndim != 2: raise ValueError( "trans and err must be 2D arrays (n_groups, n_energy_bins)" ) if trans.shape != err.shape: raise ValueError(f"Shape mismatch: trans {trans.shape} vs err {err.shape}") if len(tof) != trans.shape[1]: raise ValueError( f"TOF length {len(tof)} doesn't match trans shape {trans.shape}" ) n_groups, n_energy = trans.shape # Create indices if not provided if indices is None: indices = list(range(n_groups)) elif len(indices) != n_groups: raise ValueError( f"Number of indices ({len(indices)}) doesn't match number of groups ({n_groups})" ) # Determine group shape group_shape = cls._determine_group_shape(indices) # Create Data object self_data = cls() self_data.L = L self_data.tstep = tstep self_data.L0 = L0 self_data.t0 = t0 self_data.is_grouped = True self_data.indices = indices self_data.group_shape = group_shape self_data.grouped_trans = trans self_data.grouped_err = err # Convert TOF to energy energy = utils.time2energy(tof * tstep, L) # Create groups dictionary self_data.groups = {} for i, idx in enumerate(indices): table = pd.DataFrame( {"energy": energy, "trans": trans[i, :], "err": err[i, :]} ) table.attrs["label"] = str(idx) self_data.groups[idx] = table # Set main table to first group self_data.table = self_data.groups[indices[0]] return self_data
[docs] @classmethod def from_grouped( cls, signal, openbeam, empty_signal: str = "", empty_openbeam: str = "", tstep: float = 1.56255e-9, L: float = 10.59, L0: float = 1.0, t0: float = 0.0, pattern: str = "auto", indices: list = None, verbosity: int = 1, n_jobs: int = -1, ): """ Creates a Data object from grouped counts data using glob patterns. Supports 1D arrays, 2D grids, and named indices for spatially-resolved analysis. Parameters: ----------- signal : str Glob pattern for signal files (e.g., "archive/pixel_*.csv" or "data/grid_*_x*_y*.csv"). Can also be a folder path - all .csv files in the folder will be loaded. openbeam : str Glob pattern for openbeam files. Can also be a folder path. empty_signal : str, optional Glob pattern for empty signal files for background correction. empty_openbeam : str, optional Glob pattern for empty openbeam files for background correction. tstep : float, optional Time step (seconds) for converting time-of-flight to energy. Default is 1.56255e-9. L : float, optional Distance (meters) used in the energy conversion from time-of-flight. Default is 10.59 m. L0 : float, optional Flight path scale factor from vary_tof optimization. Default is 1.0. t0 : float, optional Time offset correction in seconds from vary_tof optimization. Default is 0.0. Will be converted to TOF channel units internally using tstep. pattern : str, optional Coordinate extraction pattern. Default is "auto" which tries common patterns: - "x{x}_y{y}" for 2D grids (e.g., "grid_x10_y20.csv") - "idx{i}" or "pixel_{i}" for 1D arrays Custom patterns can use {x}, {y}, {i}, or {name}. indices : list, optional If provided, use these indices instead of extracting from filenames. Can be list of ints (1D), list of tuples (2D), or list of strings (named). verbosity : int, optional Verbosity level. If >= 1, shows progress bar. Default is 1. n_jobs : int, optional Number of parallel jobs for loading files. Default is -1 (use all CPUs). Set to 1 for sequential loading. Returns: -------- Data A Data object with grouped data stored in self.groups. Examples: --------- # 2D grid from filenames like "pixel_x10_y20.csv" >>> data = Data.from_grouped("folder/pixel_*.csv", "folder_ob/pixel_*.csv") # 1D array with custom indices >>> data = Data.from_grouped( ... "data/det_*.csv", "data_ob/det_*.csv", indices=[0, 1, 2, 3] ... ) # Named groups >>> data = Data.from_grouped( ... "samples/*.csv", "ref/*.csv", indices=["sample1", "sample2"] ... ) """ import glob import os # Find all matching files (support folder input) if os.path.isdir(signal): signal_files = sorted(glob.glob(os.path.join(signal, "*.csv"))) else: signal_files = sorted(glob.glob(signal)) if os.path.isdir(openbeam): openbeam_files = sorted(glob.glob(os.path.join(openbeam, "*.csv"))) else: openbeam_files = sorted(glob.glob(openbeam)) if not signal_files: raise ValueError(f"No files found matching pattern: {signal}") if not openbeam_files: raise ValueError(f"No files found matching pattern: {openbeam}") if len(signal_files) != len(openbeam_files): raise ValueError( f"Mismatch: {len(signal_files)} signal files vs {len(openbeam_files)} openbeam files" ) # Handle empty beam files if provided empty_signal_files = [] empty_openbeam_files = [] use_single_empty = False # Flag for single empty file reuse if empty_signal and empty_openbeam: empty_signal_files = sorted(glob.glob(empty_signal)) empty_openbeam_files = sorted(glob.glob(empty_openbeam)) # Allow single empty file to be reused for all groups if len(empty_signal_files) == 1 and len(empty_openbeam_files) == 1: use_single_empty = True elif len(empty_signal_files) != len(signal_files) or len( empty_openbeam_files ) != len(signal_files): raise ValueError( f"Empty file count mismatch: {len(empty_signal_files)} empty signal, " f"{len(empty_openbeam_files)} empty openbeam vs {len(signal_files)} signal files. " f"Provide either 1 empty file (reused for all) or one per signal file." ) # Extract or use provided indices if indices is not None: # Convert numpy arrays to list if isinstance(indices, np.ndarray): indices = indices.tolist() # User-provided indices if len(indices) != len(signal_files): raise ValueError( f"Number of indices ({len(indices)}) must match number of files ({len(signal_files)})" ) extracted_indices = indices else: # Auto-extract from filenames extracted_indices = cls._extract_indices_from_filenames( signal_files, pattern ) # Determine group dimensionality and shape BEFORE converting to strings group_shape, is_2d, is_1d = cls._determine_group_shape(extracted_indices) # Convert all indices to strings for consistent access # For 2D: (10, 20) -> "(10,20)" (no spaces) # For 1D: 5 -> "5" # For named: "center" -> "center" string_indices = [] for idx in extracted_indices: if isinstance(idx, tuple): # Convert tuple to string without spaces: "(10,20)" string_indices.append(str(idx).replace(" ", "")) elif isinstance(idx, str): string_indices.append(idx) # "center" else: string_indices.append(str(idx)) # "5" extracted_indices = string_indices # Create Data object self_data = cls() self_data.is_grouped = True self_data.indices = extracted_indices self_data.group_shape = group_shape self_data.groups = {} self_data.L = L self_data.tstep = tstep # Store L0 and t0 values to indicate TOF correction was applied self_data.L0 = L0 self_data.t0 = t0 # Helper function to load a single group def load_single_group(i, idx): """Load a single group's data files.""" sig_file = signal_files[i] ob_file = openbeam_files[i] # Handle empty files - use single file if available, otherwise per-group if use_single_empty: es_file = empty_signal_files[0] eo_file = empty_openbeam_files[0] else: es_file = empty_signal_files[i] if empty_signal_files else "" eo_file = empty_openbeam_files[i] if empty_openbeam_files else "" # Create individual Data object for this group group_data = cls.from_counts( signal=sig_file, openbeam=ob_file, empty_signal=es_file, empty_openbeam=eo_file, tstep=tstep, L=L, L0=L0, t0=t0, verbosity=verbosity, ) return idx, group_data.table # Load groups in parallel or sequentially if n_jobs == 1: # Sequential loading with progress bar if verbosity >= 1: try: from tqdm.auto import tqdm iterator = tqdm( enumerate(extracted_indices), total=len(extracted_indices), desc=f"Loading {len(extracted_indices)} groups", ) except ImportError: iterator = enumerate(extracted_indices) else: iterator = enumerate(extracted_indices) for i, idx in iterator: idx, table = load_single_group(i, idx) self_data.groups[idx] = table else: # Parallel loading from joblib import Parallel, delayed # Create progress bar if needed if verbosity >= 1: try: from tqdm.auto import tqdm pbar = tqdm( total=len(extracted_indices), desc=f"Loading {len(extracted_indices)} groups", ) except ImportError: pbar = None else: pbar = None # Load groups in parallel using threading backend # (threading is appropriate for I/O-bound file loading and avoids serialization issues) results = Parallel(n_jobs=n_jobs, prefer="threads")( delayed(load_single_group)(i, idx) for i, idx in enumerate(extracted_indices) ) # Store results for idx, table in results: self_data.groups[idx] = table if pbar is not None: pbar.update(1) if pbar is not None: pbar.close() # Set first group as default table for compatibility self_data.table = self_data.groups[extracted_indices[0]] return self_data
[docs] def plot(self, index=None, **kwargs): """ Plots the transmission data with error bars. Parameters: ----------- index : int, tuple, or str, optional For grouped data, specify which group to plot: - int: 1D array index - tuple: (x, y) for 2D grid - str: named index If None and data is grouped, plots first group. If None and data is not grouped, plots the main table. **kwargs : dict, optional Additional plotting parameters: - xlim : tuple, optional Limits for the x-axis (default: (0.5e6, 1e7)). - ylim : tuple, optional Limits for the y-axis (default: (0., 1.)). - ecolor : str, optional Error bar color (default: "0.8"). - xlabel : str, optional Label for the x-axis (default: "Energy [eV]"). - ylabel : str, optional Label for the y-axis (default: "Transmission"). - logx : bool, optional Whether to plot the x-axis on a logarithmic scale (default: True). Returns: -------- matplotlib.Axes The axes of the plot containing the transmission data. """ xlim = kwargs.pop("xlim", (0.5e6, 1e7)) ylim = kwargs.pop("ylim", (0.0, 1.0)) ecolor = kwargs.pop("ecolor", "0.8") xlabel = kwargs.pop("xlabel", "Energy [eV]") ylabel = kwargs.pop("ylabel", "Transmission") logx = kwargs.pop("logx", True) # Determine which table to plot and set label plot_label = kwargs.pop("label", None) if self.is_grouped: # For grouped data if index is None: # Default to first group index = self.indices[0] # Normalize index for lookup (supports tuple, int, or string access) normalized_index = self._normalize_index(index) if normalized_index not in self.groups: raise ValueError( f"Index {index} not found in groups. Available indices: {self.indices}" ) table_to_plot = self.groups[normalized_index] # Add index to label if not provided if plot_label is None: plot_label = f"Index {index}" else: # For non-grouped data if index is not None: raise ValueError("Cannot specify index for non-grouped data") table_to_plot = self.table # Plot the data with error bars ax = table_to_plot.dropna().plot( x="energy", y="trans", yerr="err", xlim=xlim, ylim=ylim, logx=logx, ecolor=ecolor, xlabel=xlabel, ylabel=ylabel, label=plot_label, **kwargs, ) # Add legend if label was set if plot_label is not None: ax.legend() return ax
[docs] def plot_map( self, emin=0.5e6, emax=20e6, emin2=None, emax2=None, logT=False, n_density=None, sigma=None, **kwargs, ): """ Plot transmission map averaged over energy range for grouped data. Parameters: ----------- emin : float, optional Minimum energy for averaging (default: 0.5e6 eV). emax : float, optional Maximum energy for averaging (default: 20e6 eV). emin2 : float, optional Minimum energy for second energy range. If provided with emax2, plots the ratio of transmissions: T(emin,emax) / T(emin2,emax2). emax2 : float, optional Maximum energy for second energy range. logT : bool, optional If True, plots -ln(T)/(n*sigma) as thickness estimate in cm (default: False). Requires n_density and sigma parameters. According to Beer's law T=exp(-n*sigma*d), this transformation gives an estimate of thickness d. n_density : float, optional Number density in atoms/cm^3 (required if logT=True). sigma : float, optional Average cross-section in barns (required if logT=True). **kwargs : dict, optional Additional plotting parameters: - cmap : str, optional Colormap for 2D maps (default: 'viridis'). - title : str, optional Plot title (default: auto-generated). - vmin, vmax : float, optional Color scale limits for 2D maps. - figsize : tuple, optional Figure size (width, height) in inches. Returns: -------- matplotlib.Axes The axes of the plot. Raises: ------- ValueError If called on non-grouped data, or if logT=True but n_density or sigma not provided. Examples: --------- >>> # For 2D grid data >>> data = Data.from_grouped("pixel_x*_y*.csv", "ob_x*_y*.csv") >>> data.plot_map(emin=1e6, emax=10e6) >>> # For 1D array data >>> data = Data.from_grouped("pixel_*.csv", "ob_*.csv") >>> data.plot_map(emin=0.5e6, emax=5e6) >>> # Plot thickness estimate >>> data.plot_map(emin=1e6, emax=10e6, logT=True, n_density=8.5e22, sigma=10.0) >>> # Plot transmission ratio >>> data.plot_map(emin=1e6, emax=5e6, emin2=5e6, emax2=10e6) """ import matplotlib.pyplot as plt import numpy as np if not self.is_grouped: raise ValueError("plot_map only works for grouped data") # Validate parameters if logT and (n_density is None or sigma is None): raise ValueError("logT=True requires both n_density and sigma parameters") if (emin2 is not None or emax2 is not None) and not ( emin2 is not None and emax2 is not None ): raise ValueError("Both emin2 and emax2 must be provided together") if logT and (emin2 is not None or emax2 is not None): raise ValueError( "Cannot use both logT and transmission ratio (emin2/emax2) simultaneously" ) # Extract kwargs cmap = kwargs.pop("cmap", "viridis") title = kwargs.pop("title", None) vmin = kwargs.pop("vmin", None) vmax = kwargs.pop("vmax", None) figsize = kwargs.pop("figsize", None) # Calculate average transmission for each group avg_trans = {} for idx in self.indices: table = self.groups[idx] mask = (table["energy"] >= emin) & (table["energy"] <= emax) avg_trans[idx] = table.loc[mask, "trans"].mean() # Calculate second transmission map if requested (for ratio) if emin2 is not None and emax2 is not None: avg_trans2 = {} for idx in self.indices: table = self.groups[idx] mask = (table["energy"] >= emin2) & (table["energy"] <= emax2) avg_trans2[idx] = table.loc[mask, "trans"].mean() # Calculate ratio for idx in self.indices: if avg_trans2[idx] != 0: avg_trans[idx] = avg_trans[idx] / avg_trans2[idx] else: avg_trans[idx] = np.nan # Apply logT transformation if requested if logT: # Convert sigma from barns to cm^2 (1 barn = 1e-24 cm^2) sigma_cm2 = sigma * 1e-24 for idx in self.indices: trans_val = avg_trans[idx] if trans_val > 0 and not np.isnan(trans_val): # -ln(T) / (n * sigma) = thickness in cm avg_trans[idx] = -np.log(trans_val) / (n_density * sigma_cm2) else: avg_trans[idx] = np.nan # Create visualization based on group_shape if self.group_shape and len(self.group_shape) == 2: # 2D pcolormesh for proper block sizing # Extract unique x and y coordinates by parsing string indices xs = [] ys = [] for idx_str in self.indices: idx = self._parse_string_index(idx_str) if isinstance(idx, tuple) and len(idx) == 2: xs.append(idx[0]) ys.append(idx[1]) xs = sorted(set(xs)) ys = sorted(set(ys)) # Calculate grid spacing (block size) x_spacing = xs[1] - xs[0] if len(xs) > 1 else 1 y_spacing = ys[1] - ys[0] if len(ys) > 1 else 1 # Create coordinate arrays including edges for pcolormesh # Add half-spacing to create cell edges x_edges = np.array(xs) - x_spacing / 2 x_edges = np.append(x_edges, xs[-1] + x_spacing / 2) y_edges = np.array(ys) - y_spacing / 2 y_edges = np.append(y_edges, ys[-1] + y_spacing / 2) # Create 2D array for values trans_array = np.full((len(ys), len(xs)), np.nan) # Map indices to array positions x_map = {x: i for i, x in enumerate(xs)} y_map = {y: i for i, y in enumerate(ys)} for idx_str in self.indices: idx = self._parse_string_index(idx_str) if isinstance(idx, tuple) and len(idx) == 2: x, y = idx if x in x_map and y in y_map: trans_array[y_map[y], x_map[x]] = avg_trans[idx_str] fig, ax = plt.subplots(figsize=figsize) im = ax.pcolormesh( x_edges, y_edges, trans_array, cmap=cmap, vmin=vmin, vmax=vmax, shading="flat", **kwargs, ) ax.set_xlabel("X coordinate") ax.set_ylabel("Y coordinate") ax.set_aspect("equal") # Set appropriate title and colorbar label based on mode if title is None: if logT: title = f"Thickness Estimate Map ({emin:.2g}-{emax:.2g} eV)" elif emin2 is not None: title = f"Transmission Ratio Map ({emin:.2g}-{emax:.2g})/{emin2:.2g}-{emax2:.2g} eV)" else: title = f"Average Transmission Map ({emin:.2g}-{emax:.2g} eV)" if logT: cbar_label = "Thickness [cm]" elif emin2 is not None: cbar_label = "Transmission Ratio" else: cbar_label = "Transmission" ax.set_title(title) plt.colorbar(im, ax=ax, label=cbar_label) return ax if self.group_shape and len(self.group_shape) == 1: # 1D line plot - parse string indices back to integers indices_array = np.array( [self._parse_string_index(idx) for idx in self.indices] ) trans_values = np.array([avg_trans[idx] for idx in self.indices]) fig, ax = plt.subplots(figsize=figsize) ax.plot(indices_array, trans_values, "o-", **kwargs) ax.set_xlabel("Pixel index") # Set appropriate ylabel and title based on mode if logT: ax.set_ylabel("Thickness [cm]") if title is None: title = f"Thickness Estimate ({emin:.2g}-{emax:.2g} eV)" elif emin2 is not None: ax.set_ylabel("Transmission Ratio") if title is None: title = f"Transmission Ratio ({emin:.2g}-{emax:.2g})/({emin2:.2g}-{emax2:.2g} eV)" else: ax.set_ylabel("Average Transmission") if title is None: title = f"Average Transmission ({emin:.2g}-{emax:.2g} eV)" ax.set_title(title) ax.grid(True, alpha=0.3) return ax # Bar chart for named indices fig, ax = plt.subplots(figsize=figsize) positions = np.arange(len(self.indices)) trans_values = [avg_trans[idx] for idx in self.indices] ax.bar(positions, trans_values, **kwargs) ax.set_xticks(positions) ax.set_xticklabels(self.indices, rotation=45, ha="right") # Set appropriate ylabel and title based on mode if logT: ax.set_ylabel("Thickness [cm]") if title is None: title = f"Thickness Estimate ({emin:.2g}-{emax:.2g} eV)" elif emin2 is not None: ax.set_ylabel("Transmission Ratio") if title is None: title = f"Transmission Ratio ({emin:.2g}-{emax:.2g})/({emin2:.2g}-{emax2:.2g} eV)" else: ax.set_ylabel("Average Transmission") if title is None: title = f"Average Transmission ({emin:.2g}-{emax:.2g} eV)" ax.set_title(title) plt.tight_layout() return ax
[docs] def rebin(self, n=None, tstep=None): """ Rebin the time-of-flight data by combining bins or using a new time step. This method creates a new Data object with rebinned counts data, properly recalculated uncertainties, and updated transmission values. Works for both grouped and non-grouped data. Parameters: ----------- n : int, optional Number of original bins to combine into one new bin. E.g., n=2 combines every 2 bins, n=4 combines every 4 bins. Mutually exclusive with tstep. tstep : float, optional New time step in seconds. Uses linear interpolation if the new bins don't align with the original bins. Mutually exclusive with n. linear_bins : bool, optional If True (default), creates linearly-spaced bins when using tstep parameter. If False, creates logarithmically-spaced bins in energy space, which is more appropriate for cross-section data that varies logarithmically with energy. Note: The cross-section C++ code assumes linear binning in time/energy, so use linear_bins=True for compatibility with the response function integration. Only affects tstep method, ignored for n method. Returns: -------- Data A new Data object with rebinned data. All attributes (signal, openbeam, table, tgrid, etc.) are properly updated. Raises: ------- ValueError If both n and tstep are provided, or if neither is provided. If the Data object doesn't have original counts data (signal/openbeam). If called on non-grouped data created from transmission files. Examples: --------- >>> # Combine every 4 bins >>> data_rebinned = data.rebin(n=4) >>> # Use a new time step (2x the original) >>> data_rebinned = data.rebin(tstep=2 * data.tstep) >>> # Works with grouped data too >>> grouped_data_rebinned = grouped_data.rebin(n=2) Notes: ------ - Counts are summed in each new bin - Uncertainties are combined in quadrature: sqrt(sum(err^2)) - Transmission and its error are recalculated from rebinned counts - Energy grid is recomputed from the new time-of-flight grid - For grouped data, rebinning is applied to all groups - **NaN handling**: Rows with NaN values in energy, transmission, or error columns are automatically removed before rebinning. This is safe because NaN values don't contribute to fits. If all data is NaN, an empty table is returned for that group. """ # Validate input if n is None and tstep is None: raise ValueError( "Must specify either 'n' (number of bins to combine) or 'tstep' (new time step)" ) if n is not None and tstep is not None: raise ValueError( "Cannot specify both 'n' and 'tstep'. Choose one rebinning method." ) # Check that we have original counts data if not self.is_grouped and (self.signal is None or self.openbeam is None): raise ValueError( "Cannot rebin: original counts data (signal/openbeam) not available. " "This Data object was likely created from transmission files." ) # Helper function to rebin a single counts DataFrame def rebin_counts_dataframe( df, n_bins=None, new_tstep=None, old_tstep=None, L=None ): """ Rebin a counts DataFrame (tof, counts, err). Parameters: ----------- df : pd.DataFrame Input DataFrame with 'tof', 'counts', 'err' columns n_bins : int, optional Number of bins to combine (simple binning) new_tstep : float, optional New time step (interpolation method) old_tstep : float, optional Original time step L : float, optional Flight path length for energy conversion Returns: -------- pd.DataFrame Rebinned DataFrame """ if n_bins is not None: # Simple binning: combine every n_bins if n_bins == 1: # No rebinning needed, return copy of original rebinned_df = df.copy() else: n_original = len(df) n_new = n_original // n_bins # Truncate to make evenly divisible df_truncated = df.iloc[: n_new * n_bins].copy() # Reshape and sum tof_reshaped = df_truncated["tof"].values.reshape(n_new, n_bins) counts_reshaped = df_truncated["counts"].values.reshape( n_new, n_bins ) err_reshaped = df_truncated["err"].values.reshape(n_new, n_bins) # New tof is the CENTER of each combined bin # For bins [i, i+1, ..., i+n-1], the center is at (i + i+n-1 + 1) / 2 # This gives the correct energy for the rebinned data new_tof = (tof_reshaped[:, 0] + tof_reshaped[:, -1] + 1) / 2 # Sum counts new_counts = counts_reshaped.sum(axis=1) # Combine errors in quadrature new_err = np.sqrt((err_reshaped**2).sum(axis=1)) rebinned_df = pd.DataFrame( {"tof": new_tof, "counts": new_counts, "err": new_err} ) else: # Interpolation method: new tstep # Check if we're using the same tstep if abs(new_tstep - old_tstep) / old_tstep < 1e-10: # No rebinning needed, return copy of original rebinned_df = df.copy() else: from scipy.interpolate import interp1d # Original tof grid in time units (seconds) old_tof_time = df["tof"].values * old_tstep # Create new tof grid with linear spacing tof_min = old_tof_time.min() tof_max = old_tof_time.max() n_new_bins = int((tof_max - tof_min) / new_tstep) new_tof_time = np.linspace(tof_min, tof_max, n_new_bins) # Interpolate counts and errors # For counts: linear interpolation (represents count rate) counts_interp = interp1d( old_tof_time, df["counts"].values, kind="linear", fill_value=0, bounds_error=False, ) err_interp = interp1d( old_tof_time, df["err"].values, kind="linear", fill_value=0, bounds_error=False, ) new_counts = counts_interp(new_tof_time) new_err = err_interp(new_tof_time) # Scale by the ratio of bin widths to conserve total counts bin_width_ratio = new_tstep / old_tstep new_counts = new_counts * bin_width_ratio new_err = new_err * bin_width_ratio # Convert back to bin indices (bin centers) new_tof = new_tof_time / new_tstep rebinned_df = pd.DataFrame( {"tof": new_tof, "counts": new_counts, "err": new_err} ) # Preserve label attribute if present if hasattr(df, "attrs") and "label" in df.attrs: rebinned_df.attrs["label"] = df.attrs["label"] return rebinned_df # Helper function to calculate transmission from rebinned counts def calculate_transmission( signal_df, openbeam_df, new_tstep_val, L_val, empty_signal_df=None, empty_openbeam_df=None, ): """Calculate transmission and energy from rebinned signal and openbeam.""" # Convert tof to energy energy = utils.time2energy(signal_df["tof"].values * new_tstep_val, L_val) # Calculate transmission transmission = signal_df["counts"] / openbeam_df["counts"] # Calculate transmission error with warnings.catch_warnings(): warnings.simplefilter("ignore", RuntimeWarning) if empty_signal_df is not None and empty_openbeam_df is not None: # Apply background correction transmission *= ( empty_openbeam_df["counts"] / empty_signal_df["counts"] ) trans_err = transmission * np.sqrt( (signal_df["err"] / signal_df["counts"]) ** 2 + (openbeam_df["err"] / openbeam_df["counts"]) ** 2 + (empty_signal_df["err"] / empty_signal_df["counts"]) ** 2 + (empty_openbeam_df["err"] / empty_openbeam_df["counts"]) ** 2 ) else: trans_err = transmission * np.sqrt( (signal_df["err"] / signal_df["counts"]) ** 2 + (openbeam_df["err"] / openbeam_df["counts"]) ** 2 ) table_df = pd.DataFrame( {"energy": energy, "trans": transmission, "err": trans_err} ) # Preserve label if present if hasattr(signal_df, "attrs") and "label" in signal_df.attrs: table_df.attrs["label"] = signal_df.attrs["label"] return table_df # Determine new tstep if tstep is not None: new_tstep = tstep else: # When using n-binning, keep tstep the same since TOF indices # are adjusted to bin centers new_tstep = self.tstep # Create new Data object new_data = Data() new_data.L = self.L new_data.tstep = new_tstep new_data.is_grouped = self.is_grouped # Copy L0 and t0 if they exist if hasattr(self, "L0"): new_data.L0 = self.L0 if hasattr(self, "t0"): new_data.t0 = self.t0 if self.is_grouped: # Rebin all groups new_data.groups = {} new_data.indices = self.indices new_data.group_shape = self.group_shape # For grouped data, we rebin the transmission tables directly # This is done by interpolating onto a new energy grid def rebin_transmission_table( table_df, n_bins=None, new_tstep_val=None, old_tstep_val=None, L_val=None, ): """ Rebin a transmission table (energy, trans, err) for grouped data. Uses interpolation to map transmission values onto a new energy grid. Note: Uncertainties are interpolated, which is approximate. For best accuracy, rebin the original counts data before creating grouped data. """ if n_bins is not None: # Simple binning: combine every n_bins if n_bins == 1: return table_df.copy() # Remove NaN values before binning # NaN values don't contribute to fits, so it's safe to exclude them clean_table = table_df.dropna(subset=["energy", "trans", "err"]) if len(clean_table) == 0: # If all data is NaN, return empty table with same structure return pd.DataFrame({"energy": [], "trans": [], "err": []}) n_original = len(clean_table) n_new = n_original // n_bins if n_new == 0: # Not enough data points to bin, return as is return clean_table.copy() # Truncate to make evenly divisible table_truncated = clean_table.iloc[: n_new * n_bins].copy() # Reshape and average (for transmission, we average not sum) energy_reshaped = table_truncated["energy"].values.reshape( n_new, n_bins ) trans_reshaped = table_truncated["trans"].values.reshape( n_new, n_bins ) err_reshaped = table_truncated["err"].values.reshape(n_new, n_bins) # Use arithmetic mean for energy (centers of rebinned energy bins) # nanmean to handle any remaining NaNs in the reshaped arrays new_energy = np.nanmean(energy_reshaped, axis=1) # Use arithmetic mean for transmission new_trans = np.nanmean(trans_reshaped, axis=1) # Combine errors: err_mean = sqrt(sum(err^2)) / n # Use nansum to skip NaN values in error combination new_err = np.sqrt(np.nansum(err_reshaped**2, axis=1)) / n_bins rebinned_table = pd.DataFrame( {"energy": new_energy, "trans": new_trans, "err": new_err} ) # Remove any remaining NaN rows created by nanmean of all-NaN bins rebinned_table = rebinned_table.dropna( subset=["energy", "trans", "err"] ) else: # Interpolation method: new tstep from scipy.interpolate import interp1d # Remove NaN values before rebinning # NaN values don't contribute to fits, so it's safe to exclude them clean_table = table_df.dropna(subset=["energy", "trans", "err"]) if len(clean_table) == 0: # If all data is NaN, return empty table with same structure return pd.DataFrame({"energy": [], "trans": [], "err": []}) # Current energy grid (without NaNs) old_energy = clean_table["energy"].values # Create new energy grid based on new tstep # Convert energy back to TOF, apply new tstep, convert back old_tof_time = utils.energy2time(old_energy, L_val) tof_min = old_tof_time.min() tof_max = old_tof_time.max() n_new_bins = int((tof_max - tof_min) / new_tstep_val) if n_new_bins <= 0: # Handle edge case where time range is too small n_new_bins = 1 new_tof_time = np.linspace(tof_min, tof_max, n_new_bins) new_energy = utils.time2energy(new_tof_time, L_val) # Interpolate transmission and error # Use bounds_error=False with fill_value=nan to avoid extrapolating into NaN regions trans_interp = interp1d( old_energy, clean_table["trans"].values, kind="linear", fill_value=np.nan, bounds_error=False, ) err_interp = interp1d( old_energy, clean_table["err"].values, kind="linear", fill_value=np.nan, bounds_error=False, ) new_trans = trans_interp(new_energy) new_err = err_interp(new_energy) # Create rebinned table rebinned_table = pd.DataFrame( {"energy": new_energy, "trans": new_trans, "err": new_err} ) # Remove any NaN rows that may have been created during interpolation rebinned_table = rebinned_table.dropna( subset=["energy", "trans", "err"] ) # Preserve label if present if hasattr(table_df, "attrs") and "label" in table_df.attrs: rebinned_table.attrs["label"] = table_df.attrs["label"] return rebinned_table # Rebin each group for idx in self.indices: group_table = self.groups[idx] rebinned_table = rebin_transmission_table( group_table, n_bins=n, new_tstep_val=tstep, old_tstep_val=self.tstep, L_val=self.L, ) new_data.groups[idx] = rebinned_table # Update the main table to be the first group new_data.table = new_data.groups[self.indices[0]] # Also update grouped_trans and grouped_err arrays # Handle case where groups may have different lengths due to NaN removal if len(new_data.groups[self.indices[0]]) > 0: n_groups = len(self.indices) n_energy = len(new_data.groups[self.indices[0]]) new_trans_array = np.zeros((n_groups, n_energy)) new_err_array = np.zeros((n_groups, n_energy)) for i, idx in enumerate(self.indices): group_len = len(new_data.groups[idx]) if group_len > 0: # Handle case where this group might have different length new_trans_array[i, : min(group_len, n_energy)] = ( new_data.groups[idx]["trans"].values[:n_energy] ) new_err_array[i, : min(group_len, n_energy)] = new_data.groups[ idx ]["err"].values[:n_energy] else: # Empty group - fill with NaN new_trans_array[i, :] = np.nan new_err_array[i, :] = np.nan new_data.grouped_trans = new_trans_array new_data.grouped_err = new_err_array else: # All groups are empty - create empty arrays new_data.grouped_trans = np.array([]).reshape(len(self.indices), 0) new_data.grouped_err = np.array([]).reshape(len(self.indices), 0) else: # Rebin non-grouped data rebinned_signal = rebin_counts_dataframe( self.signal, n_bins=n, new_tstep=tstep, old_tstep=self.tstep, L=self.L ) rebinned_openbeam = rebin_counts_dataframe( self.openbeam, n_bins=n, new_tstep=tstep, old_tstep=self.tstep, L=self.L ) # Rebin empty data if it exists (for background correction) rebinned_empty_signal = None rebinned_empty_openbeam = None if hasattr(self, "empty_signal") and self.empty_signal is not None: rebinned_empty_signal = rebin_counts_dataframe( self.empty_signal, n_bins=n, new_tstep=tstep, old_tstep=self.tstep, L=self.L, ) if hasattr(self, "empty_openbeam") and self.empty_openbeam is not None: rebinned_empty_openbeam = rebin_counts_dataframe( self.empty_openbeam, n_bins=n, new_tstep=tstep, old_tstep=self.tstep, L=self.L, ) # Calculate transmission table new_table = calculate_transmission( rebinned_signal, rebinned_openbeam, new_tstep, self.L, rebinned_empty_signal, rebinned_empty_openbeam, ) new_data.signal = rebinned_signal new_data.openbeam = rebinned_openbeam new_data.table = new_table new_data.tgrid = rebinned_signal["tof"] # Store rebinned empty data new_data.empty_signal = rebinned_empty_signal new_data.empty_openbeam = rebinned_empty_openbeam return new_data