diff --git a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
index ec7a0213bb3d6705fe9c8f6ed1b74e06d4533411..16e0456998d22e292c0b7bc98f54c32eb7c8300d 100755
--- a/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
+++ b/gstlal-inspiral/bin/gstlal_ll_inspiral_pipe
@@ -361,7 +361,7 @@ listenNode = lvalert_listen_node(listenJob, dag)
 
 # dq with default options
 for ifo in channel_dict:
-	outpath = "dq"
+	outpath = "aggregator"
 	try:
 		os.makedirs(outpath)
 	except OSError:
@@ -551,7 +551,8 @@ aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir"
 aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "data-type":["max"], "job-start":0, "route": ["likelihood_history", "snr_history", "latency_history"], "kafka-server": options.output_kafka_server})
 aggNode = dagparts.DAGNode(aggJob, dag, [], opts = {"dump-period": 1, "base-dir":"aggregator", "job-tag": os.getcwd(), "num-jobs": len(jobTags), "job-start":0, "route": ["far_history", "latency_history"], "data-type":["min"], "kafka-server": options.output_kafka_server})
 
-analysisStateNode = dagparts.DAGNode(analysisStateJob, dag, [], opts = {"dump-period": 1, "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 2, "instrument": channel_dict.keys(), "kafka-server": options.output_kafka_server})
+for instrument in channel_dict.keys():
+	analysisStateNode = dagparts.DAGNode(analysisStateJob, dag, [], opts = {"dump-period": 1, "job-tag": os.getcwd(), "num-jobs": len(jobTags), "num-threads": 2, "instrument": instrument, "kafka-server": options.output_kafka_server})
 
 # summary page
 if options.injection_file: