Source code for imars3d.backend.dataio.data

#!/usr/bin/env python3
"""Data handling for iMars3D."""

# package imports
from imars3d.backend.dataio.metadata import MetaData
from imars3d.backend.util.functions import clamp_max_workers, to_time_str

# third party imports
import numpy as np
import param
import tifffile
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import process_map

# standard imports
from functools import partial
from fnmatch import fnmatchcase
import itertools
import logging
from pathlib import Path
import re
from typing import Callable, List, Optional, Tuple, Union

# ignore warnings generated by importing dxchange
import warnings

with warnings.catch_warnings():
    warnings.simplefilter("ignore")
    import dxchange

# Custom types
FlexPath = Union[str, Path]

# setup module level logger
logger = logging.getLogger(__name__)
# METADATA_DICT = {
#     65026: "ManufacturerStr:Andor",  # [ct, ob, dc]
#     65027: "ExposureTime:70.000000",  # [ct, ob, dc]
#     65068: "MotSlitHR.RBV:10.000000",  # APERTURE_HR, [ct, ob]
#     65070: "MotSlitHL.RBV:20.000000",  # APERTURE_HL, [ct, ob]
#     65066: "MotSlitVT.RBV:10.000000",  # APERTURE_VT, [ct, ob]
#     65068: "MotSlitHR.RBV:10.000000",  # APERTURE_VB, [ct, ob]
# }


[docs] class Foldernames(param.Foldername): r""" Parameter that can be set to a string specifying the path of a folder, or a list of such strings. The string(s) should be specified in UNIX style, but they will be returned in the format of the user's operating system. The specified path(s) can be absolute, or relative to either: * any of the paths specified in the search_paths attribute (if search_paths is not None); or * any of the paths searched by resolve_dir_path() (if search_paths is None). """ def _resolve(self, paths): if isinstance(paths, (str, Path)): return super()._resolve(paths) elif isinstance(paths, (list, tuple)): return [self._resolve(path) for path in paths] else: name = next(x for x in [self.name, self.label, "Foldernames parameter"] if x) raise ValueError(f"{name} must be a string or a list of strings")
[docs] class load_data(param.ParameterizedFunction): """ Load data with given input. Parameters ---------- ct_files: List[str] explicit list of radiographs (full path) ob_files: List[str] explicit list of open beams (full path) dc_files: Optional[List[str]] explicit list of dark current (full path) ct_dir: str, Path directory contains radiographs ob_dir: Union[str, Path, List[str, Path]] directory, or list of directories, containing open beams dc_dir: Optional[Union[str, Path, List[str, Path]]] directory , or list of directories, containing dark current ct_fnmatch: Optional[str] Unix shells-style wild card (``*.tiff``) for selecting radiographs ob_fnmatch: Optional[str] Unix shells-style wild card (``*.tiff``) for selecting open beams dc_fnmatch: Optional[str] Unix shells-style wild card (``*.tiff``) for selecting dark current max_workers: Optional[int] maximum number of processes allowed during loading, default to use as many as possible. tqdm_class: panel.widgets.Tqdm Class to be used for rendering tqdm progress Returns ------- radiograph stacks, obs, dcs and rotation angles as numpy.ndarray Notes ----- There are three main signatures to load the data: 1. load_data(ct_files=ctfs, ob_files=obfs, dc_files=dcfs) 2. load_data(ct_dir=ctdir, ob_dir=obdir, dc_dir=dcdir) 3. load_data(ct_dir=ctdir, ob_files=obfs, dc_files=dcfs) In all signatures dc_files and dc_dir are optional The fnmatch selectors are applicable in all signature, which help to down-select files if needed. Default is set to "*", which selects everything. Also, if ob_fnmatch and dc_fnmatch are set to "None" in the second signature call, the data loader will attempt to read the metadata embedded in the first ct file to find obs and dcs with similar metadata. Currently, we are using a forgiving reader to load the image where a corrupted file will not block reading other data. """ # ct_files = param.List(doc="list of all ct files to load") ob_files = param.List(doc="list of all ob files to load") dc_files = param.List(doc="list of all dc files to load") # ct_dir = param.Foldername(doc="radiograph directory") ob_dir = Foldernames(doc="open beam directory") dc_dir = Foldernames(doc="dark current directory") # NOTE: we need to provide a default value here as param.String default to "", which will # not trigger dict.get(key, value) to get the value as "" is not None. ct_fnmatch = param.String(default="*", doc="fnmatch for selecting ct files from ct_dir") ob_fnmatch = param.String(default="*", doc="fnmatch for selecting ob files from ob_dir") dc_fnmatch = param.String(default="*", doc="fnmatch for selecting dc files from dc_dir") # NOTE: 0 means use as many as possible max_workers = param.Integer( default=1, # default to single process bounds=(0, None), doc="Maximum number of processes allowed during loading", ) tqdm_class = param.ClassSelector(class_=object, doc="Progress bar to render with") def __call__(self, **params): """Parse inputs and perform multiple dispatch.""" # type*bounds check via Parameter _ = self.instance(**params) # sanitize arguments params = param.ParamOverrides(self, params) # type validation is done, now replacing max_worker with an actual integer self.max_workers = clamp_max_workers(params.max_workers) logger.debug(f"max_worker={self.max_workers}") # multiple dispatch # NOTE: # use set to simplify call signature checking sigs = set([k.split("_")[-1] for k in params.keys() if "fnmatch" not in k]) ref = {"files", "dir"} if ("ct_dir" in params.keys()) and ("ob_files" in params.keys()): logger.debug("Load ct by directory, ob and dc (if any) by files") ct_dir = params.get("ct_dir") if not Path(ct_dir).exists(): logger.error(f"ct_dir {ct_dir} does not exist.") raise ValueError("ct_dir does not exist.") else: ct_dir = Path(ct_dir) # gather the ct_files ct_fnmatch = params.get("ct_fnmatch", "*") ct_files = ct_dir.glob(ct_fnmatch) ct_files = list(map(str, ct_files)) ct_files.sort() ob_files = (params.get("ob_files"),) dc_files = (params.get("dc_files", []),) # it is okay to skip dc ob_files = ob_files[0] dc_files = dc_files[0] ct, ob, dc = _load_by_file_list( ct_files=ct_files, ob_files=ob_files, dc_files=dc_files, # it is okay to skip dc ct_fnmatch=params.get("ct_fnmatch", "*"), # incase None got leaked here ob_fnmatch=params.get("ob_fnmatch", "*"), dc_fnmatch=params.get("dc_fnmatch", "*"), max_workers=self.max_workers, tqdm_class=params.tqdm_class, ) elif ("ct_files" in params.keys()) and ("ob_dir" in params.keys()): logger.error("ct_files and ob_dir mixed not allowed!") raise ValueError("Mix signatures (ct_files, ob_dir) not allowed!") elif sigs.intersection(ref) == {"files"}: logger.debug("Load by file list") ct, ob, dc = _load_by_file_list( ct_files=params.get("ct_files"), ob_files=params.get("ob_files"), dc_files=params.get("dc_files", []), # it is okay to skip dc ct_fnmatch=params.get("ct_fnmatch", "*"), # incase None got leaked here ob_fnmatch=params.get("ob_fnmatch", "*"), dc_fnmatch=params.get("dc_fnmatch", "*"), max_workers=self.max_workers, tqdm_class=params.tqdm_class, ) ct_files = params.get("ct_files") elif sigs.intersection(ref) == {"dir"}: logger.debug("Load by directory") ct_files, ob_files, dc_files = _get_filelist_by_dir( ct_dir=params.get("ct_dir"), ob_dir=params.get("ob_dir"), dc_dir=params.get("dc_dir", []), # it is okay to skip dc ct_fnmatch=params.get("ct_fnmatch", "*"), # incase None got leaked here ob_fnmatch=params.get("ob_fnmatch", "*"), dc_fnmatch=params.get("dc_fnmatch", "*"), ) ct, ob, dc = _load_by_file_list( ct_files=ct_files, ob_files=ob_files, dc_files=dc_files, ct_fnmatch=params.get("ct_fnmatch", "*"), # incase None got leaked here ob_fnmatch=params.get("ob_fnmatch", "*"), dc_fnmatch=params.get("dc_fnmatch", "*"), max_workers=self.max_workers, tqdm_class=params.tqdm_class, ) else: logger.warning("No valid signature found, need to specify either files or dir") raise ValueError("No valid signature found, need to specify either files or dir") # extracting rotational angles from # 1. filename # 2. metadata (only possible for Tiff) rot_angles = _extract_rotation_angles(ct_files) # return everything return ct, ob, dc, rot_angles
# use _func to avoid sphinx pulling it into docs def _forgiving_reader( filename: str, reader: Callable, ) -> Optional[np.ndarray]: """ Skip corrupted file, but inform the user about the issue. Parameters ---------- filename: input filename reader: callable reader function that consumes the filename Returns ------- image as numpy array """ try: return reader(filename) except Exception as e: logger.error(f"While reading {filename}, the following error occurred: {e}") return None # use _func to avoid sphinx pulling it into docs def _load_images(filelist: List[str], desc: str, max_workers: int, tqdm_class) -> np.ndarray: """ Load image data via dxchange. Parameters ---------- filelist: List of images filenames/path for loading via dxchange. desc: Description for progress bar. max_workers: Maximum number of processes allowed during loading. tqdm_class: panel.widgets.Tqdm Class to be used for rendering tqdm progress Returns ------- Image array stack. """ # figure out the file type and select corresponding reader from dxchange file_ext = Path(filelist[0]).suffix.lower() if file_ext in (".tif", ".tiff"): # use tifffile directly for a faster loading reader = partial(tifffile.imread, out="memmap") elif file_ext == ".fits": reader = dxchange.read_fits else: logger.error(f"Unsupported file type: {file_ext}") raise ValueError("Unsupported file type.") # NOTE: For regular dataset, single thread reading is actually faster # as the overhead of multiprocessing will overshadow the benefits. if max_workers == 1: progress_bar = tqdm if tqdm_class is None else tqdm_class # single thread reading rst = [_forgiving_reader(f, reader) for f in progress_bar(filelist, desc=desc)] else: # multi-thread reading # NOTE: the benefits of multi-threading is only visible when # - the file list is really long # - there are a lot of cores available kwargs = { "max_workers": max_workers, "desc": desc, } rst = process_map(partial(_forgiving_reader, reader=reader), filelist, **kwargs) # return the results # NOTE: there is no need to convert to float at this point, and it will save # a lot of memory and time if the conversion is done after cropping. return np.array([me for me in rst if me is not None]) # use _func to avoid sphinx pulling it into docs def _load_by_file_list( ct_files: List[str], ob_files: List[str], dc_files: Optional[List[str]] = [], ct_fnmatch: Optional[str] = "*", ob_fnmatch: Optional[str] = "*", dc_fnmatch: Optional[str] = "*", max_workers: int = 0, tqdm_class=None, ) -> Tuple[np.ndarray]: """ Use provided list of files to load images into memory. Parameters ---------- ct_files: List of ct files. ob_files: List of ob files. dc_files: List of dc files. ct_fnmatch: fnmatch for selecting ct files from ct_dir. ob_fnmatch: fnmatch for selecting ob files from ob_dir. dc_fnmatch: fnmatch for selecting dc files from dc_dir. max_workers: Maximum number of processes allowed during loading, 0 means use as many as possible. tqdm_class: panel.widgets.Tqdm Class to be used for rendering tqdm progress Returns ------- ct, ob, dc """ # empty list is not allowed if ct_files == []: logger.error("ct_files is [].") raise ValueError("ct_files cannot be empty list.") if ob_files == []: logger.error("ob_files is [].") raise ValueError("ob_files cannot be empty list.") if dc_files == []: logger.warning("dc_files is [].") max_workers = clamp_max_workers(max_workers) # explicit list is the most straight forward solution # -- radiograph ct = _load_images( filelist=[ctf for ctf in ct_files if fnmatchcase(ctf, ct_fnmatch)], desc="ct", max_workers=max_workers, tqdm_class=tqdm_class, ) # -- open beam ob = _load_images( filelist=[obf for obf in ob_files if fnmatchcase(obf, ob_fnmatch)], desc="ob", max_workers=max_workers, tqdm_class=tqdm_class, ) # -- dark current if dc_files == []: dc = None else: dc = _load_images( filelist=[dcf for dcf in dc_files if fnmatchcase(dcf, dc_fnmatch)], desc="dc", max_workers=max_workers, tqdm_class=tqdm_class, ) # return ct, ob, dc def _get_filelist_by_dir( ct_dir: FlexPath, # either a string or a pathlib.Path ob_dir: Union[FlexPath, List[FlexPath]], dc_dir: Optional[Union[FlexPath, List[FlexPath]]] = None, ct_fnmatch: Optional[str] = "*", ob_fnmatch: Optional[str] = "*", dc_fnmatch: Optional[str] = "*", ) -> Tuple[List[str], List[str], List[str]]: """ Generate list of files from given directory and fnmatch for ct, ob, and dc. Parameters ---------- ct_dir: Directory for ct files. ob_dir: Directory, or list of directories, for ob files. dc_dir: Directory, or list of directories, for dc files. ct_fnmatch: fnmatch for selecting ct files from ct_dir. ob_fnmatch: fnmatch for selecting ob files from ob_dir. dc_fnmatch: fnmatch for selecting dc files from dc_dir. Returns ------- ct_files, ob_files, dc_files Notes ----- If ob_fnmatch is set to None, the data loader will attempt to read the metadata embedded in the ct file to find obs with similar metadata. """ # sanity check ########## # -- Process input argument ct_dir for radiographs if not Path(ct_dir).exists(): logger.error(f"ct_dir {ct_dir} does not exist.") raise ValueError("ct_dir does not exist.") else: ct_dir = Path(ct_dir) ########## # -- Process input argument ob_dir for open beam directories if isinstance(ob_dir, (str, Path)): # single directory open_beam_dirs = [Path(ob_dir)] # cast the single directory to a list of input directories elif isinstance(ob_dir, (list, tuple)): # multiple input directories, assumed items are of FlexPath type open_beam_dirs = [Path(data_dir) for data_dir in ob_dir] else: raise ValueError("ob_dir must be either a string or a list of strings") # validate for existence for data_dir in open_beam_dirs: if not data_dir.exists(): logger.error(f"open beam directory {str(data_dir)} does not exist.") raise ValueError(f"open beam directory {str(data_dir)} does not exist.") ########## # -- Process input argument dc_dir for dark current directories if dc_dir is None: logger.warning("dc_dir is None, ignoring.") dark_field_dirs = [] else: if isinstance(dc_dir, (str, Path)): # single directory dark_field_dirs = [Path(dc_dir)] # cast the single directory to a list of input directories elif isinstance(dc_dir, (list, tuple)): # multiple directories, assumed items are of FlexPath type dark_field_dirs = [Path(data_dir) for data_dir in dc_dir] else: raise ValueError("dc_dir must be either a string or a list of strings") # check for existence for i, data_dir in enumerate(dark_field_dirs): if not data_dir.exists(): logger.warning(f"dark field directory {str(data_dir)} does not exist, ignoring.") dark_field_dirs[i] = None dark_field_dirs = [data_dir for data_dir in dark_field_dirs if data_dir is not None] # extricate None entries # gather the ct_files ct_files = ct_dir.glob(ct_fnmatch) # try to use the first ct file as a reference # NOTE: do not use the generator above to avoid off by one error due to # consuming the generator try: ct_ref = next(ct_dir.glob(ct_fnmatch)) except StopIteration: logger.warning("ct_files is [].") ct_ref = None metadata_ref = None if ct_ref is None else MetaData(filename=str(ct_ref), datatype="ct") ext_ref = None if ct_ref is None else ct_ref.suffix # gather the ob_files if ob_fnmatch is None: if ct_ref is None: logger.warning("ob_files is [].") ob_files = [] else: ob_files = list() for open_beam_dir in open_beam_dirs: obfs = open_beam_dir.glob(f"*{ext_ref}") # remove files that do not match the metadata of ct_ref ob_files += [f for f in obfs if metadata_ref.match(other_filename=str(f), other_datatype="ob")] else: ob_files = list(itertools.chain(*[list(obd.glob(ob_fnmatch)) for obd in open_beam_dirs])) # gather the dc_files if dc_dir is None: dc_files = [] else: if dc_fnmatch is None: if ct_ref is None: logger.warning("dc_files is [].") dc_files = [] else: dc_files = list() for dark_field_dir in dark_field_dirs: dcfs = dark_field_dir.glob(f"*{ext_ref}") # remove files that do not match the metadata of ct_ref dc_files += [f for f in dcfs if metadata_ref.match(other_filename=str(f), other_datatype="dc")] else: dc_files = list(itertools.chain(*[list(dcf.glob(dc_fnmatch)) for dcf in dark_field_dirs])) # since generator returns an unordered list, we need to force it to be sorted # so that angles can be properly retrieved if needed ct_files = list(map(str, ct_files)) ct_files.sort() return ct_files, list(map(str, ob_files)), list(map(str, dc_files)) def _extract_rotation_angles( filelist: List[str], metadata_idx: int = 65039, ) -> np.ndarray: """ Extract rotation angles in degrees from filename or metadata. Parameters ---------- filelist: List of files to extract rotation angles from. metadata_idx: Index of metadata to extract rotation angle from, default is 65039. Returns ------- rotation_angles """ # sanity check if filelist == []: logger.error("filelist is [].") raise ValueError("filelist cannot be empty list.") # extract rotation angles from file names # Note # ---- # For the following file # 20191030_ironman_small_0070_300_440_0520.tiff # the rotation angle is 300.44 degrees # If all given filenames follows the pattern, we will use the angles from # filenames. Otherwise, we will use the angles from metadata. regex = r"\d{8}_\S*_\d{4}_(?P<deg>\d{3})_(?P<dec>\d{3})_\d*\.tiff" matches = [re.match(regex, Path(f).name) for f in filelist] if all(matches): logger.info("Using rotation angles from filenames.") rotation_angles = np.array([float(".".join(m.groups())) for m in matches]) else: # extract rotation angles from metadata file_ext = set([Path(f).suffix for f in filelist]) if file_ext != {".tiff"}: logger.error("Only tiff files are supported.") raise ValueError("Rotation angle from metadata is only supported for Tiff.") # -- read metadata # img = tifffile.TiffFile("test_with_metadata_0.tiff") # img.pages[0].tags[65039].value # >> 'RotationActual:0.579840' rotation_angles = np.array( [float(tifffile.TiffFile(f).pages[0].tags[metadata_idx].value.split(":")[-1]) for f in filelist], dtype="float", ) return rotation_angles def _save_data(filename: Path, data: np.ndarray, rot_angles: np.ndarray = None) -> None: if data is None: raise ValueError("Failed to supply data") logger.info(f'saving tiffs to "{filename.parent}"') # make sure the directory exists if not filename.parent.exists(): filename.parent.mkdir(parents=True) # save the stack of tiffs dxchange.write_tiff_stack(data, fname=str(filename)) # save the angles as a numpy object if rot_angles is not None: np.save(file=filename.parent / "rot_angles.npy", arr=rot_angles)
[docs] class save_data(param.ParameterizedFunction): """ Save data with given input. The filenames will be ``<outputbase>/<name>_YYYYMMDDhhmm/<name>_####.tiff`` where a canonical ``outputbase`` is ``/HFIR/CG1D/IPTS-23788/shared/processed_data/``. Parameters ---------- data: Array array of data to save outputbase: Path where to save the output on disk. ``param.Foldername`` will warn if the directory does not already exist. name: str Used to name file of output, defaults to ``save_data`` rot_angles: Array Optional for writing out the array of rotational (omega) angles Returns ------- The directory the files were actually saved in """ data = param.Array(doc="Data to save", precedence=1) outputbase = param.Foldername(default="/tmp/", doc="radiograph directory") name = param.String(default="save_data", doc="name for the radiograph") rot_angles = param.Array(doc="Collection of omega angles") def __call__(self, **params): """Parse inputs and perform multiple dispatch.""" # type bounds check via Parameter with param.logging_level("CRITICAL"): # do not complain about directories that don't exist _ = self.instance(**params) # sanitize arguments params = param.ParamOverrides(self, params) if params.data is None: raise ValueError("Did not supply data") save_dir = Path(params.outputbase) / f"{params.name}_{to_time_str()}" # save the data as tiffs _save_data(filename=save_dir / params.name, data=params.data, rot_angles=params.rot_angles) return save_dir
[docs] class save_checkpoint(param.ParameterizedFunction): """ Save current state to checkpoint in a datetime stamped directory name. The filenames will be ``<outputbase>/<name>_chkpt_YYYYMMDDhhmm/<name>_####.tiff`` where a canonical ``outputbase`` is ``/HFIR/CG1D/IPTS-23788/shared/processed_data/``. Parameters ---------- data: Array array of data to save outputbase: Path The parent directory of where to save the output on disk. ``param.Foldername`` will warn if the directory does not already exist. name: str Used to name file of output, defaults to output_{datetime} rot_angles: Array Optional for writing out the array of rotational (omega) angles Returns ------- The directory the files were actually saved in """ data = param.Array(doc="Data to save", precedence=1) outputbase = param.Foldername(default="/tmp/", doc="directory checkpoint should exist in") name = param.String(default="*", doc="name for the checkpoint") rot_angles = param.Array(doc="Collection of rotational (omega) angles") def __call__(self, **params): """Parse inputs and perform multiple dispatch.""" # type bounds check via Parameter with param.logging_level("CRITICAL"): # do not complain about directories that don't exist _ = self.instance(**params) # sanitize arguments params = param.ParamOverrides(self, params) save_dir = params.outputbase / f"{params.name}_chkpt_{to_time_str()}" # save the data as tiffs _save_data(filename=save_dir / params.name, data=params.data, rot_angles=params.rot_angles) return save_dir