Source code for reduce.calGainMap

#!/usr/bin/env python

# Copyright (c) 2009-2012 IAA-CSIC  - All rights reserved.
# Author: Jose M. Ibanez.
# Instituto de Astrofisica de Andalucia, IAA-CSIC
#
# This file is part of PAPI (PANIC Pipeline)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.

################################################################################
#
#
# PAPI (PANIC PIpeline)
#
# calGainMap.py
#
# Compute a super sky flat using the dither frames (IRAF implementation)
#
# Created    : 23/09/2010    jmiguel@iaa.es -
# Last update: 23/09/2009    jmiguel@iaa.es -
#              21/10/2009    jmiguel@iaa.es - Add normalization wrt chip 1 and
#                                             support for MEF
#              17/09/2013    jmiguel@iaa.es - prevent zero-division
# TODO:
#  - include bpm
################################################################################

################################################################################
# Import necessary modules

import sys
import os
import tempfile
import argparse

# Interact with FITS files
import astropy.io.fits as fits
import numpy as np
# PAPI
from papi.misc.paLog import log
from papi.datahandler.clfits import ClFits, isaFITS
from papi.reduce.calSuperFlat import SuperSkyFlat
from papi.reduce.calTwFlat import MasterTwilightFlat
from papi.reduce.calDomeFlat import MasterDomeFlat
import papi.misc.robust as robust
from papi.misc.version import __version__


[docs]class SkyGainMap(object): """ Compute the gain map from a list of sky frames """ def __init__(self, filelist, output_filename="/tmp/gainmap.fits", bpm=None, temp_dir="/tmp/"): self.framelist = filelist self.output = output_filename self.bpm = bpm self.temp_dir = temp_dir
[docs] def create(self): """ Creation of the Gain map""" #First, we create the Super Sky Flat try: output_fd, tmp_output_path = tempfile.mkstemp(suffix='.fits') os.close(output_fd) superflat = SuperSkyFlat(self.framelist, tmp_output_path, self.bpm, norm=False, temp_dir=self.temp_dir) superflat.create() except Exception as ex: log.error("Error while creating super sky flat: %s", str(ex)) raise ex # Secondly, we create the proper gainmap try: gainmap = GainMap(tmp_output_path, self.output) gainmap.create() except Exception as ex: log.error("Error while creating gain map: %s", str(ex)) raise ex os.remove(tmp_output_path) return self.output
[docs]class TwlightGainMap(object): """ Compute the gain map from a list of twlight frames """ def __init__(self, flats_filelist, master_dark_list, output_filename="/tmp/gainmap.fits", bpm=None, temp_dir="/tmp/"): self.framelist = flats_filelist self.master_dark_list = master_dark_list self.output = output_filename self.bpm = bpm self.temp_dir = temp_dir
[docs] def create(self): """ Creation of the Gain map""" #First, we create the Twlight Sky Flat try: output_fd, tmp_output_path = tempfile.mkstemp(suffix='.fits') os.close(output_fd) twflat = MasterTwilightFlat(self.framelist, master_dark_model=None, master_dark_list=self.master_dark_list, output_filename=tmp_output_path) twflat.createMaster() except Exception as ex: log.error("Error while creating twlight flat: %s", str(ex)) raise ex # Secondly, we create the gainmap try: gainmap = GainMap(tmp_output_path, self.output) gainmap.create() except Exception as ex: log.error("Error while creating gain map: %s", str(ex)) raise ex # Clean-up os.remove(tmp_output_path) return self.output
[docs]class DomeGainMap(object): """ Compute the gain map from a list of dome (lamp-on,lamp-off) frames """ def __init__(self, filelist, output_filename="/tmp/domeFlat.fits", bpm=None): """ """ self.framelist = filelist self.output = output_filename self.bpm = bpm
[docs] def create(self): """ Creation of the Gain map""" # First, we create the Dome Flat try: output_fd, tmp_output_path = tempfile.mkstemp(suffix='.fits') os.close(output_fd) domeflat = MasterDomeFlat(self.framelist, temp_dir="/tmp", output_filename=tmp_output_path) domeflat.createMaster() except Exception as e: log.error("Error while creating master dome flat: %s", str(e)) raise e # Secondly, we create the proper gainmap try: gainmap = GainMap(tmp_output_path, self.output) gainmap.create() except Exception as ex: log.error("Error while creating gain map: %s", str(ex)) raise ex os.remove(tmp_output_path) return self.output
[docs]class GainMap(object): """ Description ----------- Build a Gain Map from a Flat Field image (dome, twilight, science-sky) Author ------ JMIbanez, IAA-CSIC """ def __init__(self, flatfield, output_filename="/tmp/gainmap.fits", bpm=None, do_normalization=True, mingain=0.5, maxgain=1.5, nxblock=16, nyblock=16, nsigma=5): """ Initialization method Parameters ---------- flatfield: str Master flat field (dome, sky) from which the gainmap will be created. output_filename: str Output filename of the gainmap to build bpm: str (optional) -- NOT USED YET Bad pixel mask to use optionally for the gainmap build do_normalization: bool If true, a normalization will be done; and if the input master flat is a multi-chip frame, the gainmap will be normalized wrt chip1 However, be aware that the master flat might already have the normalization done. mingain : int (0.5) Minimal gain; pixels below this gain value are considered bad and set to 0 maxgain: int (1.5) Maximal gain; pixels above this gain value are considered bad and set to 0. nxblock: int (16) X-size (pixels) of box used to compute local bkg (even) nyblock: int (16) Y-size (pixels) of box used to compute local bkg (even) nsigma: int (10) Number of (+|-) stddev from local bkg to be bad pixel (default=10) """ # Flat-field image (normalized or not, because optionally, normalization # can be done here) self.flat = flatfield self.output_file_dir = os.path.abspath(os.path.join(output_filename, os.pardir)) self.output_filename = output_filename # full filename (path+filename) self.bpm = bpm self.do_norm = do_normalization # Some default parameter values self.m_MINGAIN = mingain # pixels with sensitivity < MINGAIN are assumed bad self.m_MAXGAIN = maxgain # pixels with sensitivity > MAXGAIN are assumed bad self.m_NXBLOCK = nxblock # image size should be multiple of block size self.m_NYBLOCK = nyblock self.m_NSIG = nsigma # badpix if sensitivity > NSIG sigma from local bkg self.m_BPM = bpm # external BadPixelMap to take into account (TODO)
[docs] def create(self): """ Given a NOT normalized flat field, compute the gain map taking into account the input parameters and an optional Bad Pixel Map (bpm). Return ------ If success, return the output filename image of the Gain Map generated, where Bad Pixels = 0.0 """ log.debug("Start creating Gain Map for file: %s", self.flat) if os.path.exists(self.output_filename): os.remove(self.output_filename) # Check if we have a MEF file f = ClFits(self.flat) isMEF = f.mef if not isMEF: nExt = 1 else: nExt = f.next naxis1 = f.naxis1 naxis2 = f.naxis2 offset1 = int(naxis1 * 0.1) offset2 = int(naxis2 * 0.1) nbad = 0 if (f.getInstrument() == 'panic' and naxis1 == 4096 and naxis2 == 4096 and not f.is_panic_h4rg): # i.e., a single extension (GEIRS) full frame is_a_panic_full_frame_h2rg = True else: is_a_panic_full_frame_h2rg = False gain = np.zeros([nExt, naxis1, naxis2], dtype=np.float32) myflat = fits.open(self.flat) # Check if normalization is already done to FF or otherwise it must be # done here if isMEF: extN = 1 else: extN = 0 log.debug("STEP #1# - median computation") rmed = robust.r_nanmedian(myflat[extN].data) log.debug("Median: %s", rmed) if rmed > 10 or rmed < -10: # ############################################################################## # Normalize the flat (if MEF, all extension are normalized wrt extension/chip SG1)# # ############################################################################## self.do_norm = True log.info("Normalization need to be done !") if isMEF: ## chip = 1 # normalize wrt to mode of chip SG1_1 if f.getInstrument() == 'panic': ext_name = 'SG1_1' try: myflat[ext_name].header except KeyError: ext_name = 'Q1' elif f.getInstrument() == 'hawki': ext_name = 'CHIP2.INT1' else: ext_name = 1 # Note that in Numpy, arrays are indexed as rows X columns (y, x), # contrary to FITS standard (NAXIS1=columns, NAXIS2=rows). median = np.median(myflat[ext_name].data[offset2: naxis2 - offset2, offset1: naxis1 - offset1]) msg = "Normalization of MEF master flat frame wrt chip %s. (MEDIAN=%f)" % (ext_name, median) elif is_a_panic_full_frame_h2rg: # It supposed to have a full frame of PANIC (old H2RG) in one single # extension (GEIRS default). Normalize wrt detector SG1_1 median = np.median(myflat[0].data[200: 2048-200, 2048+200: 4096 - 200]) msg = "Normalization of (full) PANIC master flat frame wrt chip 1. (MEDIAN=%f)" % median else: # Not MEF, not PANIC full-frame, but could be a PANIC H4RG or a # subwindow median = np.median(myflat[0].data[offset2: naxis2 - offset2, offset1 : naxis1 - offset1]) msg = "Normalization of master flat frame. (MEDIAN = %d)" % median else: self.do_norm = False median = 1.0 log.info("**No** normalization will be done. Image is already normalized !") for chip in range(0, nExt): log.debug("Operating in Extension %d", chip + 1) if isMEF: flatM = myflat[chip + 1].data else: flatM = myflat[0].data if len(flatM.shape) > 2: msg = "Data cubes are not currently supported; image need to be collapsed." log.error(msg) raise Exception(msg) # To avoid zero-division __epsilon = 1.0e-20 if np.fabs(median) > __epsilon: flatM /= median else: flatM = flatM # Check for bad pixel log.debug("STEP #2# - Search/Select for bad pixels") gain[chip] = np.where((flatM < self.m_MINGAIN) | (flatM > self.m_MAXGAIN), 0.0, flatM) m_bpm = np.where(gain[chip] == 0.0, 1, 0) # bad pixel set to 1 nbad = (m_bpm == 1).sum() log.debug("STEP #3# - Initial number of Bad Pixels : %d ", nbad) # local dev map to find out pixel deviating > NSIGMA from local median dev = np.zeros((naxis1, naxis2), dtype=np.float32) buf = np.zeros((self.m_NXBLOCK, self.m_NYBLOCK), dtype=np.float32) log.debug("STEP #4# - Loop over Blocks") # dev = get_dev(gain, naxis1, naxis2, 0, self.m_NXBLOCK, self.m_NYBLOCK) # Foreach image block for i in range(0, naxis1, self.m_NYBLOCK): for j in range(0, naxis2, self.m_NXBLOCK): box = gain[chip][i: i + self.m_NXBLOCK, j: j + self.m_NYBLOCK] p = np.where(box > 0.0) buf = box[p] if len(buf) > 0: med = np.median(buf) # log.debug("Median=%f"%med) else: # log.warning("Median < 0 !") med = 0.0 dev[i: i + self.m_NXBLOCK, j: j + self.m_NYBLOCK] = \ np.where(box > 0, (box - med), 0) log.debug("STEP #4b# - End Loop over Blocks") """ # original code from gainmap.c # Foreach image block for i in range(0,naxis1, self.m_NYBLOCK): for j in range(0,naxis2, self.m_NXBLOCK ): n=0 for k in range(i, i+self.m_NYBLOCK): # foreach pix in block for l in range(j, j+self.m_NXBLOCK): if gain[chip][k*naxis1+l]>0.0: # if good pixel buf[n] = gain[chip][k*naxis1+l] n+=1 #block median if n>0: med = np.median(buf) else: med = 0.0 for k in range(i, i+self.m_NYBLOCK): # foreach pix in block for l in range(j, j+self.m_NXBLOCK): if gain[chip][k*naxis1+l]>0.0: # if good pixel, subtract median dev[k*naxis1+l] = gain[chip][k*naxis1+l] - med else: dev[k*naxis1+l] = 0.0 # already known badpix """ med = np.median(dev) sig = np.median(np.abs(dev - med)) / 0.6745 lo = med - self.m_NSIG * sig hi = med + self.m_NSIG * sig log.debug("MED=%f LO=%f HI=%f SIGMA=%f", med, lo, hi, sig) # Find more badpix by local dev p = np.where((dev < lo) | (dev > hi)) gain[chip][p] = 0.0 # badpix # gain[chip][p] = np.nan # badpix log.debug("Final number of Bad Pixels = %d", (gain[chip] == 0.0).sum()) # Now, write result in a (MEF/single)-FITS file output = self.output_filename log.debug('Generating output file: %s', output) prihdr = myflat[0].header.copy() prihdr.set('PAPITYPE', 'MASTER_GAINMAP', 'TYPE of PANIC Pipeline generated file') prihdr.set('PAPIVERS', __version__, 'PANIC Pipeline version') prihdr.add_history('Gain map based on %s' % self.flat) if self.do_norm: prihdr.add_history('Normalization wrt chip 0 done.') fo = fits.HDUList() # Add primary header to output file... if isMEF: prihdu = fits.PrimaryHDU(None, prihdr) fo.append(prihdu) # Add each extension for chip in range(0, nExt): hdu = fits.ImageHDU(data=gain[chip].astype('float32'), header=myflat[chip + 1].header) fo.append(hdu) del hdu else: prihdu = fits.PrimaryHDU(gain[0], prihdr) fo.append(prihdu) fo.writeto(output, output_verify='ignore') fo.close(output_verify='ignore') del fo myflat.close() return output
#@jit(nopython=True)
[docs]def get_dev(gain, naxis1, naxis2, chip, nx, ny): dev = np.zeros((naxis1, naxis2), dtype=np.float32) # Foreach image block for i in range(0, naxis1, ny): for j in range(0, naxis2, nx): box = gain[chip][i: i + nx, j: j + ny] p = np.where(box > 0.0) buf = box[p] if len(buf) > 0: med = np.median(buf) # log.debug("Median=%f"%med) else: # log.warning("Median < 0 !") med = 0.0 dev[i: i + nx, j: j + ny] = \ np.where(box > 0, (box - med), 0) return dev
############################################################################# # main # ###########################################################################
[docs]def main(arguments=None): # Get and check command-line options DESC = """Creates a master gain map from a given master flat field (dome, twilight or superflat) optionally normalized. The bad pixels are set to 0""" parser = parser = argparse.ArgumentParser(description=DESC) parser.add_argument("-s", "--source", type=str, action="store", dest="source_file", help="""Flat Field image optionally normalized. It has to be a fullpath file name (required)""") parser.add_argument("-o", "--output", type=str, action="store", dest="output_filename", help="output file to write the Gain Map") # TODO # parser.add_argument("-b", "--bpm", type="str", # action="store", dest="bpm", # help="Input Bad pixel map file (optional)") parser.add_argument("-L", "--low", type=float, default=0.5, action="store", dest="mingain", help="pixel below this gain value are considered bad (default=0.5)") parser.add_argument("-H", "--high", type=float, default=1.5, action="store", dest="maxgain", help="pixel above this gain value are considered bad (default=1.5)") parser.add_argument("-x", "--nx", type=int, default=16, action="store", dest="nxblock", help="X dimen. (pixels) to compute local bkg (even) (default=16)") parser.add_argument("-y", "--ny", type=int, default=16, action="store", dest="nyblock", help="Y dimen. (pixels) to compute local bkg (even) (default=16)") parser.add_argument("-n", "--nsigma", type=int, default=10, action="store", dest="nsigma", help="number of (+|-)stddev from local bkg to be bad pixel (default=10)") parser.add_argument("-N", "--normal", default=True, action="store_true", dest="normal", help="""if true, the input flat-field will be normalized before build the gainmap (default=True)""") args = parser.parse_args() if len(sys.argv[1:]) < 1: parser.print_help() sys.exit(0) # args is the leftover positional arguments after all options have been # processed if not args.source_file or not args.output_filename: parser.print_help() parser.error("incorrect number of arguments ") try: gainmap = GainMap(args.source_file, args.output_filename, bpm=None, do_normalization=args.normal, mingain=args.mingain, maxgain=args.maxgain, nxblock=args.nxblock, nyblock=args.nyblock, nsigma=args.nsigma) gainmap.create() except Exception as ex: log.error("Some kind of problem happened %s" % str(ex))
###################################################################### if __name__ == "__main__": sys.exit(main())