Skip to content
Snippets Groups Projects
Commit c1352317 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

Merge branch 'backend_fixes' into 'main'

Backend de-duplication + fixes

See merge request ngdd/arrakis-server!23
parents 0340f284 11b7100e
Branches main
No related tags found
No related merge requests found
Pipeline #650132 passed
......@@ -66,7 +66,7 @@ def main():
"--backend-server-url",
help=(
"URL pointing to a running backend server. "
"Required if using CLICKHOUSE or NDS2 backend."
"Required if using CLICKHOUSE, HYBRID, or NDS2 backend."
),
)
parser.add_argument(
......@@ -81,10 +81,11 @@ def main():
# validate arguments
match BackendType[args.backend.upper()]:
case BackendType.CLICKHOUSE | BackendType.NDS2:
case BackendType.CLICKHOUSE | BackendType.HYBRID | BackendType.NDS2:
if not args.backend_server_url:
parser.error(
"CLICKHOUSE and NDS2 backend require --backend-server-url."
"CLICKHOUSE, HYBRID, and NDS2 backends "
"require --backend-server-url."
)
# set up logger
......
......@@ -17,6 +17,7 @@ from daqd_clickhouse.grpc.client import DEFAULT_PORT, ClickHouseGRPCClient
from pyarrow import flight
from .. import schemas
from . import utils
class ClickHouseBackend:
......@@ -70,7 +71,7 @@ class ClickHouseBackend:
"""Retrieve timeseries data for the 'fetch' route."""
# Query for channel metadata
metadata = self._get_channel_metadata(channels)
dtypes = [channel_to_dtype_name(channel) for channel in metadata]
dtypes = [utils.channel_to_dtype_name(channel) for channel in metadata]
schema = schemas.stream(channels, dtypes)
is_live = not start and not end
......@@ -178,9 +179,3 @@ def read_all_batches(
yield reader.read_next_batch()
except StopIteration:
return
def channel_to_dtype_name(channel: Channel) -> str:
"""Given a channel, return the data type's name."""
assert channel.data_type is not None
return channel.data_type.name
......@@ -26,6 +26,7 @@ from pyarrow import flight
from .. import schemas
from ..partition import partition_channels
from . import utils
DEFAULT_TIMEOUT = timedelta(seconds=1)
......@@ -43,12 +44,12 @@ class HybridBackend:
def do_describe(self, *, channels: Iterable[str]) -> flight.FlightDataStream:
"""Retrieve metadata for the 'find' route."""
metadata = [self._metadata[channel] for channel in channels]
return self._generate_metadata(metadata)
return utils.create_metadata_stream(metadata)
def do_find(self, **kwargs) -> flight.FlightDataStream:
"""Retrieve metadata for the 'find' route."""
channels = self._match_channels(**kwargs)
return self._generate_metadata(channels)
return utils.create_metadata_stream(channels)
def do_publish(self, *, producer_id: str) -> flight.FlightDataStream:
"""Retrieve connection info for the 'publish' route."""
......@@ -77,10 +78,11 @@ class HybridBackend:
channels = []
for batch in read_all_chunks(reader):
for meta in batch.to_pylist():
data_type = numpy.dtype(meta["data_type"])
channel = Channel.from_name(
meta["channel"],
sample_rate=meta["sample_rate"],
data_type=meta["data_type"],
data_type=data_type,
)
channels.append(channel)
......@@ -93,33 +95,11 @@ class HybridBackend:
producer_id=producer_id,
partitions=self._metadata,
)
self._channels = {channel for channel in self._metadata.values()}
channels = [channel for channel in self._metadata.values()]
self._channels = set(channels)
# prepare the batch with mappings
batch = pyarrow.RecordBatch.from_arrays(
[
pyarrow.array(
[channel.name for channel in channels],
type=schema.field("channel").type,
),
pyarrow.array(
[channel.data_type for channel in channels],
type=schema.field("data_type").type,
),
pyarrow.array(
[channel.sample_rate for channel in channels],
type=schema.field("sample_rate").type,
),
pyarrow.array(
[
self._metadata[channel.name].partition_id
for channel in channels
],
type=schema.field("partition_id").type,
),
],
schema=schema,
)
batch = utils.create_metadata_batch(channels)
# send partitions back to the client
writer.begin(schema)
......@@ -149,7 +129,7 @@ class HybridBackend:
"""Retrieve timeseries data for the 'stream' route."""
# Query for channel metadata
metadata = [self._metadata[channel] for channel in channels]
dtypes = [channel_to_dtype_name(channel) for channel in metadata]
dtypes = [utils.channel_to_dtype_name(channel) for channel in metadata]
schema = schemas.stream(channels, dtypes)
is_live = not start and not end
......@@ -180,10 +160,11 @@ class HybridBackend:
with partition_file.open("r") as f:
partitions = toml.load(f)
for channel_name, meta in partitions.items():
data_type = numpy.dtype(meta["dtype"])
channel = Channel.from_name(
channel_name,
data_type=data_type,
sample_rate=meta["rate"],
data_type=numpy.dtype(meta["dtype"]),
partition_id=meta["partition"],
)
metadata[channel_name] = channel
......@@ -211,31 +192,6 @@ class HybridBackend:
return channels
def _generate_metadata(self, channels: list[Channel]) -> flight.RecordBatchStream:
dtypes = [channel_to_dtype_name(channel) for channel in channels]
schema = schemas.find()
batch = pyarrow.RecordBatch.from_arrays(
[
pyarrow.array(
[str(channel) for channel in channels],
type=schema.field("channel").type,
),
pyarrow.array(dtypes, type=schema.field("data_type").type),
pyarrow.array(
[channel.sample_rate for channel in channels],
type=schema.field("sample_rate").type,
),
pyarrow.array(
[channel.partition_id for channel in channels],
type=schema.field("partition_id").type,
),
],
schema=schema,
)
return flight.RecordBatchStream(
pyarrow.RecordBatchReader.from_batches(schema, [batch])
)
def _generate_live_series(
self, channels: Iterable[Channel]
) -> Iterator[pyarrow.RecordBatch]:
......@@ -531,9 +487,3 @@ def read_all_batches(
yield reader.read_next_batch()
except StopIteration:
return
def channel_to_dtype_name(channel: Channel) -> str:
"""Given a channel, return the data type's name."""
assert channel.data_type is not None
return channel.data_type.name
......@@ -21,6 +21,7 @@ from arrakis import Channel, Time
from pyarrow import flight
from ... import schemas
from .. import utils
from . import channels as channel_lists
logger = logging.getLogger("arrakis")
......@@ -93,12 +94,12 @@ class MockBackend:
def do_describe(self, *, channels: Iterable[str]) -> flight.FlightDataStream:
"""Retrieve metadata for the 'find' route."""
metadata = [self._channel_map[channel] for channel in channels]
return self._generate_metadata(metadata)
return utils.create_metadata_stream(metadata)
def do_find(self, **kwargs) -> flight.FlightDataStream:
"""Retrieve metadata for the 'find' route."""
channels = self._match_channels(**kwargs)
return self._generate_metadata(channels)
return utils.create_metadata_stream(channels)
def do_count(self, **kwargs) -> flight.FlightDataStream:
"""Retrieve metadata for the 'count' route."""
......@@ -123,7 +124,8 @@ class MockBackend:
"""Retrieve timeseries data for the 'fetch' route."""
# Generate data from requested channels
dtypes = [
channel_to_dtype_name(self._channel_map[channel]) for channel in channels
utils.channel_to_dtype_name(self._channel_map[channel])
for channel in channels
]
schema = schemas.stream(channels, dtypes)
......@@ -162,31 +164,6 @@ class MockBackend:
return channels
def _generate_metadata(self, channels: list[Channel]) -> flight.RecordBatchStream:
dtypes = [channel_to_dtype_name(channel) for channel in channels]
schema = schemas.find()
batch = pyarrow.RecordBatch.from_arrays(
[
pyarrow.array(
[str(channel) for channel in channels],
type=schema.field("channel").type,
),
pyarrow.array(dtypes, type=schema.field("data_type").type),
pyarrow.array(
[channel.sample_rate for channel in channels],
type=schema.field("sample_rate").type,
),
pyarrow.array(
[None for _ in channels],
type=schema.field("partition_id").type,
),
],
schema=schema,
)
return flight.RecordBatchStream(
pyarrow.RecordBatchReader.from_batches(schema, [batch])
)
def _generate_block(
self, schema: pyarrow.Schema, channels: Iterable[str], timestamp: int
) -> pyarrow.RecordBatch:
......@@ -246,9 +223,3 @@ class MockBackend:
time.sleep(
max((current - int(gpstime.gpsnow() * Time.SECONDS)) / Time.SECONDS, 0)
)
def channel_to_dtype_name(channel: Channel) -> str:
"""Given a channel, return the data type's name."""
assert channel.data_type is not None
return channel.data_type.name
......@@ -14,6 +14,7 @@ from arrakis import Channel, Time
from pyarrow import flight
from .. import schemas
from . import utils
_NDS2_DATA_TYPE: dict[int, numpy.dtype] = {
1: numpy.dtype(numpy.int16),
......@@ -55,7 +56,7 @@ class NDS2Backend:
)
for channel in nds2_channels
]
return self._generate_metadata(channels)
return utils.create_metadata_stream(channels)
def do_count(
self, *, pattern: str, data_type: str, min_rate: int, max_rate: int
......@@ -90,7 +91,7 @@ class NDS2Backend:
def do_describe(self, *, channels: Iterable[str]) -> flight.FlightDataStream:
"""Retrieve metadata for the 'describe' route."""
return self._generate_metadata(self._query_metadata(channels))
return utils.create_metadata_stream(self._query_metadata(channels))
def do_stream(
self, *, channels: Iterable[str], start: int, end: int
......@@ -98,7 +99,7 @@ class NDS2Backend:
"""Retrieve timeseries data for the 'fetch' route."""
# Query for channel metadata
metadata = self._query_metadata(channels)
dtypes = [channel_to_dtype_name(channel) for channel in metadata]
dtypes = [utils.channel_to_dtype_name(channel) for channel in metadata]
schema = schemas.stream(channels, dtypes)
# stream data
......@@ -152,27 +153,6 @@ class NDS2Backend:
for buf in buffers
]
def _generate_metadata(self, channels: list[Channel]) -> flight.RecordBatchStream:
dtypes = [channel_to_dtype_name(channel) for channel in channels]
schema = schemas.find()
batch = pyarrow.RecordBatch.from_arrays(
[
pyarrow.array(
[str(channel) for channel in channels],
type=schema.field("channel").type,
),
pyarrow.array(dtypes, type=schema.field("data_type").type),
pyarrow.array(
[channel.sample_rate for channel in channels],
type=schema.field("sample_rate").type,
),
],
schema=schema,
)
return flight.RecordBatchStream(
pyarrow.RecordBatchReader.from_batches(schema, [batch])
)
@staticmethod
def _extract_channel_metadata(
batches: Iterable[pyarrow.RecordBatch],
......@@ -187,9 +167,3 @@ class NDS2Backend:
)
channels.append(channel)
return channels
def channel_to_dtype_name(channel: Channel) -> str:
"""Given a channel, return the data type's name."""
assert channel.data_type is not None
return channel.data_type.name
# Copyright (c) 2022, California Institute of Technology and contributors
#
# You should have received a copy of the licensing terms for this
# software included in the file "LICENSE" located in the top-level
# directory of this package. If you did not, you can view a copy at
# https://git.ligo.org/ngdd/arrakis-server/-/raw/main/LICENSE
import pyarrow
from arrakis import Channel
from pyarrow import flight
from .. import schemas
def create_metadata_batch(channels: list[Channel]) -> pyarrow.RecordBatch:
"""Create a record batch from channel metadata."""
schema = schemas.find()
metadata = [
(
channel.name,
channel_to_dtype_name(channel),
channel.sample_rate,
channel.partition_id,
)
for channel in channels
]
if metadata:
names, dtypes, rates, partitions = map(list, zip(*metadata))
else:
names, dtypes, rates, partitions = [], [], [], []
return pyarrow.RecordBatch.from_arrays(
[
pyarrow.array(names, type=schema.field("channel").type),
pyarrow.array(dtypes, type=schema.field("data_type").type),
pyarrow.array(rates, type=schema.field("sample_rate").type),
pyarrow.array(partitions, type=schema.field("partition_id").type),
],
schema=schema,
)
def create_metadata_stream(channels: list[Channel]) -> flight.RecordBatchStream:
"""Create a record batch stream from channel metadata."""
return record_batch_to_stream(create_metadata_batch(channels))
def record_batch_to_stream(batch: pyarrow.RecordBatch) -> flight.RecordBatchStream:
"""Convert a record batch into a record batch stream."""
return flight.RecordBatchStream(
pyarrow.RecordBatchReader.from_batches(batch.schema, [batch])
)
def channel_to_dtype_name(channel: Channel) -> str:
"""Given a channel, return the data type's name."""
assert channel.data_type is not None
return channel.data_type.name
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment