Skip to content
Snippets Groups Projects
Commit aa29b94a authored by chad.hanna's avatar chad.hanna
Browse files

lloidhander: make fewer kafka messages

parent 7b53b132
No related branches found
No related tags found
No related merge requests found
......@@ -187,13 +187,34 @@ class EyeCandy(object):
else:
self.producer = None
# FIXME, it is silly to store kafka data like this since we
# have all the other data structures, but since we are also
# maintaining the bottle route methods, we should keep this a
# bit separate for now to not disrupt too much.
self.kafka_data = {}
self.kafka_data["latency_history"] = ""
self.kafka_data["likelihood_history"] = ""
self.kafka_data["snr_history"] = ""
self.kafka_data["far_history"] = ""
self.kafka_data["ram_history"] = ""
for instrument in instruments:
self.kafka_data["%s_snr_history" % instrument] = ""
self.kafka_data["%s/dqvector_on" % instrument] = ""
self.kafka_data["%s/dqvector_off" % instrument] = ""
self.kafka_data["%s/dqvector_gap" % instrument] = ""
self.kafka_data["%s/statevector_on" % instrument] = ""
self.kafka_data["%s/statevector_off" % instrument] = ""
self.kafka_data["%s/statevector_gap" % instrument] = ""
self.kafka_data["%s/strain_dropped" % instrument] = ""
def update(self, events, last_coincs):
self.ram_history.append((float(lal.UTCToGPS(time.gmtime())), (resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + resource.getrusage(resource.RUSAGE_CHILDREN).ru_maxrss) / 1048576.)) # GB
if events:
max_snr_event = max(events, key = lambda event: event.snr)
self.ifo_snr_history[max_snr_event.ifo].append((float(max_snr_event.end), max_snr_event.snr))
if self.producer is not None:
self.producer.send(self.tag, {"%s_snr_history" % max_snr_event.ifo: "%s\t%s" % (self.ifo_snr_history[max_snr_event.ifo][-1][0], self.ifo_snr_history[max_snr_event.ifo][-1][1])})
self.kafka_data["%s_snr_history" % max_snr_event.ifo] += "%s\t%s\n" % (self.ifo_snr_history[max_snr_event.ifo][-1][0], self.ifo_snr_history[max_snr_event.ifo][-1][1])
if last_coincs:
coinc_inspiral_index = last_coincs.coinc_inspiral_index
coinc_event_index = last_coincs.coinc_event_index
......@@ -214,36 +235,38 @@ class EyeCandy(object):
self.far_history.append((max_likelihood_t, max_likelihood_far))
if self.producer is not None:
self.producer.send(self.tag, {"latency_history": "%s\t%s" % (self.latency_history[-1][0], self.latency_history[-1][1])})
self.producer.send(self.tag, {"snr_history": "%s\t%s" % (self.snr_history[-1][0], self.snr_history[-1][1])})
self.producer.send(self.tag, {"likelihood_history": "%s\t%s" % (self.likelihood_history[-1][0], self.likelihood_history[-1][1])})
self.producer.send(self.tag, {"far_history": "%s\t%s" % (self.far_history[-1][0], self.far_history[-1][1])})
self.kafka_data["latency_history"] += "%s\t%s\n" % (self.latency_history[-1][0], self.latency_history[-1][1])
self.kafka_data["snr_history"] += "%s\t%s\n" % (self.snr_history[-1][0], self.snr_history[-1][1])
self.kafka_data["likelihood_history"] += "%s\t%s\n" % (self.likelihood_history[-1][0], self.likelihood_history[-1][1])
self.kafka_data["far_history"] += "%s\t%s\n" % (self.far_history[-1][0], self.far_history[-1][1])
t = inspiral.now()
if self.time_since_last_state is None:
self.time_since_last_state = t
if self.producer is not None and (t - self.time_since_last_state) > 1:
self.time_since_last_state = t
self.producer.send(self.tag, {"ram_history": "%s\t%s" % (self.ram_history[-1][0], self.ram_history[-1][1])})
self.kafka_data["ram_history"] += "%s\t%s" % (self.ram_history[-1][0], self.ram_history[-1][1])
# send the state vector and dq vector information to kafka
for instrument, elem in self.dqvectors.items():
self.producer.send(self.tag, {"%s/dqvector_on" % instrument: "%s\t%s" % (t, elem.get_property("on-samples"))})
self.producer.send(self.tag, {"%s/dqvector_off" % instrument: "%s\t%s" % (t, elem.get_property("off-samples"))})
self.producer.send(self.tag, {"%s/dqvector_gap" % instrument: "%s\t%s" % (t, elem.get_property("gap-samples"))})
self.kafka_data["%s/dqvector_on" % instrument] += "%s\t%s" % (t, elem.get_property("on-samples"))
self.kafka_data["%s/dqvector_off" % instrument] += "%s\t%s" % (t, elem.get_property("off-samples"))
self.kafka_data["%s/dqvector_gap" % instrument] += "%s\t%s" % (t, elem.get_property("gap-samples"))
for instrument, elem in self.statevectors.items():
self.producer.send(self.tag, {"%s/statevector_on" % instrument: "%s\t%s" % (t, elem.get_property("on-samples"))})
self.producer.send(self.tag, {"%s/statevector_off" % instrument: "%s\t%s" % (t, elem.get_property("off-samples"))})
self.producer.send(self.tag, {"%s/statevector_gap" % instrument: "%s\t%s" % (t, elem.get_property("gap-samples"))})
self.kafka_data["%s/statevector_on" % instrument] += "%s\t%s" % (t, elem.get_property("on-samples"))
self.kafka_data["%s/statevector_off" % instrument] += "%s\t%s" % (t, elem.get_property("off-samples"))
self.kafka_data["%s/statevector_gap" % instrument] += "%s\t%s" % (t, elem.get_property("gap-samples"))
for instrument, elem in self.strain.items():
# I know the name is strain_drop even though it
# comes from the "add" property. that is
# because audiorate has to "add" samples when
# data is dropped.
# FIXME don't hard code the rate
self.producer.send(self.tag, {"%s/strain_dropped" % instrument: "%s\t%s" % (t, elem.get_property("add") / 16384.)})
self.kafka_data["%s/strain_dropped" % instrument] += "%s\t%s" % (t, elem.get_property("add") / 16384.)
# Actually flush all of the kafka messages
# Send and flush all of the kafka messages and clear the data
self.producer.send(self.tag, self.kafka_data)
self.producer.flush()
for k in self.kafka_data: self.kafka_data[k] = ""
def web_get_latency_histogram(self):
with self.lock:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment