From aae198c7f1fe3bfa133f8cf326bb543a328d71b1 Mon Sep 17 00:00:00 2001 From: "chad.hanna" <crh184@psu.edu> Date: Sat, 15 Sep 2018 07:51:45 -0700 Subject: [PATCH] move dataserver code --- gstlal-ugly/bin/gstlal_data_server | 566 ----------------------------- 1 file changed, 566 deletions(-) delete mode 100755 gstlal-ugly/bin/gstlal_data_server diff --git a/gstlal-ugly/bin/gstlal_data_server b/gstlal-ugly/bin/gstlal_data_server deleted file mode 100755 index 66d7bcc519..0000000000 --- a/gstlal-ugly/bin/gstlal_data_server +++ /dev/null @@ -1,566 +0,0 @@ -#!/usr/bin/python - -import gviz_api -import h5py -import os,sys -import cgi -import cgitb -cgitb.enable() -form = cgi.parse() -import numpy -import re -import time -from scipy.interpolate import interp1d -from itertools import count -#import pyparsing - -def now(): - #FIXME use pylal when available - return time.time() - 315964785 - -def parse_form(form): - # we have to get a query according to the google standard - assert "tq" in form - assert "tqx" in form - reqId = form["tqx"][0].split(":")[1] - query = form["tq"][0] - - # FIXME these are not google API complient. - # Figure out if a GPS time is being specified, else assume "now" - if "gpstime" in form: - query_time = form["gpstime"][0] - if query_time != "-1": - query_time = int(query_time) - else: - query_time = int(now()) - else: - query_time = int(now()) - - if "duration" in form: - duration = form["duration"][0] - if duration != "-1": - duration = int(duration) - else: - duration = 1000 - else: - duration = 1000 - - - if "type" in form: - datatype = form["type"][0] - else: - datatype = "max" - - if "id" in form: - idstart, idend = [int(i) for i in form["id"][0].split(",")] - else: - idstart = 0 - idend = 32 - - if "dir" in form: - base_path = form["dir"][0] - else: - base_path = "/home/gstlalcbc/observing/3/online/sept_opa/trigs" - - if "ifos" in form: - ifos = form["ifos"][0].split(",") - else: - ifos = ["H1", "L1", "V1"] - - jobdirs = ["by_job/%04d" % i for i in range(idstart, idend+1)] - - return reqId, query, query_time, duration, datatype, base_path, jobdirs, ifos - - -def gps_by_level(t, level): - return "/".join([x for x in str(t)[0:level]]) - - -def get_partial_paths_to_aggregated_data(query_time, duration, level = None): - if query_time is None: - yield "" - else: - if level is None: - level = 10 - int(numpy.ceil(numpy.log10(duration))) - if level > 6: - level = 6 - for n in count(): - this_time = query_time - n * 10**(10-level) - if this_time > 0: - yield gps_by_level(query_time - n * 10**(10-level), level) - else: - break - - -# Always print a header first -print "Content-type: application/json" -print "Cache-Control: max-age=10" -print - - -reqId, query, query_time, duration, datatype, base_path, jobdirs, ifos = parse_form(form) - -# -# "SQL" parser. FIXME. First, google query langauge isn't really SQL and -# second, if this keeps going we need to switch to a parsing library e.g., -# pyparsing. For now this is likely to be a collection of once-offs. -# - -def stats_on_data(data): - return float(numpy.min(data)), float(numpy.percentile(data, 15.9)), float(numpy.percentile(data, 84.1)), float(numpy.max(data)) - -def read_aggregated_data_by_job(route, query_time, duration, base_path, jobdirs = [""], rootdir = "aggregator", datatype = "median", age = 0., level = None): - max_time = [] - for jobdir in jobdirs: - this_data = numpy.empty((0)) - this_time = numpy.empty((0)) - for n, partial_path in enumerate(get_partial_paths_to_aggregated_data(query_time, duration, level = level)): - try: - fname = "%s/%s/%s/%s/%s/%s.hdf5" % (base_path, rootdir, partial_path, jobdir, datatype, route) - f = h5py.File(fname, "r") - this_data = numpy.hstack((numpy.array(f["data"]), this_data)) - this_time = numpy.hstack((numpy.array(f["time"]), this_time)) - f.close() - except IOError: - pass - # refuse to look back more than 100 directories and stop once you have enough data - if n > 100 or (len(this_time) > 1 and float(this_time[-1]) - float(this_time[0]) > duration): - break - if query_time is not None: - ix = numpy.logical_and(this_time > (query_time-duration), this_time <= query_time) - this_time = this_time[ix] - this_data = this_data[ix] - if len(this_time) == 0: # possible there is no data, so register the query time minus the specified default age FIXME do something smarter - this_time = numpy.array([query_time - duration - age, query_time - age]) - # FIXME allow a different default value - this_data = numpy.array([0,0]) - ix = numpy.argsort(this_time) - yield jobdir, this_time[ix], this_data[ix] - -# -# latency, SNR, FAR, likelihood -# - - -def stats_by_node(routes, query_time, duration, base_path, jobdirs, thisdatatype): - data = {} - for ifo, thisroute in routes: - # get the max time and the stats on the data - data[ifo] = [(max(x[1]), x[0].split("/")[-1], stats_on_data(x[2])) for x in read_aggregated_data_by_job(thisroute, query_time, duration, base_path, jobdirs, datatype = thisdatatype)] - - description = [("job", "string")] - for ifo in routes: - if ifo == "": - ifo = "combined" - thisdesc = [ - ("%s %d" % (ifo, float(max(data["%s_" % ifo])[0]), "number")), - ("", "number"), - ("", "number"), - ("", "number") - ] - description.extend(thisdesc) - - data_table = gviz_api.DataTable(description) - thisdata = [] - for tup in zip(*[data[k] for k in routes]): - row = [tup[0][1]] - for t in tup[1:]: - row += list(t[2]) - thisdata.append(row) - data_table.LoadData(thisdata) - print data_table.ToJSonResponse(order_by = "job", req_id = reqId) - sys.exit() - - -def status_by_node(routes, query_time, duration, base_path, jobdirs, thisdatatype): - data = {} - description = [("job", "string")] - ifostrs = [x[0] for x in routes] - for (ifo, thisroute) in routes: - thisroute = "%s%s" % (ifo, route) - # get the max time and the stats on the data - data[ifo] = [(float(x[1][-1]), x[0].split("/")[-1], float(x[2][-1])) for x in read_aggregated_data_by_job(thisroute, query_time, duration, base_path, jobdirs, datatype = thisdatatype)] - description.append(("%s %d" % (ifo, float(max(data[ifo])[0])), "number")) - data_table = gviz_api.DataTable(description) - thisdata = [] - for tup in zip(*[data[k] for (k,_) in routes]): - row = [tup[0][1]] - for t in tup: - row += [t[2]] - thisdata.append(row) - data_table.LoadData(thisdata) - print data_table.ToJSonResponse(order_by = "job", req_id = reqId) - sys.exit() - - -def node_is_all(route, query_time, duration, base_path, jobdirs, thisdatatype): - _, this_time, this_data = read_aggregated_data_by_job(route, query_time, duration, base_path, jobdirs = [""], datatype = thisdatatype).next() - out_data = [[float(t),float(d)] for t,d in zip(this_time, this_data)] - description = [("time", "number"), ("%d" % float(this_time[-1]), "number")] - data_table = gviz_api.DataTable(description) - data_table.LoadData(out_data) - print data_table.ToJSonResponse(order_by = "time", req_id = reqId) - sys.exit() - -def nodes_now(route, query_time, duration, base_path, jobdirs, thisdatatype): - _, this_time, this_data = read_aggregated_data_by_job(route, query_time, duration, base_path, jobdirs = [""], datatype = thisdatatype).next() - description = [(label, "number")] - data_table = gviz_api.DataTable(description) - data_table.LoadData([[float(this_data[-1])]]) - print data_table.ToJSonResponse(req_id = reqId) - sys.exit() - - -def scatter_by_node(routes, query_time, duration, base_path, jobdirs, thisdatatype): - data = {} - description = [("time", "number")] - ifostrs = [x[0] for x in routes] - for (ifo, thisroute) in routes: - # get the max time and the stats on the data - data[ifo] = [(x[1], x[0].split("/")[-1], x[2], x[1][-1]) for x in read_aggregated_data_by_job(thisroute, query_time, duration, base_path, jobdirs, datatype = thisdatatype)] - description.append((ifo, "number")) - - data_table = gviz_api.DataTable(description) - thisdata = [] - #FIXME make tootips bin number - tmp = [] - for ifo in ifostrs: - tmp += [data[ifo][0][0],data[ifo][0][2]] - for tup in zip(*tmp): - row = [float(tup[0])] - for n in range(len(tup) / 2): - row += [float(tup[1+2*n])] - thisdata.append(row) - data_table.LoadData(thisdata) - print data_table.ToJSonResponse(order_by = "job", req_id = reqId) - sys.exit() - -for route, label, thisdatatype in (("latency_history", "Latency (s)", "max"), ("far_history", "FAR (Hz)", "min"), ("likelihood_history", "L", "max")): - if route in query: - if "stats by node" in query: - stats_by_node([("", route)], query_time, duration, base_path, jobdirs, thisdatatype) - if "status by node" in query: - status_by_node([("", route)], query_time, duration, base_path, jobdirs, thisdatatype) - - if "where node is all" in query: - node_is_all(route, query_time, duration, base_path, jobdirs, thisdatatype) - if "now" in query: - nodes_now(route, query_time, duration, base_path, jobdirs, thisdatatype) - -# -# SNR history is special -# - - -for route, label in (("snr_history", "SNR"),): - if route in query: - ifostrs = ["%s_" % ifo for ifo in ifos] + [""] - routes = [(ifo, "%s%s" %(ifo, route)) for ifo in ifostrs] - if "stats by node" in query: - stats_by_node(routes, query_time, duration, base_path, jobdirs, "max") - - if "status by node" in query: - status_by_node(routes, query_time, duration, base_path, jobdirs, "max") - - if "scatter by node" in query: - scatter_by_node(routes, query_time, duration, base_path, [""], "max") - - if "where node is all" in query: - node_is_all(route, query_time, duration, base_path, [""], "max") - - if "now" in query: - nodes_now(route, query_time, duration, base_path, [""], "max") - - -# -# horizon history and noise -# - -for route, x, y, units in (("horizon_history", "time", "horizon", "(Mpc)"), ("noise", "time", "noise", "|n(t)|")): - if route in query: - latest = {} - if "now" in query: - for ifo, this_time, this_data in read_aggregated_data_by_job(route, query_time, duration, base_path, jobdirs = ifos, rootdir = "dq", datatype = "max"): - latest[ifo] = float(this_time[-1]), float(this_data[-1]) - # convert to range - out_data = [[float(latest[ifo][1]) / 2.25 for ifo in ifos]] - description = [("%s %s" % (ifo, units), "number") for ifo in ifos] - else: - out_data = []; IFOtime = {}; IFOdata = {} - # FIXME don't harcode - for ifo, this_time, this_data in read_aggregated_data_by_job(route, query_time, duration, base_path, jobdirs = ifos, rootdir = "dq", datatype = "max"): - # convert to range - IFOdata[ifo] = interp1d(this_time, this_data / 2.25, kind = "nearest", bounds_error = False, fill_value=0.) - IFOtime[ifo] = this_time - for t in numpy.sort(numpy.hstack([IFOtime[i] for i in ifos])): - out_data.append([float(t)] + [float(IFOdata[ifo](t)) for ifo in ifos]) - description = [(x, "number")] - description.extend([("%s %s @ %d" % (ifo, units, float(IFOtime[ifo][-1])), "number") for ifo in ifos]) - - data_table = gviz_api.DataTable(description) - data_table.LoadData(out_data) - print data_table.ToJSonResponse(order_by = x, req_id = reqId) - - -# -# uptime, dropped data and individual ifo SNR -# - -for route, label in (("_state_vector_on_off_gap", "(s)"), ("_strain_add_drop", "(s)")): - if route in query: - out_data = [] - if "status by node" in query: - jobs = []; this_time = {}; this_data = {}; - for ifo in ifos: - for job, t, d in read_aggregated_data_by_job("%s%s" % (ifo, route), None, None, base_path, jobdirs = jobdirs, rootdir = "aggregator", datatype = ""): - this_time.setdefault(ifo, []).append(t[-1]) - this_data.setdefault(ifo, []).append(d[-1]) - jobs.append(job.split("/")[-1]) - # FIXME the 16 comes from the 16 Hz sample rate - # used for state vectors in ALIGO. if that - # changes this needs to change too - tmp = [jobs] + [this_data[ifo] for ifo in ifos] - # HACK for virgo sample rate being different - if "V1" in ifos: - V1idx = ifos.index("V1") - else: - V1idx = -1 - for tup in zip(*tmp): - row = [tup[0]] - for idx, t in enumerate(tup[1:]): - if idx == V1idx: - row += [t] - else: - row += [t / 16.] - out_data.append(row) - description = [("job", "string")] - for ifo in ifos: - tmp = "%s %d" % (ifo, float(max(this_time[ifo]))) - description.append((tmp, "number")) - data_table = gviz_api.DataTable(description) - data_table.LoadData(out_data) - print data_table.ToJSonResponse(order_by = "job", req_id = reqId) - sys.exit() - - if "now" in query: - minuptime = 10000000. - for ifo in ifos: - for job, t, d in read_aggregated_data_by_job("%s%s" % (ifo, route), None, None, base_path, jobdirs = jobdirs, rootdir = "aggregator", datatype = ""): - # FIXME the 16 comes from the 16 Hz sample rate - # used for state vectors in ALIGO. if that - # changes this needs to change too - # FIXME return hours for this query because google gauges don't support axis transformations - if ifo != "V1": - minuptime = min(minuptime, float(d[-1]/16.) / 3600.) - else: - minuptime = min(minuptime, float(d[-1]) / 3600.) - out_data.append([minuptime]) - description = [ - ("uptime (h)", "number") - ] - data_table = gviz_api.DataTable(description) - data_table.LoadData(out_data) - print data_table.ToJSonResponse(req_id = reqId) - sys.exit() - - - -# -# RAM history -# - -for route, label in (("ram_history", "(GB)"),): - if route in query: - out_data = [] - if "status by node" in query: - # Set the duration for the highest level - for job, this_time, this_data in read_aggregated_data_by_job(route, None, None, base_path, jobdirs = jobdirs, rootdir = "aggregator", datatype = ""): - if len(this_data) > 0: - out_data.append([job.split("/")[-1], float(this_data[-1])]) - else: - # FIXME a silly way to handle an error, but 0 ram would be noticed - out_data.append([job.split("/")[-1], 0.]) - description = [ - ("job", "string"), - ("RAM %d" % int(this_time[-1]), "number"), - ] - data_table = gviz_api.DataTable(description) - data_table.LoadData(out_data) - print data_table.ToJSonResponse(order_by = "job", req_id = reqId) - sys.exit() - -# -# Time since last -# - -if "time_since_last" in query: - if "status by node" in query: - nowtime = now() - out_data = [] - for job, this_time, this_data in read_aggregated_data_by_job("ram_history", None, None, base_path, jobdirs = jobdirs, rootdir = "aggregator", datatype = "", age = 1e6): - if len(this_time) > 0: - out_data.append([job.split("/")[-1], nowtime - this_time[-1]]) - else: - # FIXME, a silly way to encode an erorr, but 1e8 is the max time since last that we display - out_data.append([job.split("/")[-1], 1e8]) - description = [ - ("job", "string"), - ("Time since %d" % nowtime, "number"), - ] - data_table = gviz_api.DataTable(description) - data_table.LoadData(out_data) - print data_table.ToJSonResponse(order_by = "job", req_id = reqId) - sys.exit() - - -# -# PSDs -# - -freq = numpy.array([]) -datadict = dict((ifo, numpy.array([])) for ifo in ifos) -out_data = [] -lowestdir = gps_by_level(query_time, 6) -if "psd" in query: - for ifo in ifos: - if "now" in query: - fname = "%s/dq/%s/psd.hdf5" % (base_path, ifo) - else: - # PSD files are like: H1-PSD-1159632700-100.hdf5 - fname = "%s/dq/%s/%s/%s-PSD-%d-100.hdf5" % (base_path, lowestdir, ifo, ifo, int(round(query_time,-2))) - try: - f = h5py.File(fname, "r") - datadict[ifo] = numpy.array(f["asd"]) - freq = numpy.array(f["freq"]) - psd_time = numpy.array(f["time"])[0] - f.close() - except IOError: - psd_time = 0 - pass - # FIXME don't harcode - tmp = [freq] - tmp.extend([datadict[ifo] for ifo in ifos]) - for tup in zip(*tmp): - row = [float(tup[0])] - for t in tup[1:]: - row += [float(t)] - out_data.append(row) - - description = [("freq", "number")] - for ifo in ifos: - description.append(("%s ASD @ %d" % (ifo, psd_time), "number")) - - data_table = gviz_api.DataTable(description) - data_table.LoadData(out_data) - print data_table.ToJSonResponse(req_id = reqId) - - -if "time_since_trigger" in query: - route = "snr_history" - data = {} - nowtime = now() - ifostrs = ["%s_" % ifo for ifo in ifos] + [""] - for ifo in ifostrs: - thisroute = "%s%s" % (ifo, route) - # get the max time and the stats on the data - data[ifo] = [(float(max(x[1])), x[0].split("/")[-1]) for x in read_aggregated_data_by_job(thisroute, query_time, duration, base_path, jobdirs, datatype = "max", age = 1e6)] - description = [("job", "string")] - for ifo in ifos: - description.append(("%s %d" % (ifo, float(max(data["%s_" % ifo])[0])), "number")) - description.append(("combined %d" % float(max(data[""])[0]), "number")) - data_table = gviz_api.DataTable(description) - this_data = [] - tmp = [data[ifo] for ifo in ifostrs] - for tup in zip(*tmp): - row = [tup[0][1]] - for t in tup[1:]: - row += [nowtime - t[0]] - this_data.append(row) - data_table.LoadData(this_data) - print data_table.ToJSonResponse(order_by = "job", req_id = reqId) - sys.exit() - -if "nagios" in query: - nowtime = now() - out_data = [] - for job, this_time, this_data in read_aggregated_data_by_job("ram_history", None, None, base_path, jobdirs = jobdirs, rootdir = "aggregator", datatype = "", age = 1e6): - if len(this_time) > 0: - out_data.append([job.split("/")[-1], nowtime - this_time[-1]]) - else: - # FIXME, a silly way to encode an error, but 1e8 is the max time since last that we display - out_data.append([job.split("/")[-1], 1e8]) - - - import json - delay = [x[1] for x in out_data] - # under some circumstances this can take up to 600 seconds...(though it - # really should be much faster - status = [x for x in delay if x > 600.] - if len(status) > 0: - print json.dumps({"nagios_shib_scraper_ver": 0.1, "status_intervals":[{"num_status": 2, "txt_status": "%d jobs more than 10 mins behind" % len(status)}]}, sort_keys=True, indent=4, separators=(',', ': ')) - else: - print >>sys.stdout, json.dumps({"nagios_shib_scraper_ver": 0.1, "status_intervals":[{"num_status": 0 , "txt_status": "OK: Max delay: %f s" % max(delay)}]}, sort_keys=True, indent=4, separators=(',', ': ')) - - sys.exit() - - -# FIXME, just a hardcoded POS -# FIXME counts now, turn into VT -bnsrootdir = "U_dVdzo1pz_bns_normal_low_spin_online_injections_O2A_v1_aggregator" -nsbhrootdir = "U_dVdzo1pz_nsbh05_isotropic_online_injections_O2A_v1_aggregator" -bbhrootdir = "U_dVdzo1pz_lnm_online_injections_O2A_v1_aggregator" - -if "vt" in query: - - nowtime = now() - # FIXME check this - O2_start = 1164556817 - duration = nowtime - O2_start - - - dvt = {} - dvt["bns"] = 0.240829 / (652333 + 11566878) - dvt["nsbh"] = 5.45325 / (597700 + 62915658) - dvt["bbh"] = 32.8574 / (652356 + 5281336) - - datadict = {} - out_data = [] - timevec = numpy.linspace(O2_start, nowtime, 1000) - for key, rootdir in (("bns", bnsrootdir), ("nsbh", nsbhrootdir), ("bbh", bbhrootdir)): - for job, this_time, this_data in read_aggregated_data_by_job("far_history", nowtime, duration, base_path, jobdirs = [""], rootdir = rootdir, datatype = "min", age = 1e6, level = 5): - count_data = numpy.ones(len(this_data)) - count_data[this_data > 3.86e-7] = 0 - count_interp = interp1d(this_time, numpy.cumsum(count_data), kind = "nearest", bounds_error = False, fill_value=0.) - datadict[key] = numpy.log10(count_interp(timevec) * dvt[key]) - - # FIXME should use a time vector that makes sure to span all three - # sets, but this should be okay - for t, x, y, z in zip(timevec, datadict["bns"], datadict["nsbh"], datadict["bbh"]): - out_data.append([float(t), float(x), float(y), float(z)]) - description = [ - ("time", "number"), - ("BNS", "number"), - ("NSBH", "number"), - ("BBH", "number") - ] - - data_table = gviz_api.DataTable(description) - data_table.LoadData(out_data) - print data_table.ToJSonResponse(order_by = "time", req_id = reqId) - -# Injection FAR time series -if "far_inj_history" in query: - nowtime = now() - time = [] - data = [] - for key, rootdir in (("bns", bnsrootdir), ("nsbh", nsbhrootdir), ("bbh", bbhrootdir)): - for job, this_time, this_data in read_aggregated_data_by_job("far_history", nowtime, duration, base_path, jobdirs = [""], rootdir = rootdir, datatype = "min", age = 1e6, level = 5): - time.extend(this_time) - data.extend(this_data) - - out_data = [[float(t),float(d)] for t,d in zip(time, data)] - description = [("time", "number"), ("%d" % float(time[-1]), "number")] - data_table = gviz_api.DataTable(description) - data_table.LoadData(out_data) - print data_table.ToJSonResponse(order_by = "time", req_id = reqId) - sys.exit() -- GitLab