Commit 3d9f5f0e authored by Daniel Brown's avatar Daniel Brown

Changing piping model. Two threads are created, one reads from pykat the other...

Changing piping model. Two threads are created, one reads from pykat the other waits for output data to stream to pykat. The main thread runs the simulation and process the messages. It also sends command messages back to pykat. The main and stream thread together send all the data back to pykat. By using a streaming thread, the main thread can still jump in at times and send other information back, such as progress or debug info.
parent 81e6f8f3
......@@ -58,7 +58,7 @@ static int readfn(void *handler, char *buf, int _size) {
static int writefn(void *handler, const char *buf, int _size) {
fmem_t *mem = handler;
size_t available = mem->size - mem->pos;
size_t size = (size_t) _size;
size_t size = (size_t) _size;
if (size > available) {
size = available;
......
......@@ -106,6 +106,13 @@ typedef struct __attribute__((packed)) pykat_output_info {
char name[];
} pykat_output_info_t;
typedef struct __attribute__((packed)) pykat_pipe_axis {
int paxis_index;
double from;
double to;
uint32__t N;
} pykat_pipe_axis_t;
/**
* Determines how to scale the amplitude outputs.
*/
......@@ -1496,13 +1503,6 @@ typedef struct output_data {
// result
complex_t signal; //!< output signal
/**
* pointer to where pipe output data should be written to
* the memory location should have been allocated for
* output_type
**/
void* pipe_output;
double re; //!< real component
double im; //!< imaginary component
......
......@@ -4023,16 +4023,21 @@ void write_data_header(FILE *ofp) {
}
}
// --- all from kat_io
extern pthread_cond_t msg_cond;
extern pthread_mutex_t msg_mutex;
extern pthread_mutex_t pykat_stream_mutex;
extern pthread_cond_t output_to_stream;
extern UT_array *queued_pykat_commands;
extern bool quiet_pipe_warn;
extern bool msg_thread_done;
extern bool write_thread_done;
// --- end
void do_pipe_axes() {
int rc = 0;
bool finishing = false;
int step = 0;
int i;
......@@ -4077,31 +4082,6 @@ void do_pipe_axes() {
free(p);
}
// Allocate space for storing the output data to send to pykat
pipe_output_data_t *output = (pipe_output_data_t*) malloc(sizeof(pipe_output_data_t) + pykat_output_size);
// Set the initial position pointer
inter.output_data_list[0].pipe_output = output->data;
// Here we set the rest of the output data pointers in the buffer
for(i=1; i<inter.num_outputs; i++) {
void *prev = inter.output_data_list[i-1].pipe_output;
switch(inter.output_data_list[i-1].output_type){
case COMPLEX:
prev += 16;
break;
case REAL:
prev += 8;
break;
default:
gerror("Unexpected value");
break;
}
inter.output_data_list[i].pipe_output = prev;
}
// inform pykat we are now ready to accept simulation commands after
// sending init information
pycmd("ready", NULL, 0);
......@@ -4111,7 +4091,7 @@ void do_pipe_axes() {
// Here we wait and loop through messages that come through from pykat
while(true){
if(msg_thread_done) break;
if(msg_thread_done || write_thread_done) break;
pthread_mutex_lock(&msg_mutex);
......@@ -4159,34 +4139,80 @@ void do_pipe_axes() {
//warn("UPDATE %i %g\n", *index, *value);
} else if(string_matches_exactly(pc->command, "do_step")){
start_stream(pykat_output_size);
// Run the model as it is then return the output
step_simulation();
step++;
output->step = step;
int i=0;
lock_stream();
for(i=0; i<inter.num_outputs; i++){
switch(inter.output_data_list[i].output_type){
case COMPLEX:
*((complex_t*)inter.output_data_list[i].pipe_output) = inter.output_data_list[i].signal;
write_stream_z(&(inter.output_data_list[i].signal));
break;
case REAL:
*((double*)inter.output_data_list[i].pipe_output) = inter.output_data_list[i].signal.re;
write_stream_d(&(inter.output_data_list[i].signal.re));
break;
default:
gerror("Unexpected value");
break;
}
}
signal_stream();
unlock_stream();
} else if(string_matches_exactly(pc->command, "do_axis")){
pykat_pipe_axis_t *axes = pc->data;
pycmd("output", output, sizeof(pipe_output_data_t) + pykat_output_size);
assert(pc->size == sizeof(pykat_pipe_axis_t));
if(axes->paxis_index < 0 || axes->paxis_index >= inter.num_paxis){
gerror("paxis index %i not in range", axes->paxis_index);
}
int i, j;
double d;
if(axes->N == 1) {
d = 0;
} else {
d = (axes->to - axes->from) / (axes->N-1);
}
start_stream(pykat_output_size * axes->N);
for(i=0; i<axes->N; i++) {
inter.paxis_list[axes->paxis_index].previous = *inter.paxis_list[axes->paxis_index].target;
*inter.paxis_list[axes->paxis_index].target = axes->from + d*i;
step_simulation();
lock_stream();
for(j=0; j<inter.num_outputs; j++){
switch(inter.output_data_list[j].output_type){
case COMPLEX:
write_stream_z(&(inter.output_data_list[j].signal));
break;
case REAL:
write_stream_d(&(inter.output_data_list[j].signal.re));
break;
default:
gerror("Unexpected value");
break;
}
}
signal_stream();
unlock_stream();
}
} else if(string_matches_exactly(pc->command, "finished")){
finishing = true;
break;
} else {
gerror("Didn't handle message `%s`\n", pc->command);
}
}
}
......@@ -4198,8 +4224,6 @@ void do_pipe_axes() {
if(finishing) break;
}
free(output);
}
//! Generate results and plot for 1D or 2D simulations
......
......@@ -96,25 +96,47 @@ int pykat_pipe_read;
int pykat_fd_read;
FILE* pykat_file_read;
pthread_mutex_t pipe_mutex_w = PTHREAD_MUTEX_INITIALIZER;
struct {
void *stream;
size_t size; // Total size
size_t cur; // Current index that has been streamed
size_t end; // End index of data written so far
uint8__t package_id; // Id for the output data package sent to pykat
pthread_mutex_t mutex; // Stream access lock
pthread_cond_t finished; // Set when finished sending
pthread_cond_t ouput_ready; // Set when data is available to send
} pykat_output_stream = {NULL,
0,
0,
0,
0,
PTHREAD_MUTEX_INITIALIZER,
PTHREAD_COND_INITIALIZER,
PTHREAD_COND_INITIALIZER};
pthread_t pipe_thread_r;
UT_array *queued_pykat_commands;
UT_icd pykat_command_icd = {sizeof(pykat_command_t), NULL, NULL, NULL};
pthread_cond_t msg_cond = PTHREAD_COND_INITIALIZER;
pthread_mutex_t msg_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t msg_mutex = PTHREAD_MUTEX_INITIALIZER;
// Mutex shared between main thread and streaming to sync writing messages to pykat
pthread_mutex_t pipe_mutex_w = PTHREAD_MUTEX_INITIALIZER;
pthread_t pipe_thread_r;
pthread_t pipe_thread_w;
void *buffer = NULL;
ssize_t epos = 0; // end position
ssize_t spos = 0; // start position
bool quiet_pipe_warn = true;
bool msg_thread_done = false;
bool write_thread_done = false;
void pyprint(const char *fmt, ...){
if(pykat_file_write == NULL) gerror("Pykat write pipe is closed\n");
pthread_mutex_lock( &pipe_mutex_w );
va_list arg;
va_start(arg, fmt);
......@@ -122,35 +144,40 @@ void pyprint(const char *fmt, ...){
va_end(arg);
fflush(pykat_file_write);
pthread_mutex_unlock( &pipe_mutex_w );
}
void pywrite(void* ptr, size_t size){
assert(ptr != NULL);
assert(size > 0);
pthread_mutex_lock( &pipe_mutex_w );
size_t N = write(pykat_fd_write, ptr, size);
if(N == -1){
gerror("Write pipe error (%i): %s\n", errno, strerror(errno));
}
if(N != size){
} else if(N != size){
gerror("Write failure: %zd %zu\n", N, size);
}
pthread_mutex_unlock( &pipe_mutex_w );
}
/**
* This should be used for sending a complete message in one go
* for sending a message to pykat.
*
* Will request access to pipe_mutex_w.
*
* @param cmd
* @param data
* @param size
*/
void pycmd(const char *cmd, void *data, size_t size){
pthread_mutex_lock( &pipe_mutex_w );
pyprint("%s\n", cmd);
pywrite(&size, sizeof(size_t));
if(data != NULL){
pywrite(data, size);
}
pthread_mutex_unlock( &pipe_mutex_w );
}
/**
......@@ -244,6 +271,148 @@ bool extract_message(size_t start, size_t end, int *counter){
return false;
}
/**
* Opens a stream with a required length. Will reuse previously stream memory
* if available and realloc if not enough is there.
*
* Will fail if previous stream didn't finish as only one stream can be run at once.
*
* @param len length of stream required
* @return stream pointer
*/
void start_stream(size_t len) {
if(pykat_output_stream.stream == NULL){
pykat_output_stream.stream = calloc(len, 1);
if(!pykat_output_stream.stream){
gerror("Could not allocate %zu for stream\n", len);
}
} else {
if(pykat_output_stream.cur != pykat_output_stream.end){
gerror("Previous streaming not finished %zu %zu\n", pykat_output_stream.cur, pykat_output_stream.end);
}
if(pykat_output_stream.size < len){
pykat_output_stream.stream = realloc(pykat_output_stream.stream, len);
if(!pykat_output_stream.stream){
gerror("Could not reallocate %zu for stream\n", len);
}
}
}
pykat_output_stream.package_id++;
// reset variables
pykat_output_stream.cur = 0;
pykat_output_stream.end = 0;
pykat_output_stream.size = len;
pycmd("start_stream", &len, sizeof(len));
}
/**
* Signals to streaming thread there is data to send
*/
inline void signal_stream() {
pthread_cond_signal(& pykat_output_stream.ouput_ready);
}
inline void lock_stream() {
pthread_mutex_lock(&pykat_output_stream.mutex);
}
inline void unlock_stream() {
pthread_mutex_unlock(&pykat_output_stream.mutex);
}
/**
* Writes a complex_t value to the output stream
* @param ptr
*/
inline void write_stream_z(complex_t *ptr) {
assert(ptr != NULL);
assert(pykat_output_stream.stream != NULL);
assert(pykat_output_stream.size - pykat_output_stream.end >= sizeof(complex_t));
*((complex_t*) (pykat_output_stream.stream + pykat_output_stream.end)) = *ptr;
pykat_output_stream.end += sizeof(complex_t);
}
/**
* Writes a double value to the output stream
* @param ptr
*/
inline void write_stream_d(double *ptr) {
assert(ptr != NULL);
assert(pykat_output_stream.stream != NULL);
assert(pykat_output_stream.size - pykat_output_stream.end >= sizeof(double));
*((double*)(pykat_output_stream.stream + pykat_output_stream.end)) = *ptr;
pykat_output_stream.end += sizeof(double);
}
/**
* Closes the stream and frees it's memory.
*/
void end_stream() {
if(pykat_output_stream.end - pykat_output_stream.cur != 0) {
gerror("Stream did not finishing writing all the data\n");
}
pykat_output_stream.cur = 0;
pykat_output_stream.end = 0;
free(pykat_output_stream.stream);
pykat_output_stream.stream = NULL;
}
void *do_output_streaming(){
int rc;
// This will stream any output generated and send it to pykat
while(true) {
// Lock stream whilst we determine how much data there is to send.
lock_stream();
// If no data to stream then wait
if(pykat_output_stream.cur == pykat_output_stream.end){
//warn("STREAM WAIT %zu %zu\n",pykat_output_stream.cur, pykat_output_stream.end);
rc = pthread_cond_wait(&pykat_output_stream.ouput_ready, &pykat_output_stream.mutex);
}
size_t data_len = pykat_output_stream.end - pykat_output_stream.cur;
//warn("STREAMING bytes %zu %zu\n", pykat_output_stream.cur, pykat_output_stream.end);
void *data = pykat_output_stream.stream + pykat_output_stream.cur;
size_t packet_len = data_len;
unlock_stream();
// get lock for writing to pykat pipe and send
// chunk of output data that we have
pthread_mutex_lock(&pipe_mutex_w);
pyprint("s\n");
pywrite(&packet_len, sizeof(packet_len));
pywrite(data, data_len);
fflush(pykat_file_write);
pykat_output_stream.cur += data_len;
pthread_mutex_unlock(&pipe_mutex_w);
}
// Signal to make sure we continue main thread
pthread_mutex_lock(&msg_mutex);
write_thread_done = true;
pthread_cond_signal(&msg_cond);
pthread_mutex_unlock(&msg_mutex);
return NULL;
}
void *do_pipe_reader(){
bool quiet = quiet_pipe_warn;
size_t buffer_inc = 1024; // If we need more memory, this is how much to inc
......@@ -443,6 +612,14 @@ void open_pipes(){
gerror("Error creating read thread (%i)\n", err);
}
}
{ // Start output streaming thread
int err = pthread_create(&pipe_thread_w, NULL, do_output_streaming, NULL);
if(err) {
gerror("Error creating stream thread (%i)\n", err);
}
}
#endif
}
}
......@@ -2038,7 +2215,7 @@ void print_progress(int num_points, int current_point) {
if(options.perl1) {
char line[1024] = {0};
sprintf(line, "%s\t%d\t%s", progress_action, progress_percentage, progress_message);
pycmd("progress", (void*)line, strlen(line));
//pycmd("progress", (void*)line, strlen(line));
} else {
text_length = fprintf(stderr, PROG_STRING, "", progress_action, progress_percentage, progress_message);
// if the previous progress text length wasn't as long as the last then we
......
......@@ -97,6 +97,13 @@ void close_pipes();
void open_pipes();
void pycmd(const char *cmd, void *data, size_t size);
void start_stream(size_t len);
void write_stream_d(double *ptr);
void write_stream_z(complex_t *ptr);
void signal_stream();
void lock_stream();
void unlock_stream();
void print_version(FILE *file);
void print_help2(void);
void print_help(void);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment