Skip to content
Snippets Groups Projects
Commit bbadb7cd authored by Cody Messick's avatar Cody Messick
Browse files

Use functools.cache instead of PromiseProxy to load schema from

igwn-gwalert-schema
parent 600bf3f2
No related branches found
No related tags found
1 merge request!1149Use functools.cache to load avro schema
from functools import cache
from celery import bootsteps
from celery.concurrency import solo
from celery.utils.log import get_logger
......@@ -7,7 +9,7 @@ from hop import stream
from hop.io import list_topics
from hop.models import AvroBlob, JSONBlob
from ..util import PromiseProxy, read_json
from ..util import read_json
__all__ = ('Producer',)
......@@ -15,7 +17,7 @@ __all__ = ('Producer',)
log = get_logger(__name__)
@PromiseProxy
@cache
def schema():
# The order does not matter other than the Alert schema must be loaded last
# because it references the other schema. All of the schema are saved in
......@@ -42,12 +44,7 @@ def schema():
class AvroBlobWrapper(AvroBlob):
def __init__(self, payload):
# NOTE fastavro appears to not like something about this PromiseProxy
# object. Returning a copy of the object is the only workaround I could
# find.
# FIXME Understand this behavior and write a less hacky fix. Possibly
# switch to using functools.cache once python3.8 support is dropped
return super().__init__([payload], schema.copy())
return super().__init__([payload], schema())
class KafkaWriter:
......
......@@ -11,29 +11,19 @@ from . import data
from ..kafka.bootsteps import AvroBlobWrapper, schema
@pytest.fixture
def copied_schema():
# NOTE The unit test appears to not like something about this PromiseProxy
# object. Returning a copy of the object is the only workaround I could
# find.
# FIXME Understand this behavior and write a less hacky fix
return schema.copy()
@patch('gwcelery.tasks.alerts._upload_notice.run')
@patch('gwcelery.tasks.gracedb.download._orig_run',
return_value=resources.read_binary(
data,
'MS220722v_bayestar.multiorder.fits'
))
def test_validate_alert(mock_download, mock_upload, copied_schema,
monkeypatch):
def test_validate_alert(mock_download, mock_upload, monkeypatch):
"""Validate public alerts against the schema from the userguide.
"""
def _validate_alert(public_alert_avro_blob):
assert len(public_alert_avro_blob.content) == 1
fastavro.validation.validate(public_alert_avro_blob.content[0],
copied_schema, strict=True)
schema(), strict=True)
# Replace the stream object with a mock object with _validiate_alert as its
# write attribute
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment