Review and refactor external process/thread management in postcoh_finalsink
Summary
The code in postcoh_finalsink.py manages has a lot of multi-threading and subprocess calls that are either unused (and should be deprecated) or poorly written (and should be refactored). This code should be completely reevaluated and rewritten if required.
Solution
An ideal resolution to this issue would be to re-implement only multi-threading/multi-processing in a much more readable way, removing any functions or old scripts that are no longer maintained. It may even be the case that we should not be using threading.Thread
(as it is a blocking operation in Python that only should be used for IO) nor subprocess.Popen
(as communication latency on supercomputers may be slower than expected), however both of these statements should be verified. Ultimately, refactoring this code with well explained comments and docstrings should also help future developers (both students or staff) manage this code as multi-threading/multi-processing is often a difficult beast, especially for new developers.
Examples
I have some example code snippets that I think could be re-implemented.
These are code snippets taken from postcoh_finalsink.py with changes made in !53 (merged) as of 11th January 2023.
threading.Thread Example
The following code could be improved in readability and structure. Calling del
on on our manually managed threads seems like an anti-pattern, and perhaps a cleaner approach should be used (i.e. with
context managers, and perhaps the concurrent.futures package from the standard library might be simpler to use).
def snapshot_segment_file(self, t_snapshot_start, duration, verbose=False):
filename = "%s/%s_SEGMENTS_%d_%d.xml.gz" % (self.path, self.ifos,
t_snapshot_start, duration)
logging.info("snapshotting %s" % filename)
# make sure the last round of output dumping is finished
if ((self.thread_snapshot_segment is not None)
and (self.thread_snapshot_segment.isAlive())):
self.thread_snapshot_segment.join()
# free thread context
del self.thread_snapshot_segment
self.seg_document.filename = filename
self.thread_snapshot_segment = threading.Thread(
target=self.seg_document.write_output_file,
args=(self.seg_document, ))
self.thread_snapshot_segment.start()
# NOTE: del may not be necessary, as we unlink after thread completion
del self.seg_document
self.seg_document = SegmentDocument(self.ifos)
def snapshot_output_file(self, filename, verbose=False):
# make sure the last round of output dumping is finished
logging.info("snapshotting %s" % filename)
if self.thread_snapshot is not None and self.thread_snapshot.isAlive():
self.thread_snapshot.join()
self.postcoh_document.filename = filename
# free thread context
del self.thread_snapshot
self.thread_snapshot = threading.Thread(
target=self.postcoh_document.write_output_file,
args=(self.postcoh_document, ))
self.thread_snapshot.start()
# NOTE: del may not be necessary, as we unlink after thread completion
del self.postcoh_table
del self.postcoh_document
self.postcoh_document = PostcohDocument()
self.postcoh_table = postcoh_table_def.PostcohInspiralTable.get_table(
self.postcoh_document.xmldoc)
def __wait_internal_process_finish(self):
if self.thread_snapshot is not None and self.thread_snapshot.isAlive():
self.thread_snapshot.join()
if ((self.thread_snapshot_segment is not None)
and (self.thread_snapshot_segment.isAlive())):
self.thread_snapshot_segment.join()
if ((self.thread_upload_skymap is not None)
and (self.thread_upload_skymap.isAlive())):
self.thread_upload_skymap.join()
self.fapupdater.wait_last_process_finish(
self.fapupdater.procs_update_fap_stats)
self.fapupdater.wait_last_process_finish(
self.fapupdater.procs_combine_stats)
def write_output_file(self, filename=None, verbose=False, cleanup=False):
self.__wait_internal_process_finish()
self.__write_output_file(filename, verbose=verbose, cleanup=cleanup)
subprocess.Popen
There are a number of functions making subprocess calls to do things such as plot skymap images (call_plot_fits_func
and call_fits_skymap_func
) or update false alarm rate estimations (calc_fap
).
def call_plot_fits_func(pngname,
fitsname,
labelname,
contour=None,
colormap="cylon"):
cmd = []
cmd += ["bayestar_plot_allsky_postcohspiir"]
cmd += ["-o", pngname]
cmd += ["--label", labelname]
cmd += [fitsname]
cmd += ["--colorbar"]
cmd += ["--colormap", colormap]
if contour:
cmd += ["--contour", str(contour)]
print cmd
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc_out, proc_err = proc.communicate()
return proc.returncode
def call_fits_skymap_func(out_cohsnr_fits,
out_prob_fits,
pipe_skymap_name,
event_id,
event_time,
cuda_postcoh_detrsp_fname,
verbose=False):
input_fname = pipe_skymap_name
cmd = []
cmd += ["gstlal_postcoh_skymap2fits"]
cmd += ["--output-cohsnr", out_cohsnr_fits]
cmd += ["--output-prob", out_prob_fits]
cmd += ["--cuda-postcoh-detrsp-fname", cuda_postcoh_detrsp_fname]
cmd += ["--event-id", event_id]
cmd += ["--event-time", str(event_time)]
cmd += [input_fname]
if verbose:
print cmd
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc_out, proc_err = proc.communicate()
if verbose:
print >> sys.stderr, "skymap2fits return code", proc.returncode
return proc.returncode
Note that proc_out, proc_err = proc.communicate()
is a blocking call - we are waiting for the subprocess to start, run, and send its closing message before moving on. See one of many stack overflow posts about this. This would undoubtedly add additional unnecessary latency to the pipeline and is the worst case solution from a performance perspective.
The only reason for this code to exist would be if the pipeline itself was running in Python 2.7, and the skymap plotting required Python 3. Of course, as our current production pipeline is Python 2.7 this is obviously a valid reason for this code to exist, but now that we're moving to a Python 3 pipeline, we no longer have to use these subprocess calls.
Yeah but what if I wanted it to be asynchronous?
If we would like to additionally make these calls asynchronous (either via parallelism or concurrency, respectively), then I would also not suggest starting an entire new subprocess, but perhaps using a multiprocessing executor (concurrent.futures.ProcessPoolExecutor) or the async/await syntax using asyncio from the standard library. We can also pass the gracedb id of the prior upload to this process so that it can potentially handle uploading, so the main pipeline process does not have to handle this.
Even better, it might be smarter to have an "Event Uploader" process that handles all GraceDB uploads.