Draft: Port to new resource source implementation
This ports the arrakis source to work with greg/sgn-ts!146
I couldn't figure out how to test it despite trying for a while. But you can run this with SGNLOGLEVEL=DEBUG and it seems to do something (assuming arrakis server is running in mock)
#!/usr/bin/env python3
"""
SGN Pipeline to read H1:CAL-DELTAL_EXTERNAL_DQ channel from Arrakis mock server
in real-time
"""
import os
# Set the Arrakis server URL before importing
os.environ["ARRAKIS_SERVER"] = "grpc://localhost:31206"
from sgn import Pipeline
from sgn.subprocess import Parallelize
from sgnts.sinks import NullSeriesSink
from sgn_arrakis import ArrakisSource
def main():
# Define the channel to read
channels = ["H1:CAL-DELTAL_EXTERNAL_DQ"]
print(f"Starting pipeline to read channels: {channels}")
print("Press Ctrl+C to stop\n")
# Create the Arrakis source to read from the mock server
# It will automatically connect to the server configured in ../sgn-ligo
# The mock server has data starting around GPS time 1435366385
source = ArrakisSource(
source_pad_names=channels,
start_time=1435366385, # GPS time where mock data starts
duration=10, # Read 10 seconds of data
)
# Create a null sink that will print received data (verbose=True)
# You can replace this with a different sink to process the data
sink = NullSeriesSink(
sink_pad_names=channels,
verbose=False, # Print information about received blocks
)
# Create the link map to connect source outputs to sink inputs
link_map = {sink.snks[channel]: source.srcs[channel] for channel in channels}
# Build the pipeline
pipeline = Pipeline()
pipeline.insert(source, sink, link_map=link_map)
# Run the pipeline with Parallelize context manager (required for TSResourceSource)
with Parallelize(pipeline) as parallelize:
print("Pipeline is running and reading from mock server...")
parallelize.run()
print("Pipeline completed successfully")
import threading
active_threads = threading.enumerate()
print("Active threads:")
for thread in active_threads:
print(
f" - Name: {thread.name}, Alive: {thread.is_alive()}, "
f"Daemon: {thread.daemon}"
)
if __name__ == "__main__":
main()
import sys
sys.exit(0)