astropy:docs

Source code for delphinus.phottable

#!/usr/bin/env python
# encoding: utf-8
"""
Class for working with Dolphot output data; particularly converting text
output into HDF5 files and providing nice hooks into those HDF5 tables.
"""

import os
import numpy as np
import numpy.lib.recfunctions as recf

try:
    from astropy.wcs import WCS
except ImportError:
    from pywcs import WCS
try:
    from astropy.io.fits import getheader
except ImportError:
    from pyfits import getheader
from astropy.io import ascii
from astropy.table import Table

import tables

import lfdiagnostics


[docs]class BasePhotReader(object): """Base class for reading Dolphot photometry output files.""" N_IMAGE_COLS = 17 N_GLOBAL_COLS = 11 GLOBAL_COL_OFFSET = 0 def __init__(self, filepath, nImages, refImagePath=None): super(BasePhotReader, self).__init__() self.filepath = filepath self.nImages = nImages self.referenceImagePath = refImagePath self._read()
[docs] def extract_additional_columns(self, data, nImages, nStars): """User hook for extracting additional columns from the photometry output file. """ pass
def _read(self): """Pipeline for reading DOLPHOT photometry output.""" data = np.loadtxt(self.filepath) nStars = data.shape[0] self._extract_global_cols(data, self.GLOBAL_COL_OFFSET, nStars) self._extract_image_phot_cols(data, self.GLOBAL_COL_OFFSET + self.N_GLOBAL_COLS, self.nImages, nStars) self.extract_additional_columns(data, self.nImages, nStars) # hook self._fill_radec() self.combine_structured_array() def _extract_image_phot_cols(self, data, offset, nImages, nStars): """Extract output for image-specific photometry columns.""" dt = [('counts', np.float, self.nImages), ('sky', np.float, self.nImages), ('norm_count_rate', np.float, self.nImages), ('norm_count_rate_err', np.float, self.nImages), ('mag', np.float, self.nImages), ('mag_err', np.float, self.nImages), ('chi', np.float, self.nImages), ('sn', np.float, self.nImages), ('sharp', np.float, self.nImages), ('round', np.float, self.nImages), ('crowding', np.float, self.nImages), ('fwhm', np.float, self.nImages), ('ecc', np.float, self.nImages), ('psf_a', np.float, self.nImages), ('psf_b', np.float, self.nImages), ('psf_c', np.float, self.nImages), ('quality', np.float, self.nImages)] self._idata = np.empty(nStars, dtype=np.dtype(dt)) for i in range(self.nImages): for j in range(self.N_IMAGE_COLS): k = offset + i * self.N_IMAGE_COLS + j colname = dt[j][0] # FIXME must be a better way to cast this! # temp fix for nimages=1 shape if nImages > 1: self._idata[colname][:, i] = data[:, k] else: self._idata[colname] = data[:, k] def _extract_global_cols(self, data, offset, nStars): """Extract output for global image columns.""" dt = [('ext', np.int), ('chip', np.int), ('x', np.float), ('y', np.float), ('ra', np.float), ('dec', np.float), ('ref_chi', np.float), ('ref_sn', np.float), ('ref_sharp', np.float), ('ref_round', np.float), ('major_ax', np.int), ('ref_crowding', np.float), ('type', np.int)] self._gdata = np.empty(nStars, dtype=np.dtype(dt)) for i in range(self.N_GLOBAL_COLS): j = i + offset colname = dt[i][0] self._gdata[colname] = data[:, j] def _fill_radec(self): """Compute RA/Dec columns, if a reference image is specified.""" if self.referenceImagePath is not None: refHead = getheader(self.referenceImagePath) wcs = WCS(refHead) ra, dec = wcs.all_pix2sky(self._gdata['x'], self._gdata['y'], 1) self._gdata['ra'] = ra self._gdata['dec'] = dec else: self._gdata['ra'][:] = np.nan self._gdata['dec'][:] = np.nan
[docs] def combine_structured_array(self): """docstring for combine_structured_array""" arrays = [self._gdata, self._idata] self.data = recf.merge_arrays(arrays, flatten=True, usemask=False)
[docs]class FakeReader(BasePhotReader): """Read Dolphot's .fake artificial star output.""" N_FAKE_GLOBAL_COLS = 4 N_FAKE_IMAGE_COLS = 2 def __init__(self, filepath, nImages, refImagePath=None): self.GLOBAL_COL_OFFSET = self.N_FAKE_GLOBAL_COLS \ + nImages * self.N_FAKE_IMAGE_COLS super(FakeReader, self).__init__(filepath, nImages, refImagePath=refImagePath) def __add__(self, other): """Return a concatenated FakeReader (concatenates data).""" self.data = np.concatenate((self.data, other.data)) return self
[docs] def extract_additional_columns(self, data, nImages, nStars): """Reads additional columns for .fake output.""" self._extract_fake_global_cols(data, nStars) self._extract_fake_phot_cols(data, nImages, nStars)
def _extract_fake_global_cols(self, data, nStars): """Extract global columns at beginning of .fake files.""" dt = [('fake_ext', np.int), ('fake_chip', np.int), ('fake_x', np.float), ('fake_y', np.float), ('fake_ra', np.float), ('fake_dec', np.float)] self._fgdata = np.empty(nStars, dtype=np.dtype(dt)) for i in range(self.N_FAKE_GLOBAL_COLS): colname = dt[i][0] self._fgdata[colname] = data[:, i] self._fake_fill_radec() def _extract_fake_phot_cols(self, data, nImages, nStars): """Extract input photometry columns at beginning of .fake files for each image. """ dt = [('fake_count', np.float, nImages), ('fake_mag', np.float, nImages)] self._fidata = np.empty(nStars, dtype=np.dtype(dt)) for i in range(self.nImages): for j in range(self.N_FAKE_IMAGE_COLS): k = self.N_FAKE_GLOBAL_COLS + i * self.N_FAKE_IMAGE_COLS + j colname = dt[j][0] # FIXME fix for nimages=1 shape if nImages > 1: self._fidata[colname][:, i] = data[:, k] else: self._fidata[colname] = data[:, k] # self._fidata[colname][:, i] = data[:, k] def _fake_fill_radec(self): """Compute RA/Dec columns, if a reference image is specified.""" # TODO refactor this code against _fill_radec() if self.referenceImagePath is not None: refHead = getheader(self.referenceImagePath) wcs = WCS(refHead) ra, dec = wcs.all_pix2sky(self._gdata['x'], self._gdata['y'], 1) self._fgdata['fake_ra'] = ra self._fgdata['fake_dec'] = dec else: self._fgdata['fake_ra'][:] = np.nan self._fgdata['fake_dec'][:] = np.nan
[docs] def combine_structured_array(self): """docstring for combine_structured_array""" arrays = [self._fgdata, self._fidata, self._gdata, self._idata] self.data = recf.merge_arrays(arrays, flatten=True, usemask=False)
[docs] def mag_errors(self): """Compute output-input magnitude difference for AST. Returns ------- An nImage by nStars array of output-input differences. """ diffs = self.data['mag'] - self.data['fake_mag'] return diffs
[docs] def position_errors(self, magIndex=0): """Prototype for computing position errors for AST as the Euclidean distance between input and output (x,y) coordinates. .. todo:: Should not return fakeMag .. todo:: Should cache results """ if self.nImages > 1: fakeMag = self.data['fake_mag'][:, magIndex] else: fakeMag = self.data['fake_mag'] inputX = self.data['fake_x'] inputY = self.data['fake_y'] obsX = self.data['x'] obsY = self.data['y'] dx = np.hypot(inputX - obsX, inputY - obsY) return fakeMag, dx
[docs] def completeness(self, n, mag_err_lim=None, dx_lim=None, frac=0.5, dmag=0.1): """Returns magnitude vs completeness fraction for the given image. Parameters ---------- n : int Index of image to compute completeness limit for. mag_err_lim : float Maximum absolute difference in magnitudes, in any band, for the star to be considered recovered. dx_lim : float Maximum distance between a fake star's input site and its observed site for the fake star to be considered recovered. frac : float Scalar fractional level of completeness. For example, 0.5 is the 50% completeness limit. dmag : float Bin width (magnitudes) in histogram when establishing completeness per bin. """ recovered = self.recovered_in_image(n, mag_err_lim=mag_err_lim, dx_lim=dx_lim) if self.nImages > 1: fakeMag = self.data['fake_mag'][:, n] else: fakeMag = self.data['fake_mag'] bins = np.arange(fakeMag.min(), fakeMag.max(), dmag) inds = np.digitize(fakeMag, bins) rec = np.bincount(inds, weights=recovered, minlength=None) tot = np.bincount(inds, weights=None, minlength=None) comp = rec / tot return bins, comp[1:]
[docs] def completeness_limits(self, mag_err_lim=None, dx_lim=None, frac=0.5, dmag=0.1): """Compute the completeness limit against each image. Returns a list of completeness limits corresponding to each image. Parameters ---------- mag_err_lim : float Maximum absolute difference in magnitudes, in any band, for the star to be considered recovered. dx_lim : float Maximum distance between a fake star's input site and its observed site for the fake star to be considered recovered. frac : float Scalar fractional level of completeness. For example, 0.5 is the 50% completeness limit. dmag : float Bin width (magnitudes) in histogram when establishing completeness per bin. """ comps = [] for n in xrange(self.nImages): c = self.completeness_limit_for_image(n, mag_err_lim=mag_err_lim, dx_lim=dx_lim, frac=frac, dmag=dmag) comps.append(c) return comps
[docs] def completeness_limit_for_image(self, n, mag_err_lim=None, dx_lim=None, frac=0.5, dmag=0.1): """Compute the completeness limit against each a single image. Parameters ---------- n : int Index of image to compute completeness limit for. mag_err_lim : float Maximum absolute difference in magnitudes, in any band, for the star to be considered recovered. dx_lim : float Maximum distance between a fake star's input site and its observed site for the fake star to be considered recovered. frac : float Scalar fractional level of completeness. For example, 0.5 is the 50% completeness limit. dmag : float Bin width (magnitudes) in histogram when establishing completeness per bin. """ bins, comp = self.completeness(n, mag_err_lim=mag_err_lim, dx_lim=dx_lim, dmag=dmag) # Solve where completeness crosses the threshold # TODO this could be made a lot smarter (i.e., ensure completeess # is declining; interpolate between bins; smooth) msk = np.where(np.isfinite(comp) == True)[0] i = np.argmin((comp[msk] - frac) ** 2.) return bins[i]
[docs] def recovered(self, mag_err_lim=None, dx_lim=None): """Generates a boolean array indicating if each star is recovered or not. This effectively is a boolean AND of results from :meth:`recovered_in_image`. A star is recovered if: 1. Recovered magnitude error in any band is less than `mag_err_limit`. 2. Recovered position is within `dx_lim` pixels of the artificial star. and if DOLPHOT observes a star at all at the artificial star's site. Parameters ---------- mag_err_lim : float Maximum absolute difference in magnitudes, in any band, for the star to be considered recovered. dx_lim : float Maximum distance between a fake star's input site and its observed site for the fake star to be considered recovered. """ if self.nImages > 1: recoveryArrays = [self.recovered_in_image(0, mag_err_lim=mag_err_lim, dx_lim=dx_lim) for i in xrange(self.nImages)] rec = recoveryArrays[0] for r in recoveryArrays[1:]: rec = rec & r return rec else: return self.recovered_in_images(0, mag_err_lim=mag_err_lim, dx_lim=dx_lim)
[docs] def recovered_in_image(self, n, mag_err_lim=None, dx_lim=None): """Generates a boolean array indicating if each star is recovered in the given image (`n`) or not. A star is recovered if: 1. Recovered magnitude error in any band is less than `mag_err_limit`. 2. Recovered position is within `dx_lim` pixels of the artificial star. and if DOLPHOT observes a star at all at the artificial star's site. Parameters ---------- n : int Index of image. mag_err_lim : float Maximum absolute difference in magnitudes, in any band, for the star to be considered recovered. dx_lim : float Maximum distance between a fake star's input site and its observed site for the fake star to be considered recovered. """ if dx_lim is not None: k, dx = self.position_errors() if self.nImages > 1: fakeMag = self.data['fake_mag'][:, n] obsMag = self.data['mag'][:, n] else: fakeMag = self.data['fake_mag'] obsMag = self.data['mag'] recovered = np.ones(self.data.shape[0], dtype=np.bool) if dx_lim is not None: recovered[dx > dx_lim] = 0 if mag_err_lim is not None: magErr = np.sqrt((fakeMag - obsMag) ** 2.) recovered[magErr > mag_err_lim] = 0 return recovered
[docs] def export_for_starfish(self, output_path): """Export artificial star test data for the StarFISH `synth` command. Parameters ---------- output_path : str Path where crowding table will be written. """ dt = [('ra', np.float), ('dec', np.float)] for i in xrange(self.nImages): dt.append(('mag_%i' % i, np.float)) dt.append(('dmag_%i' % i, np.float)) d = np.empty(self.data.shape[0], dtype=np.dtype(dt)) d['ra'][:] = self.data['fake_ra'][:] d['dec'][:] = self.data['fake_dec'][:] for i in xrange(self.nImages): mtag = 'mag_%i' % i dtag = 'dmag_%i' % i d[mtag][:] = self.data['mag'][:, i] d[dtag][:] = self.data['fake_mag'][:, i] - self.data['mag'][:, i] # Find obvious drop-outs dropout = np.where(np.abs(d[dtag]) > 10.)[0] d[dtag][dropout] = 9.99 # label for StarFISH dirname = os.path.dirname(output_path) if not os.path.exists(dirname): os.makedirs(dirname) t = Table(d) t.write(output_path, format='ascii.no_header', delimiter=' ')
[docs] def metrics(self, magRange, n, magErrLim=None, dxLim=None): """Makes scalar metrics of artificial stars in an image. For each image, results a tuple (RMS mag error, completeness fraction). """ if dxLim is not None: k, dx = self.position_errors() if self.nImages > 1: fakeMag = self.data['fake_mag'][:, n] obsMag = self.data['mag'][:, n] else: fakeMag = self.data['fake_mag'] obsMag = self.data['mag'] err = np.abs(fakeMag - obsMag) # Dolphot gives unrecovered stars a magnitude of 99. This should # safely distinguish those stars. recovered = obsMag < 50. if magErrLim is not None: recovered = recovered & (err < magErrLim) if dxLim is not None: recovered = recovered & (dx < dxLim) recovered = np.array(recovered, dtype=np.float) # Find stars in magnitude range minMask = fakeMag > min(magRange) maxMask = fakeMag < max(magRange) found = obsMag < 50. inds = np.where(minMask & maxMask)[0] indsMeasured = np.where(minMask & maxMask & found)[0] comp = float(np.sum(recovered[inds]) / float(len(inds))) rms = float(np.std(err[indsMeasured])) return rms, comp
[docs]class DolphotTable(object): """Represents the output from Dolphot in an HDF5 table.""" def __init__(self, hdfPath): super(DolphotTable, self).__init__() self.hdfPath = hdfPath self._open_hdf() def _open_hdf(self): self.hdf = tables.openFile(self.hdfPath) self.photTable = self.hdf.root.phot @classmethod
[docs] def make(cls, tablePath, images, referenceImage, photPath, psfsPath, apcorPath, execTime=None): """Initialize a DolphotTable using data from the Dolphot class.""" # Read phot file nImages = len(images) if referenceImage is not None: refPath = referenceImage['path'] else: # use the first image as a reference instead refPath = images[0]['path'] reader = BasePhotReader(photPath, nImages, refImagePath=refPath) # Insert the structured numpy array into a new HDF5 table title = os.path.splitext(os.path.basename(tablePath))[0] if os.path.exists(tablePath): os.remove(tablePath) h5 = tables.openFile(tablePath, mode="w", title=title) photTable = h5.createTable("/", 'phot', reader.data.dtype, "Photometry Catalog") photTable.append(reader.data) # Set meta data for the photometry table photTable.attrs.image_paths = [im['path'] for im in images] photTable.attrs.image_keys = [im['image_key'] for im in images] photTable.attrs.image_bands = [im['band'] for im in images] if execTime is not None: photTable.attrs.exec_time = execTime # Save and close photTable.flush() h5.close() instance = cls(tablePath) return instance
@property
[docs] def image_paths(self): """List of image paths in photometry, ordered as in catalog.""" return self.photTable.attrs.image_paths
@property
[docs] def image_keys(self): """List of image keys in photometry, ordered as in catalog. Image keys are strings used to represent an image in your pipeline. """ return self.photTable.attrs.image_keys
@property
[docs] def image_bands(self): """List of image bandpasses, ordered with :meth:`self.image_paths`.""" return self.photTable.attrs.image_bands
[docs] def plot_luminosity_function_diagnostics(self, plotPathRoot, magLim=None, fmt="pdf"): """docstring for plot_luminosity_function_diagnostics""" plotDir = os.path.dirname(plotPathRoot) if plotDir is not "" and not os.path.exists(plotDir): os.makedirs(plotDir) for i, (imageKey, band) in enumerate(zip(self.image_keys, self.image_bands)): plotPath = "%s_%s_%s.%s" % (plotPathRoot, imageKey, band, fmt) lfdiagnostics.make_diagnostic_plot(self, i, imageKey, band, fmt, plotPath, magLim)
[docs] def add_column(self, colname, coldata, shape=None): """Add a column to the photometry table. The procedure for adding columns to pytables tables is given by https://gist.github.com/swarbhanu/1405074 """ # TODO handle case of existing column self.hdf.close() # Open it again in append mode fileh = tables.openFile(self.hdfPath, "a") # group = fileh.root.tmp_phottable table = fileh.root.phot # Get a description of table in dictionary format descr = table.description._v_colObjects descr2 = descr.copy() # Add a column to description if len(coldata.shape) > 1: descr2[colname] = tables.Float64Col(dflt=False, shape=tuple([coldata.shape[1]])) else: descr2[colname] = tables.Float64Col(dflt=False) # Create a new table with the new description table2 = fileh.createTable(fileh.root, 'table2', descr2, "A table", tables.Filters(1)) # Copy the user attributes table.attrs._f_copy(table2) # Fill the rows of new table with default values for i in xrange(table.nrows): table2.row.append() # Flush the rows to disk table2.flush() # Copy the columns of source table to destination for col in descr: getattr(table2.cols, col)[:] = getattr(table.cols, col)[:] # Fill the new column getattr(table2.cols, colname)[:] = coldata # Remove the original table table.remove() # Move table2 to table table2.move('/','phot') # Finally, close the file fileh.close() # Re-open in read-only mode self._open_hdf()
[docs] def set_metadata(self, key, value): """Write metadata to the photometry table. Parameters ---------- key : str Key for metadata dict. value : Value of metadata. """ self.hdf.close() # Open it again in append mode fileh = tables.openFile(self.hdfPath, "a") # group = fileh.root.tmp_phottable table = fileh.root.phot setattr(table._v_attrs, key, value) table.flush() fileh.close() self._open_hdf()
[docs] def export_ascii(self, path, global_cols=None, image_cols=None, colors=None): """Export an ASCII table of the dataset, saving it to `path`. Parameters ---------- path : str Relative path where the file will be written. global_cols : list Sequence of column names to export that are global, (applying to all images) image_cols : list Sequence of column names to export that apply to each image. In the output file these columns will have a suffix corresponding to each image's key. colors : list of tuples Colours (e.g., B-V) can also be computed an exported in a new column. For each colour, add a tuple to the `colors` list. That tuple has two items, corresponding to the indices of the images. If the J-band image is at index 0, and the K-band image at index 1, then:: colors=[(0, 1)] will create a column named `J_K`, assuming those are the image bands for those columns. """ # Build astropy table for output cols = {} colnames = [] for cname in global_cols: cols[cname] = self.photTable.read(field=cname) colnames.append(cname) nimages = len(self.image_keys) for cname in image_cols: if nimages > 1: data = self.photTable.read(field=cname) for i, iname in enumerate(self.image_keys): full_cname = "_".join([cname, iname]) cols[full_cname] = data[:, i] colnames.append(full_cname) else: cols[cname] = self.photTable.read(field=cname) colnames.append(cname) if colors is not None: mags = self.photTable.read(field='mag') for cindex in colors: iB, iR = cindex print cindex print self.image_bands colournames = [self.image_bands[j] for j in cindex] colour = mags[:, iB] - mags[:, iR] cname = "_".join(colournames) cols[cname] = colour colnames.append(cname) tbl = Table(cols, names=colnames) with open(path, 'w') as f: ascii.write(tbl, f)
if __name__ == '__main__': photPath = "/Users/jsick/Dropbox/_dolphot/517eef6ce8f07284365c6156" photTable = BasePhotReader(photPath, 2, refImagePath=None) print photTable.data.dtype print photTable.data['chip'] fakePath = "/Users/jsick/Dropbox/_dolphot/517eef6ce8f07284365c6156.fake" fakeTable = FakeReader(fakePath, 2, refImagePath=None) print fakeTable.data['fake_mag'][0] print fakeTable.data['fake_count'][0] print fakeTable.data['mag'][0] print fakeTable.data['counts'][0] print fakeTable.data['fake_x'][0] print fakeTable.data['fake_y'][0] print fakeTable.data['x'][0] print fakeTable.data['y'][0] fakeTable2 = FakeReader(fakePath, 2, refImagePath=None) print fakeTable.data.dtype print fakeTable.mag_errors() print fakeTable.position_errors() print fakeTable.completeness() print len(fakeTable.data) concatTable = fakeTable + fakeTable2 print len(concatTable.data) print fakeTable.metrics([17., 18.], magErrLim=0.2) print fakeTable.metrics([19., 19.5], magErrLim=0.2)

Page Contents