diff --git a/gstlal-inspiral/bin/gstlal_inspiral b/gstlal-inspiral/bin/gstlal_inspiral
index f9f91bf8bead3e4bb425d53542f5384e6711c1f2..ec19fa83eb1efdc7ed3c177d22f7d8ae986b852b 100755
--- a/gstlal-inspiral/bin/gstlal_inspiral
+++ b/gstlal-inspiral/bin/gstlal_inspiral
@@ -292,6 +292,7 @@ def parse_command_line():
 	group.add_option("--check-time-stamps", action = "store_true", help = "Turn on time stamp checking")
 	group.add_option("--comment", metavar = "message", help = "Set the string to be recorded in comment and tag columns in various places in the output file (optional).")
 	group.add_option("--fir-stride", metavar = "seconds", type = "float", default = 8, help = "Set the length of the fir filter stride in seconds. default = 8")
+	group.add_option("--analysis-tag", metavar = "tag", default = "test", help = "Set the string to identify the analysis in which this job is part of. Used when --output-kafka-server is set. May not contain \".\" nor \"-\". Default is test.")
 	group.add_option("--job-tag", metavar = "tag", help = "Set the string to identify this job and register the resources it provides on a node.  Should be 4 digits of the form 0001, 0002, etc.;  may not contain \".\" nor \"-\".")
 	group.add_option("--local-frame-caching", action = "store_true", help = "Pre-reads frame data, performs downsampling, and stores to local filespace. ")
 	group.add_option("--nxydump-segment", metavar = "start:stop", default = ":", help = "Set the time interval to dump from nxydump elments (optional).  The default is \":\", i.e. dump all time.")
@@ -905,6 +906,7 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
 			pipeline = options.gracedb_pipeline,
 			service_url = options.gracedb_service_url,
 			delay_uploads = options.delay_uploads,
+			analysis_tag = options.analysis_tag,
 			kafka_server = options.output_kafka_server,
 			upload_auxiliary_data = True,
 			delta_t = options.coincidence_threshold,
@@ -916,7 +918,8 @@ for output_file_number, (svd_bank_url_dict, output_url, ranking_stat_output_url,
 		zerolag_rankingstatpdf_url = zerolag_rankingstat_pdf,
 		likelihood_snapshot_interval = options.likelihood_snapshot_interval,
 		sngls_snr_threshold = options.singles_threshold,
-		tag = options.job_tag,
+		analysis_tag = options.analysis_tag,
+		job_tag = options.job_tag,
 		kafka_server = options.output_kafka_server,
 		cluster = True,#options.data_source in ("lvshm", "framexmit"),# If uncommented, we only cluster when running online
 		cap_singles = options.cap_singles,
diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter b/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter
index 9fbbe2cfc6735ec75e4918f89c493e9e487b06cf..f2d743001303ef7cd409b1016a63902d39f6d969 100755
--- a/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter
+++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_event_plotter
@@ -124,8 +124,8 @@ class EventPlotter(events.EventProcessor):
 	def __init__(self, options):
 		logging.info('setting up event plotter...')
 
-		self.upload_topic = 'uploads'
-		self.ranking_stat_topic = 'ranking_stat'
+		self.upload_topic = f'gstlal.{options.tag}.uploads'
+		self.ranking_stat_topic = f'gstlal.{options.tag}.ranking_stat'
 		events.EventProcessor.__init__(
 			self,
 			process_cadence=options.processing_cadence,
diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_event_uploader b/gstlal-inspiral/bin/gstlal_ll_inspiral_event_uploader
index 9c295f4be130b4cdd1374f50427e65832c6b592c..c62597579baf501573a8425d4a2b0cd5ffe30da5 100755
--- a/gstlal-inspiral/bin/gstlal_ll_inspiral_event_uploader
+++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_event_uploader
@@ -97,7 +97,7 @@ class EventUploader(events.EventProcessor):
 			request_timeout=options.request_timeout,
 			num_messages=options.num_jobs,
 			kafka_server=options.kafka_server,
-			input_topic=options.input_topic,
+			input_topic=f"gstlal.{options.tag}.{options.input_topic}",
 			tag=options.tag
 		)
 
@@ -126,9 +126,9 @@ class EventUploader(events.EventProcessor):
 
 		### favored event settings
 		self.public_far_threshold = options.far_threshold / options.far_trials_factor
-		self.favored_event_topic = 'favored_events'
-		self.p_astro_topic = 'p_astro'
-		self.upload_topic = 'uploads'
+		self.favored_event_topic = f'gstlal.{self.tag}.favored_events'
+		self.p_astro_topic = f'gstlal.{self.tag}.p_astro'
+		self.upload_topic = f'gstlal.{self.tag}.uploads'
 
 
 	def ingest(self, message):
diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_workflow b/gstlal-inspiral/bin/gstlal_ll_inspiral_workflow
index 7af76544791d1bc96aa8d1c70cdaad907e01073a..539bb31bff71074f9142db8a2495f3df2134b8b8 100755
--- a/gstlal-inspiral/bin/gstlal_ll_inspiral_workflow
+++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_workflow
@@ -39,7 +39,7 @@ dag.create_log_dir()
 if args.workflow == "setup":
 	# input data products
 	ref_psd = DataCache.from_files(DataType.REFERENCE_PSD, config.data.reference_psd)
-	split_bank = DataCache.find(DataType.SPLIT_BANK, root=config.data.rootdir)
+	split_bank = DataCache.find(DataType.SPLIT_BANK, svd_bins="*", subtype="*")
 
 	# generate dag layers
 	svd_bank = dag.svd_bank(ref_psd, split_bank)
@@ -49,20 +49,15 @@ if args.workflow == "setup":
 
 else:
 	# input data products
-	svd_banks = DataCache.find(DataType.SVD_BANK, root=config.data.rootdir)
-	dist_stats = DataCache.find(DataType.DIST_STATS, root=config.data.rootdir)
+	svd_banks = DataCache.find(DataType.SVD_BANK, root="filter", svd_bins="*")
+	dist_stats = DataCache.find(DataType.DIST_STATS, svd_bins="*")
 
 	zerolag_pdfs = DataCache.generate(
 		DataType.ZEROLAG_DIST_STAT_PDFS,
 		config.all_ifos,
 		svd_bins=config.svd.bins,
-		root=config.data.rootdir
-	)
-	marg_pdf = DataCache.generate(
-		DataType.DIST_STAT_PDFS,
-		config.all_ifos,
-		root=config.data.rootdir
 	)
+	marg_pdf = DataCache.generate(DataType.DIST_STAT_PDFS, config.all_ifos)
 
 	# generate dag layers
 	dag.filter_online(svd_banks, dist_stats, zerolag_pdfs, marg_pdf)
diff --git a/gstlal-inspiral/python/dags/layers/inspiral.py b/gstlal-inspiral/python/dags/layers/inspiral.py
index a323b663bb9a4f9bd1d95ad37aa002b9aeee29d2..f00db7a6464b3fb27efdf53fd57655f6dcc048ff 100644
--- a/gstlal-inspiral/python/dags/layers/inspiral.py
+++ b/gstlal-inspiral/python/dags/layers/inspiral.py
@@ -1175,6 +1175,7 @@ def filter_online_layer(config, dag, svd_bank_cache, dist_stat_cache, zerolag_pd
 		Option("coincidence-threshold", config.filter.coincidence_threshold),
 		Option("fir-stride", config.filter.fir_stride),
 		Option("min-instruments", config.filter.min_instruments),
+		Option("analysis-tag", config.tag),
 		Option("gracedb-far-threshold", config.upload.gracedb_far_threshold),
 		Option("gracedb-group", config.upload.gracedb_group),
 		Option("gracedb-pipeline", config.upload.gracedb_pipeline),
@@ -1276,6 +1277,7 @@ def upload_events_layer(config, dag):
 			Option("upload-cadence-type", config.upload.aggregator_cadence_type),
 			Option("upload-cadence-factor", config.upload.aggregator_cadence_factor),
 			Option("num-jobs", len(config.svd.bins)),
+			Option("tag", config.tag),
 			Option("input-topic", "events"),
 			Option("rootdir", "event_uploader"),
 			Option("verbose"),
@@ -1300,6 +1302,7 @@ def plot_events_layer(config, dag):
 			Option("gracedb-pipeline", config.upload.gracedb_pipeline),
 			Option("gracedb-search", config.upload.gracedb_search),
 			Option("gracedb-service-url", config.upload.gracedb_service_url),
+			Option("tag", config.tag),
 			Option("verbose"),
 		],
 	)
@@ -1323,6 +1326,7 @@ def count_events_layer(config, dag):
 			Option("gracedb-pipeline", config.upload.gracedb_pipeline),
 			Option("gracedb-search", config.upload.gracedb_search),
 			Option("output-period", 300),
+			Option("topic", f"gstlal.{config.tag}.coinc"),
 		],
 		outputs = Option("output", zerolag_pdf.files),
 	)
@@ -1388,7 +1392,8 @@ def collect_metrics_layer(config, dag):
 			arguments = list(common_opts)
 			arguments.extend([
 				Option("data-type", "timeseries"),
-				Option("topic", metrics)
+				Option("topic", [f"gstlal.{config.tag}.{metric}" for metric in metrics]),
+				Option("schema", metrics),
 			])
 
 			# elect first metric collector as leader
@@ -1408,7 +1413,8 @@ def collect_metrics_layer(config, dag):
 	event_arguments = list(common_opts)
 	event_arguments.extend([
 		Option("data-type", "triggers"),
-		Option("topic", "coinc")
+		Option("topic", f"gstlal.{config.tag}.coinc"),
+		Option("schema", "coinc"),
 	])
 	event_layer += Node(arguments=event_arguments)
 
diff --git a/gstlal-inspiral/python/inspiral.py b/gstlal-inspiral/python/inspiral.py
index 38562301936361d7de1a5e3272c315378a49f895..168569adeded9c6df79e467d8bbf11b1f7fd198d 100644
--- a/gstlal-inspiral/python/inspiral.py
+++ b/gstlal-inspiral/python/inspiral.py
@@ -535,7 +535,7 @@ class GracedBWrapper(object):
 
 	DEFAULT_SERVICE_URL = gracedb.rest.DEFAULT_SERVICE_URL
 
-	def __init__(self, instruments, far_threshold = None, min_instruments = None, group = "Test", search = "LowMass", label = None, pipeline = "gstlal", service_url = None, kafka_server = None, delay_uploads = False, upload_auxiliary_data = True, delta_t = 0.005, verbose = False):
+	def __init__(self, instruments, far_threshold = None, min_instruments = None, group = "Test", search = "LowMass", label = None, pipeline = "gstlal", service_url = None, kafka_server = None, analysis_tag = "test", delay_uploads = False, upload_auxiliary_data = True, delta_t = 0.005, verbose = False):
 		self.instruments = frozenset(instruments)
 		self.min_instruments = min_instruments
 		self.group = group
@@ -543,6 +543,7 @@ class GracedBWrapper(object):
 		self.label = label
 		self.pipeline = pipeline
 		self.service_url = service_url if service_url is not None else self.DEFAULT_SERVICE_URL
+		self.analysis_tag = analysis_tag
 		self.upload_auxiliary_data = upload_auxiliary_data
 		self.verbose = verbose
 		# must initialize after .service_url because this might
@@ -565,7 +566,7 @@ class GracedBWrapper(object):
 		if kafka_server is not None:
 			from ligo.scald.io import kafka
 			self.client = kafka.Client("kafka://{}".format(kafka_server))
-			self.client.subscribe(["events", "ranking_stat"])
+			self.client.subscribe(f"gstlal.{self.analysis_tag}.{topic}" for topic in ["events", "ranking_stat"])
 		else:
 			self.client = None
 
@@ -939,7 +940,7 @@ class GracedBWrapper(object):
 				psd_fobj = io.StringIO()
 				ligolw_utils.write_fileobj(lalseries.make_psd_xmldoc(psddict), psd_fobj)
 				self.client.write(
-					"events",
+					f"gstlal.{self.analysis_tag}.events",
 					{
 						"far": coinc_inspiral_index[coinc_event.coinc_event_id].combined_far,
 						"snr": coinc_inspiral_index[coinc_event.coinc_event_id].snr,
@@ -957,7 +958,7 @@ class GracedBWrapper(object):
 					ligolw_utils.write_fileobj(rankingstat_xmldoc_func(), fileobj, compress = "gz")
 
 				self.client.write(
-					"ranking_stat",
+					f"gstlal.{self.analysis_tag}.ranking_stat",
 					{
 						"ranking_data_path": os.path.realpath(rankingstat_filename),
 						"time": coinc_inspiral_index[coinc_event.coinc_event_id].end_time,
diff --git a/gstlal-inspiral/python/lloidhandler.py b/gstlal-inspiral/python/lloidhandler.py
index 3f45187fcb728b1b373e4aa3a596712996a2e451..d970b5aec518ffdc694527c8fbaa24d195a62879 100644
--- a/gstlal-inspiral/python/lloidhandler.py
+++ b/gstlal-inspiral/python/lloidhandler.py
@@ -132,9 +132,10 @@ def subdir_from_T050017_filename(fname):
 
 
 class EyeCandy(object):
-	def __init__(self, instruments, kafka_server, tag, pipeline, segmentstracker, latencytracker):
+	def __init__(self, instruments, kafka_server, analysis_tag, job_tag, pipeline, segmentstracker, latencytracker):
 		self.kafka_server = kafka_server
-		self.tag = tag
+		self.analysis = analysis_tag
+		self.tag = job_tag
 		self.gate_history = segmentstracker.gate_history
 		if latencytracker:
 			self.pipeline_latency_history = latencytracker.pipeline_history
@@ -198,7 +199,7 @@ class EyeCandy(object):
 			topics = list(itertools.chain(snr_routes, network_routes, usage_routes, state_routes, seg_routes, instrument_latency_routes, pipeline_latency_routes))
 
 			self.client = kafka.Client("kafka://{}".format(self.kafka_server))
-			self.client.subscribe(topics)
+			self.client.subscribe([f"gstlal.{self.analysis}.{topic}" for topic in topics])
 		else:
 			self.client = None
 
@@ -327,7 +328,7 @@ class EyeCandy(object):
 			# Send all of the kafka messages and clear the data
 			#self.producer.send(self.tag, self.kafka_data)
 			for route in self.kafka_data.keys():
-				self.client.write(route, self.kafka_data[route], tags=self.tag)
+				self.client.write(f"gstlal.{self.analysis}.{route}", self.kafka_data[route], tags=self.tag)
 			# This line forces the send but is blocking!! not the
 			# best idea for production running since we value
 			# latency over getting metric data out
@@ -745,13 +746,13 @@ class Handler(simplehandler.Handler):
 	dumps of segment information, trigger files and background
 	distribution statistics.
 	"""
-	def __init__(self, mainloop, pipeline, coincs_document, rankingstat, horizon_distance_func, gracedbwrapper, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, sngls_snr_threshold = None, tag = "", kafka_server = "10.14.0.112:9092", cluster = False, cap_singles = False, FAR_trialsfactor = 1.0, activation_counts = None, track_latency = False, template_id_time_map = None, verbose = False):
+	def __init__(self, mainloop, pipeline, coincs_document, rankingstat, horizon_distance_func, gracedbwrapper, zerolag_rankingstatpdf_url = None, rankingstatpdf_url = None, ranking_stat_output_url = None, ranking_stat_input_url = None, likelihood_snapshot_interval = None, sngls_snr_threshold = None, analysis_tag = "test", job_tag = "", kafka_server = "10.14.0.112:9092", cluster = False, cap_singles = False, FAR_trialsfactor = 1.0, activation_counts = None, track_latency = False, template_id_time_map = None, verbose = False):
 		"""!
 		@param mainloop The main application's event loop
 		@param pipeline The gstreamer pipeline that is being
 		controlled by this handler
 		@param dataclass A Data class instance
-		@param tag The tag to use for naming file snapshots, e.g.
+		@param job_tag The tag to use for naming file snapshots, e.g.
 		the description will be "%s_LLOID" % tag
 		@param verbose Be verbose
 		"""
@@ -764,7 +765,8 @@ class Handler(simplehandler.Handler):
 		self.lock = threading.Lock()
 		self.coincs_document = coincs_document
 		self.pipeline = pipeline
-		self.tag = tag
+		self.analysis = analysis_tag
+		self.tag = job_tag
 		self.verbose = verbose
 		# None to disable periodic snapshots, otherwise seconds
 		self.likelihood_snapshot_interval = likelihood_snapshot_interval
@@ -798,7 +800,7 @@ class Handler(simplehandler.Handler):
 		# set up metric collection
 		#
 
-		self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.tag, pipeline, self.segmentstracker, self.latencytracker)
+		self.eye_candy = EyeCandy(rankingstat.instruments, kafka_server, self.analysis, self.tag, pipeline, self.segmentstracker, self.latencytracker)
 		# FIXME:   detangle this
 		self.eye_candy.lock = self.lock
 
diff --git a/gstlal-ugly/python/events.py b/gstlal-ugly/python/events.py
index af6dab81bd5eca9d0d71faadaf5741fe83ffaf92..dc835081bcf85a1765b21e19b0664bfc768c27a9 100644
--- a/gstlal-ugly/python/events.py
+++ b/gstlal-ugly/python/events.py
@@ -63,6 +63,7 @@ class EventProcessor(object):
 		self.is_running = False
 
 		### kafka settings
+		self.tag = tag
 		self.kafka_settings = {
 			'bootstrap.servers': kafka_server,
 			'group.id': '-'.join([self._name, tag])