#!/usr/bin/env python
# Copyright (c) 2009-2012 IAA-CSIC - All rights reserved.
# Author: Jose M. Ibanez.
# Instituto de Astrofisica de Andalucia, IAA-CSIC
# This file is part of PAPI (PANIC Pipeline)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
# PAPI (PANIC PIpeline)
# calGainMap.py
# Compute a super sky flat using the dither frames (IRAF implementation)
# Created : 23/09/2010 jmiguel@iaa.es -
# Last update: 23/09/2009 jmiguel@iaa.es -
# 21/10/2009 jmiguel@iaa.es - Add normalization wrt chip 1 and
# support for MEF
# 17/09/2013 jmiguel@iaa.es - prevent zero-division
# - include bpm
# Import necessary modules
import sys
import os
import tempfile
import argparse
# Interact with FITS files
import astropy.io.fits as fits
import numpy as np
from papi.misc.paLog import log
from papi.datahandler.clfits import ClFits, isaFITS
from papi.reduce.calSuperFlat import SuperSkyFlat
from papi.reduce.calTwFlat import MasterTwilightFlat
from papi.reduce.calDomeFlat import MasterDomeFlat
import papi.misc.robust as robust
from papi.misc.version import __version__
[docs]class SkyGainMap(object):
""" Compute the gain map from a list of sky frames """
def __init__(self, filelist, output_filename="/tmp/gainmap.fits",
bpm=None, temp_dir="/tmp/"):
self.framelist = filelist
self.output = output_filename
self.bpm = bpm
self.temp_dir = temp_dir
[docs] def create(self):
""" Creation of the Gain map"""
#First, we create the Super Sky Flat
output_fd, tmp_output_path = tempfile.mkstemp(suffix='.fits')
superflat = SuperSkyFlat(self.framelist,
self.bpm, norm=False,
except Exception as ex:
log.error("Error while creating super sky flat: %s", str(ex))
raise ex
# Secondly, we create the proper gainmap
gainmap = GainMap(tmp_output_path, self.output)
except Exception as ex:
log.error("Error while creating gain map: %s", str(ex))
raise ex
return self.output
[docs]class TwlightGainMap(object):
""" Compute the gain map from a list of twlight frames """
def __init__(self, flats_filelist, master_dark_list,
bpm=None, temp_dir="/tmp/"):
self.framelist = flats_filelist
self.master_dark_list = master_dark_list
self.output = output_filename
self.bpm = bpm
self.temp_dir = temp_dir
[docs] def create(self):
""" Creation of the Gain map"""
#First, we create the Twlight Sky Flat
output_fd, tmp_output_path = tempfile.mkstemp(suffix='.fits')
twflat = MasterTwilightFlat(self.framelist,
except Exception as ex:
log.error("Error while creating twlight flat: %s", str(ex))
raise ex
# Secondly, we create the gainmap
gainmap = GainMap(tmp_output_path, self.output)
except Exception as ex:
log.error("Error while creating gain map: %s", str(ex))
raise ex
# Clean-up
return self.output
[docs]class DomeGainMap(object):
""" Compute the gain map from a list of dome (lamp-on,lamp-off) frames """
def __init__(self, filelist, output_filename="/tmp/domeFlat.fits", bpm=None):
self.framelist = filelist
self.output = output_filename
self.bpm = bpm
[docs] def create(self):
""" Creation of the Gain map"""
# First, we create the Dome Flat
output_fd, tmp_output_path = tempfile.mkstemp(suffix='.fits')
domeflat = MasterDomeFlat(self.framelist,
except Exception as e:
log.error("Error while creating master dome flat: %s", str(e))
raise e
# Secondly, we create the proper gainmap
gainmap = GainMap(tmp_output_path, self.output)
except Exception as ex:
log.error("Error while creating gain map: %s", str(ex))
raise ex
return self.output
[docs]class GainMap(object):
Build a Gain Map from a Flat Field image (dome, twilight, science-sky)
def __init__(self, flatfield, output_filename="/tmp/gainmap.fits",
bpm=None, do_normalization=True, mingain=0.5, maxgain=1.5,
nxblock=16, nyblock=16, nsigma=5):
Initialization method
flatfield: str
Master flat field (dome, sky) from which the gainmap will be created.
output_filename: str
Output filename of the gainmap to build
bpm: str (optional) -- NOT USED YET
Bad pixel mask to use optionally for the gainmap build
do_normalization: bool
If true, a normalization will be done; and if the input master
flat is a multi-chip frame, the gainmap will be normalized wrt chip1
However, be aware that the master flat might already have the
normalization done.
mingain : int (0.5)
Minimal gain; pixels below this gain value are considered bad and
set to 0
maxgain: int (1.5)
Maximal gain; pixels above this gain value are considered bad and
set to 0.
nxblock: int (16)
X-size (pixels) of box used to compute local bkg (even)
nyblock: int (16)
Y-size (pixels) of box used to compute local bkg (even)
nsigma: int (10)
Number of (+|-) stddev from local bkg to be bad pixel (default=10)
# Flat-field image (normalized or not, because optionally, normalization
# can be done here)
self.flat = flatfield
self.output_file_dir = os.path.abspath(os.path.join(output_filename, os.pardir))
self.output_filename = output_filename # full filename (path+filename)
self.bpm = bpm
self.do_norm = do_normalization
# Some default parameter values
self.m_MINGAIN = mingain # pixels with sensitivity < MINGAIN are assumed bad
self.m_MAXGAIN = maxgain # pixels with sensitivity > MAXGAIN are assumed bad
self.m_NXBLOCK = nxblock # image size should be multiple of block size
self.m_NYBLOCK = nyblock
self.m_NSIG = nsigma # badpix if sensitivity > NSIG sigma from local bkg
self.m_BPM = bpm # external BadPixelMap to take into account (TODO)
[docs] def create(self):
Given a NOT normalized flat field, compute the gain map taking
into account the input parameters and an optional Bad Pixel Map (bpm).
If success, return the output filename image of the Gain Map generated,
where Bad Pixels = 0.0
log.debug("Start creating Gain Map for file: %s", self.flat)
if os.path.exists(self.output_filename):
# Check if we have a MEF file
f = ClFits(self.flat)
isMEF = f.mef
if not isMEF:
nExt = 1
nExt = f.next
naxis1 = f.naxis1
naxis2 = f.naxis2
offset1 = int(naxis1 * 0.1)
offset2 = int(naxis2 * 0.1)
nbad = 0
if (f.getInstrument() == 'panic' and naxis1 == 4096 and naxis2 == 4096
and not f.is_panic_h4rg):
# i.e., a single extension (GEIRS) full frame
is_a_panic_full_frame_h2rg = True
is_a_panic_full_frame_h2rg = False
gain = np.zeros([nExt, naxis1, naxis2], dtype=np.float32)
myflat = fits.open(self.flat)
# Check if normalization is already done to FF or otherwise it must be
# done here
if isMEF:
extN = 1
extN = 0
log.debug("STEP #1# - median computation")
rmed = robust.r_nanmedian(myflat[extN].data)
log.debug("Median: %s", rmed)
if rmed > 10 or rmed < -10:
# ##############################################################################
# Normalize the flat (if MEF, all extension are normalized wrt extension/chip SG1)#
# ##############################################################################
self.do_norm = True
log.info("Normalization need to be done !")
if isMEF:
## chip = 1 # normalize wrt to mode of chip SG1_1
if f.getInstrument() == 'panic':
ext_name = 'SG1_1'
except KeyError:
ext_name = 'Q1'
elif f.getInstrument() == 'hawki':
ext_name = 'CHIP2.INT1'
ext_name = 1
# Note that in Numpy, arrays are indexed as rows X columns (y, x),
# contrary to FITS standard (NAXIS1=columns, NAXIS2=rows).
median = np.median(myflat[ext_name].data[offset2: naxis2 - offset2,
offset1: naxis1 - offset1])
msg = "Normalization of MEF master flat frame wrt chip %s. (MEDIAN=%f)" % (ext_name, median)
elif is_a_panic_full_frame_h2rg:
# It supposed to have a full frame of PANIC (old H2RG) in one single
# extension (GEIRS default). Normalize wrt detector SG1_1
median = np.median(myflat[0].data[200: 2048-200, 2048+200: 4096 - 200])
msg = "Normalization of (full) PANIC master flat frame wrt chip 1. (MEDIAN=%f)" % median
# Not MEF, not PANIC full-frame, but could be a PANIC H4RG or a
# subwindow
median = np.median(myflat[0].data[offset2: naxis2 - offset2,
offset1 : naxis1 - offset1])
msg = "Normalization of master flat frame. (MEDIAN = %d)" % median
self.do_norm = False
median = 1.0
log.info("**No** normalization will be done. Image is already normalized !")
for chip in range(0, nExt):
log.debug("Operating in Extension %d", chip + 1)
if isMEF:
flatM = myflat[chip + 1].data
flatM = myflat[0].data
if len(flatM.shape) > 2:
msg = "Data cubes are not currently supported; image need to be collapsed."
raise Exception(msg)
# To avoid zero-division
__epsilon = 1.0e-20
if np.fabs(median) > __epsilon:
flatM /= median
flatM = flatM
# Check for bad pixel
log.debug("STEP #2# - Search/Select for bad pixels")
gain[chip] = np.where((flatM < self.m_MINGAIN) | (flatM > self.m_MAXGAIN), 0.0, flatM)
m_bpm = np.where(gain[chip] == 0.0, 1, 0) # bad pixel set to 1
nbad = (m_bpm == 1).sum()
log.debug("STEP #3# - Initial number of Bad Pixels : %d ", nbad)
# local dev map to find out pixel deviating > NSIGMA from local median
dev = np.zeros((naxis1, naxis2), dtype=np.float32)
buf = np.zeros((self.m_NXBLOCK, self.m_NYBLOCK), dtype=np.float32)
log.debug("STEP #4# - Loop over Blocks")
# dev = get_dev(gain, naxis1, naxis2, 0, self.m_NXBLOCK, self.m_NYBLOCK)
# Foreach image block
for i in range(0, naxis1, self.m_NYBLOCK):
for j in range(0, naxis2, self.m_NXBLOCK):
box = gain[chip][i: i + self.m_NXBLOCK, j: j + self.m_NYBLOCK]
p = np.where(box > 0.0)
buf = box[p]
if len(buf) > 0:
med = np.median(buf)
# log.debug("Median=%f"%med)
# log.warning("Median < 0 !")
med = 0.0
dev[i: i + self.m_NXBLOCK, j: j + self.m_NYBLOCK] = \
np.where(box > 0, (box - med), 0)
log.debug("STEP #4b# - End Loop over Blocks")
# original code from gainmap.c
# Foreach image block
for i in range(0,naxis1, self.m_NYBLOCK):
for j in range(0,naxis2, self.m_NXBLOCK ):
for k in range(i, i+self.m_NYBLOCK): # foreach pix in block
for l in range(j, j+self.m_NXBLOCK):
if gain[chip][k*naxis1+l]>0.0: # if good pixel
buf[n] = gain[chip][k*naxis1+l]
#block median
if n>0: med = np.median(buf)
else: med = 0.0
for k in range(i, i+self.m_NYBLOCK): # foreach pix in block
for l in range(j, j+self.m_NXBLOCK):
if gain[chip][k*naxis1+l]>0.0: # if good pixel, subtract median
dev[k*naxis1+l] = gain[chip][k*naxis1+l] - med
dev[k*naxis1+l] = 0.0 # already known badpix
med = np.median(dev)
sig = np.median(np.abs(dev - med)) / 0.6745
lo = med - self.m_NSIG * sig
hi = med + self.m_NSIG * sig
log.debug("MED=%f LO=%f HI=%f SIGMA=%f", med, lo, hi, sig)
# Find more badpix by local dev
p = np.where((dev < lo) | (dev > hi))
gain[chip][p] = 0.0 # badpix
# gain[chip][p] = np.nan # badpix
log.debug("Final number of Bad Pixels = %d", (gain[chip] == 0.0).sum())
# Now, write result in a (MEF/single)-FITS file
output = self.output_filename
log.debug('Generating output file: %s', output)
prihdr = myflat[0].header.copy()
prihdr.set('PAPITYPE', 'MASTER_GAINMAP', 'TYPE of PANIC Pipeline generated file')
prihdr.set('PAPIVERS', __version__, 'PANIC Pipeline version')
prihdr.add_history('Gain map based on %s' % self.flat)
if self.do_norm:
prihdr.add_history('Normalization wrt chip 0 done.')
fo = fits.HDUList()
# Add primary header to output file...
if isMEF:
prihdu = fits.PrimaryHDU(None, prihdr)
# Add each extension
for chip in range(0, nExt):
hdu = fits.ImageHDU(data=gain[chip].astype('float32'), header=myflat[chip + 1].header)
del hdu
prihdu = fits.PrimaryHDU(gain[0], prihdr)
fo.writeto(output, output_verify='ignore')
del fo
return output
[docs]def get_dev(gain, naxis1, naxis2, chip, nx, ny):
dev = np.zeros((naxis1, naxis2), dtype=np.float32)
# Foreach image block
for i in range(0, naxis1, ny):
for j in range(0, naxis2, nx):
box = gain[chip][i: i + nx, j: j + ny]
p = np.where(box > 0.0)
buf = box[p]
if len(buf) > 0:
med = np.median(buf)
# log.debug("Median=%f"%med)
# log.warning("Median < 0 !")
med = 0.0
dev[i: i + nx, j: j + ny] = \
np.where(box > 0, (box - med), 0)
return dev
# main
# ###########################################################################
[docs]def main(arguments=None):
# Get and check command-line options
DESC = """Creates a master gain map from a given master flat field (dome,
twilight or superflat) optionally normalized. The bad pixels are set to 0"""
parser = parser = argparse.ArgumentParser(description=DESC)
parser.add_argument("-s", "--source", type=str,
action="store", dest="source_file",
help="""Flat Field image optionally normalized. It has to be
a fullpath file name (required)""")
parser.add_argument("-o", "--output", type=str,
action="store", dest="output_filename",
help="output file to write the Gain Map")
# parser.add_argument("-b", "--bpm", type="str",
# action="store", dest="bpm",
# help="Input Bad pixel map file (optional)")
parser.add_argument("-L", "--low", type=float, default=0.5,
action="store", dest="mingain",
help="pixel below this gain value are considered bad (default=0.5)")
parser.add_argument("-H", "--high", type=float, default=1.5,
action="store", dest="maxgain",
help="pixel above this gain value are considered bad (default=1.5)")
parser.add_argument("-x", "--nx", type=int, default=16,
action="store", dest="nxblock",
help="X dimen. (pixels) to compute local bkg (even) (default=16)")
parser.add_argument("-y", "--ny", type=int, default=16,
action="store", dest="nyblock",
help="Y dimen. (pixels) to compute local bkg (even) (default=16)")
parser.add_argument("-n", "--nsigma", type=int, default=10,
action="store", dest="nsigma",
help="number of (+|-)stddev from local bkg to be bad pixel (default=10)")
parser.add_argument("-N", "--normal", default=True,
action="store_true", dest="normal",
help="""if true, the input flat-field will be normalized
before build the gainmap (default=True)""")
args = parser.parse_args()
if len(sys.argv[1:]) < 1:
# args is the leftover positional arguments after all options have been
# processed
if not args.source_file or not args.output_filename:
parser.error("incorrect number of arguments ")
gainmap = GainMap(args.source_file, args.output_filename, bpm=None,
mingain=args.mingain, maxgain=args.maxgain,
nxblock=args.nxblock, nyblock=args.nyblock,
except Exception as ex:
log.error("Some kind of problem happened %s" % str(ex))
if __name__ == "__main__":