Skip to content
Snippets Groups Projects
Commit 16dd2b11 authored by Alexander Ivanov's avatar Alexander Ivanov
Browse files

Changes to make DataBrowser work with NDS.

git-svn-id: https://redoubt.ligo-wa.caltech.edu/svn/advLigoRTS/trunk@3488 6dcd42c9-f523-4c6d-aada-af552506706e
parent 5adc77e1
No related branches found
No related tags found
No related merge requests found
package org.csstudio.archive.reader.nds;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.logging.Logger;
......@@ -9,9 +12,13 @@ import org.csstudio.archive.reader.ArchiveReader;
import org.csstudio.archive.reader.UnknownChannelException;
import org.csstudio.archive.reader.ValueIterator;
import org.csstudio.data.values.ITimestamp;
import org.csstudio.data.values.TimestampFactory;
import org.csstudio.data.values.ValueFactory;
import org.csstudio.data.values.IValue.Quality;
import org.csstudio.utility.pv.nds.*;
public class NDSArchiveReader implements ArchiveReader {
static final int rate_hz = 16;
static Logger LOG;
final private String url;
......@@ -99,18 +106,60 @@ public class NDSArchiveReader implements ArchiveReader {
@Override
public ValueIterator getRawValues(int key, String name, ITimestamp start,
ITimestamp end) throws UnknownChannelException, Exception {
LOG.info("NDsArchiveReader::getRawValues called on name=" + name + " start=" + start.seconds()
+ " end=" + end.seconds());
return new RawSampleIterator(this, name, start, end);
long gps_start = TConvert.gps(start.seconds());
long gps_end = TConvert.gps(end.seconds());
LOG.info("NDsArchiveReader::getRawValues called on name=" + name + " start=" + gps_start
+ " end=" + gps_end);
// Make NDS request and fetch the data
//
final NDS_PV nds = new NDS_PV("");
nds.connect();
final Net net = nds.getNet();
final Preferences preferences = nds.getPreferences();
preferences.setAcquisitionMode("full");
final ChannelSet cs = new ChannelSet(null, net);
preferences.updateChannelSet(cs);
if (!preferences.getChannelSet().addChannel(name.substring(6), rate_hz))
throw new Exception();
//final ChannelSet serverSet = cs.getServerSet();
if (!net.startNetWriter(preferences.getChannelSet(), gps_start, (int) (gps_end - gps_start)))
throw new Exception();
return new RawSampleIterator(nds, rate_hz);
}
@Override
public ValueIterator getOptimizedValues(int key, String name,
ITimestamp start, ITimestamp end, int count)
throws UnknownChannelException, Exception {
LOG.info("NDsArchiveReader::getOptimizedValues called on name=" + name + " start=" + start.seconds()
+ " end=" + end.seconds() + " count=" + count);
return getRawValues(key, name, start, end) ;
long gps_start = TConvert.gps(start.seconds());
long gps_end = TConvert.gps(end.seconds());
LOG.info("NDsArchiveReader::getOptimizedValues called on name=" + name + " start=" + gps_start
+ " end=" + gps_end + " count=" + count);
long diff = gps_end - gps_start;
if (count < diff) {
final NDS_PV nds = new NDS_PV("");
nds.connect();
final Net net = nds.getNet();
final Preferences preferences = nds.getPreferences();
preferences.setAcquisitionMode("trend");
final ChannelSet cs = new ChannelSet(null, net);
preferences.updateChannelSet(cs);
if (!preferences.getChannelSet().addChannel(name.substring(6)))
throw new Exception();
//final ChannelSet serverSet = cs.getServerSet();
if (!net.startNetWriter(preferences.getChannelSet(), gps_start, (int) (gps_end - gps_start)))
throw new Exception();
return new TrendSampleIterator(nds);
} else
return getRawValues(key, name, start, end) ;
//return null;
}
@Override
......
package org.csstudio.archive.reader.nds;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.logging.Logger;
import org.csstudio.archive.reader.ValueIterator;
import org.csstudio.data.values.IMinMaxDoubleValue;
import org.csstudio.data.values.INumericMetaData;
import org.csstudio.data.values.ISeverity;
import org.csstudio.data.values.ITimestamp;
import org.csstudio.data.values.IValue;
import org.csstudio.data.values.TimestampFactory;
import org.csstudio.data.values.ValueFactory;
import org.csstudio.data.values.IValue.Quality;
import org.csstudio.utility.pv.nds.DataBlock;
import org.csstudio.utility.pv.nds.NDS_PV;
import org.csstudio.utility.pv.nds.Net;
public class RawSampleIterator extends AbstractNDSValueIterator implements ValueIterator {
long count;
ITimestamp s, e;
DataBlock block = null;
Net net = null;
int samples_left = 0;
static Logger LOG;
DataInputStream block_in;
int rate_hz = 0;
int total_samples = 0;
long block_time = 0;
public RawSampleIterator(NDSArchiveReader ndsArchiveReader, String name,
ITimestamp start, ITimestamp end) {
this.s = start; this.e = end;
count = e.seconds() - s.seconds();
static {
LOG = Logger.getAnonymousLogger();
}
// This expect the request already sent and will be reading the blocks from the socket
public RawSampleIterator(NDS_PV nds, int rateHz) {
net = nds.getNet();
rate_hz = rateHz;
}
@Override
public boolean hasNext() {
if (count > 0) {
return true;
} else {
return false;
if (block == null || samples_left < 1) {
// Try get next
if (nextBlock()) {
net.disconnect();
return false;
}
}
return samples_left > 0;
}
@Override
public IValue next() throws Exception {
//final ITimestamp now = TimestampFactory.createTimestamp(block.getTimestamp (), block.getNano());
final ITimestamp now = TimestampFactory.createTimestamp(e.seconds()-count,0);
// See if we need to read the next block from the socket
if (block == null || samples_left < 1) {
// Try get next
nextBlock();
}
if (samples_left < 1) {
net.disconnect();
throw new Exception();
}
int samples_sent = total_samples - samples_left;
long period_nsec = (long)1e9 / rate_hz;
//LOG.finest("NDS Raw Iterator =" + block.getTimestamp () + " dataLen=" + block.getWave().length);
final ITimestamp now = TimestampFactory.createTimestamp(block_time + samples_sent/rate_hz, block.getNano() + samples_sent % rate_hz * period_nsec);
final ISeverity OK = ValueFactory.createOKSeverity();
final INumericMetaData meta = ValueFactory.createNumericMetaData(-5000, +5000, 0, 0, 0, 0, 1, "counts");
final INumericMetaData meta = ValueFactory.createNumericMetaData(0, 0, 0, 0, 0, 0, 1, "counts");
double a[] = {0.};
try {
double f=block_in.readFloat ();
//System.out.println (f);
a[0]=f;
} catch (IOException e) {
e.printStackTrace();
}
IValue value = ValueFactory.createDoubleValue(now, OK, OK.toString(), meta, Quality.Original, a);
double []data = {123.446 * count};
IValue value = ValueFactory.createDoubleValue(now, OK, OK.toString(),
meta, IValue.Quality.Original, data);
NDSArchiveReader.LOG.info("RawSampleIterator::next called " + now.seconds() + " val=" + data[0]);
//LOG.info("RawSampleIterator::next called " + now.seconds() + "." + now.nanoseconds() + " val=" + a [0]);
//LOG.info("samples_sent=" + samples_sent);
count--;
samples_left--;
return value;
}
// Read the next block from the socket
// Return true if no blocks were read (end of transmission)
private boolean nextBlock() {
block = net.getDataBlock ();
if (block != null && !block.getEot()) {
// See how many samples we got on this one and populate our counter
samples_left = block.getPeriod() * rate_hz; // :TODO: Of course this only works with one sample per second request (full res)
total_samples = block.getPeriod() * rate_hz;
block_in = new DataInputStream (new ByteArrayInputStream (block.getWave()));
block_time = TConvert.unix(block.getTimestamp ());
}
return block == null;
}
@Override
public void close() {
// TODO Auto-generated method stub
......
package org.csstudio.archive.reader.nds;
public class TConvert {
// This is to convert to the LIGO GPS seconds
// TODO: this needs adjustment and refinement
public static final long offs = 315963993;
public static long gps(long epoch_seconds) {
return epoch_seconds - offs;
}
public static long unix(long gps_seconds) {
return gps_seconds + offs;
}
}
package org.csstudio.archive.reader.nds;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.logging.Logger;
import org.csstudio.archive.reader.ValueIterator;
import org.csstudio.data.values.IMinMaxDoubleValue;
import org.csstudio.data.values.INumericMetaData;
import org.csstudio.data.values.ISeverity;
import org.csstudio.data.values.ITimestamp;
import org.csstudio.data.values.IValue;
import org.csstudio.data.values.TimestampFactory;
import org.csstudio.data.values.ValueFactory;
import org.csstudio.data.values.IValue.Quality;
import org.csstudio.utility.pv.nds.DataBlock;
import org.csstudio.utility.pv.nds.NDS_PV;
import org.csstudio.utility.pv.nds.Net;
public class TrendSampleIterator extends AbstractNDSValueIterator implements ValueIterator {
DataBlock block = null;
Net net = null;
int samples_left = 0;
static Logger LOG;
DataInputStream block_in_mean;
DataInputStream block_in_min;
DataInputStream block_in_max;
//int rate_hz = 1; // Second trend comes in always at 1 Hz
int total_samples = 0;
static {
LOG = Logger.getAnonymousLogger();
}
// This expect the request already sent and will be reading the blocks from the socket
public TrendSampleIterator(NDS_PV nds) {
net = nds.getNet();
}
@Override
public boolean hasNext() {
if (block == null || samples_left < 1) {
// Try get next
if (nextBlock()) {
net.disconnect();
return false;
}
}
return samples_left > 0;
}
@Override
public IValue next() throws Exception {
// See if we need to read the next block from the socket
if (block == null || samples_left < 1) {
// Try get next
nextBlock();
}
if (samples_left < 1) {
net.disconnect();
throw new Exception();
}
int samples_sent = total_samples - samples_left;
//LOG.finest("NDS Raw Iterator =" + block.getTimestamp () + " dataLen=" + block.getWave().length);
final ITimestamp now = TimestampFactory.createTimestamp(TConvert.unix(block.getTimestamp () + samples_sent), 0);
final ISeverity OK = ValueFactory.createOKSeverity();
final INumericMetaData meta = ValueFactory.createNumericMetaData(0, 0, 0, 0, 0, 0, 1, "counts");
double mean[] = {0.};
double min = 0, max = 0;
try {
mean[0] = block_in_mean.readDouble ();
min = block_in_min.readFloat ();
max = block_in_max.readFloat ();
} catch (IOException e) {
e.printStackTrace();
}
IValue value = ValueFactory.createMinMaxDoubleValue(now, OK, OK.toString(), meta, Quality.Original, mean, min, max);
//LOG.info("TrendSampleIterator::next called " + now.seconds() + "." + now.nanoseconds() + " val=" + a [0]);
//LOG.info("samples_sent=" + samples_sent);
samples_left--;
return value;
}
// Read the next block from the socket
// Return true if no blocks were read (end of transmission)
private boolean nextBlock() {
block = net.getDataBlock ();
if (block != null && !block.getEot()) {
try {
// See how many samples we got on this one and populate our
// counter
samples_left = block.getPeriod();
total_samples = block.getPeriod();
block_in_mean = new DataInputStream(new ByteArrayInputStream(
block.getWave()));
block_in_min = new DataInputStream(new ByteArrayInputStream(
block.getWave()));
block_in_min.skip(8 * total_samples);
block_in_max = new DataInputStream(new ByteArrayInputStream(
block.getWave()));
block_in_max.skip(12 * total_samples);
} catch (IOException e) {
e.printStackTrace();
}
}
return block == null;
}
@Override
public void close() {
// TODO Auto-generated method stub
}
}
......@@ -232,7 +232,7 @@ public class DaqdNet9_0 extends Net implements DataTypeConstants, Debug {
else if (preferences.minuteTrendAcquisitionModeSelected ())
command += "trend 60 net-writer";
else
command += "fast-writer";
command += "net-writer";
command += (gps > 0 ? (" " + Long.toString (gps)): "")
+ (period > 0 ? (" " + Integer.toString (period)): "")
......@@ -359,9 +359,9 @@ public class DaqdNet9_0 extends Net implements DataTypeConstants, Debug {
String rs = "";
for (Enumeration e = chSet.elements (); e.hasMoreElements ();) {
Channel ch = (Channel) e.nextElement ();
rs += " \"" + ch.getName () + (trend? ".min": "") + "\" " +
(trend ? (" \"" + ch.getName () + ".max" + "\" " +
" \"" + ch.getName () + ".rms" + "\" "
rs += " \"" + ch.getName () + (trend? ".mean": "") + "\" " +
(trend ? (" \"" + ch.getName () + ".min" + "\" " +
" \"" + ch.getName () + ".max" + "\" "
): "") +
(!trend && ch.getRate () != ((Channel) chSet.getServerSet ().channelObject (ch.getName ())).getRate ()?
Integer.toString (ch.getRate ()): "");
......
......@@ -116,7 +116,7 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants
/**
* Class field map that maps NDS PV name into theThread and reference count.
* Setting the value object to null will stop the running DAQ thrad.
* Setting the value object to null will stop the running DAQ thread.
*/
private static Map<String,ThreadRef> m;
......@@ -220,14 +220,14 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants
// Load channel from the server
preferences.updateChannelSet(new ChannelSet(null, net));
if (!preferences.getChannelSet().addChannel(name, 16384))
if (!preferences.getChannelSet().addChannel(name, 16))
throw new Exception();
if (!net.startNetWriter(preferences.getChannelSet(), 0, 0))
throw new Exception();
//if (!net.startNetWriter(preferences.getChannelSet(), 0, 0))
//throw new Exception();
final Thread update_thread = new Thread(this, getName());
update_thread.start();
//final Thread update_thread = new Thread(this, getName());
//update_thread.start();
// See how many elements there are in the map already, calculate the index
int ccount = 0;
......@@ -240,7 +240,7 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants
ccount++;
}
m.put(name, new ThreadRef(1, update_thread, this, ccount));
//m.put(name, new ThreadRef(1, update_thread, this, ccount));
}
}
LOG.log(Level.FINE,"name=" + getName() + " this=" + this + " NDS PV references =" + m.get(name).count());
......@@ -468,8 +468,9 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants
}
ccount++;
final int r = 1;
// Assign data into the queue
double cval[] = new double[1 + 1024 * ccount]; // TODO remove hard coded rate
double cval[] = new double[1 + r * ccount]; // TODO remove hard coded rate
cval[0] = now.toDouble(); // First element will contain the time stamp
it1 = m.entrySet().iterator();
while (it1.hasNext()) {
......@@ -477,7 +478,7 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants
if (p.getKey().equals("gps")) continue;
double [] val = ValueUtil.getDoubleArray(p.getValue().nds_pv.value);
int idx = p.getValue().connectIndex;
System.arraycopy(val, 0, cval, 1 + idx * 1024, 1024);
System.arraycopy(val, 0, cval, 1 + idx * r, r);
}
IValue ival = ValueFactory.createDoubleValue(now,
OK, OK.toString(), meta, Quality.Original,
......@@ -517,8 +518,9 @@ public class NDS_PV implements PV, Runnable, Debug, Defaults, DataTypeConstants
private void update() {
//value = TextUtil.parseValueFromString("1.234", null);
for (PVListener listener : listeners)
listener.pvValueUpdate(this);
// Commented on Aug 28, 2014
// for (PVListener listener : listeners)
//listener.pvValueUpdate(this);
}
}
......@@ -88,6 +88,9 @@ public abstract class Net implements Debug {
return true;
}
public boolean is_connected() {
return clientSocket != null;
}
/**
* Get DAQD net protocol version and revision string in a format like "8.1"
*/
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment