Commit 47516914 authored by Patrick Godwin's avatar Patrick Godwin
Browse files

removed unused io modules: sqlite, hdf5, http

parent 4c091e34
Pipeline #213664 passed with stages
in 2 minutes and 13 seconds
This diff is collapsed.
#!/usr/bin/env python
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
__description__ = "a module for HTTP I/O utilities"
#-------------------------------------------------
### imports
import logging
from multiprocessing import Pool
import os
import shutil
from six.moves import urllib
from .. import utils
#-------------------------------------------------
### functions
def retrieve_timeseries(rootdir, jobs, routes, job_tag, num_threads=16):
data = {route: {job: [[], []] for job in jobs} for route in routes}
for route in routes:
pool = Pool(num_threads)
mapargs = [(job, job_tag, route, rootdir) for job in jobs]
result = pool.map(retrieve_timeseries_by_job, mapargs)
for sngl_data in result:
data[route].update(sngl_data)
return data
@utils.unpack
def retrieve_timeseries_by_job(job, job_tag, route, basedir):
with open(os.path.join(job_tag, "%s_registry.txt" % job)) as f:
url = f.readline().strip()
data = get_url(url, route)
time, data = data[0], data[1]
return {job: [numpy.array(time), numpy.array(data)]}
def get_url(url, d):
"""!
A function to pull data from @param url where @param d specifies a
specific route. FIXME it assumes that the routes end in .txt
"""
f = "%s%s.txt" % (url, d)
try:
jobdata = urllib.request.urlopen(f).read().decode('utf-8').split("\n")
except urllib.error.HTTPError as e:
logging.error("%s : %s" % (f, str(e)))
return
except urllib.error.URLError as e:
logging.error("%s : %s" % (f, str(e)))
return
data = []
for line in jobdata:
if line:
data.append([float(x) for x in line.split()])
data = numpy.array(data)
out = []
if data.shape != (0,):
for i in range(data.shape[1]):
out.append(data[:,i])
return out
#!/usr/bin/env python
__author__ = "Patrick Godwin (patrick.godwin@ligo.org)"
__description__ = "a module for sqlite I/O utilities"
#-------------------------------------------------
### imports
import os
import sqlite3
#-------------------------------------------------
### functions
def store_timeseries(connection, routes, time, data, table):
### format query
columns = ','.join(routes)
vals = ','.join(['?' for ii in range(len(routes)+1)])
sql_insert = " INSERT INTO {table}(timestamp,{columns}) VALUES({vals})".format(
table=table,
columns=columns,
vals=vals,
)
### insert rows
c = connection.cursor()
for row in zip(time, *[data[route] for route in routes]):
c.execute(sql_insert, row)
### commit changes / close connection
### FIXME: really need to handle errors properly
try:
connection.commit()
except sqlite3.OperationalError:
pass
def retrieve_timeseries(connection, routes, start, end, table):
### format query
columns = ','.join(routes)
sql_query = " SELECT timestamp,{columns} FROM {table} WHERE timestamp BETWEEN {start} AND {end} ORDER BY timestamp;".format(
columns=columns,
table=table,
start=start,
end=end,
)
c = connection.cursor()
rows = c.execute(sql_query).fetchall()
time, data = zip(*rows)
return time, data
def create_table(connection, routes, table):
c = connection.cursor()
columns = ''.join([',{} real'.format(route) for route in routes])
table = " CREATE TABLE IF NOT EXISTS {table} (timestamp real PRIMARY KEY{columns});".format(
table=table,
columns=columns,
)
c.execute(table)
### commit changes / close connection
connection.commit()
def create_client(database_name, database_path, extension='sqlite'):
sqlite_path = os.path.join(database_path, '.'.join([database_name, extension]))
return sqlite3.connect(sqlite_path)
......@@ -22,7 +22,7 @@ import numpy
import yaml
from . import transforms, utils
from .io import hdf5, influx
from .io import influx
#-------------------------------------------------
### bottle configuration/templates
......@@ -510,8 +510,6 @@ def config_to_consumer(config):
backend = config['backend']
if backend == 'influxdb':
return influx.Consumer(**config)
elif backend == 'hdf5':
return hdf5.Consumer(**config)
else:
raise NotImplementedError
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment