Commit dda0a414 authored by James Clark's avatar James Clark
Browse files

incorporate lfn2pfn translator

parent f62bdf51
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# Authors:
# - James Alexander Clark, <james.clark@ligo.org>, 2018-2019
"""Call the gwrucio LFN2PFN algorithm to determine the physical filename
(PFN) for a given logical filename and scope.
We assume the scope is simply the science or engineering run in which the data
was taken. This script has three modes of operation:
1. Return the expected PFN for a given LFN.
2. Assert that the algorithmic PFN for an LFN matches a given PFN
3. Demonstrate the algorithm with some hard-coded examples.
"""
import os
import argparse
from ligo_rucio import lfn2pfn
def parse_cmdline():
"""Parse command line"""
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("frame_name", type=str, default=None, nargs='?',
help="""Name of the frame file (LFN)""")
parser.add_argument("--obs-run", type=str, default="SCOPE",
help="""Observing epoch (rucio scope). E.g., "O2" """)
parser.add_argument("--pfn", type=str, default=None,
help="""Full path to frame for verification""")
parser.add_argument("--prefix", type=str, default="/hdfs/frames",
help="""Frame directory prefix for a given site""")
cmdline_args = parser.parse_args()
return cmdline_args
## Input
ARGS = parse_cmdline()
## Demonstration Mode
if ARGS.frame_name is None:
FNS = {
"postO1/hoft/H1/H-H1_HOFT_C00-11631/H-H1_HOFT_C00-1163149312-4096.gwf":
("postO1", "H-H1_HOFT_C00-1163149312-4096.gwf"),
"postO1/hoft/L1/L-L1_HOFT_C00-11585/L-L1_HOFT_C00-1158533120-4096.gwf":
("postO1", "L-L1_HOFT_C00-1158533120-4096.gwf"),
"O2/hoft_C01/H1/H-H1_HOFT_C01-11880/H-H1_HOFT_C01-1188003840-4096.gwf":
("O2", "H-H1_HOFT_C01-1188003840-4096.gwf"),
"O1/hoft_C01_4kHz/L1/L-L1_HOFT_C01_4kHz-11372/L-L1_HOFT_C01_4kHz-1137250304-4096.gwf":
("O1", "L-L1_HOFT_C01_4kHz-1137250304-4096.gwf"),
"AdVirgo/HrecOnline/V-V1Online-11922/V-V1Online-1192294000-2000.gwf":
("AdVirgo", "V-V1Online-1192294000-2000.gwf"),
"O2/V1O2Repro1A/V-V1O2Repro1A-11879/V-V1O2Repro1A-1187990000-5000.gwf":
("O2", "V-V1O2Repro1A-1187990000-5000.gwf"),
"O1/raw/H1/H-H1_R-11266/H-H1_R-1126631040-64.gwf":
("O1", "H-H1_R-1126631040-64.gwf")
}
print "-----------------------------------------------------------------"
for pfn in FNS:
algorithmic_pfn = lfn2pfn.ligo_lab(*FNS[pfn], rse=None, rse_attrs=None, proto_attrs=None)
assert algorithmic_pfn == pfn, "{0}:{1} -> {2} (should be {3})".format(
FNS[pfn][0], FNS[pfn][1],
os.path.join(ARGS.prefix, algorithmic_pfn),
os.path.join(ARGS.prefix, pfn))
print "pfn:{}".format(os.path.join(ARGS.prefix, algorithmic_pfn))
print "scope:{}".format(FNS[pfn][0])
print "lfn:{}".format(FNS[pfn][1])
print "did:{0}:{1}".format(FNS[pfn][0], FNS[pfn][1])
print "-----------------------------------------------------------------"
## Return PFN
elif ARGS.frame_name is not None and ARGS.pfn is None:
PFN = lfn2pfn.ligo_lab(ARGS.obs_run, ARGS.frame_name, None, None, None)
print "------------------- LIGO LFN2PFN Algorithm ----------------------"
print "Input:"
print " LFN: {}".format(ARGS.frame_name)
print " Scope: {}".format(ARGS.obs_run)
print " DID: {0}:{1}".format(ARGS.obs_run, ARGS.frame_name)
print " Frames prefix: {}".format(ARGS.prefix)
print "Result:"
print " PFN: {}".format(os.path.join(ARGS.prefix, PFN))
print "-----------------------------------------------------------------"
## Assert algorithmic PFN == supplied PFN
elif ARGS.frame_name is not None and ARGS.pfn is not None:
PFN = lfn2pfn.ligo_lab(ARGS.obs_run, ARGS.frame_name, None, None, None)
print "------------------- LIGO LFN2PFN Algorithm ----------------------"
print "Input:"
print " PFN: {}".format(ARGS.prefix)
print " LFN: {}".format(ARGS.frame_name)
print " Scope: {}".format(ARGS.obs_run)
print " DID: {0}:{1}".format(ARGS.obs_run, ARGS.frame_name)
print " Frames prefix: {}".format(ARGS.prefix)
print "Result:"
print " PFN: {}".format(os.path.join(ARGS.prefix, PFN))
print "-----------------------------------------------------------------"
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# Authors:
# - Brian Bockelman, <bbockelm@cse.unl.edu>, 2017-2018
# - James Alexander Clark, <james.clark@ligo.org>, 2018-2019
"""
lfn2pfn.py
Default LFN-to-path algorithms for LIGO
"""
import re
#from rucio.rse.protocols.protocol import RSEDeterministicTranslation
_GWF_RE = re.compile(r'([A-Z]+)-([A-Za-z0-9_]+)-([0-9]{5,5})(.*)')
_DATASET_RE = re.compile(r'([A-Z0-9]+)_([A-Z0-9]+)_([A-Za-z0-9_]+)')
_TESTS_RE = re.compile(r'^[A-Z]+\.[a-z0-9]+$')
def ligo_lab(scope, name, rse, rse_attrs, proto_attrs):
"""
Map the GWF files according to the Caltech schema.
ER8:H-H1_HOFT_C02-1126256640-4096 ->
ER8/hoft_C02/H1/H-H1_HOFT_C02-11262/H-H1_HOFT_C02-1126256640-4096
"""
# Prevents unused argument warnings in pylint
del rse
del rse_attrs
del proto_attrs
# Exception for automatix test data. E.g., test.61f182e47315405ebc029599672199f2
match = _TESTS_RE.match(name)
if match:
return "%s/%s" % (scope, name)
match = _GWF_RE.match(name)
if not match:
raise ValueError("Invalid LIGO filename")
detector, dataset, gps_prefix, _ = match.groups()
dir_hash = "%s-%s-%s" % (detector, dataset, gps_prefix)
# Virgo
# In O1: all Virgo data went to /archive/frames/AdVirgo
# In O2:
# - V1Online went to /archive/frames/AdVirgo
# - raw V1O2Repro1A and V1O2Repro2A lived in /archive/frames/O2
# In O3: raw, V1Online live in /archive/frames/O3
if dataset == 'V1Online' and scope != 'O3':
detector_pretty = 'AdVirgo'
dataset_pretty = 'HrecOnline'
return '%s/%s/%s/%s' % (detector_pretty, dataset_pretty, dir_hash, name)
elif detector == 'V' and dataset == 'raw' and scope in ['O2', 'O3']:
detector_pretty = detector[0] + '1'
return "%s/%s/%s/%s/%s" % (scope, dataset, detector_pretty, dir_hash, name)
elif dataset == 'V1Online' and scope == 'O3':
return "%s/%s/%s/%s" % (scope, dataset, dir_hash, name)
elif detector == 'V' and dataset == 'raw':
detector_pretty = 'AdVirgo'
return "%s/%s/%s/%s" % (detector_pretty, dataset, dir_hash, name)
elif dataset in ['V1O2Repro1A', 'V1O2Repro2A']:
return '%s/%s/%s/%s' % (scope, dataset, dir_hash, name)
# LIGO
detector_pretty = detector[0] + '1'
match = _DATASET_RE.match(dataset)
if match:
_, kind, calib = match.groups()
if calib == 'C00':
dataset_pretty = kind.lower()
else:
dataset_pretty = '%s_%s'%(kind.lower(), calib)
else:
dataset_pretty = dataset
# LIGO Exceptions
if dataset_pretty in ['H1_R', 'L1_R']:
dataset_pretty = 'raw'
elif dataset_pretty in ['hoft_CLEAN_SUB60HZ_C01']:
dataset_pretty = 'hoft_C01_clean_sub60Hz'
return "%s/%s/%s/%s/%s" % (scope, dataset_pretty, detector_pretty, dir_hash, name)
#RSEDeterministicTranslation.register(ligo_lab)
if __name__ == '__main__':
def test_cit_mapping(scope, name, pfn):
"""Demonstrate the LFN->PFN mapping"""
mapped_pfn = ligo_lab(scope, name, None, None, None)
if mapped_pfn == pfn:
print "%s:%s -> %s" % (scope, name, pfn)
else:
print "FAILURE: %s:%s -> %s (expected %s)" % (scope, name, mapped_pfn, pfn)
test_cit_mapping("postO1", "H-H1_HOFT_C00-1163149312-4096.gwf",
"postO1/hoft/H1/H-H1_HOFT_C00-11631/H-H1_HOFT_C00-1163149312-4096.gwf")
test_cit_mapping("postO1", "L-L1_HOFT_C00-1158533120-4096.gwf",
"postO1/hoft/L1/L-L1_HOFT_C00-11585/L-L1_HOFT_C00-1158533120-4096.gwf")
test_cit_mapping("O2", "H-H1_HOFT_C01-1188003840-4096.gwf",
"O2/hoft_C01/H1/H-H1_HOFT_C01-11880/H-H1_HOFT_C01-1188003840-4096.gwf")
test_cit_mapping("O1", "L-L1_HOFT_C01_4kHz-1137250304-4096.gwf",
"O1/hoft_C01_4kHz/L1/L-L1_HOFT_C01_4kHz-11372/L-L1_HOFT_C01_4kHz-1137250304-4096.gwf")
test_cit_mapping("AdVirgo", "V-V1Online-1192294000-2000.gwf",
"AdVirgo/HrecOnline/V-V1Online-11922/V-V1Online-1192294000-2000.gwf")
test_cit_mapping("O2", "V-V1O2Repro1A-1187990000-5000.gwf",
"O2/V1O2Repro1A/V-V1O2Repro1A-11879/V-V1O2Repro1A-1187990000-5000.gwf")
test_cit_mapping("O1", "H-H1_R-1126631040-64.gwf",
"O1/raw/H1/H-H1_R-11266/H-H1_R-1126631040-64.gwf")
......@@ -33,8 +33,7 @@ from rucio.common.exception import (DataIdentifierAlreadyExists,
InputValidationError)
from rucio.common.utils import adler32, generate_uuid, md5
import rucio.rse.rsemanager as rsemgr
from ligo_rucio import lfn2pfn
from rucio.rse.protocols.protocol import RSEDeterministicTranslation
SUCCESS = 0
FAILURE = 1
......@@ -64,9 +63,19 @@ def convert_file_for_api(filemd):
return replica
def get_rse_url(rse_info):
def get_rse_pfn(rse_info, scope, name):
"""
Return the base path of the rucio url
Return the PFN at this RSE
Parameters
----------
rse_info : dict
Protocol related RSE attributes. See
`rucio/lib/rucio/rse/rsemanager.py`.
scope : str
Scope for the DID
name : str
filename of the DID
"""
protocol = rse_info['protocols'][0]
......@@ -75,13 +84,17 @@ def get_rse_url(rse_info):
port = protocol['port']
rucioserver = protocol['hostname']
lfn2pfn_translator = RSEDeterministicTranslation(rse=rse_info['rse'])
if schema == 'srm':
prefix = protocol['extended_attributes'][
'web_service_path'] + prefix
prefix = protocol['extended_attributes']['web_service_path'] + prefix
url = schema + '://' + rucioserver
if port != 0:
url = url + ':' + str(port)
return url + prefix
pfn = os.path.join(url, lfn2pfn_translator.path(scope=scope, name=name))
return pfn
class DatasetInjector(object):
......@@ -94,11 +107,17 @@ class DatasetInjector(object):
3) Create Rucio dataset
4) Register Rucio dataset
data is a dictionary with a list of files to register
data is a dictionary with a list of files to register
"""
# pylint: disable=too-many-instance-attributes,too-many-arguments
def __init__(self, rse_info, dataset_name, data, allow_uploads=False, logger=None):
def __init__(self,
rse_info,
dataset_name,
data,
allow_uploads=False,
logger=None):
if not logger:
logger = logging.getLogger('%s.null' % __name__)
......@@ -124,7 +143,8 @@ class DatasetInjector(object):
self.allow_uploads = allow_uploads
self.client = Client(rucio_host=config_get('client', 'rucio_host'))
# Read and attach list of file attributes (filename, adler32, md5, bytes)
# Read and attach list of file attributes:
# (filename, adler32, md5, bytes)
# If file is not found, continue to compute attributes on the fly
try:
self.fileinfos = data['fileinfos']
......@@ -159,17 +179,22 @@ class DatasetInjector(object):
self.client.add_dataset(scope=self.scope,
name=self.dataset_name,
rules=[{
'account': self.client.account,
'copies': 1,
'rse_expression': self.rse_info['rse'],
'grouping': 'DATASET',
'lifetime': self.lifetime}])
'account':
self.client.account,
'copies':
1,
'rse_expression':
self.rse_info['rse'],
'grouping':
'DATASET',
'lifetime':
self.lifetime
}])
logger.info('Created new dataset %s', self.dataset_name)
except RSEBlacklisted:
logger.warning(
'RSE write blacklisted, not adding replication rule')
self.client.add_dataset(scope=self.scope,
name=self.dataset_name)
self.client.add_dataset(scope=self.scope, name=self.dataset_name)
logger.info('Created new dataset %s', self.dataset_name)
except DataIdentifierAlreadyExists:
logger.debug("Dataset %s already exists", self.dataset_name)
......@@ -231,14 +256,9 @@ class DatasetInjector(object):
logger = self.logger
items = list()
rse_url = get_rse_url(self.rse_info)
logger.debug("Determined RSE base url: %s", rse_url)
for path in files:
pfn = os.path.join(
rse_url,
lfn2pfn.ligo_lab(self.scope, os.path.basename(path), None,
None, None))
pfn = get_rse_pfn(self.rse_info, self.scope,
os.path.basename(path))
dataset_did_str = ('%s:%s' % (self.scope, self.dataset_name))
items.append({
'path': path,
......@@ -356,7 +376,6 @@ class DatasetInjector(object):
return new_item
def upload_file(self, filemd):
"""
Upload file to RSE
......@@ -420,8 +439,10 @@ class DatasetInjector(object):
self.client.attach_dids(scope=self.scope,
name=self.dataset_name,
dids=[{
'scope': self.scope,
'name': filemd['did_name']
'scope':
self.scope,
'name':
filemd['did_name']
}])
except FileAlreadyExists:
logger.debug("File %s already exists in dataset %s",
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment