diff --git a/AegeanTools/BANE.py b/AegeanTools/BANE.py index e81f1a4e..fc1db625 100755 --- a/AegeanTools/BANE.py +++ b/AegeanTools/BANE.py @@ -5,6 +5,8 @@ The function filter_image should be imported from elsewhere and run as is. """ +from typing import Tuple, Union, TypeAlias, Dict, Any, Optional + import copy import logging import multiprocessing @@ -17,18 +19,31 @@ import numpy as np from astropy.io import fits from scipy.interpolate import RegularGridInterpolator +from scipy.stats import normaltest -from .fits_tools import compress +from .fits_tools import compress +from .sigma import ( + FitGaussianCDF, FittedSigmaClip, SigmaClip, FitBkgRmsEstimate, FitGaussianCDF +) __author__ = 'Paul Hancock' __version__ = 'v1.10.0' __date__ = '2022-08-17' +BANE_MODE_MAPPINGS = dict( + sigmaclip=SigmaClip, + fitrmsbkgestimate=FitBkgRmsEstimate, + fittedsigmaclip=FittedSigmaClip, + fitgaussiancdf=FitGaussianCDF +) +AVAILABLE_MODES = tuple(BANE_MODE_MAPPINGS.keys()) +AVAILABLE_TYPES = tuple(BANE_MODE_MAPPINGS.values()) +ClippingModes: TypeAlias = Union[SigmaClip, FitBkgRmsEstimate, FittedSigmaClip, FitGaussianCDF] + # global barrier for multiprocessing barrier = None memory_id = None - def init(b, mem): """ Set the global barrier and memory_id @@ -38,66 +53,6 @@ def init(b, mem): memory_id = mem -def sigmaclip(arr, lo, hi, reps=10): - """ - Perform sigma clipping on an array, ignoring non finite values. - - During each iteration return an array whose elements c obey: - mean -std*lo < c < mean + std*hi - - where mean/std are the mean std of the input array. - - Parameters - ---------- - arr : iterable - An iterable array of numeric types. - lo : float - The negative clipping level. - hi : float - The positive clipping level. - reps : int - The number of iterations to perform. Default = 3. - - Returns - ------- - mean : float - The mean of the array, possibly nan - std : float - The std of the array, possibly nan - - Notes - ----- - Scipy v0.16 now contains a comparable method that will ignore nan/inf - values. - """ - clipped = np.array(arr)[np.isfinite(arr)] - - if len(clipped) < 1: - return np.nan, np.nan - - std = np.std(clipped) - mean = np.mean(clipped) - prev_valid = len(clipped) - for count in range(int(reps)): - mask = (clipped > mean-std*lo) & (clipped < mean+std*hi) - clipped = clipped[mask] - - curr_valid = len(clipped) - if curr_valid < 1: - break - # No change in statistics if no change is noted - if prev_valid == curr_valid: - break - std = np.std(clipped) - mean = np.mean(clipped) - prev_valid = curr_valid - else: - logging.debug( - "No stopping criteria was reached after {0} cycles".format(count)) - - return mean, std - - def _sf2(args): """ A shallow wrapper for sigma_filter. @@ -121,8 +76,82 @@ def _sf2(args): raise Exception("".join(traceback.format_exception(*sys.exc_info()))) +def box(r, c, data_shape, box_size): + """ + calculate the boundaries of the box centered at r,c + with size = box_size + """ + r_min = max(0, r - box_size[0] // 2) + r_max = min(data_shape[0] - 1, r + box_size[0] // 2) + c_min = max(0, c - box_size[1] // 2) + c_max = min(data_shape[1] - 1, c + box_size[1] // 2) + + return r_min, r_max, c_min, c_max + + +def adaptive_box_estimate( + data: np.ndarray, row: int, column: int, box_size, mode: ClippingModes, max_loop: int = 5 +) -> Tuple[float,float]: + """Estimate the background and RMS level within a data extract. Multiple + clipping and noise estimation modes are supported. + + The adaptive behavour is only activated should the pixel statistics become + compromised. At the moment, this only is considered true if fewer than 80percent + of the pixels remain after clipping. + + Parameters + ---------- + data : np.ndarray + Data that will be the source of the box extraction + row : int + Row to center extracted region at + column : int + Column to center extracted region at + box_size : Tuple[int,int] + Base size of the box to extract + mode : ClippingModes + Instance of a clipping and noise estiamtion mode to use. Should return a BANEResult and ahve a .perform method. + max_loop : int, optional + Maximum number of adjustable boxes to use. Will increase in steps of 100 pixels. If 0 adative mode is disabled, by default 5 + + Returns + ------- + Tuple[float,float] + Backgrounf and noise estimation + """ + + + original_box = box_size + loop = 0 + while True: + test_box_size = tuple([b+loop*100 for b in original_box]) + r_min, r_max, c_min, c_max = box( + row, column, data_shape=data.shape, box_size=test_box_size + ) + new = data[r_min:r_max, c_min:c_max] + new = new.flatten() + + result = mode.perform(data=new) + + if loop >= max_loop: + break + if all(np.isfinite(result)) and result.valid_pixels > 0.9 * len(new): + break + + loop += 1 + print(f"increasing, {loop=} {row=} {column=}") + + + rms = result.rms + bkg = result.bkg + + print(f"{row} {column}") + + return float(bkg), float(rms) + + def sigma_filter(filename, region, step_size, box_size, shape, domask, - cube_index): + cube_index, mode, adaptive_loop=0): """ Calculate the background and rms for a sub region of an image. The results are written to shared memory - irms and ibkg. @@ -151,6 +180,13 @@ def sigma_filter(filename, region, step_size, box_size, shape, domask, cube_index : int The index into the 3rd dimension (if present) + mode : ClippingModes + Which of the clipping modes to use. + + adaptive_loop : int + The maximum number of resizes allowed should a box be resized. If 0 this is turned off. + Default = 0 + Returns ------- None @@ -201,17 +237,7 @@ def sigma_filter(filename, region, step_size, box_size, shape, domask, logging.debug('data size is {0}'.format(data.shape)) logging.debug('data format is {0}'.format(data.dtype)) - def box(r, c): - """ - calculate the boundaries of the box centered at r,c - with size = box_size - """ - r_min = max(0, r - box_size[0] // 2) - r_max = min(data.shape[0] - 1, r + box_size[0] // 2) - c_min = max(0, c - box_size[1] // 2) - c_max = min(data.shape[1] - 1, c + box_size[1] // 2) - return r_min, r_max, c_min, c_max - + # set up a grid of rows/cols at which we will compute the bkg/rms rows = list(range(ymin-data_row_min, ymax-data_row_min, step_size[0])) rows.append(ymax-data_row_min) @@ -220,14 +246,16 @@ def box(r, c): # store the computed bkg/rms in this smaller array vals = np.zeros(shape=(len(rows), len(cols))) - + for i, row in enumerate(rows): for j, col in enumerate(cols): - r_min, r_max, c_min, c_max = box(row, col) - new = data[r_min:r_max, c_min:c_max] - new = np.ravel(new) - bkg, _ = sigmaclip(new, 3, 3) - vals[i, j] = bkg + + bkg, rms = adaptive_box_estimate( + data=data, row=row, column=col, box_size=box_size, mode=mode, max_loop=adaptive_loop + ) + + if np.isfinite(bkg): + vals[i, j] = bkg # indices of all the pixels within our region gr, gc = np.mgrid[ymin-data_row_min:ymax-data_row_min, 0:shape[1]] @@ -260,11 +288,14 @@ def box(r, c): for i, row in enumerate(rows): for j, col in enumerate(cols): - r_min, r_max, c_min, c_max = box(row, col) - new = data[r_min:r_max, c_min:c_max] - new = np.ravel(new) - _, rms = sigmaclip(new, 3, 3) - vals[i, j] = rms + rms = np.nan + + bkg, rms = adaptive_box_estimate( + data=data, row=row, column=col, box_size=box_size, mode=mode, max_loop=adaptive_loop + ) + + if np.isfinite(rms): + vals[i, j] = rms logging.debug("Interpolating rms to sharemem") ifunc = RegularGridInterpolator((rows, cols), vals) @@ -293,7 +324,8 @@ def box(r, c): def filter_mc_sharemem(filename, step_size, box_size, cores, shape, nslice=None, domask=True, - cube_index=0): + cube_index=0, mode: str='sigmaclip', mode_kwargs: Optional[Dict[str, Any]]=None, + adaptive_box: bool=False): """ Calculate the background and noise images corresponding to the input file. The calculation is done via a box-car approach and uses multiple cores and @@ -326,11 +358,21 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, cube_index : int For 3d data use this index into the third dimension. Default = 0 + mode : str + Which background and rms estimation mode to use. + Default = 'sigmaclip', which is the original BANE mode. + + adaptive_box : bool + Resize the box-car should a position be deemed to have unreliable + statistics. Default = False. + + Returns ------- bkg, rms : numpy.ndarray The interpolated background and noise images. """ + mode_kwargs = mode_kwargs if mode_kwargs else {} if cores is None: cores = multiprocessing.cpu_count() @@ -356,16 +398,26 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, logging.debug("ymins {0}".format(ymins)) logging.debug("ymaxs {0}".format(ymaxs)) + + + if mode.lower() not in AVAILABLE_MODES: + raise ValueError(f"Mode not recognised. Received {mode=}, available modes {AVAILABLE_MODES}") + + logging.info(f"Using bane {mode=}") + mode: ClippingModes = BANE_MODE_MAPPINGS[mode.lower()](**mode_kwargs) + + adaptive_loop = 5 if adaptive_box else 0 + logging.info(f"Adaptive box resize loops: {adaptive_loop}") args = [] for region in zip(ymins, ymaxs): args.append((filename, region, step_size, box_size, - shape, domask, cube_index)) + shape, domask, cube_index, mode, adaptive_loop)) exit = False try: global memory_id - memory_id = str(uuid.uuid4()) + memory_id = str(uuid.uuid4())[:15] nbytes = np.prod(shape) * np.float64(1).nbytes ibkg = SharedMemory(name=f'ibkg_{memory_id}', create=True, size=nbytes) irms = SharedMemory(name=f'irms_{memory_id}', create=True, size=nbytes) @@ -387,13 +439,18 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, logging.error("Caught keyboard interrupt") pool.close() exit = True + except Exception as e: + logging.error(e) else: pool.close() pool.join() bkg = np.ndarray(shape, buffer=ibkg.buf, dtype=np.float64).astype(np.float32) rms = np.ndarray(shape, buffer=irms.buf, - dtype=np.float64).astype(np.float32) + dtype=np.float64).astype(np.float32) + + except Exception as e: + print(e) finally: ibkg.close() ibkg.unlink() @@ -407,7 +464,7 @@ def filter_mc_sharemem(filename, step_size, box_size, cores, shape, def filter_image(im_name, out_base, step_size=None, box_size=None, twopass=False, # Deprecated cores=None, mask=True, compressed=False, nslice=None, - cube_index=None): + cube_index=None, mode='sigmaclip', adaptive_box: bool=False): """ Create a background and noise image from an input image. Resulting images are written to `outbase_bkg.fits` and `outbase_rms.fits` @@ -450,6 +507,14 @@ def filter_image(im_name, out_base, step_size=None, box_size=None, If the input data is 3d, then use this index for the 3rd dimension. Default = None, use the first index. + mode : str + Which background and rms estimation mode to use. + Default = 'sigmaclip', which is the original BANE mode. + + adaptive_box : bool + Resize the box-car should a position be deemed to have unreliable + statistics. Default = False. + Returns ------- bkg, rms : `numpy.ndarray` @@ -492,7 +557,8 @@ def filter_image(im_name, out_base, step_size=None, box_size=None, bkg, rms = filter_mc_sharemem(im_name, step_size=step_size, box_size=box_size, cores=cores, shape=shape, nslice=nslice, - domask=mask, cube_index=cube_index) + domask=mask, cube_index=cube_index, mode=mode, + adaptive_box=adaptive_box) logging.info("done") if out_base is not None: diff --git a/AegeanTools/CLI/BANE.py b/AegeanTools/CLI/BANE.py index f84ce96b..22323eab 100755 --- a/AegeanTools/CLI/BANE.py +++ b/AegeanTools/CLI/BANE.py @@ -50,6 +50,20 @@ def main(argv=()): parser.set_defaults(out_base=None, step_size=None, box_size=None, twopass=True, cores=None, usescipy=False, debug=False) + group1.add_argument( + '--mode', + type=str, + choices=BANE.AVAILABLE_MODES, + default='sigmaclip', + help=f"Which bkg and rms estimation mode to use. Available are: {BANE.AVAILABLE_MODES}" + ) + group1.add_argument( + "--adaptive-box", + default=False, + action='store_true', + help="Allow the box-car to be resized should insufficent statistics be detected. " + ) + options = parser.parse_args(args=argv) if options.cite: @@ -89,5 +103,7 @@ def main(argv=()): box_size=options.box_size, cores=options.cores, mask=options.mask, compressed=options.compress, nslice=options.stripes, - cube_index=options.cube_index) + cube_index=options.cube_index, + mode=options.mode, + adaptive_box=options.adaptive_box) return 0 diff --git a/AegeanTools/sigma.py b/AegeanTools/sigma.py new file mode 100644 index 00000000..d432a12a --- /dev/null +++ b/AegeanTools/sigma.py @@ -0,0 +1,320 @@ +"""Tooling around the rms and background estimation""" + +from re import L +from typing import Tuple, NamedTuple, Optional, Callable +import logging + +import numpy as np +from scipy.stats import norm +from scipy.special import erf +from scipy.optimize import minimize +from astropy.stats import sigma_clip + +class BANEResult(NamedTuple): + """Container for function results""" + rms: float + """RMS constrained""" + bkg: float + """BKG constrained""" + valid_pixels: int + """Number of pixels constrained against""" + + +class FitGaussianCDF(NamedTuple): + """Options for the fitting approach method""" + linex_args: Tuple[float,float] = (0.1, 1.0) + + def perform(self, data: np.ndarray) -> BANEResult: + return fit_gaussian_cdf(data=data, linex_args=self.linex_args) + + +def softmax(x: np.ndarray) -> np.ndarray: + """Activation function""" + return np.log(1+np.exp(x)) + +def gaussian_cdf(x: np.ndarray, mu: float, sig: float) -> np.ndarray: + """Calculate teh cumulative distribution given a mu and sig across x-values""" + top = x - mu + bottom = 1.412 * sig + + return 0.5 * (1. + erf(top / bottom)) + + +def linear_squared(x: np.ndarray, a: float, b: float) -> np.ndarray: + """Non-linear loss function. Residuals below zero are scaled via the + linear gradient `a`. The squared-residual of residual values above + zero are scaled by factor `b`. + + Parameters + ---------- + x : np.ndarray + Residuals to scale via this linear-squared function + a : float + Gradient to apply to `x` values below zero + b : float + Scale to apply to the squared `x` values when `x` is above zero + + Returns + ------- + np.ndarray + Cost of each residual. This vector should be summed and treated as the loss when optimizing + """ + + mask = x < 0 + + return (a * -x + 1) * mask + (b * (1. + x))** 2. * ~mask + +def make_fitness( + x: np.ndarray, y: np.ndarray, a: float, b: float +) -> Callable: + """Creates the cost function on the (x,y) values that will be used throughout minimisation. """ + + def loss_function(args): + mu, sig, scalar = args + + s = np.sum( + linear_squared( + gaussian_cdf(x, mu, sig) * softmax(scalar) - y, + a, + b, + ) + ) + + return s + return loss_function + +def fit_gaussian_cdf(data: np.ndarray, linex_args: Tuple[float,float] = (0.1, 1.0)) -> BANEResult: + + data = data[np.isfinite(data)].flatten() + + sort_data = np.sort(data) + cdf_data = np.arange(start=0, stop=1, step=1./len(sort_data)) + + func = make_fitness(sort_data, cdf_data, *linex_args) + + res = minimize( + func, (0, 0.0001, 0.8), + # tol=1e-7, + method='Nelder-Mead' + ) + + bane_result = BANEResult(rms=res.x[1], bkg=res.x[0], valid_pixels=int(res.x[2] * len(data))) + + return bane_result + +class FittedSigmaClip(NamedTuple): + """Arguments for the fitted_sigma_clip""" + sigma: int = 3 + """Threshhold before clipped""" + + def perform(self, data: np.ndarray) -> BANEResult: + return fitted_sigma_clip(data=data, sigma=self.sigma) + +def fitted_mean(data: np.ndarray, axis: Optional[int] =None) -> float: + """Internal function that returns the mean by fitting to pixel distribution data""" + if axis is not None: + # This is to make astropy sigma clip happy + raise NotImplementedError("Unexpected axis keyword. ") + + mean, _ = norm.fit(data) + + return mean + + +def fitted_std(data: np.ndarray, axis: Optional[int]=None) -> float: + """Internal function that retunrs the stf by fitting to the pixel distribution""" + if axis is not None: + # This is to make astropy sigma clip happy + raise NotImplementedError("Unexpected axis keyword. ") + + _, std = norm.fit(data) + + return std + +def fitted_sigma_clip(data: np.ndarray, sigma: int=3) -> BANEResult: + """Estimate the back ground and noise level by fitting to the pixel distribution. + Sigma clipping is performed using the fitted statistics. + + Parameters + ---------- + data : np.ndarray + Data that will be considered + sigma : int, optional + Threshold for a point to be flagged, by default 3 + + Returns + ------- + BANEResult + RMS and bkg ground statistics + """ + data = data[np.isfinite(data)] + + clipped_plane = sigma_clip( + data.flatten(), + sigma=sigma, + cenfunc=fitted_mean, + stdfunc=fitted_std, + maxiters=None + ) + bkg, rms = norm.fit(clipped_plane.compressed()) + + result = BANEResult(rms=float(rms), bkg=float(bkg), valid_pixels=len(clipped_plane.compressed())) + + return result + +class FitBkgRmsEstimate(NamedTuple): + """Options for the fitting approach method""" + clip_rounds: int = 3 + """Number of clipping rounds to perform""" + bin_perc: float = 0.25 + """Minimum fraction of the histogram bins, or something""" + outlier_thres: float = 3.0 + """Threshold that a data point should be at to be considered an outlier""" + + def perform(self, data: np.ndarray) -> BANEResult: + return fit_bkg_rms_estimate(data=data, clip_rounds=self.clip_rounds, bin_perc=self.bin_perc, outlier_thres=self.outlier_thres) + +def mad(data, bkg=None): + """Compute the median asbolute deviation. optionally provide a + precomuted background measure + """ + bkg = bkg if bkg else np.median(data) + return np.median(np.abs(data - bkg)) + +def fit_bkg_rms_estimate( + data: np.ndarray, + clip_rounds: int = 2, + bin_perc: float = 0.25, + outlier_thres: float = 3.0, +) -> BANEResult: + """An over the top attempt at robustly characterising the + back ground and RMS. Data will first be flagged via the MAD, + then bin, then those bin counts are used to fit for a gaussian. + + Only bins that are above 25 percent of the maximum bin count + are used in the fitting process. + + Parameters + ---------- + data : np.ndarray + Data to be estimated + clip_rounds : int, optional + Number of clipping rounds, by default 2 + bin_perc : float, optional + Minimum set of counts, relative to the max count bin, that any bin should have. Less than this and they are discarded, by default 0.25 + outlier_thres : float, optional + Clipping threshold to use, by default 3.0 + + Returns + ------- + BANEResult + Backgroun and noise estimation + """ + data = data[np.isfinite(data)] + + cen_func = np.median + + bkg = cen_func(data) + + for i in range(clip_rounds): + data = data[np.abs(data - bkg) < outlier_thres * 1.4826 * mad(data, bkg=bkg)] + bkg = cen_func(data) + + # Attempts to ensure a sane number of bins to fit against + mask_counts = 0 + loop = 1 + while True: + counts, binedges = np.histogram(data, bins=50 * loop) + + mask = counts >= bin_perc * np.max(counts) + mask_counts = np.sum(mask) + loop += 1 + + if not (mask_counts < 5 and loop < 5): + break + + binc = (binedges[:-1] + binedges[1:]) / 2 + p = np.polyfit(binc[mask], np.log10(counts[mask] / np.max(counts)), 2) + a, b, c = p + + x1 = (-b + np.sqrt(b ** 2 - 4.0 * a * (c - np.log10(0.5)))) / (2.0 * a) + x2 = (-b - np.sqrt(b ** 2 - 4.0 * a * (c - np.log10(0.5)))) / (2.0 * a) + fwhm = np.abs(x1 - x2) + noise = fwhm / 2.355 + + result = BANEResult(rms=float(noise), bkg=float(bkg), valid_pixels=len(data)) + + return result + + + +class SigmaClip(NamedTuple): + """Container for the original sigma clipping method""" + low: float = 3.0 + """Low sigma clip threshhold""" + high: float = 3.0 + """High sigma clip threshhold""" + + def perform(self, data: np.ndarray) -> BANEResult: + return sigmaclip(arr=data, lo=self.low, hi=self.high) + +def sigmaclip(arr, lo, hi, reps=10) -> BANEResult: + """ + Perform sigma clipping on an array, ignoring non finite values. + + During each iteration return an array whose elements c obey: + mean -std*lo < c < mean + std*hi + + where mean/std are the mean std of the input array. + + Parameters + ---------- + arr : iterable + An iterable array of numeric types. + lo : float + The negative clipping level. + hi : float + The positive clipping level. + reps : int + The number of iterations to perform. Default = 3. + + Returns + ------- + mean : float + The mean of the array, possibly nan + std : float + The std of the array, possibly nan + + Notes + ----- + Scipy v0.16 now contains a comparable method that will ignore nan/inf + values. + """ + clipped = np.array(arr)[np.isfinite(arr)] + + if len(clipped) < 1: + return np.nan, np.nan + + std = np.std(clipped) + mean = np.mean(clipped) + prev_valid = len(clipped) + for count in range(int(reps)): + mask = (clipped > mean-std*lo) & (clipped < mean+std*hi) + clipped = clipped[mask] + + curr_valid = len(clipped) + if curr_valid < 1: + break + # No change in statistics if no change is noted + if prev_valid == curr_valid: + break + std = np.std(clipped) + mean = np.mean(clipped) + prev_valid = curr_valid + else: + logging.debug( + "No stopping criteria was reached after {0} cycles".format(count)) + + result = BANEResult(rms=float(std), bkg=float(mean), valid_pixels=len(clipped)) + + return result \ No newline at end of file