diff --git a/gwcelery/conf/__init__.py b/gwcelery/conf/__init__.py index f4ee27f7161bc8bd26c002b512789bbaa8faba66..9470e1bdab52795b502e4f21cf8f704d3eacf35d 100644 --- a/gwcelery/conf/__init__.py +++ b/gwcelery/conf/__init__.py @@ -80,8 +80,20 @@ then completely disable the GCN listener.""" email_host = 'imap.gmail.com' """IMAP hostname to receive the GCN e-mail notice formats.""" -kafka_topic_url = 'kafka://kafka.scimma.org/lvk-emfollow.gwalert-playground' -"""Kafka topic URL""" +scimma_broker_url = 'kafka://kafka.scimma.org/' +"""SCiMMA broker URL""" + +scimma_topic_name = 'lvk-emfollow.gwalert-playground' +"""SCiMMA topic name""" + +kafka_queue_name = 'kafka' +"""Kafka queue name""" + +kafka_queue_exchange_name = 'kafka' +"""Kafka queue exchange name""" + +kafka_queue_routing_key = 'kafka.key' +"""Kafka queue routing key""" superevent_d_t_start = {'gstlal': 1.0, 'spiir': 1.0, diff --git a/gwcelery/conf/production.py b/gwcelery/conf/production.py index 46cfbce765ef2293daafba9cff6ed3e2eb3f4bd8..e4347ec05142335a08c07cdbf04afd627213ad2e 100644 --- a/gwcelery/conf/production.py +++ b/gwcelery/conf/production.py @@ -19,8 +19,8 @@ lvalert_host = 'lvalert.cgca.uwm.edu' gracedb_host = 'gracedb.ligo.org' """GraceDB host.""" -kafka_topic_url = 'kafka://kafka.scimma.org/lvk-emfollow.gwalert' -"""Kafka topic URL""" +scimma_topic_name = 'lvk-emfollow.gwalert' +"""SCiMMA topic name""" voevent_broadcaster_address = ':5341' """The VOEvent broker will bind to this address to send GCNs. diff --git a/gwcelery/conf/test.py b/gwcelery/conf/test.py index 4d6d4952de15fe9d597f56462f08ff211761b6fe..73fa23190be7f15252b34e01107371ad0791dcc9 100644 --- a/gwcelery/conf/test.py +++ b/gwcelery/conf/test.py @@ -11,6 +11,9 @@ lvalert_host = 'lvalert-test.cgca.uwm.edu' gracedb_host = 'gracedb-test.ligo.org' """GraceDB host.""" +scimma_topic_name = 'lvk-emfollow.gwalert-test' +"""SCiMMA topic name""" + sentry_environment = 'test' """Record this `environment tag <https://docs.sentry.io/enriching-error-data/environments/>`_ in Sentry log @@ -19,6 +22,3 @@ messages.""" mock_events_simulate_multiple_uploads = True """If True, then upload each mock event several times in rapid succession with random jitter in order to simulate multiple pipeline uploads.""" - -kafka_topic_url = 'kafka://kafka.scimma.org/lvk-emfollow.gwalert-test' -"""Kafka topic URL""" \ No newline at end of file diff --git a/gwcelery/kafka/bootsteps.py b/gwcelery/kafka/bootsteps.py index 7fb964b39729955130d6cd182598a9af2453b03d..8f75c9445906be30b37c6bf9cbdad8560ffd1563 100644 --- a/gwcelery/kafka/bootsteps.py +++ b/gwcelery/kafka/bootsteps.py @@ -2,7 +2,6 @@ import json from celery import bootsteps from celery.utils.log import get_logger -from confluent_kafka.avro import AvroProducer from hop.models import Blob from hop import stream from kombu import Consumer, Exchange, Queue @@ -24,6 +23,7 @@ class KafkaBootStep(bootsteps.ConsumerStep): self.enabled = bool(kafka) def start(self, parent): + super().start(parent) log.info(f'Starting {self.name}, topic: {self.topic_url}') def stop(self, parent): @@ -37,7 +37,8 @@ class Producer(KafkaBootStep): def __init__(self, parent, kafka=False, **kwargs): super().__init__(parent, **kwargs) - self.topic_url = parent.app.conf['kafka_topic_url'] + self.topic_url = parent.app.conf['scimma_broker_url'] + \ + parent.app.conf['scimma_topic_name'] def start(self, parent): super().start(parent) @@ -47,12 +48,16 @@ class Producer(KafkaBootStep): super().stop(parent) self._s.close() - def get_consumers(self, channel): - queue = Queue('kafka', Exchange('kafka'), 'kafka.key') - return [Consumer(channel, - queues=[queue], - on_message=self.on_message, - accept=['pickle', 'json'])] + def get_consumers(self, parent, channel): + queue = Queue( + parent.app.conf['kafka_queue_name'], + Exchange(parent.app.conf['kafka_queue_exchange_name']), + parent.app.conf['kafka_queue_routing_key']) + return [Consumer( + channel, + queues=[queue], + on_message=self.on_message, + accept=['pickle', 'json'])] def on_message(self, message): payload = message.decode() @@ -62,49 +67,7 @@ class Producer(KafkaBootStep): ) ) log.info(f'Sending message to {self.topic_url}') - # FIXME: use the hop client's Avro implementation instead of a Blob when it's ready + # FIXME: use the hop client's Avro implementation when it's ready msg_blob = Blob(json.dumps(message.payload)) self._s.write(msg_blob) message.ack() - - -class Producer(KafkaBootStep): - """Run the global Kafka producer in a background thread.""" - - name = 'Kafka Avro producer' - - - -''' - def delivery_report(err, msg): - """Called once for each message produced to indicate delivery result. - Triggered by or flush(). """ - if err is not None: - print('Message delivery failed: {}'.format(err)) - else: - print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) - - - def start(self, consumer, default_key_schema, default_value_schema): - - config = { - 'bootstrap.servers': consumer.app.conf['kafka_bootstrap_servers'], - 'on_delivery': self.delivery_report, - } - - super().start(consumer) - self._producer = AvroProducer( - config, - default_key_schema=default_key_schema, - default_value_schema=default_value_schema, - - ) - - def flush(self, consumer): - super().stop(consumer) - self._producer.flush() - - def produce(self, topic, value, value_schema, key, key_schema): - self._producer.produce(topic=topic, value=value, - value_schema=value_schema, key=key, key_schema=key_schema) -''' \ No newline at end of file diff --git a/poetry.lock b/poetry.lock index 9e8735be69585d7f17e6eb721280e6b0d5ec1934..53388966776e76cadae728d5140c487246113903 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,3 +1,20 @@ +[[package]] +name = "adc-streaming" +version = "2.0.0" +description = "Astronomy Data Commons streaming client libraries" +category = "main" +optional = false +python-versions = "*" + +[package.dependencies] +certifi = ">=2020.04.05.1" +confluent-kafka = "*" +importlib-metadata = {version = "*", markers = "python_version < \"3.8\""} +tqdm = "*" + +[package.extras] +dev = ["autopep8", "docker", "flake8", "isort", "pytest", "pytest-timeout", "pytest-integration", "sphinx", "sphinx-rtd-theme", "twine"] + [[package]] name = "alabaster" version = "0.7.12" @@ -389,6 +406,22 @@ python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4.*" test = ["mock", "pyyaml", "pytest"] yaml = ["pyyaml"] +[[package]] +name = "confluent-kafka" +version = "1.8.2" +description = "Confluent's Python client for Apache Kafka" +category = "main" +optional = false +python-versions = "*" + +[package.extras] +avro = ["requests", "fastavro (>=0.23.0,<1.0)", "avro (==1.10.0)", "fastavro (>=1.0)", "avro-python3 (==1.10.0)"] +dev = ["pytest-timeout", "flake8", "requests", "pytest (==4.6.4)", "fastavro (>=0.23.0,<1.0)", "avro (==1.10.0)", "fastavro (>=1.0)", "avro-python3 (==1.10.0)", "pytest"] +doc = ["sphinx", "sphinx-rtd-theme", "requests", "fastavro (>=0.23.0,<1.0)", "avro (==1.10.0)", "fastavro (>=1.0)", "avro-python3 (==1.10.0)"] +json = ["jsonschema", "requests", "pyrsistent (==0.16.1)", "pyrsistent"] +protobuf = ["protobuf", "requests"] +schema-registry = ["requests"] + [[package]] name = "constantly" version = "15.1.0" @@ -742,6 +775,24 @@ matplotlib = "*" numpy = ">=1.13" scipy = "*" +[[package]] +name = "hop-client" +version = "0.5.0" +description = "A pub-sub client library for Multi-messenger Astrophysics" +category = "main" +optional = false +python-versions = ">=3.6.*" + +[package.dependencies] +adc-streaming = ">=2.0.0" +pluggy = ">=0.11" +toml = ">=0.9.4" +xmltodict = ">=0.9.0" + +[package.extras] +dev = ["autopep8", "flake8", "pytest (>=5.0,<5.4)", "pytest-console-scripts", "pytest-cov", "pytest-runner", "twine"] +docs = ["sphinx", "sphinx-rtd-theme", "sphinxcontrib-programoutput"] + [[package]] name = "humanize" version = "4.0.0" @@ -1446,7 +1497,7 @@ name = "pluggy" version = "1.0.0" description = "plugin and hook calling mechanisms for python" category = "main" -optional = true +optional = false python-versions = ">=3.6" [package.extras] @@ -2330,6 +2381,14 @@ category = "main" optional = false python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,>=2.7" +[[package]] +name = "xmltodict" +version = "0.12.0" +description = "Makes working with XML feel like you are working with JSON" +category = "main" +optional = false +python-versions = ">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*" + [[package]] name = "zipp" version = "3.8.0" @@ -2379,6 +2438,10 @@ python-versions = "^3.8" content-hash = "18e56c4973cc68c8832af3c5c7921a5966c616cb122475367e9d66f47443f048" [metadata.files] +adc-streaming = [ + {file = "adc-streaming-2.0.0.tar.gz", hash = "sha256:c1ab8aee52684ff1f1c5ae469a760a435d6c45836ead4e9a97423a4893cb92d6"}, + {file = "adc_streaming-2.0.0-py3-none-any.whl", hash = "sha256:dc765f2de4ef9677e1746c82e6e42c5f27c2e1c0b0e4de252150639b94dd8e75"}, +] alabaster = [ {file = "alabaster-0.7.12-py2.py3-none-any.whl", hash = "sha256:446438bdcca0e05bd45ea2de1668c1d9b032e1a9154c2c259092d77031ddd359"}, {file = "alabaster-0.7.12.tar.gz", hash = "sha256:a661d72d58e6ea8a57f7a86e37d86716863ee5e92788398526d58b26a4e4dc02"}, @@ -2577,6 +2640,25 @@ configargparse = [ {file = "ConfigArgParse-1.5.3-py3-none-any.whl", hash = "sha256:18f6535a2db9f6e02bd5626cc7455eac3e96b9ab3d969d366f9aafd5c5c00fe7"}, {file = "ConfigArgParse-1.5.3.tar.gz", hash = "sha256:1b0b3cbf664ab59dada57123c81eff3d9737e0d11d8cf79e3d6eb10823f1739f"}, ] +confluent-kafka = [ + {file = "confluent-kafka-1.8.2.tar.gz", hash = "sha256:b79e836c3554bc51c6837a8a0152f7521c9bf31342f5b8e21eba6b28044fa585"}, + {file = "confluent_kafka-1.8.2-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:02b78bb6d1199ea350240eae1f4415f22014896199a46edf85f779a69751f984"}, + {file = "confluent_kafka-1.8.2-cp36-cp36m-manylinux2010_x86_64.whl", hash = "sha256:d50b091770d277714766943d885ad6b2c5c427e67328706cfd33dc86eef540c9"}, + {file = "confluent_kafka-1.8.2-cp36-cp36m-win32.whl", hash = "sha256:4f26052ef53212752039cd1d9e932b2feb6a0975d717ab070af323629a72a0b9"}, + {file = "confluent_kafka-1.8.2-cp36-cp36m-win_amd64.whl", hash = "sha256:ac7155e1b9a94445ed8eecf691c80c61407148813808a2aa1cba0babbe197e77"}, + {file = "confluent_kafka-1.8.2-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:039c68379f9a5ece6e45a683ec7abebb95a9dac904ec4e2f9d93738e1cf6fab2"}, + {file = "confluent_kafka-1.8.2-cp37-cp37m-manylinux2010_x86_64.whl", hash = "sha256:b7cb6fa3d44972e3670e0b3b054186a6006e6fd664600cfe70e008fad2443d16"}, + {file = "confluent_kafka-1.8.2-cp37-cp37m-win32.whl", hash = "sha256:748813f47641dd65dd8d3bae8dcb3ce96a3e455c12b467d4b35e1fc880362d01"}, + {file = "confluent_kafka-1.8.2-cp37-cp37m-win_amd64.whl", hash = "sha256:ead7f18c516f7bcb886b643fa78ff2a2142270adaf931ba0311b62e9a047e6ca"}, + {file = "confluent_kafka-1.8.2-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:3d66e8c1a6a15144ca5b176170adbf30207c27813c76202c56abf52ef2b475e1"}, + {file = "confluent_kafka-1.8.2-cp38-cp38-manylinux2010_x86_64.whl", hash = "sha256:585bc8e8aa7d6fbd46dc0b2da3d4b1fd8457555288fee1ecba6af2c97ab738cc"}, + {file = "confluent_kafka-1.8.2-cp38-cp38-win32.whl", hash = "sha256:ae75d3f4bc3d2109663912d77911c45aaa2939bde3694fc05e75842c806fa760"}, + {file = "confluent_kafka-1.8.2-cp38-cp38-win_amd64.whl", hash = "sha256:1df83fa20e4fe032651ad73ce0ba85dd14a7fabff6066c9cb20e944d2748e72b"}, + {file = "confluent_kafka-1.8.2-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:add05db627435697d4ed8f81b3ce1081931770813a989fd775910865f07d694d"}, + {file = "confluent_kafka-1.8.2-cp39-cp39-manylinux2010_x86_64.whl", hash = "sha256:e49382a943fb47813f421e913cc6c87cd1d4bfdecad1785efa0dacada7003d84"}, + {file = "confluent_kafka-1.8.2-cp39-cp39-win32.whl", hash = "sha256:b679c3f9f555e87a9cbb043c676473c30d12182609e075be85afd98f84bcc863"}, + {file = "confluent_kafka-1.8.2-cp39-cp39-win_amd64.whl", hash = "sha256:f843680e183479f6e0732b593ea3235c836a5bb2de6be3819a11b891b6af1dde"}, +] constantly = [ {file = "constantly-15.1.0-py2.py3-none-any.whl", hash = "sha256:dd2fa9d6b1a51a83f0d7dd76293d734046aa176e384bf6e33b7e44880eb37c5d"}, {file = "constantly-15.1.0.tar.gz", hash = "sha256:586372eb92059873e29eba4f9dec8381541b4d3834660707faf8ba59146dfc35"}, @@ -2739,6 +2821,10 @@ healpy = [ {file = "healpy-1.15.2-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:4c8d7a279815446d12d6cc3e2c030d893dac785ac65f11314e2932638970c5f8"}, {file = "healpy-1.15.2.tar.gz", hash = "sha256:d559ad287a78d3b500919a5cb9e4dff3cb63c1b3a2e23edf62819c871ccacf7f"}, ] +hop-client = [ + {file = "hop-client-0.5.0.tar.gz", hash = "sha256:ea15cbfb972fc9ee208397234e6e5371df076bec1cdbacb35735b5c70731b14f"}, + {file = "hop_client-0.5.0-py3-none-any.whl", hash = "sha256:2c355d43919b70e109b7e9bfe15646b083a156924a00b0f1e7b0e2733563f9ea"}, +] humanize = [ {file = "humanize-4.0.0-py3-none-any.whl", hash = "sha256:8d86333b8557dacffd4dce1dbe09c81c189e2caf7bb17a970b2212f0f58f10f2"}, {file = "humanize-4.0.0.tar.gz", hash = "sha256:ee1f872fdfc7d2ef4a28d4f80ddde9f96d36955b5d6b0dac4bdeb99502bddb00"}, @@ -3903,6 +3989,10 @@ wrapt = [ {file = "wrapt-1.14.0-cp39-cp39-win_amd64.whl", hash = "sha256:bb36fbb48b22985d13a6b496ea5fb9bb2a076fea943831643836c9f6febbcfdc"}, {file = "wrapt-1.14.0.tar.gz", hash = "sha256:8323a43bd9c91f62bb7d4be74cc9ff10090e7ef820e27bfe8815c57e68261311"}, ] +xmltodict = [ + {file = "xmltodict-0.12.0-py2.py3-none-any.whl", hash = "sha256:8bbcb45cc982f48b2ca8fe7e7827c5d792f217ecf1792626f808bf41c3b86051"}, + {file = "xmltodict-0.12.0.tar.gz", hash = "sha256:50d8c638ed7ecb88d90561beedbf720c9b4e851a9fa6c47ebd64e99d166d8a21"}, +] zipp = [ {file = "zipp-3.8.0-py3-none-any.whl", hash = "sha256:c4f6e5bbf48e74f7a38e7cc5b0480ff42b0ae5178957d564d18932525d5cf099"}, {file = "zipp-3.8.0.tar.gz", hash = "sha256:56bf8aadb83c24db6c4b577e13de374ccfb67da2078beba1d037c17980bf43ad"}, diff --git a/pyproject.toml b/pyproject.toml index 9d4377b1a92efff0479600f084ef1f347881c55c..dd7f7cc443ccbb15b949681577dba1fb259d8d1a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,6 +92,7 @@ sphinx = {version=">=4.0", optional=true} pytest-celery = {version="*", optional=true} pytest-flask = {version="*", optional=true} pytest-socket = {version="*", optional=true} +hop-client = "^0.5.0" [tool.poetry.extras] doc = ["pep517", "sphinx"]