Commit b2d3e9f8 authored by Daniel Brown's avatar Daniel Brown
Browse files

Update to work with pipe streaming output data

parent 350dcb82
......@@ -2195,9 +2195,13 @@ class kat(object):
self.__progress = None
self.__output_ready = threading.Condition()
self.__output_data = None
self.__stop_event = threading.Event()
self.__ready_event = threading.Event()
self.__output_bytes = None
self.__stream_bytes_recv = None
self.__stream_bytes_expect = None
self.__stream_bytes = None
@property
def version(self): return self.__version
......@@ -2206,65 +2210,100 @@ class kat(object):
def progress(self): return self.__progress
def __read_pipe(self):
try:
# Size of size_t array on system
s_size_t = struct.calcsize('@n')
s_i = struct.calcsize("i")
# Size of size_t array on system
s_size_t = struct.calcsize('@n')
s_i = struct.calcsize("i")
for line in self.__pipe_r:
if self.__stop_event.is_set():
return None
for line in self.__pipe_r:
if self.__stop_event.is_set():
return None
cmd = line.decode('ascii').strip()
b = self.__pipe_r.read(s_size_t)
data_size = struct.unpack('@n', b)[0]
data = self.__pipe_r.read(data_size)
if self.__on_recv is not None:
self.__on_recv(cmd, data)
if cmd == "version":
self.__version = data.decode('ascii')
elif cmd == "paxis":
index, = struct.unpack('i', data[:struct.calcsize('i')])
name = data[struct.calcsize('i'):].decode('utf8')
self.__paxis_indices[name] = index
elif cmd == "ready":
self.__ready_event.set()
cmd = line.decode('ascii').strip()
elif cmd == "test_recv":
print("Finesse recv", struct.unpack('ddi', data))
data_size = struct.unpack('@n', self.__pipe_r.read(s_size_t))[0]
data_size_recv = 0
data_recv = bytearray(data_size)
elif cmd == "progress":
#self.__progress = line.split("\t")
pass
while data_size_recv < data_size:
# Loop until we get all our data
_data = self.__pipe_r.read(data_size - data_size_recv)
data_recv[data_size_recv:(data_size_recv+len(_data))] = _data
data_size_recv += len(_data)
elif cmd == "output_info":
index, = struct.unpack('i', data[:s_i])
dtype = data[s_i:(s_i+3)].rstrip(b'\0').decode('utf8')
name = data[(s_i+3):].decode('utf8')
if len(_data) == 0:
raise pkex.BasePyKatException("Could not read from pipe")
else:
#print(cmd, "%i %i %i"%(data_size_recv, data_size, len(_data)))
pass
self.__output_dtype.append((name, dtype))
if self.__on_recv is not None:
self.__on_recv(cmd, data)
if cmd == "version":
self.__version = data_recv.decode('ascii')
elif cmd == "paxis":
index, = struct.unpack('i', data_recv[:struct.calcsize('i')])
name = data_recv[struct.calcsize('i'):].decode('utf8')
self.__paxis_indices[name] = index
elif cmd == "ready":
self.__ready_event.set()
elif cmd == "output":
self.__ready_event.wait()
elif cmd == "test_recv":
print("Finesse recv", struct.unpack('ddi', data_recv))
vals = struct.unpack('i', data[:struct.calcsize('i')])
bodata = data[struct.calcsize('i'):]
elif cmd == "progress":
#self.__progress = line.split("\t")
pass
self.__output_ready.acquire()
self.__output_data = np.frombuffer(bodata, dtype=self.__output_dtype)
self.__output_ready.notify()
self.__output_ready.release()
elif cmd == "output_info":
index, = struct.unpack('i', data_recv[:s_i])
dtype = data_recv[s_i:(s_i+3)].rstrip(b'\0').decode('utf8')
name = data_recv[(s_i+3):].decode('utf8')
self.__output_dtype.append((name, dtype))
elif cmd == "start_stream": # stream
self.__ready_event.wait()
if self.__stream_bytes_expect is not None:
raise pkex.BasePyKatException("Already expecting a stream")
N = struct.unpack('n', data_recv)[0]
#print("START STREAM", N)
self.__stream_bytes_recv = 0
self.__stream_bytes_expect = N
self.__stream_bytes = bytearray(N)
elif cmd == "s": # output data stream
self.__stream_bytes[self.__stream_bytes_recv:(self.__stream_bytes_recv+len(data_recv))] = data_recv
self.__stream_bytes_recv += len(data_recv)
if self.__stream_bytes_expect == self.__stream_bytes_recv:
self.__output_ready.acquire()
self.__output_bytes = self.__stream_bytes
self.__stream_bytes_recv = None
self.__stream_bytes_expect = None
self.__stream_bytes = None
self.__output_ready.notify()
self.__output_ready.release()
elif cmd in ("recv", "test_str_recv", "progress", "WAITING","ping"):
pass
else:
raise pkex.BasePyKatException("Did not handle command type `%s`" % cmd)
finally:
self.__ready_event.set()
self.__output_ready.acquire()
self.__output_ready.notify()
self.__output_ready.release()
elif cmd in ("recv", "test_str_recv", "progress", "WAITING","ping"):
pass
else:
raise pkex.BasePyKatException("Did not handle command type %s" % cmd)
def open(self, timeout=5, cmd_args=None):
if self.__open:
raise pkex.BasePyKatException("Simulation pipe already open")
......@@ -2337,21 +2376,49 @@ class kat(object):
raise pkex.BasePyKatException("Paxis called `%s` does not exist" % paxis_name)
send_update(self.__pipe_w, self.__paxis_indices[paxis_name], np.float64(value))
def axis(self, paxis_name, _from, _to, N):
if self.__stop_event.is_set():
raise pkex.BasePyKatException("Simulation pipe is closing")
if not self.__open:
raise pkex.BasePyKatException("Simulation pipe is not open")
self.__ready_event.wait()
dtype = np.dtype(self.__output_dtype)
dsize = dtype.itemsize
self.__output_ready.acquire()
def compute(self):
send_do_axis(self.__pipe_w, self.__paxis_indices[paxis_name], _from, _to, N)
#print("OUTPUT START")
while not self.__output_ready.wait(timeout=1):
print("Waiting for ouput: ", self.__stream_bytes_recv, self.__stream_bytes_expect)
#print("OUTPUT END")
out = np.frombuffer(self.__output_bytes, dtype=dtype).copy()
self.__output_ready.release()
return out
def step(self):
if self.__stop_event.is_set():
raise pkex.BasePyKatException("Simulation pipe is closing")
if not self.__open:
raise pkex.BasePyKatException("Simulation pipe is not open")
self.__ready_event.wait()
send_do_step(self.__pipe_w)
dtype = np.dtype(self.__output_dtype)
dsize = dtype.itemsize
# wait for output
self.__output_ready.acquire()
send_do_step(self.__pipe_w)
self.__output_ready.wait()
out = self.__output_data.copy()
out = np.frombuffer(self.__output_bytes, dtype=dtype).copy()
self.__output_ready.release()
return out
......
......@@ -4,6 +4,8 @@ import struct
import pykat.exceptions as pkex
import os
from struct import calcsize as cs
def open_pipes(pipe_name, start_kat, duration):
fifo_r = None
fifo_w = None
......@@ -57,6 +59,16 @@ def send_do_step(fifo_w):
fifo_w.write(struct.pack('b', 0))
fifo_w.flush()
def send_do_axis(fifo_w, idx, _from, to, N):
if fifo_w is not None:
fifo_w.write(("<do_axis>").encode())
fifo_w.write(struct.pack('b', cs('i') + cs('d') + cs('d') + cs('i')))
fifo_w.write(struct.pack('i', idx))
fifo_w.write(struct.pack('d', _from))
fifo_w.write(struct.pack('d', to))
fifo_w.write(struct.pack('I', N))
fifo_w.flush()
def send_update(fifo_w, idx, value):
if fifo_w is not None:
bdata = struct.pack('id', idx, value)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment