From a507c4f2cbe3067b89d30712c8662707b377ccea Mon Sep 17 00:00:00 2001 From: PaulHancock Date: Fri, 17 Jan 2025 13:54:35 +0800 Subject: [PATCH] remove unused code related to bias correction --- AegeanTools/fitting.py | 182 ----------------------------------- AegeanTools/source_finder.py | 22 ----- tests/unit/test_fitting.py | 108 ++++++++------------- 3 files changed, 40 insertions(+), 272 deletions(-) diff --git a/AegeanTools/fitting.py b/AegeanTools/fitting.py index 9c8284fa..b2cbbb21 100644 --- a/AegeanTools/fitting.py +++ b/AegeanTools/fitting.py @@ -691,188 +691,6 @@ def emp_hessian(pars, x, y): return matrix -def nan_acf(noise): - """ - Calculate the autocorrelation function of the noise - where the noise is a 2d array that may contain nans - - - Parameters - ---------- - noise : 2d-array - Noise image. - - Returns - ------- - acf : 2d-array - The ACF. - """ - corr = np.zeros(noise.shape) - ix, jx = noise.shape - for i in range(ix): - si_min = slice(i, None, None) - si_max = slice(None, ix - i, None) - for j in range(jx): - sj_min = slice(j, None, None) - sj_max = slice(None, jx - j, None) - if np.all(np.isnan(noise[si_min, sj_min])) or np.all( - np.isnan(noise[si_max, sj_max]) - ): - corr[i, j] = np.nan - else: - corr[i, j] = np.nansum(noise[si_min, sj_min] * noise[si_max, sj_max]) - # return the normalised acf - return corr / np.nanmax(corr) - - -def make_ita(noise, acf=None): - """ - Create the matrix ita of the noise where the noise may be a masked array - where ita(x,y) is the correlation between pixel pairs that have the same - separation as x and y. - - Parameters - ---------- - noise : 2d-array - The noise image - - acf : 2d-array - The autocorrelation matrix. (None = calculate from data). - Default = None. - - Returns - ------- - ita : 2d-array - The matrix ita - """ - if acf is None: - acf = nan_acf(noise) - # s should be the number of non-masked pixels - s = np.count_nonzero(np.isfinite(noise)) - # the indices of the non-masked pixels - xm, ym = np.where(np.isfinite(noise)) - ita = np.zeros((s, s)) - # iterate over the pixels - for i, (x1, y1) in enumerate(zip(xm, ym)): - for j, (x2, y2) in enumerate(zip(xm, ym)): - k = abs(x1 - x2) - l = abs(y1 - y2) - ita[i, j] = acf[k, l] - return ita - - -def RB_bias(data, pars, ita=None, acf=None): - """ - Calculate the expected bias on each of the parameters in the model pars. - Only parameters that are allowed to vary will have a bias. - Calculation follows the description of Refrieger & Brown 1998 (cite). - - - Parameters - ---------- - data : 2d-array - data that was fit - - pars : lmfit.Parameters - The model - - ita : 2d-array - The ita matrix (optional). - - acf : 2d-array - The acf for the data. - - Returns - ------- - bias : array - The bias on each of the parameters - """ - logger.info("data {0}".format(data.shape)) - nparams = np.sum([pars[k].vary for k in pars.keys() if k != "components"]) - # masked pixels - xm, ym = np.where(np.isfinite(data)) - # all pixels - x, y = np.indices(data.shape) - # Create the jacobian as an AxN array accounting for the masked pixels - j = np.array(np.vsplit(lmfit_jacobian(pars, xm, ym).T, nparams)).reshape( - nparams, -1 - ) - - h = hessian(pars, x, y) - # mask the hessian to be AxAxN array - h = h[:, :, xm, ym] - Hij = np.einsum("ik,jk", j, j) - Dij = np.linalg.inv(Hij) - Bijk = np.einsum("ip,jkp", j, h) - Eilkm = np.einsum("il,km", Dij, Dij) - - Cimn_1 = -1 * np.einsum("krj,ir,km,jn", Bijk, Dij, Dij, Dij) - Cimn_2 = -1.0 / 2 * np.einsum("rkj,ir,km,jn", Bijk, Dij, Dij, Dij) - Cimn = Cimn_1 + Cimn_2 - - if ita is None: - # N is the noise (data-model) - N = data - ntwodgaussian_lmfit(pars)(x, y) - if acf is None: - acf = nan_acf(N) - ita = make_ita(N, acf=acf) - logger.info("acf.shape {0}".format(acf.shape)) - logger.info("acf[0] {0}".format(acf[0])) - logger.info("ita.shape {0}".format(ita.shape)) - logger.info("ita[0] {0}".format(ita[0])) - - # Included for completeness but not required - - # now mask/ravel the noise - # N = N[np.isfinite(N)].ravel() - # Pi = np.einsum('ip,p', j, N) - # Qij = np.einsum('ijp,p', h, N) - - Vij = np.einsum("ip,jq,pq", j, j, ita) - Uijk = np.einsum("ip,jkq,pq", j, h, ita) - - bias_1 = np.einsum("imn, mn", Cimn, Vij) - bias_2 = np.einsum("ilkm, mlk", Eilkm, Uijk) - bias = bias_1 + bias_2 - logger.info("bias {0}".format(bias)) - return bias - - -def bias_correct(params, data, acf=None): - """ - Calculate and apply a bias correction to the given fit parameters - - - Parameters - ---------- - params : lmfit.Parameters - The model parameters. These will be modified. - - data : 2d-array - The data which was used in the fitting - - acf : 2d-array - ACF of the data. Default = None. - - Returns - ------- - None - - See Also - -------- - :func:`AegeanTools.fitting.RB_bias` - """ - bias = RB_bias(data, params, acf=acf) - i = 0 - for p in params: - if "theta" in p: - continue - if params[p].vary: - params[p].value -= bias[i] - i += 1 - return - - def errors(source, model, wcshelper): """ Convert pixel based errors into sky coord errors diff --git a/AegeanTools/source_finder.py b/AegeanTools/source_finder.py index f6c13c25..283c37e0 100755 --- a/AegeanTools/source_finder.py +++ b/AegeanTools/source_finder.py @@ -24,7 +24,6 @@ from .fitting import ( Bmatrix, Cmatrix, - bias_correct, covar_errors, do_lmfit, do_lmfit_3D, @@ -192,9 +191,6 @@ class SourceFinder(object): docov : bool If True, then fitting will be done using the covariance matrix. - dobais: bool - Not yet implemented. Default = False - sources : [ComponentSource|IslandSource|SimpleSource] List of sources that have been found/measured. """ @@ -212,7 +208,6 @@ def __init__(self, **kwargs): self.wcshelper = None self.blank = False self.docov = True - self.dobias = False self.cube_index = 0 self.sources = [] @@ -1049,9 +1044,6 @@ def load_globals( self.blank = blank self.docov = docov - # Default to false until I can verify that this is working - self.dobias = False - # check if the WCS is galactic if "lon" in self.header["CTYPE1"].lower(): logger.info("Galactic coordinates detected and noted") @@ -2121,20 +2113,6 @@ def _fit_island(self, island_data): # get the real (sky) parameter errors model = covar_errors(result.params, idata, errs=errs, B=B, C=C) - if self.dobias and self.docov: - x, y = np.indices(idata.shape) - acf = elliptical_gaussian( - x, - y, - 1, - 0, - 0, - pixbeam.a * FWHM2CC * fac, - pixbeam.b * FWHM2CC * fac, - pixbeam.pa, - ) - bias_correct(model, idata, acf=acf * errs**2) - if not result.success: is_flag |= flags.FITERR diff --git a/tests/unit/test_fitting.py b/tests/unit/test_fitting.py index c8453393..22127312 100755 --- a/tests/unit/test_fitting.py +++ b/tests/unit/test_fitting.py @@ -6,31 +6,31 @@ import numpy as np from AegeanTools import fitting -__author__ = 'Paul Hancock' +__author__ = "Paul Hancock" def make_model(): """Test that we can make lmfit.Parameter models""" model = lmfit.Parameters() - model.add('c0_amp', 1, vary=True) - model.add('c0_xo', 5, vary=True) - model.add('c0_yo', 5, vary=True) - model.add('c0_sx', 2.001, vary=False) - model.add('c0_sy', 2, vary=False) - model.add('c0_theta', 0, vary=False) - model.add('components', 1, vary=False) + model.add("c0_amp", 1, vary=True) + model.add("c0_xo", 5, vary=True) + model.add("c0_yo", 5, vary=True) + model.add("c0_sx", 2.001, vary=False) + model.add("c0_sy", 2, vary=False) + model.add("c0_theta", 0, vary=False) + model.add("components", 1, vary=False) return model def test_elliptical_gaussian(): """Test our elliptical gaussian creation function""" x, y = np.indices((3, 3)) - gauss = fitting.elliptical_gaussian( - x, y, amp=1, xo=0, yo=1, sx=1, sy=1, theta=0) + gauss = fitting.elliptical_gaussian(x, y, amp=1, xo=0, yo=1, sx=1, sy=1, theta=0) if np.any(np.isnan(gauss)): raise AssertionError() gauss = fitting.elliptical_gaussian( - x, y, amp=1, xo=0, yo=1, sx=1, sy=1, theta=np.inf) + x, y, amp=1, xo=0, yo=1, sx=1, sy=1, theta=np.inf + ) if not (np.all(np.isnan(gauss))): raise AssertionError() @@ -57,14 +57,14 @@ def test_hessian_shape(): raise AssertionError() # now add another component - model.add('c1_amp', 1, vary=True) - model.add('c1_xo', 5, vary=True) - model.add('c1_yo', 5, vary=True) - model.add('c1_sx', 2.001, vary=True) - model.add('c1_sy', 2, vary=True) - model.add('c1_theta', 0, vary=True) + model.add("c1_amp", 1, vary=True) + model.add("c1_xo", 5, vary=True) + model.add("c1_yo", 5, vary=True) + model.add("c1_sx", 2.001, vary=True) + model.add("c1_sy", 2, vary=True) + model.add("c1_theta", 0, vary=True) nvar = 9 - model['components'].value = 2 + model["components"].value = 2 Hij = fitting.hessian(model, x, y) if not (Hij.shape == (nvar, nvar, 10, 10)): raise AssertionError() @@ -82,14 +82,14 @@ def test_jacobian_shape(): if not (Jij.shape == (nvar, 10, 10)): raise AssertionError() - model.add('c1_amp', 1, vary=True) - model.add('c1_xo', 5, vary=True) - model.add('c1_yo', 5, vary=True) - model.add('c1_sx', 2.001, vary=True) - model.add('c1_sy', 2, vary=True) - model.add('c1_theta', 0, vary=True) + model.add("c1_amp", 1, vary=True) + model.add("c1_xo", 5, vary=True) + model.add("c1_yo", 5, vary=True) + model.add("c1_sx", 2.001, vary=True) + model.add("c1_sy", 2, vary=True) + model.add("c1_theta", 0, vary=True) nvar = 9 - model['components'].value = 2 + model["components"].value = 2 Jij = fitting.jacobian(model, x, y) if not (Jij.shape == (nvar, 10, 10)): raise AssertionError() @@ -106,14 +106,14 @@ def test_emp_vs_ana_jacobian(): if not (np.max(diff) < 1e-5): raise AssertionError() - model.add('c1_amp', 1, vary=True) - model.add('c1_xo', 5, vary=True) - model.add('c1_yo', 5, vary=True) - model.add('c1_sx', 2.001, vary=True) - model.add('c1_sy', 2, vary=True) - model.add('c1_theta', 0, vary=True) + model.add("c1_amp", 1, vary=True) + model.add("c1_xo", 5, vary=True) + model.add("c1_yo", 5, vary=True) + model.add("c1_sx", 2.001, vary=True) + model.add("c1_sy", 2, vary=True) + model.add("c1_theta", 0, vary=True) - model['components'].value = 2 + model["components"].value = 2 emp_Jij = fitting.emp_jacobian(model, x, y) ana_Jij = fitting.jacobian(model, x, y) diff = np.abs(ana_Jij - emp_Jij) @@ -132,14 +132,14 @@ def test_emp_vs_ana_hessian(): if not (np.max(diff) < 1e-5): raise AssertionError() - model.add('c1_amp', 1, vary=True) - model.add('c1_xo', 5, vary=True) - model.add('c1_yo', 5, vary=True) - model.add('c1_sx', 2.001, vary=True) - model.add('c1_sy', 2, vary=True) - model.add('c1_theta', 0, vary=True) + model.add("c1_amp", 1, vary=True) + model.add("c1_xo", 5, vary=True) + model.add("c1_yo", 5, vary=True) + model.add("c1_sx", 2.001, vary=True) + model.add("c1_sy", 2, vary=True) + model.add("c1_theta", 0, vary=True) - model['components'].value = 2 + model["components"].value = 2 emp_Hij = fitting.emp_hessian(model, x, y) ana_Hij = fitting.hessian(model, x, y) diff = np.abs(ana_Hij - emp_Hij) @@ -147,37 +147,9 @@ def test_emp_vs_ana_hessian(): raise AssertionError() -def test_make_ita(): - """Test make_ita""" - noise = np.random.random((10, 10)) - ita = fitting.make_ita(noise) - if not (ita.shape == (100, 100)): - raise AssertionError() - noise *= np.nan - ita = fitting.make_ita(noise) - if not (len(ita) == 0): - raise AssertionError() - - -def test_RB_bias(): - """Test RB_bias""" - data = np.random.random((4, 4)) - model = make_model() - bias = fitting.RB_bias(data, model) - if not (len(bias) == 3): - raise AssertionError() - - -def test_bias_correct(): - """test that bias_correct does things""" - data = np.random.random((4, 4)) - model = make_model() - fitting.bias_correct(model, data) - - if __name__ == "__main__": # introspect and run all the functions starting with 'test' for f in dir(): - if f.startswith('test'): + if f.startswith("test"): print(f) globals()[f]()