Skip to content
GitLab
Projects
Groups
Snippets
Help
Loading...
Help
What's new
10
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Sign in
Toggle navigation
Open sidebar
lscsoft
gstlal
Commits
4ee635f9
Commit
4ee635f9
authored
Mar 20, 2019
by
chad.hanna
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
lloidhandler: reduce some of the data going to kafka
parent
7ad4e4a2
Pipeline
#54294
passed with stages
in 32 minutes and 55 seconds
Changes
1
Pipelines
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
19 additions
and
2 deletions
+19
-2
gstlal-inspiral/python/lloidhandler.py
gstlal-inspiral/python/lloidhandler.py
+19
-2
No files found.
gstlal-inspiral/python/lloidhandler.py
View file @
4ee635f9
...
...
@@ -260,6 +260,7 @@ class EyeCandy(object):
t
=
inspiral
.
now
()
if
self
.
time_since_last_state
is
None
:
self
.
time_since_last_state
=
t
# NOTE only dump to kafka every 1 seconds
if
self
.
producer
is
not
None
and
(
t
-
self
.
time_since_last_state
)
>=
1
:
self
.
time_since_last_state
=
t
for
ii
,
column
in
enumerate
([
"time"
,
"data"
]):
...
...
@@ -284,9 +285,25 @@ class EyeCandy(object):
self
.
kafka_data
[
"%s_strain_dropped"
%
instrument
][
"time"
].
append
(
float
(
t
))
self
.
kafka_data
[
"%s_strain_dropped"
%
instrument
][
"data"
].
append
(
elem
.
get_property
(
"add"
)
/
16384.
)
# Send and flush all of the kafka messages and clear the data
# decimate the data in the other routes
for
route
in
self
.
kafka_data
.
keys
():
if
route
==
"coinc"
or
len
(
self
.
kafka_data
[
route
][
"data"
])
==
0
:
continue
if
route
==
"likelihood_history"
or
route
==
"latency_history"
or
"snr_history"
in
route
:
ix
=
numpy
.
argmax
(
self
.
kafka_data
[
route
][
"data"
])
self
.
kafka_data
[
route
][
"time"
]
=
[
self
.
kafka_data
[
route
][
"time"
][
ix
]]
self
.
kafka_data
[
route
][
"data"
]
=
[
self
.
kafka_data
[
route
][
"data"
][
ix
]]
if
route
==
"far_history"
:
ix
=
numpy
.
argmin
(
self
.
kafka_data
[
route
][
"data"
])
self
.
kafka_data
[
route
][
"time"
]
=
[
self
.
kafka_data
[
route
][
"time"
][
ix
]]
self
.
kafka_data
[
route
][
"data"
]
=
[
self
.
kafka_data
[
route
][
"data"
][
ix
]]
# Send all of the kafka messages and clear the data
self
.
producer
.
send
(
self
.
tag
,
self
.
kafka_data
)
self
.
producer
.
flush
()
# This line forces the send but is blocking!! not the
# best idea for production running since we value
# latency over getting metric data out
#self.producer.flush()
for
route
in
self
.
kafka_data
.
keys
():
self
.
kafka_data
[
route
]
=
{
'time'
:
[],
'data'
:
[]}
self
.
kafka_data
[
"coinc"
]
=
[]
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment