Skip to content
GitLab
Explore
Sign in
Primary navigation
Search or go to…
Project
gwcelery
Manage
Activity
Members
Labels
Plan
Issues
Issue boards
Milestones
Iterations
Requirements
Code
Merge requests
Repository
Branches
Commits
Tags
Repository graph
Compare revisions
Snippets
Locked files
Deploy
Releases
Container Registry
Model registry
Monitor
Incidents
Analyze
Value stream analytics
Contributor analytics
Repository analytics
Code review analytics
Issue analytics
Insights
Model experiments
Help
Help
Support
GitLab documentation
Compare GitLab plans
Community forum
Contribute to GitLab
Provide feedback
Keyboard shortcuts
?
Snippets
Groups
Projects
Show more breadcrumbs
emfollow
gwcelery
Merge requests
!896
Shutdown igwn-alert listener properly
Code
Review changes
Check out branch
Download
Patches
Plain diff
Merged
Shutdown igwn-alert listener properly
deep.chatterjee/gwcelery:stop-listener-properly
into
main
Overview
16
Commits
1
Pipelines
10
Changes
1
Merged
Deep Chatterjee
requested to merge
deep.chatterjee/gwcelery:stop-listener-properly
into
main
2 years ago
Overview
14
Commits
1
Pipelines
10
Changes
1
Expand
closes
#424 (closed)
Edited
2 years ago
by
Deep Chatterjee
0
0
Merge request reports
Compare
main
version 9
69a0c19b
2 years ago
version 8
d94ac975
2 years ago
version 7
b6c0e58f
2 years ago
version 6
1ca1754d
2 years ago
version 5
f2b3c410
2 years ago
version 4
9d523648
2 years ago
version 3
1f3a5c2b
2 years ago
version 2
0962bca8
2 years ago
version 1
71827064
2 years ago
main (base)
and
version 5
latest version
eb4efd0d
1 commit,
2 years ago
version 9
69a0c19b
1 commit,
2 years ago
version 8
d94ac975
1 commit,
2 years ago
version 7
b6c0e58f
1 commit,
2 years ago
version 6
1ca1754d
1 commit,
2 years ago
version 5
f2b3c410
4 commits,
2 years ago
version 4
9d523648
4 commits,
2 years ago
version 3
1f3a5c2b
3 commits,
2 years ago
version 2
0962bca8
2 commits,
2 years ago
version 1
71827064
1 commit,
2 years ago
1 file
+
53
−
2
Inline
Compare changes
Side-by-side
Inline
Show whitespace changes
Show one file at a time
gwcelery/igwn_alert/bootsteps.py
+
53
−
2
Options
import
json
from
json
import
JSONDecodeError
from
threading
import
Thread
import
warnings
from
celery
import
bootsteps
from
celery.utils.log
import
get_logger
from
hop.models
import
JSONBlob
from
hop.io
import
StartPosition
from
igwn_alert
import
client
from
.signals
import
igwn_alert_received
@@ -11,6 +16,51 @@ __all__ = ('Receiver',)
log
=
get_logger
(
__name__
)
class
IGWNAlertClient
(
client
):
def
__init__
(
self
,
*
args
,
**
kwargs
):
super
().
__init__
(
*
args
,
**
kwargs
)
self
.
batch_size
=
1
self
.
until_eos
=
True
self
.
running
=
False
self
.
start_at
=
StartPosition
.
LATEST
# mostly implemented from https://git.ligo.org/computing/igwn-alert/client/-/blob/main/igwn_alert/client.py # noqa: E501
def
listen
(
self
,
callback
=
None
,
topic
=
None
):
self
.
running
=
True
if
topic
:
if
isinstance
(
topic
,
str
):
topic
=
[
topic
]
listen_topics
=
topic
else
:
listen_topics
=
self
.
get_topics
()
s
=
self
.
open
(
self
.
_construct_topic_url
(
listen_topics
),
"
r
"
)
log
.
info
(
"
Opened connection to scimma
"
)
while
self
.
running
:
try
:
for
payload
,
metadata
in
s
.
read
(
metadata
=
True
,
batch_size
=
self
.
batch_size
,
batch_timeout
=
self
.
batch_timeout
):
# Fix in case message is in new format:
if
isinstance
(
payload
,
JSONBlob
):
payload
=
payload
.
content
else
:
try
:
payload
=
json
.
loads
(
payload
)
except
(
JSONDecodeError
,
TypeError
)
as
e
:
warnings
.
warn
(
"
Payload is not valid
"
"
json: {}
"
.
format
(
e
))
if
not
callback
:
print
(
"
New message from topic {topic}: {msg}
"
.
format
(
topic
=
metadata
.
topic
,
msg
=
payload
))
else
:
callback
(
topic
=
metadata
.
topic
.
split
(
'
.
'
)[
1
],
payload
=
payload
)
except
(
KeyboardInterrupt
,
SystemExit
):
self
.
running
=
False
s
.
close
()
class
IGWNAlertBootStep
(
bootsteps
.
ConsumerStep
):
"""
Generic boot step to limit us to appropriate kinds of workers.
@@ -41,7 +91,8 @@ class Receiver(IGWNAlertBootStep):
def
start
(
self
,
consumer
):
super
().
start
(
consumer
)
self
.
_client
=
client
(
group
=
consumer
.
app
.
conf
[
'
igwn_alert_group
'
])
self
.
_client
=
IGWNAlertClient
(
group
=
consumer
.
app
.
conf
[
'
igwn_alert_group
'
])
self
.
thread
=
Thread
(
target
=
self
.
_client
.
listen
,
args
=
(
_send_igwn_alert
,
consumer
.
app
.
conf
[
'
igwn_alert_topics
'
]),
@@ -50,8 +101,8 @@ class Receiver(IGWNAlertBootStep):
def
stop
(
self
,
consumer
):
super
().
stop
(
consumer
)
self
.
_client
.
running
=
False
self
.
thread
.
join
()
self
.
_client
.
disconnect
()
def
info
(
self
,
consumer
):
return
{
'
igwn-alert-topics
'
:
consumer
.
app
.
conf
[
Loading