#!/usr/bin/env python
# This file is part of PAPI (PANIC Pipeline)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
################################################################################
#########################################################################
# PANIC data processing
#########################################################################
# Example of nonlinearity correction for full frame MEF files
# Includes many sanity checks, the actual calculation is very simple
# Incorrectable and saturated pixels are set to NaN. Therefore the ouptut
# data is float.
#
# 1.0 14/07/2014 BD Creation (correct_nonlinearity.py)
#
# 1.1 18/07/2014 JMIM Adaption to PAPI (correctNonLinearity.py)
#
# 1.2 08/02/2024 JMIM Update to new PANIC detector H4RG
_name = 'correctNonLinearity.py'
_version = '1.2'
################################################################################
# Import necessary modules
import numpy as np
import astropy.io.fits as fits
import dateutil.parser
import sys
import os
import fileinput
import argparse
import multiprocessing
# PAPI modules
from papi.misc.mef import MEF
from papi.misc.paLog import log
from papi.misc.version import __version__
# If you want to use the new multiprocessing module in Python 2.6 within a class,
# you might run into some problems. Here's a trick how to do a work-around.
[docs]def unwrap_self_applyModel(arg, **kwarg):
return NonLinearityCorrection.applyModel(*arg, **kwarg)
[docs]class NonLinearityCorrection(object):
"""
Class used to correct the Non-linearity of the PANIC detectors based on the
algorithm described by Bernhard Dorner at PANIC-DEC-TN-02_0_1.pdf.
"""
def __init__(self, r_offset, model, input_files, out_dir='/tmp',
suffix='_LC', force=False, coadd_correction=True):
"""
Init the object.
Parameters
----------
r_offset : str
FITS filename of the reference offset (bias) to be used for; it must be
subtracted from the input data before applying the model.
model : str
FITS filename of the Non-Linearity model, ie., containing polynomial
coeffs (4th order) for correction that must has been previously
computed. It must be a cube with 4 planes (a plane for each coeff c4
to c1; c0 (intercept) is not used in the correction and not stored in the cube),
and one single extension for the full detector. Planes definitions:
plane_0 = coeff_4
plane_1 = coeff_3
plane_2 = coeff_2
plane_3 = coeff_1
input_files: list
A list of FITS files to be corrected (non MEF FITS).
out_dir: str
Directory where new corredted files will be saved
suffix: str
Suffix to add to the input filename to generate the output filename.
force: bool
If true, no check of input raw header is done (NCOADDS, DETROT90,
INSTRUME,...)
coadd_correction: bool
If true and NCOADDS>1, divide the data by NCOADDS, apply NLC and then
multiply by NCOADDS.
Returns
-------
outfitsname: list
The list of new corrected FITS files created.
"""
self.input_files = input_files
self.model = model
self.r_offset = r_offset
self.suffix = suffix
self.out_dir = out_dir
self.force = force
self.coadd_correction = coadd_correction
if not os.access(self.out_dir, os.F_OK):
try:
os.mkdir(self.out_dir)
except Exception as e:
raise e
if len(self.input_files)<1:
msg = "Found empty list of input files"
log.error(msg)
raise Exception(msg)
if not os.path.exists(self.model):
msg = "Cannot read non-linearity model file '%s'" % self.model
log.error(msg)
raise Exception(msg)
[docs] def parse_detsec(self, detsec_str):
"""Parses DETSEC string in the format '[x1:x2,y1:y2]' into integer indices."""
try:
detsec_str = detsec_str.strip().replace(" ", "").replace("[", "").replace("]", "")
x_range, y_range = detsec_str.split(",")
x1, x2 = map(int, x_range.split(":"))
y1, y2 = map(int, y_range.split(":"))
# **Convert 1-based indices to 0-based**
x1 -= 1
x2 -= 1
y1 -= 1
y2 -= 1
# Fix for inclusive slicing
x2 += 1 # Include last column
y2 += 1 # Include last row
# print (f"Parsed DETSEC: x1={x1}, x2={x2}, y1={y1}, y2={y2}")
return x1, x2, y1, y2
except Exception as e:
log.error(f"Failed to parse DETSEC: {detsec_str}. Error: {e}")
raise ValueError(f"Invalid DETSEC format: {detsec_str}")
[docs] def applyModel(self, data_file):
"""
Do the Non-linearity correction using the supplied model. In principle,
it should be applied to all raw images (darks, flats, science, ...).
Parameters
----------
data_file: str
input data FITS filename to be corrected.
Returns
-------
outfitsname: str
The list of new corrected files created.
"""
# load raw data file
hdulist = fits.open(data_file)
dataheader = hdulist[0].header
# Check if input files are in SEF format
to_delete = None
if len(hdulist) > 1:
# we do not allow MEF
log.warning("Mismatch in header data format. MEF file not supported")
hdulist.close()
raise ValueError('Mismatch in header data format. Only MEF files allowed.')
# load reference offset file
if self.r_offset:
try:
ref_offset = fits.open(self.r_offset)[0].data
except Exception as e:
log.error("Cannot read reference offset file '%s'" % self.r_offset)
raise e
else:
ref_offset = np.zeros(hdulist[0].data.shape, dtype='float32')
log.warning("No reference offset file provided. Using zero offset.")
# load model
nlhdulist = fits.open(self.model)
nlheader = nlhdulist[0].header
# Check headers
try:
if not self.force:
self.checkHeader(nlheader, dataheader)
except Exception as e:
log.error("Mismatch in header data for input NLC model %s"%str(e))
raise e
# Creates output fits HDU
linhdu = fits.PrimaryHDU()
linhdu.header = dataheader.copy()
hdus = []
# ---
# another way would be to loop until the correct one is found
datadetsec = str(hdulist[0].header['DETSEC']).replace(" ", "") if isinstance(hdulist[0].header['DETSEC'], str) else str(hdulist[0].header['DETSEC'])
nldetsec = str(nlhdulist[0].header['DETSEC']).replace(" ", "") if isinstance(nlhdulist[0].header['DETSEC'], str) else str(nlhdulist[0].header['DETSEC'])
if datadetsec != nldetsec:
log.warning("Mismatch of detector sections")
#raise ValueError('Mismatch of detector sections')
datadetid = str(hdulist[0].header['CHIPID']).strip() if isinstance(hdulist[0].header['CHIPID'], str) else str(hdulist[0].header['CHIPID'])
# nldetid = nlhdulist['LINMAX'].header['CHIPID']
nldetid = str(nlhdulist[0].header['CHIPID']).strip() if isinstance(nlhdulist[0].header['CHIPID'], str) else str(nlhdulist[0].header['CHIPID'])
if datadetid != nldetid:
log.warning("Mismatch of detector IDs")
# raise ValueError('Mismatch of detector IDs')
# Work around to correct data when NCOADDS > 1
if hdulist[0].header['NCOADDS'] > 1:
if self.coadd_correction:
log.info("NCOADDS > 1; Doing ncoadd correction...")
n_coadd = hdulist[0].header['NCOADDS']
else:
log.info("Found a wrong type of source file. Use -c to user ncoadd correction")
raise ValueError('Cannot apply model, found NCOADDS > 1.')
else:
n_coadd = 1
# 2024-02-23: JMIM
# Parche provisional para poder procesar imagenes con NEXP != NCOADD que se crearon "mal"
# en convRaw2CDS.py (ya parcheado tambien)
# 2025-02-23: JMIM: Although this convRaw2CDS is already fixed, it is a good idea to keep it
nexp = hdulist[0].header['NEXP']
if len(hdulist[0].data.shape) > 2:
cube_layers = hdulist[0].data.shape[-1]
else:
cube_layers = 1
if nexp > 1 and n_coadd == 1 and cube_layers == 1:
log.warning("Overwriting n_coadd with nexp. Ensure this is the intended behavior.")
n_coadd = nexp
elif n_coadd == 1:
n_coadd = 1
else:
log.info("Using provided n_coadd value.")
# Fin-del-parche
# load file data (and fix coadded images => coadd_correction)
data = hdulist[0].data / n_coadd
nlmaxs = nlhdulist['LINMAX'].data
nlpolys = np.rollaxis(nlhdulist['LINPOLY'].data, 0, 3)
# Check if data is a subset of the full detector
# (if so, we need to crop the data)
# Extract subsection using DETSEC
try:
x1, x2, y1, y2 = self.parse_detsec(datadetsec)
nlmaxs_subsection = nlmaxs[y1:y2, x1:x2]
nlpolys_subsection = nlpolys[y1:y2, x1:x2, :] # Assuming last dimension is polynomial coefficients
sub_r_offset = ref_offset[y1:y2, x1:x2]
except ValueError:
log.warning("Using full data since DETSEC parsing failed")
nlmaxs_subsection = nlmaxs
nlpolys_subsection = nlpolys
# subtract reference offset taking into account the coadd_correction (repetitions integrated)
log.info("Subtracting reference offset")
log.debug("Mean value of reference offset: %s" % str(np.mean(ref_offset)))
log.debug("Mean value of data: %s" % str(np.mean(data)))
data = data - sub_r_offset*n_coadd
log.debug("Mean value of data after offset: %s" % str(np.mean(data)))
# calculate linear corrected data
lindata = self.polyval_map(nlpolys_subsection, data)
# mask saturated inputs - to use nan it has to be a float array
lindata[data > nlmaxs_subsection] = np.nan
# mask where max range is nan
# (first, take into account the option of cubes as input images)
if len(lindata.shape) == 3:
# we have a 3D image (cube)
for i in range(lindata.shape[0]):
lindata[i, np.isnan(nlmaxs_subsection)] = np.nan
else:
# we have a single 2D image
lindata[np.isnan(nlmaxs_subsection)] = np.nan
# Undo the coadd_correction
lindata = lindata * n_coadd
linhdu.data = lindata.astype('float32')
# add some info in the header
linhdu.header['HISTORY'] = 'Nonlinearity correction applied'
linhdu.header['HISTORY'] = 'Nonlinearity data: %s' %nlheader['ID']
linhdu.header['HISTORY'] = '<-- The PANIC team made this on 2025/05/30'
linhdu.header.set('PAPIVERS', __version__,'PANIC Pipeline version')
linhdulist = fits.HDUList([linhdu])
# Compose output filename
mfnp = os.path.basename(data_file).partition('.fits')
# add suffix before .fits extension, or at the end if no such extension present
outfitsname = self.out_dir + '/' + mfnp[0] + self.suffix + mfnp[1] + mfnp[2]
outfitsname = os.path.normpath(outfitsname)
# overwrite the output file if exists
linhdulist.writeto(outfitsname, overwrite=True)
if to_delete:
os.unlink(to_delete)
return outfitsname
[docs] def runMultiNLC(self):
"""
Run a parallel proceesing of NL-correction for the input files taking
advantege of multi-core CPUs.
Returns
-------
On succes, a list with the filenames of the corrected files.
"""
# use all CPUs available in the computer
n_cpus = multiprocessing.cpu_count()
log.debug("N_CPUS :" + str(n_cpus))
pool = multiprocessing.Pool(processes=n_cpus)
results = []
solved = []
for i_file in self.input_files:
red_parameters = [i_file]
try:
# Instead of pool.map() that blocks until
# the result is ready, we use pool.map_async()
results += [pool.map_async(unwrap_self_applyModel,
zip([self]*len(red_parameters), red_parameters) )]
except Exception as e:
log.error("Error processing file: " + i_file)
log.error(str(e))
for result in results:
try:
result.wait()
# the 0 index is *ONLY* required if map_async is used !!!
solved.append(result.get()[0])
log.info("New file created => %s"%solved[-1])
except Exception as e:
log.error("Cannot process file \n" + str(e))
# Prevents any more tasks from being submitted to the pool.
# Once all the tasks have been completed the worker
# processes will exit.
pool.close()
# Wait for the worker processes to exit. One must call
#close() or terminate() before using join().
pool.join()
log.info("Finished parallel NL-correction")
return solved
[docs] def polyval_map(self, poly, map):
"""
Evaluate individual polynomials on an array. Looping over each pixel
is stupid, therefore we loop over the order and calculate the
polynomial directly.
Note: The output is a float array!
Input
-----
poly : array_like
Polynomial coefficients without constant offset. The order
must be along the last axis.
map : array_like
Data array, that can be a cube of 2D images.
Returns
-------
polymap : array_like
Result of evaluation, same shape as map, dtype float
"""
order = poly.shape[-1]
polymap = map * 0.
for io in range(order):
polymap += poly[Ellipsis, -io-1] * map**(io+1)
return polymap
################################################################################
# main
[docs]def main(arguments=None):
desc = """Performs the non-linearity correction of the PANIC raw data files
using the proper NL-Model (FITS file). Raw data files must be SEF files; if
SEF-cubes, each plane is corrected individually.
"""
parser = argparse.ArgumentParser(description=desc)
# Basic inputs
parser.add_argument("-m", "--model",
action="store", dest="model",
help="FITS SEF (can be a cube) file of polinomial coeffs (c4, c3, c2, c1) of the NL model.")
parser.add_argument("-r", "--reference_offset",
action="store", dest="r_offset",
help="FITS file of reference offset (bias) to be used for previosly to apply the model")
parser.add_argument("-i", "--input_file",
action="store", dest="input_file",
help="FITS file to be corrected.")
parser.add_argument("-s", "--source",
action="store", dest="source_file_list",
help="Source file list of FITS files to be corrected.")
parser.add_argument("-o", "--out_dir", type=str, dest="out_dir",
action="store", default="/tmp",
help="filename of out data file (default: %(default)s)")
parser.add_argument("-S", "--suffix", type=str,
action="store", dest="suffix", default="_NLC",
help="Suffix to use for new corrected files (default: %(default)s)")
parser.add_argument("-f", "--force",
action="store_true", dest="force", default=False,
help="Force Non-linearity correction with no check of header"
"values (NCOADD, DATE-OBS, DETROT90, ...")
parser.add_argument("-c", "--coadd_correction",
action="store_true", dest="coadd_correction", default=True,
help="Force NCOADDS correction and apply NLC")
args = parser.parse_args()
if len(sys.argv[1:]) < 1:
parser.print_help()
sys.exit(0)
# Check required parameters
if ((not args.source_file_list and not args.input_file) or not args.out_dir
or not args.model) or not args.r_offset: # args is the leftover positional arguments after all options have been processed
parser.print_help()
parser.error("incorrect number of arguments ")
if args.input_file and os.path.isfile(args.input_file):
filelist = [args.input_file]
elif args.source_file_list and os.path.isfile(args.source_file_list):
# Read the source file list
filelist = [line.replace("\n", "") for line in fileinput.input(args.source_file_list)]
else:
parser.print_help()
parser.error("incorrect number of arguments " )
NLC = NonLinearityCorrection(args.r_offset, args.model, filelist, args.out_dir,
args.suffix, args.force,
args.coadd_correction)
try:
corr = NLC.runMultiNLC()
except Exception as e:
log.error("Error running NLC: %s"%str(e))
raise e
print("\nCorrected files: ", corr)
"""
# Non parallel processing
for i_file in filelist:
try:
NLC.applyModel(i_file)
except Exception,e:
log.error("Error applying NLC model to file '%s': %s"%(i_file, str(e)))
"""
######################################################################
if __name__ == "__main__":
sys.exit(main())