From 16dd2b1107c29a00f73ad40b3bd1a6d820796fe9 Mon Sep 17 00:00:00 2001 From: Alexander Ivanov <alexander.ivanov@ligo.org> Date: Tue, 24 Sep 2013 19:15:04 +0000 Subject: [PATCH] Changes to make DataBrowser work with NDS. git-svn-id: https://redoubt.ligo-wa.caltech.edu/svn/advLigoRTS/trunk@3488 6dcd42c9-f523-4c6d-aada-af552506706e --- .../archive/reader/nds/NDSArchiveReader.java | 61 ++++++++- .../archive/reader/nds/RawSampleIterator.java | 96 +++++++++++--- .../csstudio/archive/reader/nds/TConvert.java | 14 ++ .../reader/nds/TrendSampleIterator.java | 125 ++++++++++++++++++ .../csstudio/utility/pv/nds/DaqdNet9_0.java | 8 +- .../org/csstudio/utility/pv/nds/NDS_PV.java | 24 ++-- .../src/org/csstudio/utility/pv/nds/Net.java | 3 + 7 files changed, 291 insertions(+), 40 deletions(-) create mode 100644 plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/TConvert.java create mode 100644 plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/TrendSampleIterator.java diff --git a/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/NDSArchiveReader.java b/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/NDSArchiveReader.java index 8ca29d7d3..c48611bfc 100644 --- a/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/NDSArchiveReader.java +++ b/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/NDSArchiveReader.java @@ -1,5 +1,8 @@ package org.csstudio.archive.reader.nds; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.Enumeration; import java.util.logging.Logger; @@ -9,9 +12,13 @@ import org.csstudio.archive.reader.ArchiveReader; import org.csstudio.archive.reader.UnknownChannelException; import org.csstudio.archive.reader.ValueIterator; import org.csstudio.data.values.ITimestamp; +import org.csstudio.data.values.TimestampFactory; +import org.csstudio.data.values.ValueFactory; +import org.csstudio.data.values.IValue.Quality; import org.csstudio.utility.pv.nds.*; public class NDSArchiveReader implements ArchiveReader { + static final int rate_hz = 16; static Logger LOG; final private String url; @@ -99,18 +106,60 @@ public class NDSArchiveReader implements ArchiveReader { @Override public ValueIterator getRawValues(int key, String name, ITimestamp start, ITimestamp end) throws UnknownChannelException, Exception { - LOG.info("NDsArchiveReader::getRawValues called on name=" + name + " start=" + start.seconds() - + " end=" + end.seconds()); - return new RawSampleIterator(this, name, start, end); + long gps_start = TConvert.gps(start.seconds()); + long gps_end = TConvert.gps(end.seconds()); + + LOG.info("NDsArchiveReader::getRawValues called on name=" + name + " start=" + gps_start + + " end=" + gps_end); + + // Make NDS request and fetch the data + // + final NDS_PV nds = new NDS_PV(""); + nds.connect(); + final Net net = nds.getNet(); + final Preferences preferences = nds.getPreferences(); + preferences.setAcquisitionMode("full"); + final ChannelSet cs = new ChannelSet(null, net); + preferences.updateChannelSet(cs); + if (!preferences.getChannelSet().addChannel(name.substring(6), rate_hz)) + throw new Exception(); + //final ChannelSet serverSet = cs.getServerSet(); + + if (!net.startNetWriter(preferences.getChannelSet(), gps_start, (int) (gps_end - gps_start))) + throw new Exception(); + + return new RawSampleIterator(nds, rate_hz); } @Override public ValueIterator getOptimizedValues(int key, String name, ITimestamp start, ITimestamp end, int count) throws UnknownChannelException, Exception { - LOG.info("NDsArchiveReader::getOptimizedValues called on name=" + name + " start=" + start.seconds() - + " end=" + end.seconds() + " count=" + count); - return getRawValues(key, name, start, end) ; + long gps_start = TConvert.gps(start.seconds()); + long gps_end = TConvert.gps(end.seconds()); + + LOG.info("NDsArchiveReader::getOptimizedValues called on name=" + name + " start=" + gps_start + + " end=" + gps_end + " count=" + count); + + long diff = gps_end - gps_start; + if (count < diff) { + final NDS_PV nds = new NDS_PV(""); + nds.connect(); + final Net net = nds.getNet(); + final Preferences preferences = nds.getPreferences(); + preferences.setAcquisitionMode("trend"); + final ChannelSet cs = new ChannelSet(null, net); + preferences.updateChannelSet(cs); + if (!preferences.getChannelSet().addChannel(name.substring(6))) + throw new Exception(); + //final ChannelSet serverSet = cs.getServerSet(); + + if (!net.startNetWriter(preferences.getChannelSet(), gps_start, (int) (gps_end - gps_start))) + throw new Exception(); + return new TrendSampleIterator(nds); + } else + return getRawValues(key, name, start, end) ; + //return null; } @Override diff --git a/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/RawSampleIterator.java b/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/RawSampleIterator.java index 04c9e1f19..5525d02fe 100644 --- a/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/RawSampleIterator.java +++ b/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/RawSampleIterator.java @@ -1,50 +1,108 @@ package org.csstudio.archive.reader.nds; +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.logging.Logger; + import org.csstudio.archive.reader.ValueIterator; +import org.csstudio.data.values.IMinMaxDoubleValue; import org.csstudio.data.values.INumericMetaData; import org.csstudio.data.values.ISeverity; import org.csstudio.data.values.ITimestamp; import org.csstudio.data.values.IValue; import org.csstudio.data.values.TimestampFactory; import org.csstudio.data.values.ValueFactory; +import org.csstudio.data.values.IValue.Quality; +import org.csstudio.utility.pv.nds.DataBlock; +import org.csstudio.utility.pv.nds.NDS_PV; +import org.csstudio.utility.pv.nds.Net; public class RawSampleIterator extends AbstractNDSValueIterator implements ValueIterator { - - long count; - ITimestamp s, e; + DataBlock block = null; + Net net = null; + int samples_left = 0; + static Logger LOG; + DataInputStream block_in; + int rate_hz = 0; + int total_samples = 0; + long block_time = 0; - public RawSampleIterator(NDSArchiveReader ndsArchiveReader, String name, - ITimestamp start, ITimestamp end) { - this.s = start; this.e = end; - count = e.seconds() - s.seconds(); + static { + LOG = Logger.getAnonymousLogger(); + } + + // This expect the request already sent and will be reading the blocks from the socket + public RawSampleIterator(NDS_PV nds, int rateHz) { + net = nds.getNet(); + rate_hz = rateHz; } @Override public boolean hasNext() { - if (count > 0) { - return true; - } else { - return false; + if (block == null || samples_left < 1) { + // Try get next + if (nextBlock()) { + net.disconnect(); + return false; + } } + return samples_left > 0; } @Override public IValue next() throws Exception { - //final ITimestamp now = TimestampFactory.createTimestamp(block.getTimestamp (), block.getNano()); - final ITimestamp now = TimestampFactory.createTimestamp(e.seconds()-count,0); + // See if we need to read the next block from the socket + if (block == null || samples_left < 1) { + // Try get next + nextBlock(); + } + if (samples_left < 1) { + net.disconnect(); + throw new Exception(); + } + + + int samples_sent = total_samples - samples_left; + long period_nsec = (long)1e9 / rate_hz; + //LOG.finest("NDS Raw Iterator =" + block.getTimestamp () + " dataLen=" + block.getWave().length); + + final ITimestamp now = TimestampFactory.createTimestamp(block_time + samples_sent/rate_hz, block.getNano() + samples_sent % rate_hz * period_nsec); final ISeverity OK = ValueFactory.createOKSeverity(); - final INumericMetaData meta = ValueFactory.createNumericMetaData(-5000, +5000, 0, 0, 0, 0, 1, "counts"); + final INumericMetaData meta = ValueFactory.createNumericMetaData(0, 0, 0, 0, 0, 0, 1, "counts"); + double a[] = {0.}; + try { + double f=block_in.readFloat (); + //System.out.println (f); + a[0]=f; + } catch (IOException e) { + e.printStackTrace(); + } + + IValue value = ValueFactory.createDoubleValue(now, OK, OK.toString(), meta, Quality.Original, a); - double []data = {123.446 * count}; - IValue value = ValueFactory.createDoubleValue(now, OK, OK.toString(), - meta, IValue.Quality.Original, data); - NDSArchiveReader.LOG.info("RawSampleIterator::next called " + now.seconds() + " val=" + data[0]); + //LOG.info("RawSampleIterator::next called " + now.seconds() + "." + now.nanoseconds() + " val=" + a [0]); + //LOG.info("samples_sent=" + samples_sent); - count--; + samples_left--; return value; } + // Read the next block from the socket + // Return true if no blocks were read (end of transmission) + private boolean nextBlock() { + block = net.getDataBlock (); + if (block != null && !block.getEot()) { + // See how many samples we got on this one and populate our counter + samples_left = block.getPeriod() * rate_hz; // :TODO: Of course this only works with one sample per second request (full res) + total_samples = block.getPeriod() * rate_hz; + block_in = new DataInputStream (new ByteArrayInputStream (block.getWave())); + block_time = TConvert.unix(block.getTimestamp ()); + } + return block == null; + } + @Override public void close() { // TODO Auto-generated method stub diff --git a/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/TConvert.java b/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/TConvert.java new file mode 100644 index 000000000..6146e9409 --- /dev/null +++ b/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/TConvert.java @@ -0,0 +1,14 @@ +package org.csstudio.archive.reader.nds; + +public class TConvert { + // This is to convert to the LIGO GPS seconds + // TODO: this needs adjustment and refinement + public static final long offs = 315963993; + public static long gps(long epoch_seconds) { + return epoch_seconds - offs; + } + public static long unix(long gps_seconds) { + return gps_seconds + offs; + } + +} diff --git a/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/TrendSampleIterator.java b/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/TrendSampleIterator.java new file mode 100644 index 000000000..eedbacfec --- /dev/null +++ b/plugins/org.csstudio.archive.reader.nds/src/org/csstudio/archive/reader/nds/TrendSampleIterator.java @@ -0,0 +1,125 @@ +package org.csstudio.archive.reader.nds; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.util.logging.Logger; + +import org.csstudio.archive.reader.ValueIterator; +import org.csstudio.data.values.IMinMaxDoubleValue; +import org.csstudio.data.values.INumericMetaData; +import org.csstudio.data.values.ISeverity; +import org.csstudio.data.values.ITimestamp; +import org.csstudio.data.values.IValue; +import org.csstudio.data.values.TimestampFactory; +import org.csstudio.data.values.ValueFactory; +import org.csstudio.data.values.IValue.Quality; +import org.csstudio.utility.pv.nds.DataBlock; +import org.csstudio.utility.pv.nds.NDS_PV; +import org.csstudio.utility.pv.nds.Net; + +public class TrendSampleIterator extends AbstractNDSValueIterator implements ValueIterator { + DataBlock block = null; + Net net = null; + int samples_left = 0; + static Logger LOG; + DataInputStream block_in_mean; + DataInputStream block_in_min; + DataInputStream block_in_max; + + //int rate_hz = 1; // Second trend comes in always at 1 Hz + int total_samples = 0; + + static { + LOG = Logger.getAnonymousLogger(); + } + + // This expect the request already sent and will be reading the blocks from the socket + public TrendSampleIterator(NDS_PV nds) { + net = nds.getNet(); + } + + @Override + public boolean hasNext() { + if (block == null || samples_left < 1) { + // Try get next + if (nextBlock()) { + net.disconnect(); + return false; + } + } + return samples_left > 0; + } + + @Override + public IValue next() throws Exception { + // See if we need to read the next block from the socket + if (block == null || samples_left < 1) { + // Try get next + nextBlock(); + } + if (samples_left < 1) { + net.disconnect(); + throw new Exception(); + } + + + int samples_sent = total_samples - samples_left; + //LOG.finest("NDS Raw Iterator =" + block.getTimestamp () + " dataLen=" + block.getWave().length); + + final ITimestamp now = TimestampFactory.createTimestamp(TConvert.unix(block.getTimestamp () + samples_sent), 0); + final ISeverity OK = ValueFactory.createOKSeverity(); + final INumericMetaData meta = ValueFactory.createNumericMetaData(0, 0, 0, 0, 0, 0, 1, "counts"); + + double mean[] = {0.}; + double min = 0, max = 0; + try { + mean[0] = block_in_mean.readDouble (); + min = block_in_min.readFloat (); + max = block_in_max.readFloat (); + } catch (IOException e) { + e.printStackTrace(); + } + + IValue value = ValueFactory.createMinMaxDoubleValue(now, OK, OK.toString(), meta, Quality.Original, mean, min, max); + + //LOG.info("TrendSampleIterator::next called " + now.seconds() + "." + now.nanoseconds() + " val=" + a [0]); + //LOG.info("samples_sent=" + samples_sent); + + samples_left--; + return value; + } + + // Read the next block from the socket + // Return true if no blocks were read (end of transmission) + private boolean nextBlock() { + block = net.getDataBlock (); + if (block != null && !block.getEot()) { + try { + // See how many samples we got on this one and populate our + // counter + samples_left = block.getPeriod(); + total_samples = block.getPeriod(); + block_in_mean = new DataInputStream(new ByteArrayInputStream( + block.getWave())); + + block_in_min = new DataInputStream(new ByteArrayInputStream( + block.getWave())); + block_in_min.skip(8 * total_samples); + block_in_max = new DataInputStream(new ByteArrayInputStream( + block.getWave())); + block_in_max.skip(12 * total_samples); + } catch (IOException e) { + e.printStackTrace(); + } + } + return block == null; + } + + @Override + public void close() { + // TODO Auto-generated method stub + + } + +} diff --git a/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/DaqdNet9_0.java b/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/DaqdNet9_0.java index 33b75489d..808a012e9 100644 --- a/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/DaqdNet9_0.java +++ b/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/DaqdNet9_0.java @@ -232,7 +232,7 @@ public class DaqdNet9_0 extends Net implements DataTypeConstants, Debug { else if (preferences.minuteTrendAcquisitionModeSelected ()) command += "trend 60 net-writer"; else - command += "fast-writer"; + command += "net-writer"; command += (gps > 0 ? (" " + Long.toString (gps)): "") + (period > 0 ? (" " + Integer.toString (period)): "") @@ -359,9 +359,9 @@ public class DaqdNet9_0 extends Net implements DataTypeConstants, Debug { String rs = ""; for (Enumeration e = chSet.elements (); e.hasMoreElements ();) { Channel ch = (Channel) e.nextElement (); - rs += " \"" + ch.getName () + (trend? ".min": "") + "\" " + - (trend ? (" \"" + ch.getName () + ".max" + "\" " + - " \"" + ch.getName () + ".rms" + "\" " + rs += " \"" + ch.getName () + (trend? ".mean": "") + "\" " + + (trend ? (" \"" + ch.getName () + ".min" + "\" " + + " \"" + ch.getName () + ".max" + "\" " ): "") + (!trend && ch.getRate () != ((Channel) chSet.getServerSet ().channelObject (ch.getName ())).getRate ()? Integer.toString (ch.getRate ()): ""); diff --git a/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/NDS_PV.java b/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/NDS_PV.java index d390b1c8b..1d2c1e532 100644 --- a/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/NDS_PV.java +++ b/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/NDS_PV.java @@ -116,7 +116,7 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants /** * Class field map that maps NDS PV name into theThread and reference count. - * Setting the value object to null will stop the running DAQ thrad. + * Setting the value object to null will stop the running DAQ thread. */ private static Map<String,ThreadRef> m; @@ -220,14 +220,14 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants // Load channel from the server preferences.updateChannelSet(new ChannelSet(null, net)); - if (!preferences.getChannelSet().addChannel(name, 16384)) + if (!preferences.getChannelSet().addChannel(name, 16)) throw new Exception(); - if (!net.startNetWriter(preferences.getChannelSet(), 0, 0)) - throw new Exception(); + //if (!net.startNetWriter(preferences.getChannelSet(), 0, 0)) + //throw new Exception(); - final Thread update_thread = new Thread(this, getName()); - update_thread.start(); + //final Thread update_thread = new Thread(this, getName()); + //update_thread.start(); // See how many elements there are in the map already, calculate the index int ccount = 0; @@ -240,7 +240,7 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants ccount++; } - m.put(name, new ThreadRef(1, update_thread, this, ccount)); + //m.put(name, new ThreadRef(1, update_thread, this, ccount)); } } LOG.log(Level.FINE,"name=" + getName() + " this=" + this + " NDS PV references =" + m.get(name).count()); @@ -468,8 +468,9 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants } ccount++; + final int r = 1; // Assign data into the queue - double cval[] = new double[1 + 1024 * ccount]; // TODO remove hard coded rate + double cval[] = new double[1 + r * ccount]; // TODO remove hard coded rate cval[0] = now.toDouble(); // First element will contain the time stamp it1 = m.entrySet().iterator(); while (it1.hasNext()) { @@ -477,7 +478,7 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants if (p.getKey().equals("gps")) continue; double [] val = ValueUtil.getDoubleArray(p.getValue().nds_pv.value); int idx = p.getValue().connectIndex; - System.arraycopy(val, 0, cval, 1 + idx * 1024, 1024); + System.arraycopy(val, 0, cval, 1 + idx * r, r); } IValue ival = ValueFactory.createDoubleValue(now, OK, OK.toString(), meta, Quality.Original, @@ -517,8 +518,9 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants private void update() { //value = TextUtil.parseValueFromString("1.234", null); - for (PVListener listener : listeners) - listener.pvValueUpdate(this); + // Commented on Aug 28, 2014 + // for (PVListener listener : listeners) + //listener.pvValueUpdate(this); } } diff --git a/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/Net.java b/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/Net.java index 8e77bbf2e..5879efd73 100644 --- a/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/Net.java +++ b/plugins/org.csstudio.utility.pv.nds/src/org/csstudio/utility/pv/nds/Net.java @@ -88,6 +88,9 @@ public abstract class Net implements Debug { return true; } + public boolean is_connected() { + return clientSocket != null; + } /** * Get DAQD net protocol version and revision string in a format like "8.1" */ -- GitLab