Skip to content
Snippets Groups Projects

Convert scripts to entry_points

Open Duncan Macleod requested to merge duncanmmacleod/dqsegdb-client:entry-points into master
1 unresolved thread
Files
10
#!/usr/bin/env python3
# Copyright (C) 2014-2020 Syracuse University, European Gravitational Observatory, and Christopher Newport University. Written by Ryan Fisher and Gary Hemming. See the NOTICE file distributed with this work for additional information regarding copyright ownership.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""
Publishing client used to publish DQXML files (usually produced by DMT) from a set
of directories to the DQSEGDB server. This publisher supports grouping
multiple input files together and then pushing data to the database either
one flag:version at a time or pushing data in a threaded manner.
Also supports a start time and end time argument to restrict the range of input data accepted.
Please see Python help documentation for all input arguments.
Example call to function:
ligolw_publish_threaded_dqxml_dqsegdb --segment-url https://segments.ligo.org --state-file=/home/rfisher/DQSEGDB/DQSEGDBClient/var/spool/L-DQ_Segments_long_test.xml --pid-file=/home/rfisher/DQSEGDB/DQSEGDBClient/var/run/L-DQ_Segments.pid --log-file=/home/rfisher/DQSEGDB/DQSEGDBClient/var/log/L-DQ_Segments.log --input-directory=/archive/frames/dmt/ER4/DQ/L1 --log-level DEBUG -m 60 -c 20 -e 105819443
"""
from __future__ import print_function
import sys
import traceback
import copy
import pwd
import os
import signal
import re
import logging
import logging.handlers
from optparse import OptionParser
from glue import lal
from ligo import segments
from ligo.segments import utils as segmentsUtils
from glue import pidfile
from glue.ligolw import ligolw
from glue.ligolw import utils as ligolw_utils
from glue.ligolw.utils import process as ligolw_process
# ER6 Glue fixes:
from glue.ligolw import table
from glue.ligolw import array
from glue.segmentdb import segmentdb_utils
from dqsegdb.apicalls import *
# try and exit gracefully on a term signal
die_now = False
def SIGTERMhandler(signum, frame):
global die_now
try:
logger.info("caught SIGTERM")
except:
pass
die_now = True
signal.signal(signal.SIGTERM, SIGTERMhandler)
PROGRAM_NAME = sys.argv[0].replace('./','')
PROGRAM_PID = os.getpid()
try:
USER_NAME = os.getlogin()
except:
USER_NAME = pwd.getpwuid(os.getuid())[0]
from dqsegdb._version import get_versions
__author__ = "Ryan Fisher <ryan.fisher@ligo.org>"
__date__ = get_versions()['date']
__version__ = get_versions()['version']
__id__ = get_versions()['full-revisionid']
__verbose_msg__ = "version: " + get_versions()['version'] + "\nfull-revisionid: " + get_versions()['full-revisionid'] + "\ndirty: " + str(get_versions()['dirty']) \
+ "\nerror: " + str(get_versions()['error']) + "\ndate: " + get_versions()['date']
parser = OptionParser(
#version = "%prog CVS $Header$",
version = "Name: %prog\n" + __verbose_msg__,
usage = "%prog [OPTIONS]",
description = "Publishes XML files into the segment database")
parser.add_option("-t", "--segment-url", metavar = "PROTOCOL://HOST", help = "connect to segments database on PROTOCOL://HOST, e.g., https://segments.ligo.org")
parser.add_option("-s", "--state-file", metavar = "FILE", help = "read published and excluded segments from FILE")
parser.add_option("-f", "--segments-file", metavar = "FILE", help = "read list of segments to publish from FILE")
parser.add_option("-P", "--pid-file", metavar = "FILE", help = "use FILE as process lock file")
parser.add_option("-D", "--input-directory", metavar = "DIR", help = "look for input files in DIR")
parser.add_option("-l", "--log-file", metavar = "FILE", help = "use FILE as log file")
parser.add_option("-L", "--log-level", metavar = "LEVEL", default = "INFO", help = "set logging level to LEVEL")
#old parser.add_option("-p", "--ping", action = "store_true")
parser.add_option("-d", "--dry-run", action = "store_true")
parser.add_option("-b", "--start-time", metavar="start_time",help="Manual start time window, used with state file to determine publish window by start=max(manual start time, state file start time)")
parser.add_option("-e", "--end-time", metavar="end_time",help="Manual end time window, used with state file to determine publish window by end=min(manual end time, current time)")
parser.add_option("-m", "--multiple-files",metavar="multiple_files",help="Setting to control the number of DQXML files batched together before the data is sent to the dqsegdb; defaults to 1.")
parser.add_option("-c", "--thread-count",metavar="thread_count",help="Setting to control the number of threads used when the data is sent to the dqsegdb; defaults to 1.")
options, filenames = parser.parse_args()
if not options.segment_url:
raise ValueError("missing argument --segment-url")
if not options.state_file and not options.segments_file:
raise ValueError("missing argument --state-file or --segments-file")
if options.state_file and options.segments_file:
raise ValueError("incompatible arguments --state-file and --segments-file")
if not options.pid_file:
raise ValueError("missing argument --pid-file")
if not options.input_directory:
raise ValueError("missing argument --input-directory")
if not options.log_file:
raise ValueError("missing argument --log-file")
if options.log_level=="DEBUG":
local_debug=True
else:
local_debug=False
# check if a valid lock file already exists, and create one if not
pidfile.get_lock(options.pid_file)
import atexit
def unlink_file(name):
os.unlink(name)
atexit.register(unlink_file,options.pid_file)
try:
# set up logging
logger = logging.getLogger('ligolw_publish_dqxml_dqsegdb')
handler = logging.handlers.RotatingFileHandler(options.log_file, 'a', 1024**3, 3)
formatter = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(eval("logging." + options.log_level))
logger.info("ligolw_publish_dqxml_dqsegdb starting")
logger.debug("pid file = " + options.pid_file)
logger.debug("input directory = " + options.input_directory)
logger.debug("log file = " + options.log_file)
logger.debug("log level = " + options.log_level)
if options.state_file:
logger.debug("segment-url= " + options.segment_url)
# setup the output file
outdoc = ligolw.Document()
outdoc.appendChild(ligolw.LIGO_LW())
proc_id = ligolw_process.register_to_xmldoc(outdoc, PROGRAM_NAME, options.__dict__).process_id
# read in the published file and get the published segment list
logger.debug("reading state from %s" % options.state_file)
#indoc = ligolw_utils.load_url(options.state_file, gz = (options.state_file or "stdin").endswith(".gz"))
ContentHandler = ligolw.LIGOLWContentHandler
table.use_in(ContentHandler)
array.use_in(ContentHandler)
indoc = ligolw_utils.load_url(options.state_file, gz = (options.state_file or "stdin").endswith(".gz"), contenthandler=ContentHandler)
published_segments = segmentdb_utils.find_segments(indoc,"P1:PUBLISHED:0")
exclude_segments = segmentdb_utils.find_segments(indoc,"P1:EXCLUDE:0")
logger.debug("published segments = %s" % str(published_segments))
logger.debug("excluded segments = %s" % str(exclude_segments))
# FIXME this will break Jul 19 2027 02:39:45 UTC
all_time = segments.segmentlist([segments.segment(0,1500000000)])
# make a list of the segments that need to be inserted
pending_segments = (all_time - published_segments) - exclude_segments
elif options.segments_file:
logger.debug("segments-file= " + options.segments_file)
logger.debug("reading segments from %s" % options.segments_file)
# read in the segments file and get the list of segments to insert
sfile = open(options.segments_file, "r")
pending_segments = segmentsUtils.fromsegwizard(sfile).coalesce()
sfile.close()
published_segments = segments.segmentlist()
if options.start_time:
begin_time=segments.segmentlist([segments.segment(int(options.start_time),1500000000)])
pending_segments&=begin_time
if options.end_time:
ending_time=segments.segmentlist([segments.segment(0,int(options.end_time))])
pending_segments&=ending_time
logger.info("pending segments = %s" % str(pending_segments))
pending_files = lal.Cache()
# make a list of the files that need to be inserted
for s in pending_segments:
pending_files += lal.Cache.from_urls(segmentdb_utils.get_all_files_in_range(options.input_directory,s[0],s[1]),coltype=int).sieve(segment=s)
pending_files = pending_files.unique()
pending_files.sort()
logger.debug("pending files = %s" % [os.path.basename(f) for f in pending_files.pfnlist()])
if options.multiple_files:
num_files=int(options.multiple_files)
else:
num_files=1
if options.thread_count:
thread_count=int(options.thread_count)
else:
thread_count=int(1)
# publish the files and add them to the list of published segments
group_files_counter=0
current_segments=segments.segmentlist()
infiles=[]
for f in pending_files:
result = None
if die_now:
break
if group_files_counter<num_files:
logger.debug("adding file to group: %s" % f.path)
infiles.append(f.path)
group_files_counter+=1
current_segments|=segments.segmentlist([f.segment])
if group_files_counter==num_files:
group_files_counter=0
grouped_segments=copy.deepcopy(current_segments)
current_segments=segments.segmentlist([])
try:
#fh = open(infile,'r')
#xmltext = fh.read()
#fh.close()
if options.dry_run:
logger.debug("(dry-run) inserting %s" % infiles)
result = True
else:
logger.debug("inserting %s" % infiles)
result=InsertMultipleDQXMLFileThreaded(infiles,logger,options.segment_url,hackDec11=False,debug=local_debug,threads=thread_count)
infiles=[]
except KeyboardInterrupt:
logger.info("caught keyboard interrupt")
if result:
logger.debug("published segment before interrupt: %s" % str(grouped_segments))
published_segments |= grouped_segments
die_now = True
except Exception as e:
logger.error("failed to publish %s (%s)" % (infiles, str(e)))
traceback.print_exc()
die_now = True
break
#raise
else:
if result:
logger.debug("published segments in group: %s" % str(grouped_segments))
published_segments |= grouped_segments
else:
logger.debug("Failed to publish segments in group: %s, exiting." % str(grouped_segments))
die_now = True
break
if group_files_counter!=0: # publish remaining files
result = False
if die_now:
pass
else:
grouped_segments=current_segments
try:
#fh = open(infile,'r')
#xmltext = fh.read()
#fh.close()
if options.dry_run:
logger.debug("(dry-run) inserting %s" % infiles)
result = True
else:
logger.debug("inserting %s" % infiles)
result=InsertMultipleDQXMLFileThreaded(infiles,logger,options.segment_url,hackDec11=False,debug=local_debug,threads=thread_count)
except KeyboardInterrupt:
logger.info("caught keyboard interrupt")
if result:
logger.debug("published remaining segments before interrupt: %s" % str(grouped_segments))
published_segments |= grouped_segments
#die_now = True unneeded because there is no loop here, we are just cleaning up
except Exception as e:
logger.error("failed to publish %s (%s)" % (infiles, str(e)))
else:
if result:
logger.debug("published remaining segments %s" % str(grouped_segments))
published_segments |= grouped_segments
else:
logger.debug("Failed to publish segments in group: %s" % str(grouped_segments))
if options.state_file:
excl_def_id = segmentdb_utils.add_to_segment_definer(outdoc,proc_id,"P1","EXCLUDE",0)
pub_def_id = segmentdb_utils.add_to_segment_definer(outdoc,proc_id,"P1","PUBLISHED",0)
segmentdb_utils.add_to_segment(outdoc,proc_id,excl_def_id,exclude_segments)
segmentdb_utils.add_to_segment(outdoc,proc_id,pub_def_id,published_segments)
logger.debug("published segments final result= %s" % str(published_segments))
logger.debug("excluded segments = %s" % str(exclude_segments))
# write the new segment state file on top of the old one
logger.debug("writing state to %s" % options.state_file)
ligolw_utils.write_filename(outdoc, options.state_file)
except Exception as e:
try:
logger.error(str(e))
except:
pass
print("runtime error (%s)" % str(e), file=sys.stderr)
os.unlink(options.pid_file)
sys.exit(1)
logger.info("exiting")
#os.unlink(options.pid_file)
sys.exit(0)
Loading