Online new extinction
How the marginalize_likelihoods_online program worked before is:
- For each registry, get the rankingstat from the inspiral job and run calc_rank_pdfs of it
- Add it to the variable
data
- retry the failed bins
-
-
data
now contains the newly-created noise PDFS from (almost) every bin
-
- get the marginalized zerolag from the trigger counter job and add it to the data variable. This should be the zerolag from the start of the analysis till now
- If a previously saved marg dist_stat_pdf exists, load it and add it to
data
-
data
now contains not just the newly created noise PDFS, but also the noise PDFs from the start of the analysis till now - Save
data
to disk as the new version of the marg dist_stat_pdf file
With these changes, the marginalize_likelihoods_online implements the new extinction model. This involves first-round-extinction of the bin-specific PDFs with the bin-specific zerolag, adding these singly-extincted PDFs together, and adding the clustered zerolag so that it's ready for second-round-extinction. Now, the marginalize_likelihoods_online job works as:
0. Now the marg job also saves the bin-specific dist_stat_pdf to disk since they're required for first-round extinction. The path is determined by DataCache.generate
, and not decided by the user. The filename of the marg dist_stat_pdf is still provided by the user
- For each registry, load the old dist_stat_pdf file from disk if it exists
- Get the rankingstat from the inspiral job and run calc_rank_pdfs of it, and add it to the old dist_stat_pdf
- calc_rank_pdfs is retried 3 times in my version. This is because I found it difficult to coordinate the retries with both 1 and 2. But I think it's still possible to do the retries at the end like before, so I'll make that change soon
- Save the new + old dist_stat_pdf to disk
- Get the bin-specific zerolag from the corresponding inspiral job and add it to the new + old dist_stat_pdf (needed for dirst-round extinction)
- Perform first round extinction on this pdf, and add it to the
data
variable - Since in each iteration of the loop we are adding the old and new dist_stat_pdfs to
data
, it contains the noise PDF from the start of the analysis to now. No need to do step 6 from the previous version of the code - Remove the bin-specific zerolags and add the marginalized zerolag from the trigger counter job. This should be the zerolag from the start of the analysis till now
- Save data to disk as the new version of the marg dist_stat_pdf file
Merge request reports
Activity
requested review from @rebecca.ewing, @leo.tsukada, and @chad-hanna
assigned to @prathamesh.joshi
- Resolved by Leo Tsukada
- Resolved by Leo Tsukada
- Resolved by Leo Tsukada
100 logging.warning(f'Caught error while running calc rank pdfs: {e}.') 101 return 0, None 102 103 lr_rankingstat = rankingstat.copy() 104 lr_rankingstat.finish() 105 rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose) 100 tries = 0 101 failed = 1 102 while tries < 3: 103 try: 104 rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose) 105 failed = 0 106 break 107 except (URLError, HTTPError) as e: 108 logging.warning(f'Caught error while running calc rank pdfs: {e}.') 109 tries += 1 - Comment on lines +102 to +109
maybe @rebecca.ewing can clarify, but I believe the way this retry is implemented now is to go through sane bins as quickly as possible and come back to bad bins at the end, instead of potentially being stuck for some bad bins at the beginning (e.g. first 100 bins might take 3 retries). This way might save time as leaving those bad bins to the end might give those bins some time to recover connection.
786 # never allow PDFs that have had the extinction model 787 # applied to be written to disk: on-disk files must only 788 # ever provide the original data. forbid PDFs that have 789 # been extincted from being re-extincted. 790 # 791 732 792 def new_with_extinction(*args, **kwargs): 793 raise NotImplementedError("re-extincting an extincted RankingStatPDF object is forbidden") 794 self.new_with_extinction = new_with_extinction 795 def to_xml(*args, **kwargs): 796 raise NotImplementedError("writing extincted RankingStatPDF object to disk is forbidden") 797 self.to_xml = to_xml 733 # fitting is done between ix_min and ix_max 734 fg_ccdf = numpy.cumsum(fg[::-1])[::-1] 735 ix_min = (fg_ccdf < fg_ccdf[0] / 2.).argmax() 736 ix_max = (fg_ccdf < fg_ccdf[0] / 100.).argmax() changed this line in version 10 of the diff
added 1 commit
- 14a0d4aa - Zeroing out zerolag after first round of extinction bug fix
74 75 def parse_command_line(): 75 76 parser = OptionParser() 76 77 77 parser.add_option("--output", metavar = "filename", help = "") 78 parser.add_option("--output-path", metavar = "path", help = "Set the path where the output PDFs are stored. Optional") 78 79 parser.add_option("--registry", metavar = "filename", action = "append", help = "") 79 80 parser.add_option("-j", "--num-cores", metavar = "cores", default = 4, type = "int", help = "Number of cores to use when constructing ranking statistic histograms (default = 4 cores).") 80 81 parser.add_option("--output-kafka-server", metavar = "addr", help = "Set the server address and port number for output data. Optional, e.g., 10.14.0.112:9092") 81 82 parser.add_option("--tag", metavar = "string", default = "test", help = "Sets the name of the tag used. Default = 'test'") 83 parser.add_option("--ifo", metavar = "ifo", action = "append", help = "ifos with which to create output filenames if they don't already exist") 82 84 parser.add_option("--verbose", action = "store_true", help = "Be verbose.") 83 85 options, filenames = parser.parse_args() 84 86 85 if options.output is None: 86 raise ValueError("must set --output.") 87 In that case,
options.output_path
will beNone
, so lines like this will evaluate toroot = None
, which means the PDFs will get saved as<analysis_dir>/dist_stat_pdfs/H1L1V1-0000_GSTLAL_DIST_STAT_PDFS-0-0.xml.gz
97 failed = 1 98 while tries < 3: 99 try: 100 rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose) 101 failed = 0 102 break 103 except (URLError, HTTPError) as e: 104 logging.warning(f'Caught error while running calc rank pdfs: {e}.') 105 tries += 1 106 107 if not failed: 108 lr_rankingstat = rankingstat.copy() 109 lr_rankingstat.finish() 110 rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose) 111 112 return 1, rankingstatpdf - Resolved by Rebecca Ewing
137 148 # get 10 million samples 138 149 ranking_stat_samples = int(10000000 / len(registries)) 139 150 151 # 152 # set up the output paths 153 # 154 155 marg_pdf_exists = os.path.isfile(options.output) 156 pdfs = DataCache.find(DataType.DIST_STAT_PDFS, svd_bins = "*") 157 if marg_pdf_exists and len(pdfs) == len(registries): 158 files_exist = True 159 elif not marg_pdf_exists and len(pdfs) == 0: 160 files_exist = False 161 elif marg_pdf_exists and len(pdfs) != len(registries): changed this line in version 6 of the diff
But ayway, I had to remove all these checks because I realized with the online jobs possibly failing or unresponsive or the online analysis being taken down before the first iteration over the registries is complete, it's pretty much impossible to say with certaintly which files should exist. There are always cases, like when a job is unresponsive that the PDF for thet bin might not exist
211 # load ranking stat pdf and marginalize as we go 212 status, pdf = calc_rank_pdfs(url, ranking_stat_samples, options.num_cores, verbose = options.verbose) 213 if status: 214 logging.info(f"completed {reg} on retry: {retry}") 215 failed.remove(reg) 216 if data: 217 data += pdf 218 else: 219 data = pdf 220 else: 221 logging.info(f"failed to complete {reg} on retry: {retry}") 222 223 if kafka_processor: 224 kafka_processor.heartbeat() 225 226 retry += 1 94 97 """ 95 load Ranking Stat PDF from a url 98 create a Ranking Stat PDF from a url 96 99 """ 97 try: 98 rankingstat = far.marginalize_pdf_urls([ url ], "RankingStat", verbose = verbose) 99 except (URLError, HTTPError) as e: 100 logging.warning(f'Caught error while running calc rank pdfs: {e}.') 101 return 0, None 102 103 lr_rankingstat = rankingstat.copy() 104 lr_rankingstat.finish() 105 rankingstatpdf = far.RankingStatPDF(lr_rankingstat, signal_noise_pdfs = None, nsamples = samples, nthreads = num_cores, verbose = verbose) 100 tries = 0 101 failed = 1 102 while tries < 3: