Commit ec0ec805 authored by Sean Leavey's avatar Sean Leavey

Reorganisation of files

parent c940704c
PROG = "fibres"
DESC = "Fibre profile data extracter command line utility"
VERS = "0.8.0"
URL = "https://git.ligo.org/sean-leavey/fibre-profile-extractor"
SYNOPSIS = "{} <command> [<args>...]".format(PROG)
MANPAGE = """
NAME
{prog} - {desc} (v{vers})
{url}
SYNOPSIS
{synopsis}
DESCRIPTION
Command line utility to extract fibre profile information as generated
by the IGR fibre profiler LabVIEW script. This program finds fibre files
corresponding to the top, middle and bottom measurements, extracts the
relevant length and thickness columns and saves them in a single file.
This conversion operation can be performed on a single set of files
sharing the same identifier or on an entire directory.
COMMANDS
{{cmds}}
AUTHOR
Sean Leavey <sean.leavey@glasgow.ac.uk>
""".format(prog=PROG,
desc=DESC,
vers=VERS,
url=URL,
synopsis=SYNOPSIS,
).strip()
......@@ -8,39 +8,9 @@ import argparse
import textwrap
import collections
from . import SYNOPSIS, PROG
from . import extract
PROG = 'fibres'
DESC = 'Fibre profile data extracter command line utility'
SYNOPSIS = '{} <command> [<args>...]'.format(PROG)
# NOTE: double spaces are interpreted by text2man to be paragraph
# breaks. NO DOUBLE SPACES. Also two spaces at the end of a line
# indicate an element in a tag list.
MANPAGE = """
NAME
{prog} - {desc}
SYNOPSIS
{synopsis}
DESCRIPTION
Command line interface to extract fibre profile information as generated
by the IGR fibre profiler LabVIEW script.
COMMANDS
{{cmds}}
AUTHOR
Sean Leavey <sean.leavey@glasgow.ac.uk>
""".format(prog=PROG,
desc=DESC,
synopsis=SYNOPSIS,
).strip()
def enable_verbose_logs(level):
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
......@@ -62,14 +32,15 @@ def find_fibre_files_by_id(input_dir, ids):
return fibre_files
class Cmd(object):
"""base class for commands"""
cmd = ''
"""Base class for commands"""
cmd = ""
def __init__(self):
"""Initialize argument parser"""
self.parser = argparse.ArgumentParser(
prog='{} {}'.format(PROG, self.cmd),
description=self.__doc__.strip(),
# formatter_class=argparse.RawDescriptionHelpFormatter,
prog="{} {}".format(PROG, self.cmd),
description=self.__doc__.strip()
)
def parse_args(self, args):
"""Parse arguments and returned ArgumentParser Namespace object"""
......@@ -79,17 +50,18 @@ class Cmd(object):
pass
class List(Cmd):
"""List fibre profile files."""
cmd = 'list'
"""List fibre profile files in a given directory"""
cmd = "list"
def __init__(self):
Cmd.__init__(self)
self.parser.add_argument('input_dir',
self.parser.add_argument("input_dir",
help="directory containing the fibre profile "
"files")
self.parser.add_argument('-v', '--verbose', action='store_true',
self.parser.add_argument("-v", "--verbose", action="store_true",
help="enable verbose output")
self.parser.add_argument('-V', '--very-verbose', action='store_true',
self.parser.add_argument("-V", "--very-verbose", action="store_true",
help="enable very verbose output")
def __call__(self, args):
......@@ -100,6 +72,9 @@ class List(Cmd):
input_dir = os.path.abspath(str(args.input_dir))
if not extract.dir_is_read_valid(input_dir):
sys.exit("Input directory is not readable")
# find identifiers and corresponding files in data directory
fibre_files = extract.find_fibre_files(input_dir)
......@@ -110,26 +85,27 @@ class List(Cmd):
print(k)
class Convert(Cmd):
"""Convert fibre profile files."""
cmd = 'convert'
"""Convert fibre profile files into single file"""
cmd = "convert"
def __init__(self):
Cmd.__init__(self)
self.parser.add_argument('input_dir',
self.parser.add_argument("input_dir",
help="directory containing the fibre profile "
"files")
self.parser.add_argument('-o', '--output_dir',
self.parser.add_argument("-o", "--output_dir",
help="directory to store output file",
default=None)
self.parser.add_argument('-i', '--identifier',
self.parser.add_argument("-i", "--identifier",
help="identifier to extract "
"(leave blank for all)",
default=None)
self.parser.add_argument('-f', '--force', action='store_true',
self.parser.add_argument("-f", "--force", action="store_true",
help="overwrite existing output files")
self.parser.add_argument('-v', '--verbose', action='store_true',
self.parser.add_argument("-v", "--verbose", action="store_true",
help="enable verbose output")
self.parser.add_argument('-V', '--very-verbose', action='store_true',
self.parser.add_argument("-V", "--very-verbose", action="store_true",
help="enable very verbose output")
def __call__(self, args):
......@@ -140,12 +116,18 @@ class Convert(Cmd):
input_dir = os.path.abspath(str(args.input_dir))
if not extract.dir_is_read_valid(input_dir):
sys.exit("Input directory is not readable")
if args.output_dir is None:
logging.getLogger().info("Using input directory to store output file")
output_dir = input_dir
else:
output_dir = os.path.abspath(str(args.output_dir))
if not extract.dir_is_write_valid(output_dir):
sys.exit("Output directory is not writeable")
# find identifiers and corresponding files in data directory
fibre_files = extract.find_fibre_files(input_dir)
fibre_identifiers = fibre_files.keys()
......@@ -163,11 +145,6 @@ class Convert(Cmd):
sys.exit("Could not find files using identifier \"{0}\"".format(
identifier))
logging.getLogger().info("Extracting data for the following "
"identifiers: {0}".format(
", ".join(ids_to_extract)
))
# loop over output files first to check if any of them exist already
files_existing = []
for identifier in ids_to_extract:
......@@ -180,25 +157,34 @@ class Convert(Cmd):
sys.exit("The following files already exist (specify -f to force "
"overwrite): {0}".format(", ".join(files_existing)))
logging.getLogger().info("Extracting data for the following "
"identifiers: {0}".format(
", ".join(ids_to_extract)
))
# loop over output files again, this time writing the data
for identifier in ids_to_extract:
output_file = os.path.join(output_dir, "{0}.csv".format(identifier))
if os.path.exists(output_file):
if not args.force:
# cannot overwrite existing file
sys.exit("File {0} already exists (specify -f to force "
"overwrite)".format(output_file))
else:
logging.getLogger().info("Overwriting {0}".format(output_file))
else:
logging.getLogger().info("Writing {0}".format(output_file))
logging.getLogger().info("Writing {0}".format(output_file))
extract.write_data(output_file,
# write data to output file in CSV format
extract.write_csv_data(output_file,
extract.extract_data(input_dir, fibre_files[identifier]))
class Help(Cmd):
"""Print manpage or command help (also '-h' after command)."""
cmd = 'help'
"""Print manpage or command help (also "-h" after command)"""
cmd = "help"
def __init__(self):
Cmd.__init__(self)
self.parser.add_argument('cmd', nargs='?',
......@@ -211,20 +197,20 @@ class Help(Cmd):
print(MANPAGE.format(cmds=format_commands(man=True)))
CMDS = collections.OrderedDict([
('list', List),
('convert', Convert),
('help', Help),
("list", List),
("convert", Convert),
("help", Help),
])
ALIAS = {
'--help': 'help',
'-h': 'help',
"--help": "help",
"-h": "help",
}
##################################################
def format_commands(man=False):
prefix = ' '*8
prefix = " " * 8
wrapper = textwrap.TextWrapper(
width=70,
initial_indent=prefix,
......@@ -234,7 +220,7 @@ def format_commands(man=False):
for name, func in CMDS.items():
if man:
fo = func()
usage = fo.parser.format_usage()[len('usage: {} '.format(PROG)):].strip()
usage = fo.parser.format_usage()[len("usage: {} ".format(PROG)):].strip()
desc = wrapper.fill('\n'.join([l.strip() for l in fo.parser.description.splitlines() if l]))
f.write(' {} \n'.format(usage))
f.write(desc+'\n')
......@@ -251,14 +237,14 @@ def get_func(cmd):
try:
return CMDS[cmd]()
except KeyError:
print('Unknown command:', cmd, file=sys.stderr)
print("See 'help' for usage.", file=sys.stderr)
print("Unknown command:", cmd, file=sys.stderr)
print("See \"help\" for usage.", file=sys.stderr)
sys.exit(1)
def main():
if len(sys.argv) < 2:
print('Command not specified.', file=sys.stderr)
print('usage: '+SYNOPSIS, file=sys.stderr)
print("Command not specified.", file=sys.stderr)
print("usage: " + SYNOPSIS, file=sys.stderr)
print(file=sys.stderr)
print(format_commands(), file=sys.stderr)
sys.exit(1)
......@@ -269,5 +255,5 @@ def main():
##################################################
if __name__ == '__main__':
if __name__ == "__main__":
main()
......@@ -18,47 +18,13 @@ re10 = '\\.xls' # extension
file_name_regex = re.compile(re1+re2+re3+re4+re5+re6+re7+re8+re9+re10,
re.IGNORECASE|re.DOTALL)
def path_is_write_valid(path):
def dir_is_read_valid(path):
"""Check if the specified path is write valid"""
try:
open(path, 'w')
except OSError:
return False
return True
def get_valid_dir(message, default, permissions, error_message):
"""Get a valid, readable directory path from the user"""
while True:
print("{0} (default: \"{1}\"): ".format(message, default), end="")
path = input()
if path == "":
# use default path
path = default
if os.path.isdir(path):
if os.access(path, permissions):
# valid path specified; break loop
break
else:
# invalid permissions; ask user again
print(error_message)
else:
# invalid path; ask user again
print("Directory path is not valid")
return path
def get_valid_read_dir(*args, **kwargs):
return get_valid_dir(*args, **kwargs, permissions=os.R_OK,
error_message="No read permissions to the specified"
"file; try again")
def get_valid_write_dir(*args, **kwargs):
return get_valid_dir(*args, **kwargs, permissions=os.W_OK,
error_message="No write permissions to the specified"
"file; try again")
return os.access(path, os.R_OK)
def dir_is_write_valid(path):
"""Check if the specified path is write valid"""
return os.access(path, os.W_OK)
def find_fibre_files(directory):
"""Finds fibre data files in the specified directory"""
......@@ -91,12 +57,16 @@ def find_fibre_files(directory):
return identifiers
def validate_fibre_file_group(fibre_files):
"""Checks whether the required fibre files exist in a fibre file group"""
required_fibre_files = ["Top Neck", "Middle", "Bottom Neck"]
# check all of the relevant files are present
return set(required_fibre_files) == set(fibre_files.keys())
def extract_data(fibre_dir, fibre_files):
"""Generates rows from the top, middle and bottom fibre files"""
if not validate_fibre_file_group(fibre_files):
raise Exception("Could not find all necessary fibre files")
......@@ -116,7 +86,8 @@ def extract_data(fibre_dir, fibre_files):
for row in csv.reader(open(filepath_bot, 'r'), delimiter='\t'):
yield [row[0], row[1]]
def write_data(target_file, data):
def write_csv_data(target_file, data):
"""Writes comma separated data from an iterator into the target file"""
with open(target_file, "w") as f:
# format and write rows to file
f.writelines(["{0},{1}{2}".format(row[0], row[1], os.linesep) for row in data])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment