# Licensed under a 3-clause BSD style license - see LICENSE.rst
"""
Classes and utilites to keep track of files associated to an analysis.
The main class is `FileArchive`, which keep track of all the files associated to an analysis.
The `FileHandle` helper class encapsulates information on a particular file.
"""
from __future__ import absolute_import, division, print_function
import os
import sys
import time
import tempfile
from shutil import copyfile
from collections import OrderedDict
import numpy as np
# from numpy.core import defchararray
from astropy.table import Table, Column
from fermipy.fits_utils import write_tables_to_fits
[docs]def get_timestamp():
"""Get the current time as an integer"""
return int(time.time())
[docs]def get_unique_match(table, colname, value):
"""Get the row matching value for a particular column.
If exactly one row matchs, return index of that row,
Otherwise raise KeyError.
"""
# FIXME, This is here for python 3.5, where astropy is now returning bytes
# instead of str
if table[colname].dtype.kind in ['S', 'U']:
mask = table[colname].astype(str) == value
else:
mask = table[colname] == value
if mask.sum() != 1:
raise KeyError("%i rows in column %s match value %s" %
(mask.sum(), colname, value))
return np.argmax(mask)
# @unique
# class FileStatus(Enum):
[docs]class FileStatus(object):
"""Enumeration of file status types"""
no_file = 0 # File is not in system
expected = 1 # File will be created by a scheduled job
exists = 2 # File exists
missing = 3 # File should exist, but does not
superseded = 4 # File exists, but has been superseded
temp_removed = 5 # File was temporary and has been removed
# class FileFlags(Enum):
[docs]class FileFlags(object):
"""Bit masks to indicate file types"""
no_flags = 0 # No flags are set for this file
input_mask = 1 # File is input to job
output_mask = 2 # File is output from job
rm_mask = 4 # File is removed by job
gz_mask = 8 # File is compressed by job
internal_mask = 16 # File is internal to job
stageable = 32 # File can be staged to / from scratch area
in_ch_mask = input_mask | output_mask | rm_mask | internal_mask
out_ch_mask = output_mask | rm_mask | internal_mask
in_stage_mask = input_mask | stageable
out_stage_mask = output_mask | stageable
rmint_mask = rm_mask | internal_mask
[docs]class FileDict(object):
"""Small class to keep track of files used & createed by a link.
Parameters
----------
file_args : dict
Dictionary mapping argument to `FileFlags` enum
file_dict : dict
Dictionary mapping file path to `FileFlags` enum
"""
def __init__(self, **kwargs):
"""C'tor"""
self.file_args = kwargs.get('file_args', {})
self.file_dict = {}
[docs] def latch_file_info(self, args):
"""Extract the file paths from a set of arguments
"""
self.file_dict.clear()
for key, val in self.file_args.items():
try:
file_path = args[key]
if file_path is None:
continue
# 'args' is special
if key[0:4] == 'args':
if isinstance(file_path, list):
tokens = file_path
elif isinstance(file_path, str):
tokens = file_path.split()
else:
raise TypeError(
"Args has type %s, expect list or str" % type(file_path))
for token in tokens:
self.file_dict[token.replace('.gz', '')] = val
else:
self.file_dict[file_path.replace('.gz', '')] = val
except KeyError:
pass
[docs] def update(self, file_dict):
"""Update self with values from a dictionary
mapping file path [str] to `FileFlags` enum """
for key, val in file_dict.items():
if key in self.file_dict:
self.file_dict[key] |= val
else:
self.file_dict[key] = val
[docs] def items(self):
"""Return iterator over self.file_dict"""
return self.file_dict.items()
@property
def input_files(self):
"""Return a list of the input files needed by this link.
For `Link` sub-classes this will return the union
of all the input files of each internal `Link`.
That is to say this will include files produced by one
`Link` in a `Chain` and used as input to another `Link` in the `Chain`
"""
ret_list = []
for key, val in self.file_dict.items():
# For input files we only want files that were marked as input
if val & FileFlags.input_mask:
ret_list.append(key)
return ret_list
@property
def output_files(self):
"""Return a list of the output files produced by this link.
For `Link` sub-classes this will return the union
of all the output files of each internal `Link`.
That is to say this will include files produced by one
`Link` in a `Chain` and used as input to another `Link` in the `Chain`
"""
ret_list = []
for key, val in self.file_dict.items():
# For output files we only want files that were marked as output
if val & FileFlags.output_mask:
ret_list.append(key)
return ret_list
@property
def chain_input_files(self):
"""Return a list of the input files needed by this chain.
For `Link` sub-classes this will return only those files
that were not created by any internal `Link`
"""
ret_list = []
for key, val in self.file_dict.items():
# For chain input files we only want files that were not marked as output
# (I.e., not produced by some other step in the chain)
if val & FileFlags.in_ch_mask == FileFlags.input_mask:
ret_list.append(key)
return ret_list
@property
def chain_output_files(self):
"""Return a list of the all the output files produced by this link.
For `Link` sub-classes this will return only those files
that were not marked as internal files or marked for removal.
"""
ret_list = []
for key, val in self.file_dict.items():
# For pure input files we only want output files that were not
# marked as internal or temp
if val & FileFlags.out_ch_mask == FileFlags.output_mask:
ret_list.append(key)
return ret_list
@property
def input_files_to_stage(self):
"""Return a list of the input files needed by this link.
For `Link` sub-classes this will return the union
of all the input files of each internal `Link`.
That is to say this will include files produced by one
`Link` in a `Chain` and used as input to another `Link` in the `Chain`
"""
ret_list = []
for key, val in self.file_dict.items():
# For input files we only want files that were marked as input
if val & FileFlags.in_stage_mask == FileFlags.in_stage_mask:
ret_list.append(key)
return ret_list
@property
def output_files_to_stage(self):
"""Return a list of the input files needed by this link.
For `Link` sub-classes this will return the union
of all the input files of each internal `Link`.
That is to say this will include files produced by one
`Link` in a `Chain` and used as input to another `Link` in the `Chain`
"""
ret_list = []
for key, val in self.file_dict.items():
# For input files we only want files that were marked as input
if val & FileFlags.out_stage_mask == FileFlags.out_stage_mask:
ret_list.append(key)
return ret_list
@property
def internal_files(self):
"""Return a list of the intermediate files produced by this link.
This returns all files that were explicitly marked as internal files.
"""
ret_list = []
for key, val in self.file_dict.items():
# For internal files we only want files that were marked as
# internal
if val & FileFlags.internal_mask:
ret_list.append(key)
return ret_list
@property
def temp_files(self):
"""Return a list of the temporary files produced by this link.
This returns all files that were explicitly marked for removal.
"""
ret_list = []
for key, val in self.file_dict.items():
# For temp files we only want files that were marked for removal
if val & FileFlags.rm_mask:
ret_list.append(key)
return ret_list
@property
def gzip_files(self):
"""Return a list of the files compressed by this link.
This returns all files that were explicitly marked for compression.
"""
ret_list = []
for key, val in self.file_dict.items():
# For temp files we only want files that were marked for removal
if val & FileFlags.gz_mask:
ret_list.append(key)
return ret_list
[docs] def print_summary(self, stream=sys.stdout, indent=""):
"""Print a summary of the files in this file dict.
This version explictly counts the union of all input and output files.
"""
stream.write("%sTotal files : %i\n" %
(indent, len(self.file_dict)))
stream.write("%s Input files : %i\n" %
(indent, len(self.input_files)))
stream.write("%s Output files : %i\n" %
(indent, len(self.output_files)))
stream.write("%s Internal files : %i\n" %
(indent, len(self.internal_files)))
stream.write("%s Temp files : %i\n" %
(indent, len(self.temp_files)))
[docs] def print_chain_summary(self, stream=sys.stdout, indent=""):
"""Print a summary of the files in this file dict.
This version uses chain_input_files and chain_output_files to
count the input and output files.
"""
stream.write("%sTotal files : %i\n" %
(indent, len(self.file_dict)))
stream.write("%s Input files : %i\n" %
(indent, len(self.chain_input_files)))
stream.write("%s Output files : %i\n" %
(indent, len(self.chain_output_files)))
stream.write("%s Internal files : %i\n" %
(indent, len(self.internal_files)))
stream.write("%s Temp files : %i\n" %
(indent, len(self.temp_files)))
[docs]class FileStageManager(object):
"""Small class to deal with staging files to and from a scratch area """
def __init__(self, scratchdir, workdir):
"""C'tor """
try:
self.scratchdir = tempfile.mkdtemp(prefix=os.environ['USER'] + '.',
dir=scratchdir)
except OSError:
self.scratchdir = os.path.join(
scratchdir, os.environ['USER'], 'dummy')
self.workdir = os.path.abspath(workdir)
[docs] def split_local_path(self, local_file):
"""Split the local path into a directory name and a file name
If local_file is in self.workdir or a subdirectory of it,
the directory will consist of the relative path from workdir.
If local_file is not in self.workdir, directory will be empty.
Returns (dirname, basename)
"""
abspath = os.path.abspath(local_file)
if abspath.find(self.workdir) >= 0:
relpath = abspath.replace(self.workdir, '')[1:]
basename = os.path.basename(relpath)
dirname = os.path.dirname(relpath)
else:
basename = os.path.basename(local_file)
dirname = ''
return (dirname, basename)
[docs] def construct_scratch_path(self, dirname, basename):
"""Construct and return a path in the scratch area.
This will be <self.scratchdir>/<dirname>/<basename>
"""
return os.path.join(self.scratchdir, dirname, basename)
[docs] def get_scratch_path(self, local_file):
"""Construct and return a path in the scratch area from a local file.
"""
(local_dirname, local_basename) = self.split_local_path(local_file)
return self.construct_scratch_path(local_dirname, local_basename)
[docs] def map_files(self, local_files):
"""Build a dictionary mapping local paths to scratch paths.
Parameters
----------
local_files : list
List of filenames to be mapped to scratch area
Returns dict
Mapping local_file : fullpath of scratch file
"""
ret_dict = {}
for local_file in local_files:
ret_dict[local_file] = self.get_scratch_path(local_file)
return ret_dict
[docs] @staticmethod
def make_scratch_dirs(file_mapping, dry_run=True):
"""Make any directories need in the scratch area"""
scratch_dirs = {}
for value in file_mapping.values():
scratch_dirname = os.path.dirname(value)
scratch_dirs[scratch_dirname] = True
for scratch_dirname in scratch_dirs:
if dry_run:
print("mkdir -f %s" % (scratch_dirname))
else:
try:
os.makedirs(scratch_dirname)
except OSError:
pass
[docs] @staticmethod
def copy_to_scratch(file_mapping, dry_run=True):
"""Copy input files to scratch area """
for key, value in file_mapping.items():
if not os.path.exists(key):
continue
if dry_run:
print ("copy %s %s" % (key, value))
else:
print ("copy %s %s" % (key, value))
copyfile(key, value)
return file_mapping
[docs] @staticmethod
def copy_from_scratch(file_mapping, dry_run=True):
"""Copy output files from scratch area """
for key, value in file_mapping.items():
if dry_run:
print ("copy %s %s" % (value, key))
else:
try:
outdir = os.path.dirname(key)
os.makedirs(outdir)
except OSError:
pass
print ("copy %s %s" % (value, key))
copyfile(value, key)
return file_mapping
[docs]class FileHandle(object):
"""Class to keep track of infomration about a file file.
Parameters
----------
key : int
Unique id for this particular file
creator : int
Unique id for the job that created this file
timestamp : int
File creation time cast as an int
status : `FileStatus`
Enum giving current status of file
flags : `FileFlags`
Mask giving flags set on this file
path : str
Path to file
"""
def __init__(self, **kwargs):
"""C'tor
Take values of class members from keyword arguments.
"""
self.key = kwargs.get('key', -1)
self.creator = kwargs.get('creator', -1)
self.timestamp = kwargs.get('timestamp', 0)
self.status = kwargs.get('status', FileStatus.no_file)
self.flags = kwargs.get('flags', FileFlags.no_flags)
self.path = kwargs['path']
if self.path[0] == '@':
self.path = self.path[1:]
[docs] @staticmethod
def make_table(file_dict):
"""Build and return an `astropy.table.Table` to store `FileHandle`"""
col_key = Column(name='key', dtype=int)
col_path = Column(name='path', dtype='S256')
col_creator = Column(name='creator', dtype=int)
col_timestamp = Column(name='timestamp', dtype=int)
col_status = Column(name='status', dtype=int)
col_flags = Column(name='flags', dtype=int)
columns = [col_key, col_path, col_creator,
col_timestamp, col_status, col_flags]
table = Table(data=columns)
for val in file_dict.values():
val.append_to_table(table)
return table
[docs] @classmethod
def make_dict(cls, table):
"""Build and return a dict of `FileHandle` from an `astropy.table.Table`
The dictionary is keyed by FileHandle.key, which is a unique integer for each file
"""
ret_dict = {}
for row in table:
file_handle = cls.create_from_row(row)
ret_dict[file_handle.key] = file_handle
return ret_dict
[docs] @classmethod
def create_from_row(cls, table_row):
"""Build and return a `FileHandle` from an `astropy.table.row.Row` """
kwargs = {}
for key in table_row.colnames:
kwargs[key] = table_row[key]
try:
return cls(**kwargs)
except KeyError:
print(kwargs)
[docs] def check_status(self, basepath=None):
"""Check on the status of this particular file"""
if basepath is None:
fullpath = self.path
else:
fullpath = os.path.join(basepath, self.path)
exists = os.path.exists(fullpath)
if not exists:
if self.flags & FileFlags.gz_mask != 0:
fullpath += '.gz'
exists = os.path.exists(fullpath)
if exists:
if self.status == FileStatus.superseded:
pass
else:
self.status = FileStatus.exists
else:
if self.status in [FileStatus.no_file,
FileStatus.expected,
FileStatus.missing,
FileStatus.temp_removed]:
if self.flags & FileFlags.rmint_mask != 0:
self.status = FileStatus.temp_removed
elif self.status == FileStatus.exists:
self.status = FileStatus.missing
elif self.status == FileStatus.exists:
self.status = FileStatus.temp_removed
return self.status
[docs] def append_to_table(self, table):
"""Add this instance as a row on a `astropy.table.Table` """
table.add_row(dict(path=self.path,
key=self.key,
creator=self.creator,
timestamp=self.timestamp,
status=self.status,
flags=self.flags))
[docs] def update_table_row(self, table, row_idx):
"""Update the values in an `astropy.table.Table` for this instances"""
table[row_idx]['path'] = self.path
table[row_idx]['key'] = self.key
table[row_idx]['creator'] = self.creator
table[row_idx]['timestamp'] = self.timestamp
table[row_idx]['status'] = self.status
table[row_idx]['flags'] = self.flags
[docs]class FileArchive(object):
"""Class that keeps track of the status of files used in an analysis
Parameters
----------
table_file : str
Path to the file used to persist this `FileArchive`
table : `astropy.table.Table`
Persistent representation of this `FileArchive`
cache : `OrderedDict`
Transient representation of this `FileArchive`
base_path : str
Base file path for all files in this `FileArchive`
"""
# Singleton instance
_archive = None
def __init__(self, **kwargs):
"""C'tor
Takes self.base_path from kwargs['base_path']
Reads kwargs['file_archive_table']
"""
self._table_file = None
self._table = None
self._cache = OrderedDict()
self._base_path = kwargs['base_path']
self._read_table_file(kwargs['file_archive_table'])
def __getitem__(self, key):
""" Return the `FileHandle` whose linkname is key"""
return self._cache[key]
@property
def table_file(self):
"""Return the path to the file used to persist this `FileArchive` """
return self._table_file
@property
def table(self):
"""Return the persistent representation of this `FileArchive` """
return self._table
@property
def cache(self):
"""Return the transiet representation of this `FileArchive` """
return self._cache
@property
def base_path(self):
"""Return the base file path for all files in this `FileArchive` """
return self._base_path
def _get_fullpath(self, filepath):
"""Return filepath with the base_path prefixed """
if filepath[0] == '/':
return filepath
return os.path.join(self._base_path, filepath)
def _get_localpath(self, filepath):
"""Return the filepath with the base_path removed """
return filepath.replace(self._base_path, '')
def _fill_cache(self):
"""Fill the cache from the `astropy.table.Table`"""
for irow in range(len(self._table)):
file_handle = self._make_file_handle(irow)
self._cache[file_handle.path] = file_handle
def _read_table_file(self, table_file):
"""Read an `astropy.table.Table` to set up the archive"""
self._table_file = table_file
if os.path.exists(self._table_file):
self._table = Table.read(self._table_file)
else:
self._table = FileHandle.make_table({})
self._fill_cache()
def _make_file_handle(self, row_idx):
"""Build and return a `FileHandle` object from an `astropy.table.row.Row` """
row = self._table[row_idx]
return FileHandle.create_from_row(row)
[docs] def get_handle(self, filepath):
"""Get the `FileHandle` object associated to a particular file """
localpath = self._get_localpath(filepath)
return self._cache[localpath]
[docs] def register_file(self, filepath, creator, status=FileStatus.no_file, flags=FileFlags.no_flags):
"""Register a file in the archive.
If the file already exists, this raises a `KeyError`
Parameters
----------
filepath : str
The path to the file
creatror : int
A unique key for the job that created this file
status : `FileStatus`
Enumeration giving current status of file
flags : `FileFlags`
Enumeration giving flags set on this file
Returns `FileHandle`
"""
# check to see if the file already exists
try:
file_handle = self.get_handle(filepath)
raise KeyError("File %s already exists in archive" % filepath)
except KeyError:
pass
localpath = self._get_localpath(filepath)
if status == FileStatus.exists:
# Make sure the file really exists
fullpath = self._get_fullpath(filepath)
if not os.path.exists(fullpath):
print("register_file called on called on mising file %s" % fullpath)
status = FileStatus.missing
timestamp = 0
else:
timestamp = int(os.stat(fullpath).st_mtime)
else:
timestamp = 0
key = len(self._table) + 1
file_handle = FileHandle(path=localpath,
key=key,
creator=creator,
timestamp=timestamp,
status=status,
flags=flags)
file_handle.append_to_table(self._table)
self._cache[localpath] = file_handle
return file_handle
[docs] def update_file(self, filepath, creator, status):
"""Update a file in the archive
If the file does not exists, this raises a `KeyError`
Parameters
----------
filepath : str
The path to the file
creatror : int
A unique key for the job that created this file
status : `FileStatus`
Enumeration giving current status of file
Returns `FileHandle`
"""
file_handle = self.get_handle(filepath)
if status in [FileStatus.exists, FileStatus.superseded]:
# Make sure the file really exists
fullpath = file_handle.fullpath
if not os.path.exists(fullpath):
raise ValueError("File %s does not exist" % fullpath)
timestamp = int(os.stat(fullpath).st_mtime)
else:
timestamp = 0
file_handle.creator = creator
file_handle.timestamp = timestamp
file_handle.status = status
file_handle.update_table_row(self._table, file_handle.key - 1)
return file_handle
[docs] def get_file_ids(self, file_list, creator=None,
status=FileStatus.no_file, file_dict=None):
"""Get or create a list of file ids based on file names
Parameters
----------
file_list : list
The paths to the file
creatror : int
A unique key for the job that created these files
status : `FileStatus`
Enumeration giving current status of files
file_dict : `FileDict`
Mask giving flags set on this file
Returns list of integers
"""
ret_list = []
for fname in file_list:
if file_dict is None:
flags = FileFlags.no_flags
else:
flags = file_dict.file_dict[fname]
try:
fhandle = self.get_handle(fname)
except KeyError:
if creator is None:
creator = -1
# raise KeyError("Can not register a file %s without a creator"%fname)
fhandle = self.register_file(fname, creator, status, flags)
ret_list.append(fhandle.key)
return ret_list
[docs] def get_file_paths(self, id_list):
"""Get a list of file paths based of a set of ids
Parameters
----------
id_list : list
List of integer file keys
Returns list of file paths
"""
if id_list is None:
return []
try:
path_array = self._table[id_list - 1]['path']
except IndexError:
print("IndexError ", len(self._table), id_list)
path_array = []
return [path for path in path_array]
[docs] def write_table_file(self, table_file=None):
"""Write the table to self._table_file"""
if self._table is None:
raise RuntimeError("No table to write")
if table_file is not None:
self._table_file = table_file
if self._table_file is None:
raise RuntimeError("No output file specified for table")
write_tables_to_fits(self._table_file, [self._table], clobber=True,
namelist=['FILE_ARCHIVE'])
[docs] def update_file_status(self):
"""Update the status of all the files in the archive"""
nfiles = len(self.cache.keys())
status_vect = np.zeros((6), int)
sys.stdout.write("Updating status of %i files: " % nfiles)
sys.stdout.flush()
for i, key in enumerate(self.cache.keys()):
if i % 200 == 0:
sys.stdout.write('.')
sys.stdout.flush()
fhandle = self.cache[key]
fhandle.check_status(self._base_path)
fhandle.update_table_row(self._table, fhandle.key - 1)
status_vect[fhandle.status] += 1
sys.stdout.write("!\n")
sys.stdout.flush()
sys.stdout.write("Summary:\n")
sys.stdout.write(" no_file: %i\n" % status_vect[0])
sys.stdout.write(" expected: %i\n" % status_vect[1])
sys.stdout.write(" exists: %i\n" % status_vect[2])
sys.stdout.write(" missing: %i\n" % status_vect[3])
sys.stdout.write(" superseded: %i\n" % status_vect[4])
sys.stdout.write(" temp_removed: %i\n" % status_vect[5])
[docs] @classmethod
def get_archive(cls):
"""Return the singleton `FileArchive` instance """
return cls._archive
[docs] @classmethod
def build_archive(cls, **kwargs):
"""Return the singleton `FileArchive` instance, building it if needed"""
if cls._archive is None:
cls._archive = cls(**kwargs)
return cls._archive
[docs]def main_browse():
"""Entry point for command line use for browsing a FileArchive """
import argparse
parser = argparse.ArgumentParser(usage="file_archive.py [options]",
description="Browse a job archive")
parser.add_argument('--files', action='store', dest='file_archive_table',
type=str, default='file_archive_temp.fits', help="File archive file")
parser.add_argument('--base', action='store', dest='base_path',
type=str, default=os.path.abspath('.'), help="File archive base path")
args = parser.parse_args(sys.argv[1:])
FileArchive.build_archive(**args.__dict__)
if __name__ == '__main__':
main_browse()