Commit a924be14 authored by Sean Leavey's avatar Sean Leavey

Initial commit

parents
__pycache__
#!/usr/bin/env python3
import os
import io
import sys
import logging
import argparse
import textwrap
import collections
from . import extract
PROG = 'fibres'
DESC = 'Fibre profile data extracter command line utility'
SYNOPSIS = '{} <command> [<args>...]'.format(PROG)
# NOTE: double spaces are interpreted by text2man to be paragraph
# breaks. NO DOUBLE SPACES. Also two spaces at the end of a line
# indicate an element in a tag list.
MANPAGE = """
NAME
{prog} - {desc}
SYNOPSIS
{synopsis}
DESCRIPTION
Command line interface to extract fibre profile information as generated
by the IGR fibre profiler LabVIEW script.
COMMANDS
{{cmds}}
AUTHOR
Sean Leavey <sean.leavey@glasgow.ac.uk>
""".format(prog=PROG,
desc=DESC,
synopsis=SYNOPSIS,
).strip()
def enable_verbose_logs(level):
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(level)
def find_fibre_files_by_id(input_dir, ids):
# find files in data directory
fibre_files = extract.find_fibre_files(input_dir)
fibre_identifiers = fibre_files.keys()
# check that all identifiers to convert were found
for identifier in ids:
if identifier not in fibre_identifiers:
sys.exit("Could not find files using identifier \"{0}\"".format(
identifier))
return fibre_files
class Cmd(object):
"""base class for commands"""
cmd = ''
def __init__(self):
"""Initialize argument parser"""
self.parser = argparse.ArgumentParser(
prog='{} {}'.format(PROG, self.cmd),
description=self.__doc__.strip(),
# formatter_class=argparse.RawDescriptionHelpFormatter,
)
def parse_args(self, args):
"""Parse arguments and returned ArgumentParser Namespace object"""
return self.parser.parse_args(args)
def __call__(self, args):
"""Take Namespace object as input and execute command"""
pass
class List(Cmd):
"""List fibre profile files."""
cmd = 'list'
def __init__(self):
Cmd.__init__(self)
self.parser.add_argument('input_dir',
help="directory containing the fibre profile "
"files")
self.parser.add_argument('-v', '--verbose', action='store_true',
help="enable verbose output")
self.parser.add_argument('-V', '--very-verbose', action='store_true',
help="enable very verbose output")
def __call__(self, args):
if args.verbose:
enable_verbose_logs(logging.INFO)
elif args.very_verbose:
enable_verbose_logs(logging.DEBUG)
input_dir = os.path.abspath(str(args.input_dir))
# find identifiers and corresponding files in data directory
fibre_files = extract.find_fibre_files(input_dir)
# print them
print("Found the following identifiers:")
for k in fibre_files.keys():
if extract.validate_fibre_file_group(fibre_files[k]):
print(k)
class Convert(Cmd):
"""Convert fibre profile files."""
cmd = 'convert'
def __init__(self):
Cmd.__init__(self)
self.parser.add_argument('input_dir',
help="directory containing the fibre profile "
"files")
self.parser.add_argument('-o', '--output_dir',
help="directory to store output file",
default=None)
self.parser.add_argument('-i', '--identifier',
help="identifier to extract "
"(leave blank for all)",
default=None)
self.parser.add_argument('-f', '--force', action='store_true',
help="overwrite existing output files")
self.parser.add_argument('-v', '--verbose', action='store_true',
help="enable verbose output")
self.parser.add_argument('-V', '--very-verbose', action='store_true',
help="enable very verbose output")
def __call__(self, args):
if args.verbose:
enable_verbose_logs(logging.INFO)
elif args.very_verbose:
enable_verbose_logs(logging.DEBUG)
input_dir = os.path.abspath(str(args.input_dir))
if args.output_dir is None:
logging.getLogger().info("Using input directory to store output file")
output_dir = input_dir
else:
output_dir = os.path.abspath(str(args.output_dir))
# find identifiers and corresponding files in data directory
fibre_files = extract.find_fibre_files(input_dir)
fibre_identifiers = fibre_files.keys()
# build list of identifiers to convert
ids_to_extract = []
if args.identifier is None:
ids_to_extract.extend(fibre_identifiers)
else:
ids_to_extract.append(str(args.identifier))
# check that all identifiers to convert were found
for identifier in ids_to_extract:
if identifier not in fibre_identifiers:
sys.exit("Could not find files using identifier \"{0}\"".format(
identifier))
logging.getLogger().info("Extracting data for the following "
"identifiers: {0}".format(
", ".join(ids_to_extract)
))
# loop over output files first to check if any of them exist already
files_existing = []
for identifier in ids_to_extract:
output_file = os.path.join(output_dir, "{0}.csv".format(identifier))
if not args.force and os.path.exists(output_file):
files_existing.append(output_file)
if len(files_existing):
sys.exit("The following files already exist (specify -f to force "
"overwrite): {0}".format(", ".join(files_existing)))
# loop over output files again, this time writing the data
for identifier in ids_to_extract:
output_file = os.path.join(output_dir, "{0}.csv".format(identifier))
if os.path.exists(output_file):
if not args.force:
sys.exit("File {0} already exists (specify -f to force "
"overwrite)".format(output_file))
else:
logging.getLogger().info("Overwriting {0}".format(output_file))
else:
logging.getLogger().info("Writing {0}".format(output_file))
extract.write_data(output_file,
extract.extract_data(input_dir, fibre_files[identifier]))
class Help(Cmd):
"""Print manpage or command help (also '-h' after command)."""
cmd = 'help'
def __init__(self):
Cmd.__init__(self)
self.parser.add_argument('cmd', nargs='?',
help="command")
def __call__(self, args):
if args.cmd:
get_func(args.cmd).parser.print_help()
else:
print(MANPAGE.format(cmds=format_commands(man=True)))
CMDS = collections.OrderedDict([
('list', List),
('convert', Convert),
('help', Help),
])
ALIAS = {
'--help': 'help',
'-h': 'help',
}
##################################################
def format_commands(man=False):
prefix = ' '*8
wrapper = textwrap.TextWrapper(
width=70,
initial_indent=prefix,
subsequent_indent=prefix,
)
with io.StringIO() as f:
for name, func in CMDS.items():
if man:
fo = func()
usage = fo.parser.format_usage()[len('usage: {} '.format(PROG)):].strip()
desc = wrapper.fill('\n'.join([l.strip() for l in fo.parser.description.splitlines() if l]))
f.write(' {} \n'.format(usage))
f.write(desc+'\n')
f.write('\n')
else:
desc = func.__doc__.splitlines()[0]
f.write(' {:10}{}\n'.format(name, desc))
output = f.getvalue()
return output.rstrip()
def get_func(cmd):
if cmd in ALIAS:
cmd = ALIAS[cmd]
try:
return CMDS[cmd]()
except KeyError:
print('Unknown command:', cmd, file=sys.stderr)
print("See 'help' for usage.", file=sys.stderr)
sys.exit(1)
def main():
if len(sys.argv) < 2:
print('Command not specified.', file=sys.stderr)
print('usage: '+SYNOPSIS, file=sys.stderr)
print(file=sys.stderr)
print(format_commands(), file=sys.stderr)
sys.exit(1)
cmd = sys.argv[1]
args = sys.argv[2:]
func = get_func(cmd)
func(func.parse_args(args))
##################################################
if __name__ == '__main__':
main()
import os
import os.path
import re
import logging
import csv
re1 = '(0[1-9]|[12][0-9]|3[01])' # day
re2 = '_' # underscore
re3 = '(0[1-9]|1[012])' # month
re4 = '_' # underscore
re5 = '([0-9]{4})' # year
re6 = '\\s+' # space
re7 = '((?:[a-z0-9\\s_+]+))' # description
re8 = '\\s+' # space
re9 = '(Bottom Neck|Middle|Top Neck)' # location
re10 = '\\.xls' # extension
file_name_regex = re.compile(re1+re2+re3+re4+re5+re6+re7+re8+re9+re10,
re.IGNORECASE|re.DOTALL)
def path_is_write_valid(path):
"""Check if the specified path is write valid"""
try:
open(path, 'w')
except OSError:
return False
return True
def get_valid_dir(message, default, permissions, error_message):
"""Get a valid, readable directory path from the user"""
while True:
print("{0} (default: \"{1}\"): ".format(message, default), end="")
path = input()
if path == "":
# use default path
path = default
if os.path.isdir(path):
if os.access(path, permissions):
# valid path specified; break loop
break
else:
# invalid permissions; ask user again
print(error_message)
else:
# invalid path; ask user again
print("Directory path is not valid")
return path
def get_valid_read_dir(*args, **kwargs):
return get_valid_dir(*args, **kwargs, permissions=os.R_OK,
error_message="No read permissions to the specified"
"file; try again")
def get_valid_write_dir(*args, **kwargs):
return get_valid_dir(*args, **kwargs, permissions=os.W_OK,
error_message="No write permissions to the specified"
"file; try again")
def find_fibre_files(directory):
"""Finds fibre data files in the specified directory"""
# dict to hold file identifiers
identifiers = {}
for filename in os.listdir(directory):
matches = file_name_regex.match(filename)
if matches is None:
# no matches for this file
continue
# get identifier
identifier = str(matches.group(4))
# get type
meas_type = str(matches.group(5))
# create dict if necessary
if identifier not in identifiers.keys():
identifiers[identifier] = {}
# store file in indentifiers dict
identifiers[identifier][meas_type] = filename
logging.getLogger().debug("Found {0} ({1})".format(identifier,
meas_type))
return identifiers
def validate_fibre_file_group(fibre_files):
required_fibre_files = ["Top Neck", "Middle", "Bottom Neck"]
# check all of the relevant files are present
return set(required_fibre_files) == set(fibre_files.keys())
def extract_data(fibre_dir, fibre_files):
if not validate_fibre_file_group(fibre_files):
raise Exception("Could not find all necessary fibre files")
# proper filepaths
filepath_top = os.path.join(fibre_dir, fibre_files["Top Neck"])
filepath_mid = os.path.join(fibre_dir, fibre_files["Middle"])
filepath_bot = os.path.join(fibre_dir, fibre_files["Bottom Neck"])
# return the appropriate columns in each dataset
logging.getLogger().debug("Extracting columns 1 and 2 from top file")
for row in csv.reader(open(filepath_top, 'r'), delimiter='\t'):
yield [row[0], row[1]]
logging.getLogger().debug("Extracting columns 1 and 4 from middle file")
for row in csv.reader(open(filepath_mid, 'r'), delimiter='\t'):
yield [row[0], row[3]]
logging.getLogger().debug("Extracting columns 1 and 2 from bottom file")
for row in csv.reader(open(filepath_bot, 'r'), delimiter='\t'):
yield [row[0], row[1]]
def write_data(target_file, data):
with open(target_file, "w") as f:
# format and write rows to file
f.writelines(["{0},{1}{2}".format(row[0], row[1], os.linesep) for row in data])
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment