Skip to content
Snippets Groups Projects
Commit 07261470 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

etg -> feature_extractor: the great migration. purged all mentions of ETG and...

etg -> feature_extractor: the great migration. purged all mentions of ETG and replaced them with feature extractor or more aptly name depending on context
parent bfc31042
No related branches found
No related tags found
No related merge requests found
......@@ -33,8 +33,8 @@ dist_bin_SCRIPTS = \
gstlal_ll_dq \
gstlal_ll_inspiral_state \
gstlal_condor_top \
gstlal_etg \
gstlal_etg_pipe \
gstlal_etg_whitener_check \
gstlal_etg_template_overlap \
gstlal_feature_extractor \
gstlal_feature_extractor_pipe \
gstlal_feature_extractor_whitener_check \
gstlal_feature_extractor_template_overlap \
gstlal_injsplitter
......@@ -106,7 +106,7 @@ class MultiChannelHandler(simplehandler.Handler):
multiple channels.
Implements additional message handling for dealing with spectrum
messages and creates trigger files for use in iDQ.
messages and creates trigger files containing features for use in iDQ.
"""
def __init__(self, mainloop, pipeline, data_source_info, options, **kwargs):
self.lock = threading.Lock()
......@@ -158,20 +158,20 @@ class MultiChannelHandler(simplehandler.Handler):
self.fdata = deque(maxlen = 25000)
self.fdata.append(self.header)
# set up ETG bottle related properties
self.etg_event = deque(maxlen = 20000)
self.etg_event_time = None
# set up stream related properties
self.stream_event = deque(maxlen = 20000)
self.last_stream_event_time = None
# set up bottle routes for PSDs and extracted ETG data
# set up bottle routes for PSDs and extracted feature data
self.psds = {}
self.etg_data = deque(maxlen = 2000)
self.feature_data = deque(maxlen = 2000)
if not options.disable_web_service:
bottle.route("/psds.xml")(self.web_get_psd_xml)
bottle.route("/etg_subset")(self.web_get_etg_data)
bottle.route("/feature_subset")(self.web_get_feature_data)
# set up kafka related properties
if options.use_kafka:
self.etg_parition = options.etg_partition
self.kafka_partition = options.kafka_partition
self.kafka_topic = options.kafka_topic
self.kafka_conf = {'bootstrap.servers': options.kafka_server}
self.producer = Producer(self.kafka_conf)
......@@ -216,17 +216,17 @@ class MultiChannelHandler(simplehandler.Handler):
buftime = int(buf.pts / 1e9)
channel, rate = sink_dict[elem]
# push new etg event to queue if done processing current timestamp
if self.etg_event_time is None:
self.etg_event_time = buftime
if self.etg_event_time < buftime:
etg_subset = {'timestamp': self.etg_event_time, 'etg_data': list(self.etg_event)}
# push new stream event to queue if done processing current timestamp
if self.last_stream_event_time is None:
self.last_stream_event_time = buftime
if self.last_stream_event_time < buftime:
feature_subset = {'timestamp': self.last_stream_event_time, 'feature_data': list(self.stream_event)}
if options.use_kafka:
self.producer.produce(timestamp = self.etg_event_time, topic = self.kafka_topic, value = json.dumps(etg_subset))
self.producer.produce(timestamp = self.last_stream_event_time, topic = self.kafka_topic, value = json.dumps(feature_subset))
else:
self.etg_data.append(etg_subset)
self.etg_event.clear()
self.etg_event_time = buftime
self.feature_data.append(feature_subset)
self.stream_event.clear()
self.last_stream_event_time = buftime
# set save time appropriately
if self.last_save_time is None:
......@@ -285,13 +285,13 @@ class MultiChannelHandler(simplehandler.Handler):
# Setting stop time to trigger time for use with half sine gaussians
stop_time = trigger_time
# append row for data transfer/saving
etg_row = {'timestamp': buftime, 'channel': channel, 'rate': rate, 'start_time': start_time, 'stop_time': stop_time, 'trigger_time': trigger_time,
feature_row = {'timestamp': buftime, 'channel': channel, 'rate': rate, 'start_time': start_time, 'stop_time': stop_time, 'trigger_time': trigger_time,
'frequency': freq, 'q': q, 'phase': row.phase, 'sigmasq': row.sigmasq, 'chisq': row.chisq, 'snr': row.snr}
self.etg_event.append(etg_row)
self.stream_event.append(feature_row)
# save iDQ compatible data
if options.save_hdf:
self.fdata.append(etg_row, key = (channel, rate), buftime = buftime)
self.fdata.append(feature_row, key = (channel, rate), buftime = buftime)
else:
self.fdata.append("%20.9f\t%20.9f\t%20.9f\t%10.3f\t%8.3f\t%8.3f\t%8.3f\t%10.3f\t%s\n" % (start_time, stop_time, trigger_time, freq, row.phase, q, row.chisq, row.snr, channel_tag))
......@@ -352,17 +352,17 @@ class MultiChannelHandler(simplehandler.Handler):
output.close()
return outstr
def web_get_etg_data(self):
def web_get_feature_data(self):
header = {'Content-type': 'application/json'}
# if queue is empty, send appropriate response
if not self.etg_data:
if not self.feature_data:
status = 204
body = json.dumps({'error': "No Content"})
# else, get etg data and send as JSON
# else, get feature data and send as JSON
else:
status = 200
with self.lock:
body = json.dumps(self.etg_data.popleft())
body = json.dumps(self.feature_data.popleft())
return bottle.HTTPResponse(status = status, headers = header, body = body)
class LinkedAppSync(pipeparts.AppSync):
......@@ -486,10 +486,10 @@ def parse_command_line():
parser.add_option("-v", "--verbose", action = "store_true", help = "Be verbose.")
parser.add_option("--save-hdf", action = "store_true", default = False, help = "If set, will save hdf5 files to disk straight from dataframe once every cadence")
parser.add_option("--use-kafka", action = "store_true", default = False, help = "If set, will output feature vector subsets to a Kafka topic.")
parser.add_option("--etg-partition", metavar = "string", help = "If using Kafka, sets the partition that this ETG is assigned to.")
parser.add_option("--kafka-topic", metavar = "string", help = "If using Kafka, sets the topic name that this ETG publishes feature vector subsets to.")
parser.add_option("--kafka-partition", metavar = "string", help = "If using Kafka, sets the partition that this feature extractor is assigned to.")
parser.add_option("--kafka-topic", metavar = "string", help = "If using Kafka, sets the topic name that this feature extractor publishes feature vector subsets to.")
parser.add_option("--kafka-server", metavar = "string", help = "If using Kafka, sets the server url that the kafka topic is hosted on.")
parser.add_option("--job-id", type = "string", default = "0001", help = "Sets the job identication of the ETG with a 4 digit integer string code, padded with zeros. Default = 0001")
parser.add_option("--job-id", type = "string", default = "0001", help = "Sets the job identication of the feature extractor with a 4 digit integer string code, padded with zeros. Default = 0001")
parser.add_option("-m", "--mismatch", type = "float", default = 0.2, help = "Mismatch between templates, mismatch = 1 - minimal match. Default = 0.2.")
parser.add_option("-q", "--qhigh", type = "float", default = 20, help = "Q high value for half sine-gaussian waveforms. Default = 20.")
parser.add_option("--trigger-start-time", type = "int", metavar = "seconds", help = "Set the start time of the segment to output triggers in GPS seconds. Required unless --data-source=lvshm")
......@@ -545,7 +545,7 @@ duration = int(options.gps_end_time) - int(options.gps_start_time)
logdir = os.path.join(options.out_path, 'logs', options.job_id)
aggregator.makedir(logdir)
logger = idq_utils.get_logger('gstlal-etg_%d-%d' % (options.gps_start_time, duration), rootdir=logdir, verbose=options.verbose)
logger = idq_utils.get_logger('gstlal-feature-extractor_%d-%d' % (options.gps_start_time, duration), rootdir=logdir, verbose=options.verbose)
logger.info("writing log to %s" % logdir)
#
......
......@@ -17,7 +17,7 @@
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
"""
This program makes a dag to run gstlal_etg offline
This program makes a dag to run gstlal_feature_extractor offline
"""
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>, Patrick Godwin <patrick.godwin@ligo.org>'
......@@ -105,11 +105,11 @@ def analysis_segments(ifo, allsegs, boundary_seg, segment_length, max_template_l
return segsdict
#
# get a dictionary of all the channels per gstlal_etg job
# get a dictionary of all the channels per gstlal_feature_extractor job
#
def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, data_source_info):
etg_nodes = {}
def feature_extractor_node_gen(gstlalFeatureExtractorJob, dag, parent_nodes, segsdict, ifo, options, data_source_info):
feature_extractor_nodes = {}
trig_start = options.gps_start_time
# parallelize jobs by channel subsets
......@@ -122,7 +122,7 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, data_s
if ii / options.concurrency == 0:
dep_nodes = parent_nodes
else:
dep_nodes = [etg_nodes[(ii - options.concurrency, seg)]]
dep_nodes = [feature_extractor_nodes[(ii - options.concurrency, seg)]]
# creates a list of channel names with entries of the form --channel-name=IFO:CHANNEL_NAME:RATE
channels = [''.join(["--channel-name=",':'.join([channel, str(int(data_source_info.channel_dict[channel]['fsamp']))])]) for channel in channel_subset]
......@@ -130,10 +130,10 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, data_s
# FIXME: hacky way of getting options to get passed correctly for channels
channels[0] = channels[0].split('=')[1]
outpath = os.path.join(options.out_path, "gstlal_etg")
outpath = os.path.join(options.out_path, "gstlal_feature_extractor")
etg_nodes[(ii, seg)] = \
inspiral_pipe.generic_node(gstlalETGJob, dag, parent_nodes = dep_nodes,
feature_extractor_nodes[(ii, seg)] = \
inspiral_pipe.generic_node(gstlalFeatureExtractorJob, dag, parent_nodes = dep_nodes,
opts = {"gps-start-time":int(seg[0]),
"gps-end-time":int(seg[1]),
"trigger-start-time":int(trig_start),
......@@ -160,7 +160,7 @@ def etg_node_gen(gstlalETGJob, dag, parent_nodes, segsdict, ifo, options, data_s
trig_start = int(seg[1])
return etg_nodes
return feature_extractor_nodes
#
# Main
......@@ -216,7 +216,7 @@ options, filenames = parse_command_line()
output_dir = "plots"
listdir = os.path.join(options.out_path, "gstlal_etg/channel_lists")
listdir = os.path.join(options.out_path, "gstlal_feature_extractor/channel_lists")
if not os.path.exists(listdir):
os.makedirs(listdir)
......@@ -240,21 +240,21 @@ try:
os.mkdir("logs")
except:
pass
dag = inspiral_pipe.DAG("etg_trigger_pipe")
dag = inspiral_pipe.DAG("feature_extractor_pipe")
#
# setup the job classes
#
gstlalETGJob = inspiral_pipe.generic_job("gstlal_etg", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}))
gstlalFeatureExtractorJob = inspiral_pipe.generic_job("gstlal_feature_extractor", condor_commands = inspiral_pipe.condor_command_dict_from_opts(options.condor_command, {"request_memory":options.request_memory, "request_cpus":options.request_cpu, "want_graceful_removal":"True", "kill_sig":"15"}))
segsdict = analysis_segments(ifo, data_source_info.frame_segments, boundary_seg, options.segment_length, max_template_length=max_template_length)
#
# ETG jobs
# feature extractor jobs
#
etg_nodes = etg_node_gen(gstlalETGJob, dag, [], segsdict, ifo, options, data_source_info)
feature_extractor_nodes = feature_extractor_node_gen(gstlalFeatureExtractorJob, dag, [], segsdict, ifo, options, data_source_info)
#
# all done
......
......@@ -180,7 +180,7 @@ class FeatureData(object):
def __init__(self, columns, keys = None, **kwargs):
self.keys = keys
self.columns = columns
self.etg_data = {}
self.feature_data = {}
def dump(self, path):
raise NotImplementedError
......@@ -198,7 +198,7 @@ class HDF5FeatureData(FeatureData):
def __init__(self, columns, keys, **kwargs):
super(HDF5FeatureData, self).__init__(columns, keys = keys, **kwargs)
self.cadence = kwargs.pop('cadence')
self.etg_data = {key: numpy.empty((self.cadence,), dtype = [(column, 'f8') for column in self.columns]) for key in keys}
self.feature_data = {key: numpy.empty((self.cadence,), dtype = [(column, 'f8') for column in self.columns]) for key in keys}
self.clear()
def dump(self, path, base, start_time, key = None, tmp = False):
......@@ -208,12 +208,12 @@ class HDF5FeatureData(FeatureData):
name = "%d_%d" % (start_time, self.cadence)
if key:
group = os.path.join(str(key[0]), str(key[1]).zfill(4))
create_new_dataset(path, base, self.etg_data[key], name=name, group=group, tmp=tmp)
create_new_dataset(path, base, self.feature_data[key], name=name, group=group, tmp=tmp)
self.clear(key)
else:
for key in self.keys:
group = os.path.join(str(key[0]), str(key[1]).zfill(4))
create_new_dataset(path, base, self.etg_data[key], name=name, group=group, tmp=tmp)
create_new_dataset(path, base, self.feature_data[key], name=name, group=group, tmp=tmp)
self.clear()
def append(self, value, key = None, buftime = None):
......@@ -222,14 +222,14 @@ class HDF5FeatureData(FeatureData):
"""
if buftime and key:
idx = buftime % self.cadence
self.etg_data[key][idx] = numpy.array([value[column] for column in self.columns])
self.feature_data[key][idx] = numpy.array([value[column] for column in self.columns])
def clear(self, key = None):
if key:
self.etg_data[key][:] = numpy.nan
self.feature_data[key][:] = numpy.nan
else:
for key in self.keys:
self.etg_data[key][:] = numpy.nan
self.feature_data[key][:] = numpy.nan
class HalfSineGaussianGenerator(object):
"""
......
......@@ -82,12 +82,12 @@ FRAME_TYPE=R
############
all : dag
sed -i '/gstlal_etg / s/$$/ |& grep -v '\''XLAL\|GSL\|Generic'\''/' etg_trigger_pipe.sh
@echo "Submit with: condor_submit_dag etg_trigger_pipe.dag"
sed -i '/gstlal_feature_extractor / s/$$/ |& grep -v '\''XLAL\|GSL\|Generic'\''/' feature_extractor_pipe.sh
@echo "Submit with: condor_submit_dag feature_extractor_pipe.dag"
# Run etg pipe to produce dag
dag : frame.cache plots channel_list.txt segments.xml.gz
gstlal_etg_pipe \
gstlal_feature_extractor_pipe \
--data-source frames \
--gps-start-time $(START) \
--gps-end-time $(STOP) \
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment