Source code for reduce.calDarkModel

#!/usr/bin/env python

# Copyright (c) 2009-2012 IAA-CSIC  - All rights reserved. 
# Author: Jose M. Ibanez. 
# Instituto de Astrofisica de Andalucia, IAA-CSIC
#
# This file is part of PAPI (PANIC Pipeline)
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
################################################################################
#
# PANICtool
#
# calDarkModel.py
#
# Created    : 17/06/2009    jmiguel@iaa.es
# Last update: 22/06/2009    jmiguel@iaa.es
#              03/03/2010    jmiguel@iaa.es Added READMODE checking
#              18/02/2014    jmiguel@iaa.es Speeded up with numpy.polynomial.polynomial.polyfit 
#                            and added support for MEFs.
#                            Changed structure of planes, p0=bias, p1=dark_curr
#
################################################################################

# Import necessary modules

import argparse
import sys
import os
import fileinput
import time

# PAPI
from papi.misc.paLog import log
from papi.misc.fileUtils import removefiles
from papi.misc.utils import clock
from papi.datahandler.clfits import ClFits
import papi.misc.robust as robust
from papi.misc.version import __version__

# Interact with FITS files
import astropy.io.fits as fits
import numpy


[docs]class MasterDarkModel(object): """ Class used to build and manage a master calibration dark model As input a series of dark exposures with a range of exposure times is given. A linear fit is done at each pixel position of data number versus exposure time. A each pixel position in the output map represents the slope of the fit done at that position and is thus the dark current expressed in units of data numbers per second. Parameters ---------- input_data: list A list of dark files temp_dir: str Directory for temporal files output_filename: str Filename for the master dark obtained bpm: str Input bad pixel mask or NULL (optional) Returns ------- If no error, a fits file (nx*ny) with 2 planes (extensions) plane 0 = bias plane 1 = dark current in DN/sec DARKCURRENT The median dark current in data numbers per second found from the median value of the output dark current map. TODO ---- - Data model for MEF files (PANIC) """ def __init__(self, input_files, temp_dir='/tmp/', output_filename="/tmp/mdarkmodel.fits", bpm=None, show_stats=True): self.__input_files = input_files self.__output_filename = output_filename # full filename (path+filename) self.__bpm = bpm self.show_stats = show_stats
[docs] def createDarkModel(self): """ Create a master DARK model from the dark file list """ log.debug("Start createDarkModel") start_time = time.time() t = clock() t.tic() # Get the user-defined list of dark frames framelist = self.__input_files # STEP 0: Determine the number of darks frames to combine try: nframes = len(framelist) except IndexError: log.error("No DARK frames defined") raise if not os.path.exists( os.path.abspath(os.path.join(self.__output_filename, os.pardir))): raise NameError('Wrong output path') if not self.__output_filename: log.error("Output DARK frame not defined") raise Exception("Wrong output filename") # Change to the source directory base, infile = os.path.split(self.__output_filename) darks = numpy.zeros(nframes, dtype=numpy.int) # STEP 1: Check TYPE(dark), READMODE and read the EXPTIME of each frame # print "FRAMELIST= %s" %framelist i = 0 f_readmode = -1 f_n_extensions = -1 for iframe in framelist: myfits = ClFits(iframe) log.debug("Frame %s EXPTIME= %f TYPE= %s " %(iframe, myfits.exptime, myfits.type)) # Check TYPE (dark) if not myfits.isDark(): log.warning("Warning: Task 'createDarkModel' found a non dark frame. Skipping %s", iframe) darks[i] = 0 else: # Check READMODE if (f_readmode !=-1 and (f_readmode != myfits.getReadMode() )): log.error("Error: Task 'createMasterDark' finished. Found a DARK frame with different READMODE") darks[i] = 0 #continue raise Exception("Found a DARK frame with different READMODE") else: f_readmode = myfits.getReadMode() f_n_extensions = myfits.getNExt() #log.debug("NEXT= %s"%(f_n_extensions)) darks[i] = 1 i = i + 1 log.debug('All frames checked') naxis1 = myfits.naxis1 naxis2 = myfits.naxis2 ndarks = (darks==1).sum() if ndarks < 2: log.error('Dark frameset doesnt have enough frames. At least 2 dark frames are needed') raise Exception("Dark sequence is too short. Al least 2 dark frames are needed !") # Initialize some storage arrays times = numpy.zeros(ndarks, dtype=numpy.float32) temp = numpy.zeros([ndarks, f_n_extensions, naxis1, naxis2], dtype=numpy.float32) out = numpy.zeros([2, f_n_extensions, naxis1, naxis2], dtype=numpy.float32) # Loop the images counter = 0 # counter for the number of darks for i in range(0, nframes): file = fits.open(framelist[i]) f = ClFits(framelist[i]) if darks[i] == 1: # is a good dark for i_ext in range(0, f_n_extensions): if f_n_extensions == 1: temp[counter, 0, :,:] = file[0].data else: log.debug("Found MEF file") temp[counter, i_ext, :,:] = file[i_ext + 1].data _mean = numpy.mean(temp[counter]) _robust_mean = robust.r_nanmean(temp[counter].reshape(naxis1 * naxis2 * f_n_extensions)) _median = robust.r_nanmedian(temp[counter]) _mode = 3 * _median - 2 * _robust_mean if self.show_stats: log.info("Dark frame TEXP=%s , ITIME=%s ,MEAN(not used)=%s , ROBUST_MEDIAN=%s ROBUST_MEAN=%s" % (f.expTime(), f.getItime(), _mean, _median, _robust_mean)) times[counter] = float(f.expTime()) counter = counter + 1 file.close() log.debug("Now fitting the dark model...") # now collapse and fit the data # polyfit returns polynomial coefficients ordered from low to high. # It means, 0-coeff => bias, 1-coeff => dark_current fit = numpy.polynomial.polynomial.polyfit(times, temp.reshape(len(times), naxis1 * naxis2 * f_n_extensions), deg=1) log.debug("Fitting finished, now some stats and save results") # Get the median value of the dark current median_dark_current = robust.r_nanmedian(fit[1]) median_bias = robust.r_nanmedian(fit[0]) log.info("MEDIAN_DARK_CURRENT = %s" % median_dark_current) log.info("MEDIAN BIAS = %s" % median_bias) removefiles( self.__output_filename ) # Write result in a FITS hdulist = fits.HDUList() hdr0 = fits.getheader(framelist[numpy.where(darks==1)[0][0]]) prihdu = fits.PrimaryHDU (data = None, header = None) try: prihdu.header.set('INSTRUME', hdr0['INSTRUME']) prihdu.header.set('TELESCOP', hdr0['TELESCOP']) prihdu.header.set('CAMERA', hdr0['CAMERA']) prihdu.header.set('MJD-OBS', hdr0['MJD-OBS']) prihdu.header.set('DATE-OBS', hdr0['DATE-OBS']) prihdu.header.set('DATE', hdr0['DATE']) prihdu.header.set('UT', hdr0['UT']) prihdu.header.set('LST', hdr0['LST']) prihdu.header.set('ORIGIN', hdr0['ORIGIN']) prihdu.header.set('OBSERVER', hdr0['OBSERVER']) except Exception as e: log.warning("%s" % str(e)) prihdu.header.set('PAPITYPE','MASTER_DARK_MODEL') prihdu.header.set('PAPIVERS', __version__, 'PANIC Pipeline version') prihdu.header.add_history('Dark model based on %s' % framelist) prihdu.header.add_history('Plane 0: bias ; Plane 1: dark current') if f_n_extensions > 1: prihdu.header.set('EXTEND', True, after = 'NAXIS') prihdu.header.set('NEXTEND', f_n_extensions) prihdu.header.set('FILENAME', self.__output_filename) hdulist.append(prihdu) for i_ext in range(0, f_n_extensions): hdu = fits.PrimaryHDU() hdu.data = fit.reshape(2, f_n_extensions, naxis1, naxis2)[:, i_ext, :, :].astype('float32') hdulist.append(hdu) del hdu else: prihdu.data = fit.reshape(2, 1, naxis1, naxis2)[:, 0, :, :].astype('float32') hdulist.append(prihdu) # write FITS try: hdulist.writeto(self.__output_filename) hdulist.close(output_verify='ignore') except Exception as e: log.error("Error writing dark model %s" % self.__output_filename) raise e log.debug('Saved DARK Model to %s' , self.__output_filename) log.debug("createDarkModel' finished %s", t.tac()) return self.__output_filename
################################################################################
[docs]def my_mode(data): """ An easy (efficient and precise ??) way to find out the mode stats of an array (not used) """ counts = {} for x in data.flatten(): counts[x] = counts.get(x,0) + 1 maxcount = max(counts.values()) modelist = [] for x in counts: if counts[x] == maxcount: modelist.append(x) return modelist,maxcount
################################################################################ # main
[docs]def main(arguments=None): usage = "usage: %prog [options]" desc = """ This module receives a series of FITS images (darks) with increasing exposure time and creates the master dark model and computes several statistics. """ parser = argparse.ArgumentParser(description=desc) parser.add_argument("-s", "--source", action="store", dest="source_file_list", help="Source file listing the filenames of dark frames.") parser.add_argument("-o", "--output", action="store", dest="output_filename", help="final coadded output image") parser.add_argument("-S", "--show_stats", action="store_true", dest="show_stats", default=False, help="Show frame stats [default False]") options = parser.parse_args() if len(sys.argv[1:])<1: parser.print_help() sys.exit(0) if not options.source_file_list or not options.output_filename: parser.print_help() parser.error("Incorrect number of arguments ") if os.path.isdir(options.source_file_list) or \ os.path.isdir(options.output_filename): parser.print_help() parser.error("Source and output must be a file, not a directory") filelist = [line.replace( "\n", "") for line in fileinput.input(options.source_file_list)] try: mDark = MasterDarkModel(filelist, "/tmp", options.output_filename, options.show_stats) mDark.createDarkModel() except Exception as e: log.error("Error computing dark model: %s"%str(e)) sys.exit(0)
###################################################################### if __name__ == "__main__": sys.exit(main())