#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Image noise reduction (denoise) module."""
import logging
import param
from imars3d.backend.util.functions import clamp_max_workers
import numpy as np
import tomopy
from multiprocessing.managers import SharedMemoryManager
from tqdm.contrib.concurrent import process_map
from functools import partial
from scipy.signal import convolve2d
from scipy.ndimage import median_filter
from skimage.restoration import denoise_bilateral
logger = logging.getLogger(__name__)
[docs]def measure_noiseness(image: np.ndarray) -> float:
"""Measure the noiseness of the image.
Reference paper, doi:`10.1006/cviu.1996.0060 <https://doi.org/10.1006/cviu.1996.0060>`_
Parameters
----------
image:
The image to measure the noiseness of.
Returns
-------
The noiseness of the image.
"""
# rescale to [0, 255]
image = (image - image.min()) / (image.max() - image.min()) * 255
logger.debug(f"image range: [{image.min()}, {image.max()}]")
# compute factor
height, width = image.shape
factor = np.sqrt(np.pi / 2.0) / (6 * (height - 2) * (width - 2))
logger.debug(f"factor: {factor}")
# set kernel
kernel = np.array(
[
[1, -2, 1],
[-2, 4, -2],
[1, -2, 1],
]
)
# compute
sigma = np.sum(np.sum(np.absolute(convolve2d(image, kernel))))
return factor * sigma
[docs]def measure_sharpness(image: np.ndarray) -> float:
"""
Measure the sharpness of the image, using norm of gradient as approximation.
Parameters
----------
image:
The image to measure the sharpness of.
Returns
-------
The sharpness of the image.
"""
# rescale to [0, 255]
image = (image - image.min()) / (image.max() - image.min()) * 255
logger.debug(f"image range: [{image.min()}, {image.max()}]")
# compute gradient
gx, gy = np.gradient(image)
# use norm of gradient as approximation for sharpness
return np.sqrt(np.square(gx) + np.square(gy)).mean()
[docs]def denoise_by_bilateral(
arrays: np.ndarray, sigma_color: float = 0.02, sigma_spatial: float = 5.0, max_workers: int = 0, tqdm_class=None
) -> np.ndarray:
"""
Denoise the image stack with the bilateral filter.
Parameters
----------
arrays:
The image stack to denoise.
sigma_color:
Standard deviation for grayvalue/color distance (radiometric similarity).
sigma_spatial:
Standard deviation for range distance.
max_workers:
The number of cores to use for parallel processing, default is 0, which means using all available cores.
tqdm_class: panel.widgets.Tqdm
Class to be used for rendering tqdm progress
Returns
-------
The denoised image stack.
"""
# NOTE:
# The bilateral filter based denoise method can be contributed back to tomopy
# upstream.
if arrays.ndim == 2:
return denoise_by_bilateral_2d(arrays)
elif arrays.ndim == 3:
max_workers = clamp_max_workers(max_workers)
with SharedMemoryManager() as smm:
# create the shared memory
shm = smm.SharedMemory(arrays.nbytes)
# create a numpy array point to the shared memory
shm_arrays = np.ndarray(
arrays.shape,
dtype=arrays.dtype,
buffer=shm.buf,
)
# copy the data to the shared memory
np.copyto(shm_arrays, arrays)
# mp
kwargs = {
"max_workers": max_workers,
"desc": "denoise_by_bilateral",
}
if tqdm_class:
kwargs["tqdm_class"] = tqdm_class
rst = process_map(
partial(denoise_by_bilateral_2d, sigma_color=sigma_color, sigma_spatial=sigma_spatial),
[img for img in shm_arrays],
**kwargs,
)
logger.info("denoise completed via bilateral filter")
return np.array(rst)
else:
raise ValueError("Unsupported image dimension: {}".format(arrays.ndim))
[docs]def denoise_by_bilateral_2d(
array_2d: np.ndarray,
sigma_color: float = 0.02,
sigma_spatial: float = 5.0,
) -> np.ndarray:
"""
Denoise an image with the bilateral filter.
Parameters
----------
array_2d:
The image to denoise.
sigma_color:
Standard deviation for grayvalue/color distance (radiometric similarity).
sigma_spatial:
Standard deviation for range distance.
Returns
-------
The denoised image.
"""
# NOTE:
# need to make this a separate function so as to pickle it with ProcessPoolExecutor
array2d_max = array_2d.max()
logger.debug(f"denoise_by_bilateral_2d:array2d_max = {array2d_max}")
_sigma_color = sigma_color / array2d_max
array_2d /= array2d_max
array_2d = denoise_bilateral(array_2d, sigma_color=_sigma_color, sigma_spatial=sigma_spatial)
return array_2d * array2d_max
[docs]class denoise(param.ParameterizedFunction):
"""
Denoise the image stack with the median filter.
Parameters
----------
arrays: np.ndarray
The image stack to denoise.
method: str = 'bilateral'
The denoise method to use.
median_filter_kernel: int = 3
The kernel size of the median filter, only valid for 'median' method.
bilateral_sigma_color: float = 0.02
The sigma of the color/gray space, only valid for 'bilateral' method.
bilateral_sigma_spatial: float = 5.
The sigma of the spatial space, only valid for 'bilateral' method.
max_workers: int = 0
The number of cores to use for parallel processing, default is 0, which means using all available cores.
tqdm_class: panel.widgets.Tqdm
Class to be used for rendering tqdm progress
Returns
-------
The denoised image stack.
"""
arrays = param.Array(doc="The image stack to denoise.", default=None)
method = param.Selector(
default="bilateral",
objects=["median", "bilateral"],
doc="The denoise method to use.",
)
median_filter_kernel = param.Integer(
default=3,
bounds=(3, None),
doc="The kernel size of the median filter, only valid for 'median' method.",
)
bilateral_sigma_color = param.Number(
default=0.02,
bounds=(0.0, None),
doc="The sigma of the color/gray space, only valid for 'bilateral' method.",
)
bilateral_sigma_spatial = param.Number(
default=5.0,
bounds=(0.0, None),
doc="The sigma of the spatial space, only valid for 'bilateral' method.",
)
max_workers = param.Integer(
default=0,
bounds=(0, None),
doc="The number of cores to use for parallel processing, default is 0, which means using all available cores.",
)
tqdm_class = param.ClassSelector(class_=object, doc="Progress bar to render with")
def __call__(self, **params):
"""Call the denoise function."""
logger.info("Executing Filter: Denoise Filter")
# type*bounds check via Parameter
_ = self.instance(**params)
# sanitize arguments
params = param.ParamOverrides(self, params)
# type validation is done, now replacing max_worker with an actual integer
self.max_workers = clamp_max_workers(params.max_workers)
logger.debug(f"max_worker={self.max_workers}")
denoised_array = None
if params.method == "median":
logger.info("Executing Filter: Denoise Filter with median filter")
if bool(params.tqdm_class):
logger.info("Ignoring supplied progress bar indicator")
denoised_array = denoise_by_median(
arrays=params.arrays,
median_filter_kernel=params.median_filter_kernel,
max_workers=self.max_workers,
)
elif params.method == "bilateral":
logger.info("Executing Filter: Denoise Filter with bilateral filter")
denoised_array = denoise_by_bilateral(
arrays=params.arrays,
sigma_color=params.bilateral_sigma_color,
sigma_spatial=params.bilateral_sigma_spatial,
max_workers=self.max_workers,
tqdm_class=params.tqdm_class,
)
else:
# NOTE:
# param.Selector should have already checked this, but in case user
# figure out a way to bypass param, let's double check here.
raise ValueError(f"Unsupported denoise method: {params.method}")
return denoised_array