From 98f6cb2d8da23b153a3f29c6cf8397dfb8eb86c1 Mon Sep 17 00:00:00 2001
From: Jonathan Hanks <jonathan.hanks@ligo.org>
Date: Thu, 10 Oct 2019 10:37:06 -0700
Subject: [PATCH] Integrating tests to the standalone_edc.  WIP tests do not
 pass.

* Moving a size and data type information from daq/channels.h to
include/daq_data_types.h.
* Extending standalone_edc to use DBR_TIME* to look for late data.
* Updating the stream generators to handle more data types.
---
 src/daqd/channel.h                            |  33 +-
 src/dv/Th/CMakeLists.txt                      |   1 +
 src/epics/seq/CMakeLists.txt                  |   8 +
 src/epics/seq/standalone_edcu.cc              | 287 ++++++++++++------
 .../seq/test/daqdrc_standalone_edc_live_test  |  82 +++++
 src/epics/seq/test/epics_test.py              |  71 ++++-
 .../test/test_standalone_edc_live_nds.sh.in   | 139 +++++++++
 src/fe_stream_test/CMakeLists.txt             |   4 +-
 .../fe_stream_check_edcu_nds.cc               | 129 ++++++--
 src/fe_stream_test/fe_stream_generator.cc     |  39 ++-
 src/fe_stream_test/fe_stream_generator.hh     |  70 ++++-
 src/include/daq_data_types.h                  |  35 +++
 src/nds/CMakeLists.txt                        |   1 +
 13 files changed, 706 insertions(+), 193 deletions(-)
 create mode 100644 src/epics/seq/test/daqdrc_standalone_edc_live_test
 create mode 100644 src/epics/seq/test/test_standalone_edc_live_nds.sh.in
 create mode 100644 src/include/daq_data_types.h

diff --git a/src/daqd/channel.h b/src/daqd/channel.h
index 1916a6eac..1f6c377de 100644
--- a/src/daqd/channel.h
+++ b/src/daqd/channel.h
@@ -8,6 +8,8 @@
   #include <values.h>
 #endif
 
+#include <daq_data_types.h>
+
 /* Allowed maximum length for DMT channels */
 #define MAX_LONG_CHANNEL_NAME_LENGTH 255
 /* Allowed maximum length for DAQ channels */
@@ -22,41 +24,10 @@
 #define MAX_CHANNELS 524288
 #define MAX_TREND_CHANNELS  MAX_CHANNELS
 
-/* numbering must be contiguous */
-typedef enum {
-  _undefined = 0,
-  _16bit_integer = 1,
-  _32bit_integer = 2,
-  _64bit_integer = 3,
-  _32bit_float = 4,
-  _64bit_double = 5,
-  _32bit_complex = 6,
-  _32bit_uint = 7
-} daq_data_t;
-
 /* should be equal to the last data type   */
 #define MAX_DATA_TYPE _32bit_uint
 #define MIN_DATA_TYPE _16bit_integer
 
-inline static int
-data_type_size (short dtype) {
-  switch (dtype) {
-  case _16bit_integer: // 16 bit integer
-    return 2;
-  case _32bit_integer: // 32 bit integer
-  case _32bit_float: // 32 bit float
-  case _32bit_uint: // 32 bit unsigned integer
-    return 4;
-  case _64bit_integer: // 64 bit integer
-  case _64bit_double: // 64 bit double
-    return 8;
-  case _32bit_complex: // 32 bit complex
-    return 4*2;
-  default:
-    return _undefined;
-  }
-}
-
 inline static double
 data_type_max(short dtype) {
   switch (dtype) {
diff --git a/src/dv/Th/CMakeLists.txt b/src/dv/Th/CMakeLists.txt
index b8763d60b..eeb476af3 100644
--- a/src/dv/Th/CMakeLists.txt
+++ b/src/dv/Th/CMakeLists.txt
@@ -6,5 +6,6 @@ target_include_directories(datasrv PUBLIC
         ${CMAKE_CURRENT_SOURCE_DIR}
         ${CMAKE_CURRENT_SOURCE_DIR}/../Lib
         ${CMAKE_CURRENT_SOURCE_DIR}/../../daqd
+        ${CMAKE_CURRENT_SOURCE_DIR}/../../include
         )
 add_library(th::datasrv ALIAS datasrv)
diff --git a/src/epics/seq/CMakeLists.txt b/src/epics/seq/CMakeLists.txt
index 32e9152f3..61bf0cc37 100644
--- a/src/epics/seq/CMakeLists.txt
+++ b/src/epics/seq/CMakeLists.txt
@@ -14,6 +14,14 @@ target_link_libraries(standalone_edc PUBLIC
         ${CMAKE_THREAD_LIBS_INIT})
 target_requires_cpp11(standalone_edc PUBLIC)
 
+configure_file(test/epics_test.py ${CMAKE_CURRENT_BINARY_DIR}/epics_test.py COPYONLY)
+configure_file(test/daqdrc_standalone_edc_live_test ${CMAKE_CURRENT_BINARY_DIR}/daqdrc_standalone_edc_live_test COPYONLY)
+configure_file(test/test_standalone_edc_live_nds.sh.in ${CMAKE_CURRENT_BINARY_DIR}/test_standalone_edc_live_nds.sh @ONLY)
+
+add_test(NAME test_standalone_edc_live_nds
+        COMMAND /bin/bash ./test_standalone_edc_live_nds.sh
+        WORKING_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}")
+
 install(TARGETS standalone_edc DESTINATION bin)
 
 else(Boost_FOUND)
diff --git a/src/epics/seq/standalone_edcu.cc b/src/epics/seq/standalone_edcu.cc
index 2bd61524e..110fb8997 100644
--- a/src/epics/seq/standalone_edcu.cc
+++ b/src/epics/seq/standalone_edcu.cc
@@ -18,6 +18,7 @@ of this distribution.
 #include <algorithm>
 #include <array>
 #include <iterator>
+#include <numeric>
 #include <string>
 #include <thread>
 #include <utility>
@@ -35,6 +36,7 @@ of this distribution.
 #include <fcntl.h>
 
 #include <daqmap.h>
+#include <daq_data_types.h>
 
 #define BOOST_ASIO_USE_BOOST_DATE_TIME_FOR_SOCKET_IOSTREAM
 #include <boost/asio.hpp>
@@ -44,21 +46,6 @@ of this distribution.
 #include <rapidjson/writer.h>
 #include <rapidjson/stringbuffer.h>
 
-/* taken from channel.h in the daqd source
- * find a way to do this better
- */
-/* numbering must be contiguous */
-typedef enum
-{
-    _undefined = 0,
-    _16bit_integer = 1,
-    _32bit_integer = 2,
-    _64bit_integer = 3,
-    _32bit_float = 4,
-    _64bit_double = 5,
-    _32bit_complex = 6,
-    _32bit_uint = 7
-} daq_data_t;
 
 extern "C" {
 #include "findSharedMemory.h"
@@ -118,22 +105,42 @@ typedef union edc_data_t
     double  data_float64;
 } edc_data_t;
 
+struct edc_timestamped_data_t
+{
+    edc_timestamped_data_t(): data(), timestamp() {
+        timestamp.secPastEpoch = 0;
+        timestamp.nsec = 0;
+    }
+    edc_data_t data;
+    epicsTimeStamp timestamp;
+};
+
 unsigned long daqFileCrc;
-typedef struct daqd_c
+class daqd_c
 {
+public:
+    daqd_c():
+    num_chans(0), con_chans(0), val_events(0), con_events(0),
+    channel_type(), channel_value(), channel_name(), channel_status(),
+    gpsTime(0),
+    epicsSync(0),
+    prefix(nullptr),
+    dcuid(0)
+    {}
+
     int                num_chans;
-    std::atomic< int > con_chans;
+    int con_chans;
     int                val_events;
     int                con_events;
     daq_data_t         channel_type[ EDCU_MAX_CHANS ];
-    edc_data_t         channel_value[ EDCU_MAX_CHANS ];
+    edc_timestamped_data_t         channel_value[ EDCU_MAX_CHANS ];
     char               channel_name[ EDCU_MAX_CHANS ][ 64 ];
     int                channel_status[ EDCU_MAX_CHANS ];
     long               gpsTime;
     long               epicsSync;
     char*              prefix;
     int                dcuid;
-} daqd_c;
+};
 
 int num_chans_index = -1;
 int con_chans_index = -1;
@@ -148,7 +155,6 @@ static struct cdsDaqNetGdsTpNum* shmTpTable;
 static const int                 buf_size = DAQ_DCU_BLOCK_SIZE * 2;
 static const int                 header_size =
     sizeof( struct rmIpcStr ) + sizeof( struct cdsDaqNetGdsTpNum );
-static DAQ_XFER_INFO xferInfo;
 
 static int symmetricom_fd = -1;
 int        timemarks[ 16 ] = { 1000 * 1000,   63500 * 1000,  126000 * 1000,
@@ -596,6 +602,20 @@ connectCallback( struct connection_handler_args args )
     daqd_edcu1.con_events++;
 }
 
+inline
+bool operator>(const epicsTimeStamp t1, const epicsTimeStamp t2)
+{
+    if (t1.secPastEpoch > t2.secPastEpoch)
+    {
+        return true;
+    }
+    if (t1.secPastEpoch < t2.secPastEpoch)
+    {
+        return false;
+    }
+    return t1.nsec > t2.nsec;
+}
+
 // **************************************************************************
 void
 subscriptionHandler( struct event_handler_args args )
@@ -606,30 +626,65 @@ subscriptionHandler( struct event_handler_args args )
     {
         return;
     }
+
     switch ( args.type )
     {
-    case DBR_SHORT:
+    case DBR_TIME_SHORT:
     {
-        int16_t val = *( (int16_t*)args.dbr );
-        ( (edc_data_t*)( args.usr ) )->data_int16 = val;
+
+        dbr_time_short* dbr = (dbr_time_short*)(args.dbr);
+
+        edc_timestamped_data_t* edc_data = ((edc_timestamped_data_t *) (args.usr));
+        int i = edc_data - &(daqd_edcu1.channel_value[0]);
+        if (dbr->stamp > edc_data->timestamp)
+        {
+//            if (strcmp(daqd_edcu1.channel_name[i], "X6:EDC-1571--gpssmd30koff1p--20--1--16") == 0)
+//            {
+//                std::cout << edc_data->data.data_int16 << " " << edc_data->timestamp.secPastEpoch << ":"
+//                << edc_data->timestamp.nsec << "    -> "
+//                << dbr->value << " " << dbr->stamp.secPastEpoch << ":"
+//                << dbr->stamp.nsec << std::endl;
+//            }
+            edc_data->data.data_int16 = dbr->value;
+            edc_data->timestamp.secPastEpoch = dbr->stamp.secPastEpoch;
+            edc_data->timestamp.nsec = dbr->stamp.nsec;
+        }
     }
     break;
-    case DBR_LONG:
+    case DBR_TIME_LONG:
     {
-        int32_t val = *( (int32_t*)args.dbr );
-        ( (edc_data_t*)( args.usr ) )->data_int32 = val;
+        dbr_time_long* dbr = (dbr_time_long*)(args.dbr);
+        edc_timestamped_data_t* edc_data = ((edc_timestamped_data_t *) (args.usr));
+        if (dbr->stamp > edc_data->timestamp)
+        {
+            edc_data->data.data_int32 = dbr->value;
+            edc_data->timestamp.secPastEpoch = dbr->stamp.secPastEpoch;
+            edc_data->timestamp.nsec = dbr->stamp.nsec;
+        }
     }
     break;
-    case DBR_FLOAT:
+    case DBR_TIME_FLOAT:
     {
-        float val = *( (float*)args.dbr );
-        ( (edc_data_t*)( args.usr ) )->data_float32 = val;
+        dbr_time_float* dbr = (dbr_time_float*)(args.dbr);
+        edc_timestamped_data_t* edc_data = ((edc_timestamped_data_t *) (args.usr));
+        if (dbr->stamp > edc_data->timestamp)
+        {
+            edc_data->data.data_float32 = dbr->value;
+            edc_data->timestamp.secPastEpoch = dbr->stamp.secPastEpoch;
+            edc_data->timestamp.nsec = dbr->stamp.nsec;
+        }
     }
     break;
-    case DBR_DOUBLE:
+    case DBR_TIME_DOUBLE:
     {
-        double val = *( (double*)args.dbr );
-        ( (edc_data_t*)( args.usr ) )->data_float64 = val;
+        dbr_time_double* dbr = (dbr_time_double*)(args.dbr);
+        edc_timestamped_data_t* edc_data = ((edc_timestamped_data_t *) (args.usr));
+        if (dbr->stamp > edc_data->timestamp)
+        {
+            edc_data->data.data_float64 = dbr->value;
+            edc_data->timestamp.secPastEpoch = dbr->stamp.secPastEpoch;
+            edc_data->timestamp.nsec = dbr->stamp.nsec;
+        }
     }
     break;
     default:
@@ -664,23 +719,35 @@ daq_data_t_to_epics( daq_data_t datatype )
     switch ( datatype )
     {
     case _16bit_integer:
-        return DBR_SHORT;
+        return DBR_TIME_SHORT;
     case _32bit_integer:
-        return DBR_LONG;
+        return DBR_TIME_LONG;
     case _32bit_float:
-        return DBR_FLOAT;
+        return DBR_TIME_FLOAT;
     case _64bit_double:
-        return DBR_DOUBLE;
+        return DBR_TIME_DOUBLE;
     default:
         throw std::runtime_error( "Unexpected data type given" );
     }
 }
 
+std::size_t
+accumulte_daq_sizes(std::size_t cur, daq_data_t data_type)
+{
+    return cur + data_type_size(data_type);
+}
+
+std::size_t
+calculate_data_size(const daqd_c& edc)
+{
+    return std::accumulate(&(edc.channel_type[0]), &(edc.channel_type[0]) + edc.num_chans, 0, accumulte_daq_sizes);
+}
+
 bool
 channel_is_edcu_special_chan( daqd_c* edc, const char* channel_name )
 {
     const char* dummy_prefix = "";
-    const char* prefix = ( edc->prefix ? prefix : dummy_prefix );
+    const char* prefix = ( edc->prefix ? edc->prefix : dummy_prefix );
     size_t      pref_len = strlen( prefix );
     size_t      name_len = strlen( channel_name );
 
@@ -740,13 +807,17 @@ channel_parse_callback( char*              channel_name,
         std::cerr << "Invalid data type given for " << channel_name << "\n";
         exit( 1 );
     }
-    if ( channel_is_edcu_special_chan( edc, channel_name ) &&
-         daq_data_type != _32bit_integer )
+    if ( channel_is_edcu_special_chan( edc, channel_name ) )
     {
-        std::cerr << "The edcu special variables (EDCU_CHAN_CONN/CNT/NOCON) "
-                     "must be 32 bit ints ("
-                  << static_cast< int >( _32bit_integer ) << ")\n";
-        exit( 1 );
+        if ( daq_data_type != _32bit_integer && daq_data_type != _32bit_float )
+        {
+            std::cerr
+                << "The edcu special variables (EDCU_CHAN_CONN/CNT/NOCON) "
+                   "must be 32 bit ints ("
+                << static_cast< int >( _32bit_integer ) << ") or 32 bit floats("
+                << static_cast< int >( _32bit_float ) << "\n";
+            exit( 1 );
+        }
     }
     edc->channel_type[ edc->num_chans ] = daq_data_type;
     strncpy( edc->channel_name[ edc->num_chans ],
@@ -758,9 +829,7 @@ channel_parse_callback( char*              channel_name,
 
 // **************************************************************************
 void
-edcuCreateChanList( const char*    pref,
-                    const char*    daqfilename,
-                    unsigned long* crc )
+edcuCreateChanList( daqd_c& daq, const char* daqfilename, unsigned long* crc )
 {
     // **************************************************************************
     int           i = 0;
@@ -768,33 +837,32 @@ edcuCreateChanList( const char*    pref,
     unsigned long dummy_crc = 0;
 
     char eccname[ 256 ];
-    sprintf( eccname, "%s%s", pref, "EDCU_CHAN_CONN" );
+    sprintf( eccname, "%s%s", daq.prefix, "EDCU_CHAN_CONN" );
     char chcntname[ 256 ];
-    sprintf( chcntname, "%s%s", pref, "EDCU_CHAN_CNT" );
+    sprintf( chcntname, "%s%s", daq.prefix, "EDCU_CHAN_CNT" );
     char cnfname[ 256 ];
-    sprintf( cnfname, "%s%s", pref, "EDCU_CHAN_NOCON" );
+    sprintf( cnfname, "%s%s", daq.prefix, "EDCU_CHAN_NOCON" );
 
     if ( !crc )
     {
         crc = &dummy_crc;
     }
-    daqd_edcu1.num_chans = 0;
+    daq.num_chans = 0;
 
-    daqd_edcu1.dcuid = -1;
+    daq.dcuid = -1;
     parseConfigFile( const_cast< char* >( daqfilename ),
                      crc,
                      channel_parse_callback,
                      -1,
                      (char*)0,
-                     reinterpret_cast< void* >( &daqd_edcu1 ) );
-    if ( daqd_edcu1.num_chans < 1 )
+                     reinterpret_cast< void* >( &daq ) );
+    if ( daq.num_chans < 1 )
     {
         std::cerr << "No channels to record, aborting\n";
         exit( 1 );
     }
 
-    xferInfo.crcLength = 4 * daqd_edcu1.num_chans;
-    printf( "CRC data length = %d\n", xferInfo.crcLength );
+    std::cout << "CRC data length = " << calculate_data_size(daqd_edcu1) << "\n";
 
     chid chid1;
     if ( ca_context_create( ca_enable_preemptive_callback ) != ECA_NORMAL )
@@ -803,60 +871,76 @@ edcuCreateChanList( const char*    pref,
         exit( 1 );
     }
 
-    for ( i = 0; i < daqd_edcu1.num_chans; i++ )
+    for ( i = 0; i < daq.num_chans; i++ )
     {
-        if ( strcmp( daqd_edcu1.channel_name[ i ], chcntname ) == 0 )
+        if ( strcmp( daq.channel_name[ i ], chcntname ) == 0 )
         {
             num_chans_index = i;
             internal_channel_count = internal_channel_count + 1;
-            daqd_edcu1.channel_status[ i ] = 0;
+            daq.channel_status[ i ] = 0;
         }
-        else if ( strcmp( daqd_edcu1.channel_name[ i ], eccname ) == 0 )
+        else if ( strcmp( daq.channel_name[ i ], eccname ) == 0 )
         {
             con_chans_index = i;
             internal_channel_count = internal_channel_count + 1;
-            daqd_edcu1.channel_status[ i ] = 0;
+            daq.channel_status[ i ] = 0;
         }
-        else if ( strcmp( daqd_edcu1.channel_name[ i ], cnfname ) == 0 )
+        else if ( strcmp( daq.channel_name[ i ], cnfname ) == 0 )
         {
             nocon_chans_index = i;
             internal_channel_count = internal_channel_count + 1;
-            daqd_edcu1.channel_status[ i ] = 0;
+            daq.channel_status[ i ] = 0;
         }
         else
         {
-            status =
-                ca_create_channel( daqd_edcu1.channel_name[ i ],
-                                   connectCallback,
-                                   (void*)&( daqd_edcu1.channel_status[ i ] ),
-                                   0,
-                                   &chid1 );
+            status = ca_create_channel( daq.channel_name[ i ],
+                                        connectCallback,
+                                        (void*)&( daq.channel_status[ i ] ),
+                                        0,
+                                        &chid1 );
             if ( status != ECA_NORMAL )
             {
                 fprintf( stderr,
                          "Error creating connection to %s\n",
-                         daqd_edcu1.channel_name[ i ] );
+                         daq.channel_name[ i ] );
             }
             status = ca_create_subscription(
-                daq_data_t_to_epics( daqd_edcu1.channel_type[ i ] ),
+                daq_data_t_to_epics( daq.channel_type[ i ] ),
                 0,
                 chid1,
                 DBE_VALUE,
                 subscriptionHandler,
-                (void*)&( daqd_edcu1.channel_value[ i ] ),
+                (void*)&( daq.channel_value[ i ] ),
                 0 );
             if ( status != ECA_NORMAL )
             {
                 fprintf( stderr,
                          "Error creating subscription for %s\n",
-                         daqd_edcu1.channel_name[ i ] );
+                         daq.channel_name[ i ] );
             }
         }
     }
 
-    daqd_edcu1.con_chans = daqd_edcu1.con_chans + internal_channel_count;
+    daq.con_chans = daq.con_chans + internal_channel_count;
+}
+
+void edcuLoadSpecial(int index, int value)
+{
+    if (index >= 0) {
+        switch (daqd_edcu1.channel_type[index]) {
+            case _32bit_integer:
+                daqd_edcu1.channel_value[index].data.data_int32 = value;
+                break;
+            case _32bit_float:
+                daqd_edcu1.channel_value[index].data.data_float32 = static_cast<float>(value);
+                break;
+        }
+    }
 }
 
+
+
+
 // **************************************************************************
 void
 edcuWriteData( int           daqBlockNum,
@@ -869,55 +953,55 @@ edcuWriteData( int           daqBlockNum,
     int   buf_size;
     int   ii;
 
-    if ( num_chans_index != -1 )
-    {
-        daqd_edcu1.channel_value[ num_chans_index ].data_int32 =
-            daqd_edcu1.num_chans;
-    }
-
-    if ( con_chans_index != -1 )
-    {
-        daqd_edcu1.channel_value[ con_chans_index ].data_int32 =
-            daqd_edcu1.con_chans;
-    }
-
-    if ( nocon_chans_index != -1 )
-    {
-        daqd_edcu1.channel_value[ nocon_chans_index ].data_int32 =
-            daqd_edcu1.num_chans - daqd_edcu1.con_chans;
-    }
+    edcuLoadSpecial(num_chans_index, daqd_edcu1.num_chans);
+    edcuLoadSpecial(con_chans_index, daqd_edcu1.con_chans);
+    edcuLoadSpecial(nocon_chans_index, daqd_edcu1.num_chans - daqd_edcu1.con_chans);
 
     buf_size = DAQ_DCU_BLOCK_SIZE * DAQ_NUM_SWING_BUFFERS;
     daqData = (char*)( shmDataPtr + ( buf_size * daqBlockNum ) );
+    char *data_start = daqData;
+
+    static std::int16_t data_16 = 0;
     for ( ii = 0; ii < daqd_edcu1.num_chans; ++ii )
     {
         switch ( daqd_edcu1.channel_type[ ii ] )
         {
         case _16bit_integer:
         {
+            if (strcmp(daqd_edcu1.channel_name[ii], "X6:EDC-99--gpssmd30koff1p--0--1--16") == 0)
+            {
+                std::int16_t tmp = daqd_edcu1.channel_value[ ii ].data.data_int16;
+                std::cout << tmp;
+                if (tmp < data_16)
+                {
+                    std::cout << " **";
+                }
+                std::cout << std::endl;
+                data_16 = tmp;
+            }
             *reinterpret_cast< int16_t* >( daqData ) =
-                daqd_edcu1.channel_value[ ii ].data_int16;
+                daqd_edcu1.channel_value[ ii ].data.data_int16;
             daqData += sizeof( int16_t );
             break;
         }
         case _32bit_integer:
         {
             *reinterpret_cast< int32_t* >( daqData ) =
-                daqd_edcu1.channel_value[ ii ].data_int32;
+                daqd_edcu1.channel_value[ ii ].data.data_int32;
             daqData += sizeof( int32_t );
             break;
         }
         case _32bit_float:
         {
             *reinterpret_cast< float* >( daqData ) =
-                daqd_edcu1.channel_value[ ii ].data_float32;
+                daqd_edcu1.channel_value[ ii ].data.data_float32;
             daqData += sizeof( float );
             break;
         }
         case _64bit_double:
         {
             *reinterpret_cast< double* >( daqData ) =
-                daqd_edcu1.channel_value[ ii ].data_float64;
+                daqd_edcu1.channel_value[ ii ].data.data_float64;
             daqData += sizeof( double );
             break;
         }
@@ -932,9 +1016,10 @@ edcuWriteData( int           daqBlockNum,
     //        daqd_edcu1.num_chans * sizeof( float ) );
     dipc->dcuId = dcuId;
     dipc->crc = daqFileCrc;
-    dipc->dataBlockSize = xferInfo.crcLength;
+    dipc->dataBlockSize = daqData - data_start;
+    dipc->channelCount = daqd_edcu1.num_chans;
     dipc->bp[ daqBlockNum ].cycle = daqBlockNum;
-    dipc->bp[ daqBlockNum ].crc = xferInfo.crcLength;
+    dipc->bp[ daqBlockNum ].crc = daqData - data_start;
     dipc->bp[ daqBlockNum ].timeSec = (unsigned int)cycle_gps_time;
     dipc->bp[ daqBlockNum ].timeNSec = (unsigned int)daqBlockNum;
     if ( daqreset )
@@ -1065,10 +1150,12 @@ main( int argc, char* argv[] )
     const char* daqsharedmemname = "edc_daq";
     // const char* syncsharedmemname = "-";
     const char* daqFile = "edc.ini";
-    const char* prefix = "";
+    // const char* prefix = "";
 
     int delay_multiplier = 0;
 
+    memset( (void*)&daqd_edcu1, 0, sizeof( daqd_edcu1 ) );
+
     diag_thread_args diag_args( diag_msg_queue, diag_free_queue );
 
     int cur_arg = 0;
@@ -1086,7 +1173,7 @@ main( int argc, char* argv[] )
             delay_multiplier = atoi( optarg );
             break;
         case 'p':
-            prefix = optarg;
+            daqd_edcu1.prefix = optarg;
             break;
         case 'l':
             diag_args.address = parse_address( optarg );
@@ -1099,8 +1186,6 @@ main( int argc, char* argv[] )
         }
     }
 
-    memset( (void*)&daqd_edcu1, 0, sizeof( daqd_edcu1 ) );
-
     // **********************************************
     //
 
@@ -1112,7 +1197,7 @@ main( int argc, char* argv[] )
         daqd_edcu1.channel_status[ ii ] = 0xbad;
     }
     edcuInitialize( daqsharedmemname, "-" );
-    edcuCreateChanList( prefix, daqFile, &daqFileCrc );
+    edcuCreateChanList( daqd_edcu1, daqFile, &daqFileCrc );
     std::cout << "The edc dcuid = " << daqd_edcu1.dcuid << "\n";
     int datarate = daqd_edcu1.num_chans * 64 / 1000;
 
diff --git a/src/epics/seq/test/daqdrc_standalone_edc_live_test b/src/epics/seq/test/daqdrc_standalone_edc_live_test
new file mode 100644
index 000000000..92eadf608
--- /dev/null
+++ b/src/epics/seq/test/daqdrc_standalone_edc_live_test
@@ -0,0 +1,82 @@
+set thread_stack_size=10240;
+#set cit_40m=1;
+set dcu_status_check=1;
+#set symm_gps_offset=-1;
+#set controller_dcu=22;
+set debug=0;
+set log=6;
+set zero_bad_data=0;
+set master_config="MASTER";
+configure channels begin end;
+#tpconfig "TESTPOINT";
+
+status dcu;
+#tpconfig "TESTPOINT";
+
+set gps_leaps = 820108813;
+set detector_name="TST";
+set detector_prefix="X6";
+set detector_longitude=-90.7742403889;
+set detector_latitude=30.5628943337;
+set detector_elevation=.0;
+set detector_azimuths=1.1,4.7123889804;
+set detector_altitudes=1.0,2.0;
+set detector_midpoints=2000.0, 2000.0;
+
+#enable frame_wiper;
+#set num_dirs = 10;
+#set frames_per_dir=225;
+#set full_frames_per_file=1;
+#set full_frames_blocks_per_frame=32;
+#set frame_dir="/frames/full", "M-R-", ".gwf";
+#scan frames;
+
+#enable trend_frame_wiper;
+#set trend_num_dirs=10;
+#set trend_frames_per_dir=1440;
+#set trend_frame_dir= "/frames/trend/second", "M-T-", ".gwf";
+
+#set raw-minute-trend-dir="/frames/trend/minute/raw";
+
+#set nds-jobs-dir="/opt/fb";
+
+set parameter "shmem_input" = "local_dc";
+set parameter "shmem_size" = "104857600";
+
+
+#enable minute-trend-frame-wiper;
+#set minute-trend-num-dirs=10;
+#set minute-trend-frames-per-dir=24;
+#set minute-trend-frame-dir="/frames/trend/minute", "M-M-", ".gwf";
+#
+#scan minute-trend-frames;
+#scan trend-frames;
+scan frames;
+
+start main 5;
+start profiler;
+
+# comment out this block to stop saving data
+
+#start frame-saver;
+#sync frame-saver;
+#start trender;
+#start trend-frame-saver;
+#sync trend-frame-saver;
+#start minute-trend-frame-saver;
+#sync minute-trend-frame-saver;
+#start raw-minute-trend-saver;
+
+#start fast-writer "127.255.255.255" broadcast="127.0.0.0" all;
+#sleep 2;
+#sleep 5;
+
+start producer;
+#start epics dcu;
+;start epics server "X3:DAQ-SHM0_" "X3:DAQ-SHM0_";
+
+#start listener 8087;
+start listener 8088 1;
+# for this test we do not need to clear the crcs
+#sleep 60;
+#clear crc;
diff --git a/src/epics/seq/test/epics_test.py b/src/epics/seq/test/epics_test.py
index fec3fab38..b54cce55a 100755
--- a/src/epics/seq/test/epics_test.py
+++ b/src/epics/seq/test/epics_test.py
@@ -1,10 +1,12 @@
 #!/usr/bin/env python3
 
+from __future__ import print_function
+
 import threading
 
 import pcaspy
 
-def write_ini_file(prefix, db, fname):
+def write_ini_file(prefix, db, fname, datatypes):
     print("Writing ini file '{0}'".format(fname))
     with open(fname, 'wt') as f:
         f.write("""[default]
@@ -18,22 +20,32 @@ offset=0
 slope=1.0
 units=undef
 """)
-        keys = list(db.keys())
-        keys.sort()
         i = 0
-        for entry in keys:
+
+        for entry in ['EDCU_CHAN_CONN', 'EDCU_CHAN_CNT',]:
             f.write("""[{0}{1}]
 datarate=16
 datatype=4
 chnnum={2}
 """.format(prefix, entry, 40000 + i))
-            i += 1
-        for entry in ['EDCU_CHAN_CONN', 'EDCU_CHAN_CNT', 'EDCU_CHAN_NOCON']:
+            i = i + 1
+
+        f.write("""[{0}{1}]
+datarate=16
+datatype=2
+chnnum={2}
+""".format(prefix, 'EDCU_CHAN_NOCON', 40000 + i))
+        i = i + 1
+
+        keys = list(db.keys())
+        keys.sort()
+        for entry in keys:
             f.write("""[{0}{1}]
 datarate=16
-datatype=4
+datatype={3}
 chnnum={2}
-""".format(prefix, entry, 40000 + i))
+""".format(prefix, entry, 40000 + i, datatypes[entry]))
+            i += 1
 
 
 def read_time():
@@ -43,9 +55,11 @@ def read_time():
         return int(data[0:dot])
 
 class myDriver(pcaspy.Driver):
-    def __init__(self, offsets):
+    def __init__(self, offsets, datatypes):
         super(myDriver, self).__init__()
-        self.__offsets = offsets
+        self.__params = {}
+        for entry in offsets.keys():
+            self.__params[entry] = (offsets[entry], datatypes[entry])
         self.__lock = threading.Lock()
 
     def read(self, reason):
@@ -57,8 +71,11 @@ class myDriver(pcaspy.Driver):
 
     def update_vals(self, ref_time):
         with self.__lock:
-            for entry in self.__offsets:
-                val = (ref_time % 100000) + self.__offsets[entry]
+            for entry in self.__params:
+                if self.__params[entry][1] == 1:
+                    val = (ref_time % 30000) + self.__params[entry][0]
+                else:
+                    val = (ref_time % 100000) + self.__params[entry][0]
                 #if entry == "EDC-189--gpssmd100koff1p--24--2--16":
                 #    print("X6:EDC-189--gpssmd100koff1p--24--2--16  == {0} offset of {1}".format(val, self.__offsets[entry]))
                 self.setParam(entry, val)
@@ -67,10 +84,34 @@ class myDriver(pcaspy.Driver):
 prefix="X6:"
 db = {}
 offsets = {}
+datatypes = {}
 
 def add_entry(db, i, offset):
     global offsets
-    name="EDC-{0}--gpssmd100koff1p--{1}--4--16".format(i, offset)
+    global datatypes
+
+    # 5 = double
+    # 4 = float
+    # 2 = 32 bit int
+    # 1 = 16 bit int
+
+    mod4 = i % 4
+    if mod4 == 0:
+        name="EDC-{0}--gpssmd100koff1p--{1}--5--16".format(i, offset)
+        db[name] = {'type': 'float' }
+        datatypes[name] = 5
+    elif mod4 == 1:
+        name="EDC-{0}--gpssmd100koff1p--{1}--4--16".format(i, offset)
+        db[name] = {'type': 'float' }
+        datatypes[name] = 4
+    elif mod4 == 2:
+        name="EDC-{0}--gpssmd100koff1p--{1}--2--16".format(i, offset)
+        db[name] = {'type': 'int' }
+        datatypes[name] = 2
+    elif mod4 == 3:
+        name="EDC-{0}--gpssmd30koff1p--{1}--1--16".format(i, offset)
+        db[name] = {'type': 'int' }
+        datatypes[name] = 1
     db[name] = {'type': 'int' }
     offsets[name] = offset
     return db
@@ -78,11 +119,11 @@ def add_entry(db, i, offset):
 for i in range(2000):
     db = add_entry(db, i, i%33)
 
-write_ini_file(prefix, db, 'edcu.ini')
+write_ini_file(prefix, db, 'edcu.ini', datatypes)
 
 server = pcaspy.SimpleServer()
 server.createPV(prefix, db)
-driver = myDriver(offsets)
+driver = myDriver(offsets, datatypes)
 driver.update_vals(read_time())
 
 last_sec = read_time()
diff --git a/src/epics/seq/test/test_standalone_edc_live_nds.sh.in b/src/epics/seq/test/test_standalone_edc_live_nds.sh.in
new file mode 100644
index 000000000..d5ba3781e
--- /dev/null
+++ b/src/epics/seq/test/test_standalone_edc_live_nds.sh.in
@@ -0,0 +1,139 @@
+#!/bin/bash
+
+CWD="@CMAKE_CURRENT_BINARY_DIR@"
+
+TDIR=""
+PID_EPICS_IOC=0
+PID_STANDALONE_EDC=0
+PID_LOCAL_DC=0
+PID_DAQD=0
+
+function kill_proc {
+    if [ $1 -gt 0 ]; then
+        echo "Closing process $1"
+        kill $1
+    fi
+}
+
+function cleanup {
+    rm -rf daqdrc_live_test_final
+    if [ "x$TDIR" != "x" ]; then
+        if [ -d $TDIR ]; then
+            rm -rf "$TDIR"
+        fi
+    fi
+    kill_proc $PID_EPICS_IOC
+    kill_proc $PID_STANDALONE_EDC
+    kill_proc $PID_LOCAL_DC
+    kill_proc $PID_DAQD
+}
+
+EPICS_IOC="$CWD/epics_test.py"
+if [ ! -f "$EPICS_IOC" ]; then
+    echo "cannot find $EPICS_IOC"
+    exit 1
+fi
+
+STANDALONE_EDC="$CWD/standalone_edc"
+if [ ! -x "$STANDALONE_EDC" ]; then
+    echo "cannot find $STANDALONE_EDC"
+    exit 1
+fi
+
+LOCAL_DC="$CWD/../../local_dc/local_dc"
+if [ ! -x "$LOCAL_DC" ]; then
+    echo "cannot find $LOCAL_DC"
+    exit 1
+fi
+
+DAQD="$CWD/../../daqd/daqd_shmem"
+if [ ! -x "$DAQD" ]; then
+    echo "cannot find $DAQD"
+    exit 1
+fi
+
+FE_STREAM_CHECK_NDS="$CWD/../../fe_stream_test/fe_stream_check_edcu_nds"
+if [ ! -x "$FE_STREAM_CHECK_NDS" ]; then
+  echo "cannot find $FE_STREAM_CHECK_NDS"
+  exit 1
+fi
+
+if [ ! -w /dev/gpstime ]; then
+  echo "the gpstime module must be loaded, configured, and accessible by this user"
+  exit 1
+fi
+
+if [ ! -w /dev/mbuf ]; then
+  echo "the mbuf module must be loaded, configured, and accessible by this user"
+  exit 1
+fi
+
+PYTHON=""
+which python > /dev/null
+if [ $? -eq 0 ]; then
+    PYTHON=`which python`
+else
+    which python3 > /dev/null
+    if [ $? -eq 0 ]; then
+        PYTHON=`which python3`
+    else
+        echo "Cannot find python or python3"
+        exit 1
+    fi
+fi
+
+trap cleanup EXIT
+
+TDIR=`$PYTHON -c "from __future__ import print_function; import tempfile; print(tempfile.mkdtemp())"`
+mkdir "$TDIR/ini_files"
+mkdir "$TDIR/logs"
+
+echo "Ini dir = $TDIR/ini_files"
+
+pushd "$TDIR/ini_files"
+$PYTHON $EPICS_IOC &> "$TDIR/logs/epics_ioc" &
+PID_EPICS_IOC=$!
+popd
+
+echo "$TDIR/ini_files/edcu.ini" > $TDIR/ini_files/master
+
+echo "[X6:EDCU-EXC_1]" > "$TDIR/ini_files/tpchn_edc.par"
+echo "ifoid = 1" >> "$TDIR/ini_files/tpchn_edc.par"
+echo "rmid = 52" >> "$TDIR/ini_files/tpchn_edc.par"
+echo "dcuid = 52" >> "$TDIR/ini_files/tpchn_edc.par"
+echo "chnnum = 1" >> "$TDIR/ini_files/tpchn_edc.par"
+echo "datatype = 4" >> "$TDIR/ini_files/tpchn_edc.par"
+echo "datarate = 16" >> "$TDIR/ini_files/tpchn_edc.par"
+
+sleep 1
+
+"$STANDALONE_EDC" -b edc_daq -i "$TDIR/ini_files/edcu.ini" -p X6: -l 127.0.0.1:9000 &> "$TDIR/logs/standalone_edc" &
+PID_STANDALONE_EDC=$!
+
+"$LOCAL_DC" -b local_dc -m 100 -s edc -d "$TDIR/ini_files" &> "$TDIR/logs/local_dc" &
+PID_LOCAL_DC=$!
+
+echo "Streamer PID = PID_LOCAL_DC"
+
+sleep 1
+
+MASTER_FILE="$TDIR/ini_files/master"
+TESTPOINT_FILE=""
+cat daqdrc_standalone_edc_live_test | sed s\|MASTER\|$MASTER_FILE\| | sed s\|TESTPOINT\|$TESTPOINT_FILE\| > daqdrc_standalone_edc_live_test_final
+"$DAQD" -c daqdrc_standalone_edc_live_test_final &> "$TDIR/logs/daqd" &
+PID_DAQD=$!
+
+echo "Sleeping to allow the daq to start"
+sleep 15
+
+DUMMY=""
+read DUMMY
+
+"$FE_STREAM_CHECK_NDS" -c 30 --live
+RESULT=$?
+
+echo "Testing return $RESULT (0 = success)"
+exit $RESULT
+#echo "Press enter to continue..."
+#DUMMY=""
+#read DUMMY
\ No newline at end of file
diff --git a/src/fe_stream_test/CMakeLists.txt b/src/fe_stream_test/CMakeLists.txt
index 88acd6879..5cd94aa8b 100644
--- a/src/fe_stream_test/CMakeLists.txt
+++ b/src/fe_stream_test/CMakeLists.txt
@@ -3,11 +3,12 @@ add_library(fe_stream_generator STATIC
         fe_stream_generator.hh
         str_split.cc
         str_split.hh)
-target_include_directories(fe_stream_generator PUBLIC ${CMAKE_CURRENT_SOURCE_DIR})
+target_include_directories(fe_stream_generator PUBLIC ${CMAKE_CURRENT_SOURCE_DIR} ${CMAKE_CURRENT_SOURCE_DIR}/../include)
 
 add_library(fe_generator_support STATIC
         fe_generator_support.hh
         fe_generator_support.cc)
+target_include_directories(fe_generator_support PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
 
 add_executable(fe_multi_stream_test
         fe_multi_stream_test.cc
@@ -43,6 +44,7 @@ target_link_libraries(fe_stream_check_nds
         nds2client::cxx)
 
 add_executable(fe_stream_check_edcu_nds fe_stream_check_edcu_nds.cc)
+target_include_directories(fe_stream_check_edcu_nds PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/../include)
 target_link_libraries(fe_stream_check_edcu_nds
         PRIVATE
         fe_stream_generator
diff --git a/src/fe_stream_test/fe_stream_check_edcu_nds.cc b/src/fe_stream_test/fe_stream_check_edcu_nds.cc
index 427f5b98f..984d96381 100644
--- a/src/fe_stream_test/fe_stream_check_edcu_nds.cc
+++ b/src/fe_stream_test/fe_stream_check_edcu_nds.cc
@@ -13,6 +13,8 @@
 #include <stdexcept>
 #include <vector>
 
+#include <daq_data_types.h>
+
 #include <nds.hh>
 
 #include "fe_stream_generator.hh"
@@ -46,6 +48,7 @@ usage( const char* progname )
     cout << "If -c is specified a the random number generator can be seeded\n";
     cout << "manually with the -s option.  If not specified it will be\n";
     cout << "randomly seeded.\n";
+    cout << "\nTo read live data, specify a start time of 0 and use stop as a duration\n";
 
     exit( 1 );
 }
@@ -150,8 +153,13 @@ is_generated_channel(const NDS::channel& chan)
 {
     static const std::string slow("--16");
     static const std::string spacer("--");
-    return ((chan.Name().size() > slow.size())
-    && (std::equal(slow.begin(), slow.end(), chan.Name().begin() + chan.Name().size() - slow.size())));
+
+    auto pos = chan.Name().rfind(slow);
+    if (pos == std::string::npos)
+    {
+        return false;
+    }
+    return pos == chan.Name().size() - slow.size();
 }
 
 std::vector< GeneratorPtr >
@@ -208,6 +216,16 @@ contains_only(It it1, It it2, T val)
     return true;
 }
 
+bool
+buffer_contains_only(const NDS::buffer& buf, float value)
+{
+    if (buf.DataType() == NDS::channel::DATA_TYPE_FLOAT32)
+    {
+        return contains_only(buf.cbegin<float>(), buf.cend<float>(), value);
+    }
+    return contains_only(buf.cbegin<std::int32_t>(), buf.cend<std::int32_t>(), static_cast<std::int32_t>(value));
+}
+
 template <typename It1, typename It2>
 bool
 blended_compare(It1 begin1, It1 end1, It1 begin2, It2 begin3)
@@ -244,19 +262,17 @@ blended_compare(It1 begin1, It1 end1, It1 begin2, It2 begin3)
     return true;
 }
 
+template<typename T>
 void
-test_channel(const NDS::buffer& buf, GeneratorPtr& gen)
+test_channel_by_type(const NDS::buffer& buf, GeneratorPtr& gen, T tag)
 {
     NDS::buffer::gps_second_type start = buf.Start();
     NDS::buffer::gps_second_type end = buf.Stop();
 
-    std::array<float, 16> buf1{};
-    std::array<float, 16> buf2{};
-
     for (NDS::buffer::gps_second_type cur = start; cur != end; ++cur)
     {
-        std::array<float, 16> buf1{};
-        std::array<float, 16> buf2{};
+        std::array<T, 16> buf1{};
+        std::array<T, 16> buf2{};
 
         char* ptr1 = reinterpret_cast<char*>(buf1.data());
         char* ptr2 = reinterpret_cast<char*>(buf2.data());
@@ -269,10 +285,10 @@ test_channel(const NDS::buffer& buf, GeneratorPtr& gen)
             ptr2 = gen->generate( cur, nano,  ptr2);
         }
 
-        const float* ptr3 = buf.cbegin<float>() + (16*(cur-start));
+        const T* ptr3 = buf.cbegin<T>() + (16*(cur-start));
         if (!blended_compare(buf1.begin(), buf1.end(), buf2.begin(), ptr3))
         {
-            std::cerr << "Unexpected data found\n";
+            std::cerr << "Unexpected data found on " << buf.Name() << "\n";
             std::cerr << "prev: ";
             for (int j = 0; j < buf1.size(); ++j)
             {
@@ -293,6 +309,41 @@ test_channel(const NDS::buffer& buf, GeneratorPtr& gen)
     }
 }
 
+void
+require_channel_type(daq_data_t req_type, GeneratorPtr gen)
+{
+    if (static_cast<int>(req_type) != gen->data_type())
+    {
+        throw std::runtime_error("Mismatch of channel and generator types");
+    }
+}
+
+void
+test_channel(const NDS::buffer& buf, GeneratorPtr& gen)
+{
+    switch (buf.DataType())
+    {
+        case NDS::channel::DATA_TYPE_FLOAT64:
+            require_channel_type(_64bit_double, gen);
+            test_channel_by_type<double>(buf, gen, 0.);
+            break;
+        case NDS::channel::DATA_TYPE_FLOAT32:
+            require_channel_type(_32bit_float, gen);
+            test_channel_by_type<float>(buf, gen, 0.f);
+            break;
+        case NDS::channel::DATA_TYPE_INT32:
+            require_channel_type(_32bit_integer, gen);
+            test_channel_by_type<std::int32_t>(buf, gen, 0);
+            break;
+        case NDS::channel::DATA_TYPE_INT16:
+            require_channel_type(_16bit_integer, gen);
+            test_channel_by_type<std::int16_t>(buf, gen, 0);
+            break;
+        default:
+            throw std::runtime_error("Unsupported channel type");
+    }
+}
+
 void
 test_channels( Config& cfg, NDS::parameters& params, std::vector< GeneratorPtr >& generators)
 {
@@ -354,32 +405,50 @@ test_channel_counts( NDS::parameters&             params,
     channels.push_back(chan_conn);
     channels.push_back(chan_nocon);
     channels.push_back(chan_cnt);
+    std::cout << "Channels:\n\t";
+    std::copy(channels.begin(), channels.end(), std::ostream_iterator<std::string>(std::cout, "\n\t"));
+    std::cout << "\n";
 
-    NDS::buffers_type bufs = NDS::fetch( params, gps_start, gps_stop, channels );
-    if (bufs.size() != 3)
-    {
-        std::cerr << "size is wrong " << bufs.size() << "\n";
-    }
-    std::cerr << "sample count = " << bufs[2].Samples() << "\n";
+    float expected_count = -1;
 
-    float expected_count = bufs[2].at<float>(0);
-    if (!contains_only(bufs[2].cbegin<float>(), bufs[2].cend<float>(), expected_count))
+    auto stream = NDS::iterate(params, NDS::request_period(gps_start, gps_stop), channels);
+    for (const auto& bufs:stream)
     {
-        std::cout << "The channel count changed during the test timespan, it was not always " << expected_count << "\n";
-        exit(1);
-    }
+        if (bufs->size() != 3)
+        {
+            std::cerr << "size is wrong " << bufs->size() << "\n";
+        }
+        std::cerr << "sample count = " << bufs->at(2).Samples() << "\n";
 
-    if ( ! contains_only( bufs[1].cbegin<float>(), bufs[1].cend<float>(), 0.0f ) )
-    {
-        std::cout << "The channel noconn count changed during the test timespan, it was not always 0.\n";
-        exit(1);
-    }
+        if (expected_count < 0.0)
+        {
+            if (bufs->at(2).DataType() == NDS::channel::DATA_TYPE_FLOAT32) {
+                expected_count = bufs->at(2).at<float>(0);
+            } else {
+                expected_count = static_cast<float>(bufs->at(2).at<std::int32_t>(0));
+            }
+            std::cerr << "Expected count == " << expected_count << "\n";
+        }
 
-    if (!contains_only(bufs[0].cbegin<float>(), bufs[0].cend<float>(), expected_count))
-    {
-        std::cout << "The connected count changed during the test timespan, it was not always " << expected_count << "\n";
-        exit(1);
+        if (!buffer_contains_only(bufs->at(2), expected_count))
+        {
+            std::cout << "The channel count changed during the test timespan, it was not always " << expected_count << "\n";
+            exit(1);
+        }
+
+        if ( ! buffer_contains_only( bufs->at(1), 0.0f ) )
+        {
+            std::cout << "The channel noconn count changed during the test timespan, it was not always 0.\n";
+            exit(1);
+        }
+
+        if (!buffer_contains_only(bufs->at(0), expected_count))
+        {
+            std::cout << "The connected count changed during the test timespan, it was not always " << expected_count << "\n";
+            exit(1);
+        }
     }
+
 }
 
 int
diff --git a/src/fe_stream_test/fe_stream_generator.cc b/src/fe_stream_test/fe_stream_generator.cc
index e0a22f12a..6f47eb06b 100644
--- a/src/fe_stream_test/fe_stream_generator.cc
+++ b/src/fe_stream_test/fe_stream_generator.cc
@@ -3,6 +3,25 @@
 //
 #include "fe_stream_generator.hh"
 
+bool
+is_data_type_valid(int data_type)
+{
+    switch (static_cast<daq_data_t>(data_type))
+    {
+        case _16bit_integer:
+        case _32bit_integer:
+        case _64bit_integer:
+        case _32bit_float:
+        case _64bit_double:
+        case _32bit_uint:
+            return true;
+        case _32bit_complex:
+        default:
+            break;
+    }
+    return false;
+}
+
 GeneratorPtr
 create_generator(const std::string& generator, const SimChannel& ch)
 {
@@ -30,7 +49,7 @@ create_generator(const std::string& channel_name)
     std::istringstream is (parts[parts.size()-1]);
     is >> rate;
   }
-  if (!(data_type == 2 || data_type == 4) || rate < 16)
+  if (!is_data_type_valid(data_type) || rate < 16)
     throw std::runtime_error("Invalid data type or rate found");
   std::string& base = parts[0];
   std::string& name = parts[1];
@@ -40,21 +59,21 @@ create_generator(const std::string& channel_name)
     std::istringstream is(parts[2]);
     int offset = 0;
     is >> offset;
-    if (data_type == 2)
-      return GeneratorPtr(new Generators::GPSSecondWithOffset<int>(SimChannel(base, data_type, rate, 0), offset));
-    else
-      return GeneratorPtr(new Generators::GPSSecondWithOffset<float>(SimChannel(base, data_type, rate, 0), offset));
-
+    return create_generic_generator<Generators::GPSSecondWithOffset>(data_type, SimChannel(base, data_type, rate, 0), offset);
   }
   else if (name == "gpssmd100koff1p" && arg_count == 1)
   {
     std::istringstream is(parts[2]);
     int offset = 0;
     is >> offset;
-    if (data_type == 2)
-      return GeneratorPtr(new Generators::GPSMod100kSecWithOffset<int>(SimChannel(base, data_type, rate, 0), offset));
-    else
-      return GeneratorPtr(new Generators::GPSMod100kSecWithOffset<float>(SimChannel(base, data_type, rate, 0), offset));
+    return create_generic_generator<Generators::GPSMod100kSecWithOffset>(data_type, SimChannel(base, data_type, rate, 0), offset);
+  }
+  else if (name == "gpssmd30koff1p" && arg_count == 1)
+  {
+      std::istringstream is(parts[2]);
+      int offset = 0;
+      is >> offset;
+      return create_generic_generator<Generators::GPSMod30kSecWithOffset>(data_type, SimChannel(base, data_type, rate, 0), offset);
   }
   else
   {
diff --git a/src/fe_stream_test/fe_stream_generator.hh b/src/fe_stream_test/fe_stream_generator.hh
index 1c8bf4453..4a5b29bed 100644
--- a/src/fe_stream_test/fe_stream_generator.hh
+++ b/src/fe_stream_test/fe_stream_generator.hh
@@ -10,8 +10,10 @@
 #include <iostream>
 #include <sstream>
 #include <string>
-
 #include <stdexcept>
+#include <utility>
+
+#include <daq_data_types.h>
 
 #include "str_split.hh"
 
@@ -55,8 +57,6 @@ public:
 class Generator
 {
 protected:
-    virtual int data_type() const = 0;
-    virtual int data_rate() const = 0;
     virtual const std::string& channel_base_name() const = 0;
 
     virtual std::string other_params() const { return ""; }
@@ -75,6 +75,9 @@ public:
 
     virtual void output_ini_entry(std::ostream& os) = 0;
     virtual void output_par_entry(std::ostream& os) = 0;
+
+    virtual int data_type() const = 0;
+    virtual int data_rate() const = 0;
 };
 
 typedef std::tr1::shared_ptr<Generator> GeneratorPtr;
@@ -84,8 +87,6 @@ namespace Generators {
     protected:
         SimChannel ch_;
 
-        virtual int data_type() const { return ch_.data_type(); };
-        virtual int data_rate() const { return ch_.data_rate(); };
         virtual const std::string& channel_base_name() const { return ch_.name(); };
     public:
         SimChannelGenerator(const SimChannel& ch):
@@ -120,6 +121,9 @@ namespace Generators {
             os << "datatype = " << data_type() << "\n";
             os << "datarate = " << data_rate() << "\n";
         }
+
+        virtual int data_type() const { return ch_.data_type(); };
+        virtual int data_rate() const { return ch_.data_rate(); };
     };
 
     
@@ -205,6 +209,36 @@ namespace Generators {
         }
     };
 
+    template <typename T>
+    class GPSMod30kSecWithOffset: public SimChannelGenerator
+    {
+        int offset_;
+    public:
+        GPSMod30kSecWithOffset(const SimChannel& ch, int offset):
+                SimChannelGenerator(ch), offset_(offset) {}
+
+        std::string generator_name() const { return "gpssmd30koff1p"; }
+
+        std::string other_params() const
+        {
+            std::ostringstream os;
+            os << "--" << offset_;
+            return os.str();
+        }
+
+        char* generate(int gps_sec, int gps_nano, char* out)
+        {
+            int rate = data_rate() / 16;
+            T *out_ = reinterpret_cast<T*>(out);
+            for (int i = 0; i < rate; ++i)
+            {
+                *out_ = static_cast<T>((gps_sec%30000) + offset_);
+                ++out_;
+            }
+            return reinterpret_cast<char*>(out_);
+        }
+    };
+
     template <typename T>
     class StaticValue: public SimChannelGenerator
     {
@@ -232,6 +266,32 @@ namespace Generators {
     };
 }
 
+template <template<class> class GenClass, typename... Args>
+GeneratorPtr
+create_generic_generator(int data_type, Args&&... args)
+{
+    switch (static_cast<daq_data_t>(data_type))
+    {
+        case _16bit_integer:
+            return GeneratorPtr(new GenClass<std::int16_t>(std::forward<Args>(args)...));
+        case _32bit_integer:
+            return GeneratorPtr(new GenClass<std::int32_t>(std::forward<Args>(args)...));
+        case _64bit_integer:
+            return GeneratorPtr(new GenClass<std::int64_t>(std::forward<Args>(args)...));
+        case _32bit_float:
+            return GeneratorPtr(new GenClass<float>(std::forward<Args>(args)...));
+        case _64bit_double:
+            return GeneratorPtr(new GenClass<double>(std::forward<Args>(args)...));
+        case _32bit_complex:
+            break;
+        case _32bit_uint:
+            return GeneratorPtr(new GenClass<std::uint32_t>(std::forward<Args>(args)...));
+        default:
+            break;
+    }
+    throw std::runtime_error("Unknown data type found, cannot create a generator");
+}
+
 GeneratorPtr
 create_generator(const std::string& generator, const SimChannel& ch);
 
diff --git a/src/include/daq_data_types.h b/src/include/daq_data_types.h
new file mode 100644
index 000000000..0ab16e5c1
--- /dev/null
+++ b/src/include/daq_data_types.h
@@ -0,0 +1,35 @@
+#ifndef DAQD_DATA_TYPES_H
+#define DAQD_DATA_TYPES_H
+
+/* numbering must be contiguous */
+typedef enum {
+    _undefined = 0,
+    _16bit_integer = 1,
+    _32bit_integer = 2,
+    _64bit_integer = 3,
+    _32bit_float = 4,
+    _64bit_double = 5,
+    _32bit_complex = 6,
+    _32bit_uint = 7
+} daq_data_t;
+
+inline static int
+data_type_size (short dtype) {
+    switch (dtype) {
+        case _16bit_integer: // 16 bit integer
+            return 2;
+        case _32bit_integer: // 32 bit integer
+        case _32bit_float: // 32 bit float
+        case _32bit_uint: // 32 bit unsigned integer
+            return 4;
+        case _64bit_integer: // 64 bit integer
+        case _64bit_double: // 64 bit double
+            return 8;
+        case _32bit_complex: // 32 bit complex
+            return 4*2;
+        default:
+            return _undefined;
+    }
+}
+
+#endif
\ No newline at end of file
diff --git a/src/nds/CMakeLists.txt b/src/nds/CMakeLists.txt
index d63027be4..d2f5646c9 100644
--- a/src/nds/CMakeLists.txt
+++ b/src/nds/CMakeLists.txt
@@ -50,6 +50,7 @@ target_include_directories(nds PRIVATE
     ${CMAKE_CURRENT_SOURCE_DIR}/client
     ${CMAKE_CURRENT_SOURCE_DIR}/framelib/src
     ${CMAKE_CURRENT_SOURCE_DIR}/framelib/src/zlib
+    ${CMAKE_CURRENT_SOURCE_DIR}/../include
 )
 target_link_libraries(nds PRIVATE ldastools::framecpp z bz2 dl nsl)
 
-- 
GitLab