Gitlab will migrate to a new storage backend starting 0300 UTC on 2020-04-04. We do not anticipate a maintenance window for this migration. Performance may be impacted over the weekend. Thanks for your patience.

Commit 06d5ccd2 authored by Patrick Godwin's avatar Patrick Godwin

lloidhandler.py: add gate history queues to SegmentsTracker, used in EyeCandy...

lloidhandler.py: add gate history queues to SegmentsTracker, used in EyeCandy for more meaningful segment metrics in place of on/off/gap samples, update routes used for aggregation in inspiral_pipe.py
parent bd05f18e
......@@ -241,15 +241,14 @@ def aggregator_layer(dag, jobs, options, job_tags):
# define routes used for aggregation jobs
snr_routes = ["%s_snr_history" % ifo for ifo in options.channel_dict]
network_routes = ["likelihood_history", "snr_history", "latency_history"]
state_routes = ["%s_strain_dropped" % ifo for ifo in options.channel_dict]
usage_routes = ["ram_history"]
state_routes = []
for ifo in options.channel_dict.keys():
state_routes.extend(["%s_dqvector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.extend(["%s_statevector_%s" % (ifo, state) for state in ["on", "off", "gap"]])
state_routes.append("%s_strain_dropped" % ifo)
agg_routes = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes))
gates = ["%ssegments" % gate for gate in ("statevector", "dqvector", "whiteht")]
seg_routes = ["%s_%s" % (ifo, gate) for ifo in options.channel_dict for gate in gates]
# analysis-based aggregation jobs
# FIXME don't hard code the 1000
max_agg_jobs = 1000
......@@ -272,6 +271,22 @@ def aggregator_layer(dag, jobs, options, job_tags):
else:
aggNode = dagparts.DAGNode(jobs['agg'], dag, [], opts = these_options)
# segment-based jobs
seg_routes = list(dagparts.groups(seg_routes, max(max_agg_jobs // (4 * len(job_tags)), 1)))
for routes in seg_routes:
these_options = dict(agg_options)
these_options["route"] = routes
these_options["data-type"] = "min"
for ii, (aggstart, aggend) in enumerate(zip(agg_job_bounds[:-1], agg_job_bounds[1:])):
these_options["job-start"] = aggstart
these_options["num-jobs"] = aggend - aggstart
if ii == 0: ### elect first aggregator per route as leader
these_options["across-jobs"] = ""
aggNode = dagparts.DAGNode(jobs['aggLeader'], dag, [], opts = these_options)
else:
aggNode = dagparts.DAGNode(jobs['agg'], dag, [], opts = these_options)
# Trigger aggregation
trigagg_options = {
"dump-period": 0,
......
......@@ -60,6 +60,7 @@ import math
import numpy
import os
import resource
from scipy.interpolate import interp1d
import StringIO
import sys
import threading
......@@ -131,9 +132,10 @@ def subdir_from_T050017_filename(fname):
class EyeCandy(object):
def __init__(self, instruments, kafka_server, tag, pipeline):
def __init__(self, instruments, kafka_server, tag, pipeline, segmentstracker):
self.kafka_server = kafka_server
self.tag = tag
self.gate_history = segmentstracker.gate_history
self.latency_histogram = rate.BinnedArray(rate.NDBins((rate.LinearPlusOverflowBins(5, 205, 22),)))
# NOTE most of this data is collected at 1Hz, thus a 300
# element deque should hold about 5 minutes of history.
......@@ -147,17 +149,8 @@ class EyeCandy(object):
self.far_history = deque(maxlen = 300)
self.ram_history = deque(maxlen = 2)
self.ifo_snr_history = dict((instrument, deque(maxlen = 300)) for instrument in instruments)
self.state_sample_rates = {"H1": 16, "L1": 16, "V1": 1}
self.dqvectors = {}
self.statevectors = {}
self.strain = {}
for instrument in instruments:
name = "%s_state_vector" % instrument
elem = pipeline.get_by_name(name)
self.statevectors[instrument] = elem
name = "%s_dq_vector" % instrument
elem = pipeline.get_by_name(name)
self.dqvectors[instrument] = elem
name = "%s_strain_audiorate" % instrument
elem = pipeline.get_by_name(name)
self.strain[instrument] = elem
......@@ -271,22 +264,30 @@ class EyeCandy(object):
t = inspiral.now()
if self.time_since_last_state is None:
self.time_since_last_state = t
# NOTE only dump to kafka every 1 seconds
# send state/segment information to kafka every second
if self.producer is not None and (t - self.time_since_last_state) >= 1:
self.time_since_last_state = t
for ii, column in enumerate(["time", "data"]):
self.kafka_data["ram_history"][column].append(float(self.ram_history[-1][ii]))
# send the state vector and dq vector information to kafka
# FIXME state sample rate hack to adjust for 16 Hz sample rate of ALIGO vs 1 Hz of Virgo
for instrument, elem in self.dqvectors.items():
for state in ["on", "off", "gap"]:
self.kafka_data["%s_dqvector_%s" % (instrument, state)]["time"].append(float(t))
self.kafka_data["%s_dqvector_%s" % (instrument, state)]["data"].append(elem.get_property("%s-samples" % state) / self.state_sample_rates[instrument])
for instrument, elem in self.statevectors.items():
for state in ["on", "off", "gap"]:
self.kafka_data["%s_statevector_%s" % (instrument, state)]["time"].append(float(t))
self.kafka_data["%s_statevector_%s" % (instrument, state)]["data"].append(elem.get_property("%s-samples" % state) / self.state_sample_rates[instrument])
# collect gate segments
for gate in self.gate_history.keys():
for instrument, seg_history in self.gate_history[gate].items():
# get on/off points, add point at +inf
gate_interp_times, gate_interp_onoff = zip(*seg_history)
gate_interp_times = list(gate_interp_times)
gate_interp_times.append(2000000000)
gate_interp_onoff = list(gate_interp_onoff)
gate_interp_onoff.append(gate_interp_onoff[-1])
# regularly sample from on/off points
gate_times = numpy.arange(int(self.time_since_last_state), int(t + 1), 0.25)
gate_onoff = interp1d(gate_interp_times, gate_interp_onoff, kind='zero')(gate_times)
self.kafka_data["%s_%s" % (instrument, gate)]["time"].extend([t for t in gate_times if t >= self.time_since_last_state])
self.kafka_data["%s_%s" % (instrument, gate)]["data"].extend([state for t, state in zip(gate_times, gate_onoff) if t >= self.time_since_last_state])
# collect strain dropped samples
for instrument, elem in self.strain.items():
# I know the name is strain_drop even though it
# comes from the "add" property. that is
......@@ -405,6 +406,9 @@ class SegmentsTracker(object):
self.recent_segment_histories = self.seglistdicts.copy()
self.segment_history_duration = segment_history_duration
# recent gate history encoded in on/off bits
self.gate_history = {segtype: {instrument: deque(maxlen = 30) for instrument in instruments} for segtype in gate_suffix}
# iterate over segment types and instruments, look for the
# gate element that should provide those segments, and
# connect handlers to collect the segments
......@@ -459,9 +463,11 @@ class SegmentsTracker(object):
if new_state == "off":
# record end of segment
self.seglistdicts[segtype][instrument] -= segments.segmentlist((segments.segment(timestamp, segments.PosInfinity),))
self.gate_history[segtype][instrument].append((float(timestamp), 0.))
elif new_state == "on":
# record start of new segment
self.seglistdicts[segtype][instrument] += segments.segmentlist((segments.segment(timestamp, segments.PosInfinity),))
self.gate_history[segtype][instrument].append((float(timestamp), 1.))
else:
assert False, "impossible new_state '%s'" % new_state
......@@ -669,16 +675,20 @@ class Handler(simplehandler.Handler):
# FIXME: detangle this
self.gracedbwrapper.lock = self.lock
self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.tag, pipeline)
# FIXME: detangle this
self.eye_candy.lock = self.lock
#
# setup segment list collection from gates
#
self.segmentstracker = SegmentsTracker(pipeline, rankingstat.instruments, verbose = verbose)
#
# set up metric collection
#
self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.tag, pipeline, self.segmentstracker)
# FIXME: detangle this
self.eye_candy.lock = self.lock
#
# setup bottle routes (and rahwts)
#
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment