#!/usr/bin/env python3
"""Data handling for iMars3D."""
# package imports
from imars3d.backend.dataio.metadata import MetaData
from imars3d.backend.util.functions import clamp_max_workers, to_time_str
# third party imports
import numpy as np
import param
import tifffile
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import process_map
# standard imports
from functools import partial
from fnmatch import fnmatchcase
import itertools
import logging
from pathlib import Path
import re
from typing import Callable, List, Optional, Tuple, Union
# ignore warnings generated by importing dxchange
import warnings
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import dxchange
# Custom types
FlexPath = Union[str, Path]
# setup module level logger
logger = logging.getLogger(__name__)
# METADATA_DICT = {
# 65026: "ManufacturerStr:Andor", # [ct, ob, dc]
# 65027: "ExposureTime:70.000000", # [ct, ob, dc]
# 65068: "MotSlitHR.RBV:10.000000", # APERTURE_HR, [ct, ob]
# 65070: "MotSlitHL.RBV:20.000000", # APERTURE_HL, [ct, ob]
# 65066: "MotSlitVT.RBV:10.000000", # APERTURE_VT, [ct, ob]
# 65068: "MotSlitHR.RBV:10.000000", # APERTURE_VB, [ct, ob]
# }
[docs]class Foldernames(param.Foldername):
r"""
Parameter that can be set to a string specifying the path of a folder, or a list of such strings.
The string(s) should be specified in UNIX style, but they will be
returned in the format of the user's operating system.
The specified path(s) can be absolute, or relative to either:
* any of the paths specified in the search_paths attribute (if search_paths is not None);
or
* any of the paths searched by resolve_dir_path() (if search_paths is None).
"""
def _resolve(self, paths):
if isinstance(paths, (str, Path)):
return super()._resolve(paths)
elif isinstance(paths, (list, tuple)):
return [self._resolve(path) for path in paths]
else:
name = next(x for x in [self.name, self.label, "Foldernames parameter"] if x)
raise ValueError(f"{name} must be a string or a list of strings")
[docs]class load_data(param.ParameterizedFunction):
"""
Load data with given input.
Parameters
----------
ct_files: List[str]
explicit list of radiographs (full path)
ob_files: List[str]
explicit list of open beams (full path)
dc_files: Optional[List[str]]
explicit list of dark current (full path)
ct_dir: str, Path
directory contains radiographs
ob_dir: Union[str, Path, List[str, Path]]
directory, or list of directories, containing open beams
dc_dir: Optional[Union[str, Path, List[str, Path]]]
directory , or list of directories, containing dark current
ct_fnmatch: Optional[str]
Unix shells-style wild card (``*.tiff``) for selecting radiographs
ob_fnmatch: Optional[str]
Unix shells-style wild card (``*.tiff``) for selecting open beams
dc_fnmatch: Optional[str]
Unix shells-style wild card (``*.tiff``) for selecting dark current
max_workers: Optional[int]
maximum number of processes allowed during loading, default to use as many as possible.
tqdm_class: panel.widgets.Tqdm
Class to be used for rendering tqdm progress
Returns
-------
radiograph stacks, obs, dcs and rotation angles as numpy.ndarray
Notes
-----
There are two main signatures to load the data:
1. load_data(ct_files=ctfs, ob_files=obfs, dc_files=dcfs)
2. load_data(ct_dir=ctdir, ob_dir=obdir, dc_dir=dcdir)
The two signatures are mutually exclusive, and dc_files and dc_dir are optional
in both cases as some experiments do not have dark current measurements.
The fnmatch selectors are applicable in both signature, which help to down-select
files if needed. Default is set to "*", which selects everything.
Also, if ob_fnmatch and dc_fnmatch are set to "None" in the second signature call, the
data loader will attempt to read the metadata embedded in the first ct file to find obs
and dcs with similar metadata.
Currently, we are using a forgiving reader to load the image where a corrupted file
will not block reading other data.
"""
#
ct_files = param.List(doc="list of all ct files to load")
ob_files = param.List(doc="list of all ob files to load")
dc_files = param.List(doc="list of all dc files to load")
#
ct_dir = param.Foldername(doc="radiograph directory")
ob_dir = Foldernames(doc="open beam directory")
dc_dir = Foldernames(doc="dark current directory")
# NOTE: we need to provide a default value here as param.String default to "", which will
# not trigger dict.get(key, value) to get the value as "" is not None.
ct_fnmatch = param.String(default="*", doc="fnmatch for selecting ct files from ct_dir")
ob_fnmatch = param.String(default="*", doc="fnmatch for selecting ob files from ob_dir")
dc_fnmatch = param.String(default="*", doc="fnmatch for selecting dc files from dc_dir")
# NOTE: 0 means use as many as possible
max_workers = param.Integer(
default=1, # default to single process
bounds=(0, None),
doc="Maximum number of processes allowed during loading",
)
tqdm_class = param.ClassSelector(class_=object, doc="Progress bar to render with")
def __call__(self, **params):
"""Parse inputs and perform multiple dispatch."""
# type*bounds check via Parameter
_ = self.instance(**params)
# sanitize arguments
params = param.ParamOverrides(self, params)
# type validation is done, now replacing max_worker with an actual integer
self.max_workers = clamp_max_workers(params.max_workers)
logger.debug(f"max_worker={self.max_workers}")
# multiple dispatch
# NOTE:
# use set to simplify call signature checking
sigs = set([k.split("_")[-1] for k in params.keys() if "fnmatch" not in k])
ref = {"files", "dir"}
if sigs.intersection(ref) == {"files", "dir"}:
logger.error("Files and dir cannot be used at the same time")
raise ValueError("Mix usage of allowed signature.")
elif sigs.intersection(ref) == {"files"}:
logger.debug("Load by file list")
ct, ob, dc = _load_by_file_list(
ct_files=params.get("ct_files"),
ob_files=params.get("ob_files"),
dc_files=params.get("dc_files", []), # it is okay to skip dc
ct_fnmatch=params.get("ct_fnmatch", "*"), # incase None got leaked here
ob_fnmatch=params.get("ob_fnmatch", "*"),
dc_fnmatch=params.get("dc_fnmatch", "*"),
max_workers=self.max_workers,
tqdm_class=params.tqdm_class,
)
ct_files = params.get("ct_files")
elif sigs.intersection(ref) == {"dir"}:
logger.debug("Load by directory")
ct_files, ob_files, dc_files = _get_filelist_by_dir(
ct_dir=params.get("ct_dir"),
ob_dir=params.get("ob_dir"),
dc_dir=params.get("dc_dir", []), # it is okay to skip dc
ct_fnmatch=params.get("ct_fnmatch", "*"), # incase None got leaked here
ob_fnmatch=params.get("ob_fnmatch", "*"),
dc_fnmatch=params.get("dc_fnmatch", "*"),
)
ct, ob, dc = _load_by_file_list(
ct_files=ct_files,
ob_files=ob_files,
dc_files=dc_files,
ct_fnmatch=params.get("ct_fnmatch", "*"), # incase None got leaked here
ob_fnmatch=params.get("ob_fnmatch", "*"),
dc_fnmatch=params.get("dc_fnmatch", "*"),
max_workers=self.max_workers,
tqdm_class=params.tqdm_class,
)
else:
logger.warning("No valid signature found, need to specify either files or dir")
raise ValueError("No valid signature found, need to specify either files or dir")
# extracting rotational angles from
# 1. filename
# 2. metadata (only possible for Tiff)
rot_angles = _extract_rotation_angles(ct_files)
# return everything
return ct, ob, dc, rot_angles
# use _func to avoid sphinx pulling it into docs
def _forgiving_reader(
filename: str,
reader: Callable,
) -> Optional[np.ndarray]:
"""
Skip corrupted file, but inform the user about the issue.
Parameters
----------
filename:
input filename
reader:
callable reader function that consumes the filename
Returns
-------
image as numpy array
"""
try:
return reader(filename)
except Exception:
logger.error(f"Cannot read {filename}, skipping.")
return None
# use _func to avoid sphinx pulling it into docs
def _load_images(filelist: List[str], desc: str, max_workers: int, tqdm_class) -> np.ndarray:
"""
Load image data via dxchange.
Parameters
----------
filelist:
List of images filenames/path for loading via dxchange.
desc:
Description for progress bar.
max_workers:
Maximum number of processes allowed during loading.
tqdm_class: panel.widgets.Tqdm
Class to be used for rendering tqdm progress
Returns
-------
Image array stack.
"""
# figure out the file type and select corresponding reader from dxchange
file_ext = Path(filelist[0]).suffix.lower()
if file_ext in (".tif", ".tiff"):
# use tifffile directly for a faster loading
reader = partial(tifffile.imread, out="memmap")
elif file_ext == ".fits":
reader = dxchange.read_fits
else:
logger.error(f"Unsupported file type: {file_ext}")
raise ValueError("Unsupported file type.")
# NOTE: For regular dataset, single thread reading is actually faster
# as the overhead of multiprocessing will overshadow the benefits.
if max_workers == 1:
progress_bar = tqdm if tqdm_class is None else tqdm_class
# single thread reading
rst = [_forgiving_reader(f, reader) for f in progress_bar(filelist, desc=desc)]
else:
# multi-thread reading
# NOTE: the benefits of multi-threading is only visible when
# - the file list is really long
# - there are a lot of cores available
kwargs = {
"max_workers": max_workers,
"desc": desc,
}
rst = process_map(partial(_forgiving_reader, reader=reader), filelist, **kwargs)
# return the results
# NOTE: there is no need to convert to float at this point, and it will save
# a lot of memory and time if the conversion is done after cropping.
return np.array([me for me in rst if me is not None])
# use _func to avoid sphinx pulling it into docs
def _load_by_file_list(
ct_files: List[str],
ob_files: List[str],
dc_files: Optional[List[str]] = [],
ct_fnmatch: Optional[str] = "*",
ob_fnmatch: Optional[str] = "*",
dc_fnmatch: Optional[str] = "*",
max_workers: int = 0,
tqdm_class=None,
) -> Tuple[np.ndarray]:
"""
Use provided list of files to load images into memory.
Parameters
----------
ct_files:
List of ct files.
ob_files:
List of ob files.
dc_files:
List of dc files.
ct_fnmatch:
fnmatch for selecting ct files from ct_dir.
ob_fnmatch:
fnmatch for selecting ob files from ob_dir.
dc_fnmatch:
fnmatch for selecting dc files from dc_dir.
max_workers:
Maximum number of processes allowed during loading, 0 means
use as many as possible.
tqdm_class: panel.widgets.Tqdm
Class to be used for rendering tqdm progress
Returns
-------
ct, ob, dc
"""
# empty list is not allowed
if ct_files == []:
logger.error("ct_files is [].")
raise ValueError("ct_files cannot be empty list.")
if ob_files == []:
logger.error("ob_files is [].")
raise ValueError("ob_files cannot be empty list.")
if dc_files == []:
logger.warning("dc_files is [].")
max_workers = clamp_max_workers(max_workers)
# explicit list is the most straight forward solution
# -- radiograph
ct = _load_images(
filelist=[ctf for ctf in ct_files if fnmatchcase(ctf, ct_fnmatch)],
desc="ct",
max_workers=max_workers,
tqdm_class=tqdm_class,
)
# -- open beam
ob = _load_images(
filelist=[obf for obf in ob_files if fnmatchcase(obf, ob_fnmatch)],
desc="ob",
max_workers=max_workers,
tqdm_class=tqdm_class,
)
# -- dark current
if dc_files == []:
dc = None
else:
dc = _load_images(
filelist=[dcf for dcf in dc_files if fnmatchcase(dcf, dc_fnmatch)],
desc="dc",
max_workers=max_workers,
tqdm_class=tqdm_class,
)
#
return ct, ob, dc
def _get_filelist_by_dir(
ct_dir: FlexPath, # either a string or a pathlib.Path
ob_dir: Union[FlexPath, List[FlexPath]],
dc_dir: Optional[Union[FlexPath, List[FlexPath]]] = None,
ct_fnmatch: Optional[str] = "*",
ob_fnmatch: Optional[str] = "*",
dc_fnmatch: Optional[str] = "*",
) -> Tuple[List[str], List[str], List[str]]:
"""
Generate list of files from given directory and fnmatch for ct, ob, and dc.
Parameters
----------
ct_dir:
Directory for ct files.
ob_dir:
Directory, or list of directories, for ob files.
dc_dir:
Directory, or list of directories, for dc files.
ct_fnmatch:
fnmatch for selecting ct files from ct_dir.
ob_fnmatch:
fnmatch for selecting ob files from ob_dir.
dc_fnmatch:
fnmatch for selecting dc files from dc_dir.
Returns
-------
ct_files, ob_files, dc_files
Notes
-----
If ob_fnmatch is set to None, the data loader will attempt to read the metadata
embedded in the ct file to find obs with similar metadata.
"""
# sanity check
##########
# -- Process input argument ct_dir for radiographs
if not Path(ct_dir).exists():
logger.error(f"ct_dir {ct_dir} does not exist.")
raise ValueError("ct_dir does not exist.")
else:
ct_dir = Path(ct_dir)
##########
# -- Process input argument ob_dir for open beam directories
if isinstance(ob_dir, (str, Path)): # single directory
open_beam_dirs = [Path(ob_dir)] # cast the single directory to a list of input directories
elif isinstance(ob_dir, (list, tuple)): # multiple input directories, assumed items are of FlexPath type
open_beam_dirs = [Path(data_dir) for data_dir in ob_dir]
else:
raise ValueError("ob_dir must be either a string or a list of strings")
# validate for existence
for data_dir in open_beam_dirs:
if not data_dir.exists():
logger.error(f"open beam directory {str(data_dir)} does not exist.")
raise ValueError(f"open beam directory {str(data_dir)} does not exist.")
##########
# -- Process input argument dc_dir for dark current directories
if dc_dir is None:
logger.warning("dc_dir is None, ignoring.")
dark_field_dirs = []
else:
if isinstance(dc_dir, (str, Path)): # single directory
dark_field_dirs = [Path(dc_dir)] # cast the single directory to a list of input directories
elif isinstance(dc_dir, (list, tuple)): # multiple directories, assumed items are of FlexPath type
dark_field_dirs = [Path(data_dir) for data_dir in dc_dir]
else:
raise ValueError("dc_dir must be either a string or a list of strings")
# check for existence
for i, data_dir in enumerate(dark_field_dirs):
if not data_dir.exists():
logger.warning(f"dark field directory {str(data_dir)} does not exist, ignoring.")
dark_field_dirs[i] = None
dark_field_dirs = [data_dir for data_dir in dark_field_dirs if data_dir is not None] # extricate None entries
# gather the ct_files
ct_files = ct_dir.glob(ct_fnmatch)
# try to use the first ct file as a reference
# NOTE: do not use the generator above to avoid off by one error due to
# consuming the generator
try:
ct_ref = next(ct_dir.glob(ct_fnmatch))
except StopIteration:
logger.warning("ct_files is [].")
ct_ref = None
metadata_ref = None if ct_ref is None else MetaData(filename=str(ct_ref), datatype="ct")
ext_ref = None if ct_ref is None else ct_ref.suffix
# gather the ob_files
if ob_fnmatch is None:
if ct_ref is None:
logger.warning("ob_files is [].")
ob_files = []
else:
ob_files = list()
for open_beam_dir in open_beam_dirs:
obfs = open_beam_dir.glob(f"*{ext_ref}")
# remove files that do not match the metadata of ct_ref
ob_files += [f for f in obfs if metadata_ref.match(other_filename=str(f), other_datatype="ob")]
else:
ob_files = list(itertools.chain(*[list(obd.glob(ob_fnmatch)) for obd in open_beam_dirs]))
# gather the dc_files
if dc_dir is None:
dc_files = []
else:
if dc_fnmatch is None:
if ct_ref is None:
logger.warning("dc_files is [].")
dc_files = []
else:
dc_files = list()
for dark_field_dir in dark_field_dirs:
dcfs = dark_field_dir.glob(f"*{ext_ref}")
# remove files that do not match the metadata of ct_ref
dc_files += [f for f in dcfs if metadata_ref.match(other_filename=str(f), other_datatype="dc")]
else:
dc_files = list(itertools.chain(*[list(dcf.glob(dc_fnmatch)) for dcf in dark_field_dirs]))
# since generator returns an unordered list, we need to force it to be sorted
# so that angles can be properly retrieved if needed
ct_files = list(map(str, ct_files))
ct_files.sort()
return ct_files, list(map(str, ob_files)), list(map(str, dc_files))
def _extract_rotation_angles(
filelist: List[str],
metadata_idx: int = 65039,
) -> np.ndarray:
"""
Extract rotation angles in degrees from filename or metadata.
Parameters
----------
filelist:
List of files to extract rotation angles from.
metadata_idx:
Index of metadata to extract rotation angle from, default is 65039.
Returns
-------
rotation_angles
"""
# sanity check
if filelist == []:
logger.error("filelist is [].")
raise ValueError("filelist cannot be empty list.")
# extract rotation angles from file names
# Note
# ----
# For the following file
# 20191030_ironman_small_0070_300_440_0520.tiff
# the rotation angle is 300.44 degrees
# If all given filenames follows the pattern, we will use the angles from
# filenames. Otherwise, we will use the angles from metadata.
regex = r"\d{8}_\S*_\d{4}_(?P<deg>\d{3})_(?P<dec>\d{3})_\d*\.tiff"
matches = [re.match(regex, Path(f).name) for f in filelist]
if all(matches):
logger.info("Using rotation angles from filenames.")
rotation_angles = np.array([float(".".join(m.groups())) for m in matches])
else:
# extract rotation angles from metadata
file_ext = set([Path(f).suffix for f in filelist])
if file_ext != {".tiff"}:
logger.error("Only tiff files are supported.")
raise ValueError("Rotation angle from metadata is only supported for Tiff.")
# -- read metadata
# img = tifffile.TiffFile("test_with_metadata_0.tiff")
# img.pages[0].tags[65039].value
# >> 'RotationActual:0.579840'
rotation_angles = np.array(
[float(tifffile.TiffFile(f).pages[0].tags[metadata_idx].value.split(":")[-1]) for f in filelist],
dtype="float",
)
return rotation_angles
def _save_data(filename: Path, data: np.ndarray, rot_angles: np.ndarray = None) -> None:
if data is None:
raise ValueError("Failed to supply data")
logger.info(f'saving tiffs to "{filename.parent}"')
# make sure the directory exists
if not filename.parent.exists():
filename.parent.mkdir(parents=True)
# save the stack of tiffs
dxchange.write_tiff_stack(data, fname=str(filename))
# save the angles as a numpy object
if rot_angles is not None:
np.save(file=filename.parent / "rot_angles.npy", arr=rot_angles)
[docs]class save_data(param.ParameterizedFunction):
"""
Save data with given input.
The filenames will be
``<outputbase>/<name>_YYYYMMDDhhmm/<name>_####.tiff``
where a canonical ``outputbase`` is ``/HFIR/CG1D/IPTS-23788/shared/processed_data/``.
Parameters
----------
data: Array
array of data to save
outputbase: Path
where to save the output on disk.
``param.Foldername`` will warn if the directory does not already exist.
name: str
Used to name file of output, defaults to ``save_data``
rot_angles: Array
Optional for writing out the array of rotational (omega) angles
Returns
-------
The directory the files were actually saved in
"""
data = param.Array(doc="Data to save", precedence=1)
outputbase = param.Foldername(default="/tmp/", doc="radiograph directory")
name = param.String(default="save_data", doc="name for the radiograph")
rot_angles = param.Array(doc="Collection of omega angles")
def __call__(self, **params):
"""Parse inputs and perform multiple dispatch."""
# type bounds check via Parameter
with param.logging_level("CRITICAL"):
# do not complain about directories that don't exist
_ = self.instance(**params)
# sanitize arguments
params = param.ParamOverrides(self, params)
if params.data is None:
raise ValueError("Did not supply data")
save_dir = Path(params.outputbase) / f"{params.name}_{to_time_str()}"
# save the data as tiffs
_save_data(filename=save_dir / params.name, data=params.data, rot_angles=params.rot_angles)
return save_dir
[docs]class save_checkpoint(param.ParameterizedFunction):
"""
Save current state to checkpoint in a datetime stamped directory name.
The filenames will be
``<outputbase>/<name>_chkpt_YYYYMMDDhhmm/<name>_####.tiff``
where a canonical ``outputbase`` is ``/HFIR/CG1D/IPTS-23788/shared/processed_data/``.
Parameters
----------
data: Array
array of data to save
outputbase: Path
The parent directory of where to save the output on disk.
``param.Foldername`` will warn if the directory does not already exist.
name: str
Used to name file of output, defaults to output_{datetime}
rot_angles: Array
Optional for writing out the array of rotational (omega) angles
Returns
-------
The directory the files were actually saved in
"""
data = param.Array(doc="Data to save", precedence=1)
outputbase = param.Foldername(default="/tmp/", doc="directory checkpoint should exist in")
name = param.String(default="*", doc="name for the checkpoint")
rot_angles = param.Array(doc="Collection of rotational (omega) angles")
def __call__(self, **params):
"""Parse inputs and perform multiple dispatch."""
# type bounds check via Parameter
with param.logging_level("CRITICAL"):
# do not complain about directories that don't exist
_ = self.instance(**params)
# sanitize arguments
params = param.ParamOverrides(self, params)
save_dir = params.outputbase / f"{params.name}_chkpt_{to_time_str()}"
# save the data as tiffs
_save_data(filename=save_dir / params.name, data=params.data, rot_angles=params.rot_angles)
return save_dir