Skip to content
Snippets Groups Projects

Process injections in online analysis

Merged Wanting Niu requested to merge test_sim_row_kafka_2 into master
Compare and
2 files
+ 33
14
Compare changes
  • Side-by-side
  • Inline
Files
2
@@ -358,7 +358,7 @@ def chisq_distribution(df, non_centralities, size):
class CoincsDocument(object):
sngl_inspiral_columns = ("process:process_id", "ifo", "end_time", "end_time_ns", "eff_distance", "coa_phase", "mass1", "mass2", "snr", "chisq", "chisq_dof", "bank_chisq", "bank_chisq_dof", "sigmasq", "spin1x", "spin1y", "spin1z", "spin2x", "spin2y", "spin2z", "template_duration", "event_id", "Gamma0", "Gamma1")
def __init__(self, url, process_params, process_start_time, comment, instruments, seg, offsetvectors, injection_filename = None, tmp_path = None, replace_file = None, verbose = False):
+3
def __init__(self, url, process_params, process_start_time, comment, instruments, seg, offsetvectors, injection_filename = None, tmp_path = None, replace_file = None, sim_inspiral_table = None, verbose = False):
#
# how to make another like us
#
@@ -374,7 +374,7 @@ class CoincsDocument(object):
#
# build the XML document
#
self.xmldoc = ligolw.Document()
self.xmldoc.appendChild(ligolw.LIGO_LW())
# NOTE FIXME override the process start time. Since gstlal
@@ -403,7 +403,13 @@ class CoincsDocument(object):
#
if injection_filename is not None:
ligolw_add.ligolw_add(self.xmldoc, [injection_filename], contenthandler = LIGOLWContentHandler, verbose = verbose)
# sort and add the sim_insprial
xmldoc_sim_inspiral = ligolw_untils.load_filname(injection_filename, verbose = verbose, contenthandler = LIGOWContentHandler)
self.sim_inspiral_table = lsctables.SimInspiralTable.get_table(xmldoc_sim_inspiral)
self.sim_inspiral_table.sort(key = lambda row:row.end)
self.xmldoc.childNodes[-1].appendChild(self.sim_inspiral_table)
else:
self.sim_inspiral_table = None
#
# insert time slide offset vectors. remove duplicate
@@ -652,7 +658,17 @@ class GracedBWrapper(object):
self.__upload_aux_data(message, filename, tag, fobj.getvalue(), gracedb_ids)
del fobj
def do_alerts(self, last_coincs, psddict, rankingstat_xmldoc_func, seglistdicts, get_p_astro_func):
def nearest_sim_table(self, gps_time):
# search through the sim inspiral table
# find the row with geo-centric ends time nearest to the specific gps time (using by bisect)
# return the new sim inspiral table with a single row corresponding to the nearest time
sim_inspiral_table = lsctables.New(lsctables.SimInspiralTable)
idx = bisect.bisect(self.sim_inspiral, gps_time, lo=0, hi=len(self.sim_inspiral))
row = self.sim_inspiral[idx][1]
sim_inspiral_table.append(row)
return sim_inspiral_table
def do_alerts(self, last_coincs, psddict, rankingstat_xmldoc_func, seglistdicts, get_p_astro_func, sim_inspiral_table = None):
gracedb_ids = []
# no-op short circuit
@@ -698,9 +714,6 @@ class GracedBWrapper(object):
end_time = int(coinc_inspiral_index[coinc_event.coinc_event_id].end)
filename = "%s-%s-%d-%d.xml" % (instruments, description, end_time, 1)
#
# make sure the directory where we will write the files to disk exists
#
gracedb_uploads_gps_dir = os.path.join("gracedb_uploads", str(end_time)[:5])
if not os.path.exists(gracedb_uploads_gps_dir):
@@ -724,6 +737,9 @@ class GracedBWrapper(object):
# give the alert all the standard inspiral
# columns (attributes should all be
# populated). FIXME: ugly.
if self.sim_inspiral_table is not None:
xmldoc.childNodes[-1].appendChild(self.nearest_sim_table(end_time))
sngl_inspiral_table = lsctables.SnglInspiralTable.get_table(xmldoc)
process_params_table = lsctables.ProcessParamsTable.get_table(xmldoc)
for standard_column in ("process:process_id", "ifo", "search", "channel", "end_time", "end_time_ns", "end_time_gmst", "impulse_time", "impulse_time_ns", "template_duration", "event_duration", "amplitude", "eff_distance", "coa_phase", "mass1", "mass2", "mchirp", "mtotal", "eta", "kappa", "chi", "tau0", "tau2", "tau3", "tau4", "tau5", "ttotal", "psi0", "psi3", "alpha", "alpha1", "alpha2", "alpha3", "alpha4", "alpha5", "alpha6", "beta", "f_final", "snr", "chisq", "chisq_dof", "bank_chisq", "bank_chisq_dof", "cont_chisq", "cont_chisq_dof", "sigmasq", "rsqveto_duration", "Gamma0", "Gamma1", "Gamma2", "Gamma3", "Gamma4", "Gamma5", "Gamma6", "Gamma7", "Gamma8", "Gamma9", "spin1x", "spin1y", "spin1z", "spin2x", "spin2y", "spin2z", "event_id"):
@@ -980,12 +996,15 @@ class GracedBWrapper(object):
if self.verbose:
print("event assigned grace ID %s" % resp_json["graceid"], file=sys.stderr)
gracedb_ids.append(resp_json["graceid"])
self.__upload_aux_data("GstLAL internally computed p-astro", "p_astro.json", "p_astro", p_astro, [gracedb_ids[-1]])
try:
resp = self.gracedb_client.writeLabel(gracedb_ids[-1], 'PASTRO_READY')
except gracedb.rest.HTTPError as resp:
print(resp, file=sys.stderr)
break
print("gracedb upload of %s failed on attempt %d/%d" % (filename, attempt, self.retries), file=sys.stderr)
print(resp_json, file=sys.stderr)
time.sleep(random.lognormal(math.log(self.retry_delay), .5))
else:
print("gracedb upload of %s failed" % filename, file=sys.stderr)
# save event to disk
message.close()
with open(os.path.join(gracedb_uploads_gps_dir, filename), "w") as fileobj:
print("gracedb upload of %s failed on attempt %d/%d" % (filename, attempt, self.retries), file=sys.stderr)
print(resp_json, file=sys.stderr)
time.sleep(random.lognormal(math.log(self.retry_delay), .5))
Loading