Commit 1463507e authored by Patrick Godwin's avatar Patrick Godwin

gstlal_snax execs: port scald imports for 0.7+, rename __usage__ strings, fix a few typos

parent 29f8844e
Pipeline #82820 failed with stages
in 19 minutes and 13 seconds
......@@ -16,7 +16,7 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_aggregator [--options]"
__usage__ = "gstlal_snax_aggregate [--options]"
__description__ = "an executable to aggregate and generate job metrics for streaming features"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
......@@ -37,8 +37,8 @@ import numpy
from confluent_kafka import Consumer, KafkaError
from ligo.scald import aggregator
from ligo.scald import io
from ligo.scald import utils as scald_utils
from ligo.scald.io import hdf5, influx
from gstlal.snax import utils
......@@ -99,7 +99,7 @@ class StreamAggregator(object):
### other aggregator options
self.data_type = options.data_type
self.last_save = aggregator.now()
self.last_save = scald_utils.gps_now()
### initialize consumers
self.jobs = options.jobs
......@@ -114,7 +114,7 @@ class StreamAggregator(object):
### set up aggregator
logger.info("setting up aggregator with backend: %s"%options.data_backend)
if options.data_backend == 'influx':
self.agg_sink = io.influx.Aggregator(
self.agg_sink = influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
......@@ -123,7 +123,7 @@ class StreamAggregator(object):
reduce_across_tags=False,
)
else: ### hdf5 data backend
self.agg_sink = io.hdf5.Aggregator(
self.agg_sink = hdf5.Aggregator(
rootdir=options.rootdir,
num_processes=options.num_processes,
reduce_across_tags=False,
......@@ -162,8 +162,8 @@ class StreamAggregator(object):
"""
process and aggregate features from feature extraction jobs on a regular cadence
"""
if utils.in_new_epoch(aggregator.now(), self.last_save, 1):
self.last_save = aggregator.now()
if utils.in_new_epoch(scald_utils.gps_now(), self.last_save, 1):
self.last_save = scald_utils.gps_now()
### format incoming packets into metrics and timeseries
feature_packets = [(job, self.feature_queue[job].pop()) for job in self.jobs for i in range(len(self.feature_queue[job]))]
......
......@@ -18,7 +18,7 @@
"""
A program that measures template overlaps and how templates are spread out in the parameter space
for gstlal_feature_extractor
for gstlal_snax_extract
"""
####################
......
......@@ -16,7 +16,7 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_combiner [--options]"
__usage__ = "gstlal_snax_combine [--options]"
__description__ = "an executable to combine features from the batch pipeline to provide a more user-friendly output"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
......
......@@ -17,7 +17,7 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This program makes a dag to run offline gstlal_feature_extractor batch jobs
This program makes a dag to run offline gstlal_snax_extract batch jobs
"""
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'
......@@ -76,7 +76,7 @@ def analysis_segments(ifo, allsegs, boundary_seg, segment_length, psd_drop_time,
def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, segsdict, ifo, options, data_source_info, max_template_length = 30):
"""
get a dictionary of all the channels per gstlal_feature_extractor job
get a dictionary of all the channels per gstlal_snax_extract job
"""
feature_extractor_nodes = {}
......@@ -106,7 +106,7 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, segsdic
# FIXME: hacky way of getting options to get passed correctly for channels
channels[0] = channels[0].split('=')[1]
outpath = os.path.join(options.out_path, "gstlal_feature_extractor")
outpath = os.path.join(options.out_path, "gstlal_snax_extract")
# define analysis times
gps_start_time = int(seg[0])
......@@ -209,7 +209,7 @@ max_template_length = 30
# create directories if needed
#
listdir = os.path.join(options.out_path, "gstlal_feature_extractor/channel_lists")
listdir = os.path.join(options.out_path, "gstlal_snax_extract/channel_lists")
aggregator.makedir(listdir)
aggregator.makedir("logs")
......
......@@ -17,7 +17,7 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This program makes a dag to run a series of gstlal_feature_extractor jobs online
This program makes a dag to run a series of gstlal_snax_extract jobs online
"""
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'
......@@ -131,7 +131,7 @@ def feature_extractor_node_gen(feature_extractor_job, dag, parent_nodes, ifo, op
feature_extractor_nodes[ii] = \
dagparts.DAGNode(feature_extractor_job, dag, parent_nodes = parent_nodes,
opts = subset_options,
output_files = {"out-path": os.path.join(options.out_path, "gstlal_feature_extractor")}
output_files = {"out-path": os.path.join(options.out_path, "gstlal_snax_extract")}
)
num_channels = len(channel_list)
......
......@@ -30,7 +30,7 @@ A program to extract features from auxiliary channel data in real time or in off
###
### digraph llpipe {
### labeljust = "r";
### label="gstlal_feature_extractor"
### label="gstlal_snax_extract"
### rankdir=LR;
### graph [fontname="Roman", fontsize=24];
### edge [ fontname="Roman", fontsize=10 ];
......
......@@ -16,7 +16,7 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_monitor [--options]"
__usage__ = "gstlal_snax_monitor [--options]"
__description__ = "an executable to collect and monitor streaming features"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
......@@ -35,7 +35,7 @@ import time
from confluent_kafka import Consumer, KafkaError
from ligo.scald import io
from ligo.scald.io import hdf5, influx
from gstlal.snax import multichannel_datasource
from gstlal.snax import utils
......@@ -119,7 +119,7 @@ class StreamMonitor(object):
### set up aggregator
logger.info("setting up monitor with backend: %s"%options.data_backend)
if options.data_backend == 'influx':
self.agg_sink = io.influx.Aggregator(
self.agg_sink = influx.Aggregator(
hostname=options.influx_hostname,
port=options.influx_port,
db=options.influx_database_name,
......@@ -128,7 +128,7 @@ class StreamMonitor(object):
reduce_across_tags=False,
)
else: ### hdf5 data backend
self.agg_sink = io.hdf5.Aggregator(
self.agg_sink = hdf5.Aggregator(
rootdir=options.rootdir,
num_processes=options.num_processes,
reduce_across_tags=False,
......
......@@ -16,7 +16,7 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_hdf5_sink [--options]"
__usage__ = "gstlal_snax_sink [--options]"
__description__ = "an executable to dump streaming data to disk via hdf5"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
......@@ -124,7 +124,7 @@ class HDF5StreamSink(object):
self.last_save_time = None
self.last_persist_time = None
self.rootdir = options.rootdir
self.base_features_path = options.features_path
self.base_features_path = options.features_path
self.sample_rate = options.sample_rate
self.write_cadence = options.write_cadence
self.persist_cadence = options.persist_cadence
......
......@@ -16,8 +16,8 @@
# with this program; if not, write to the Free Software Foundation, Inc.,
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
__usage__ = "gstlal_feature_synchronizer [--options]"
__description__ = "an executable to synchronize incoming gstlal feature extractor streams and send downstream"
__usage__ = "gstlal_snax_synchronize [--options]"
__description__ = "an executable to synchronize incoming feature streams and send downstream"
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
#-------------------------------------------------
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment