Commit 222bee91 authored by James Clark's avatar James Clark

incremental progress copying upload client

parent c80eb247
Pipeline #76916 passed with stage
in 40 seconds
...@@ -370,7 +370,8 @@ def inject_data(label, rset, aparser): ...@@ -370,7 +370,8 @@ def inject_data(label, rset, aparser):
# Register files for replication # Register files for replication
rucio_data.LOGGER.info("Adding files") rucio_data.LOGGER.info("Adding files")
dataset.upload_files() #dataset.upload_files()
dataset.register_files()
rucio_data.LOGGER.info("Files Added") rucio_data.LOGGER.info("Files Added")
return SUCCESS return SUCCESS
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
""" """
# Native # Native
import copy
import os import os
import sys import sys
import logging import logging
...@@ -27,7 +28,11 @@ import requests ...@@ -27,7 +28,11 @@ import requests
from rucio.client.client import Client from rucio.client.client import Client
from rucio.common.config import config_get from rucio.common.config import config_get
from rucio.client.uploadclient import UploadClient from rucio.client.uploadclient import UploadClient
from rucio.common.exception import (DataIdentifierAlreadyExists,
DataIdentifierNotFound)
from rucio.common.utils import adler32, execute, generate_uuid, md5
# FIXME: define this more cleanly
LOGGER = logging.getLogger() LOGGER = logging.getLogger()
SUCCESS = 0 SUCCESS = 0
...@@ -118,6 +123,7 @@ class DatasetInjector(object): ...@@ -118,6 +123,7 @@ class DatasetInjector(object):
# Initialization for dataset # Initialization for dataset
self.rucio_host = config_get('client', 'rucio_host') self.rucio_host = config_get('client', 'rucio_host')
self.client = Client(rucio_host=self.rucio_host) self.client = Client(rucio_host=self.rucio_host)
# XXX: only initialise me if uploading
self.upload_client = UploadClient(logger=LOGGER) self.upload_client = UploadClient(logger=LOGGER)
# Support lists OR diskcache for files # Support lists OR diskcache for files
...@@ -128,6 +134,7 @@ class DatasetInjector(object): ...@@ -128,6 +134,7 @@ class DatasetInjector(object):
files = data['filelist'][:] files = data['filelist'][:]
# Get dictionary of files and metadata to register # Get dictionary of files and metadata to register
# XXX
force_checksums=True force_checksums=True
if not force_checksums: if not force_checksums:
LOGGER.info("Checking how many of %d files already registered", LOGGER.info("Checking how many of %d files already registered",
...@@ -135,33 +142,6 @@ class DatasetInjector(object): ...@@ -135,33 +142,6 @@ class DatasetInjector(object):
files = self.reduce_file_list(files) files = self.reduce_file_list(files)
self.enumerate_uploads(files) self.enumerate_uploads(files)
def upload_files(self):
"""
Pass the list of file dictionaries to an UploadClient instance and upload.
"""
self.upload_client.upload(self.items_to_upload)
def enumerate_uploads(self, files):
"""
Create a list of dictionaries which describe files to pass to the rucio
UploadClient
"""
self.items_to_upload = list()
for path in files:
self.items_to_upload.append(
{'path': path,
'rse': self.rse,
'did_scope': self.scope,
'did_name': os.path.basename(path),
'dataset_scope': self.scope,
'dataset_name': self.dataset_name,
'force_scheme': None,
'no_register': False,
'register_after_upload': True,
'lifetime': self.lifetime,
'transfer_timeout': self.transfer_timeout,
'guid': str(uuid.uuid4())})
def reduce_file_list(self, files): def reduce_file_list(self, files):
""" """
Reduce the list of files for registration to extant, unregistered files. Reduce the list of files for registration to extant, unregistered files.
...@@ -206,4 +186,231 @@ class DatasetInjector(object): ...@@ -206,4 +186,231 @@ class DatasetInjector(object):
self.scope, lfn, self.rse) self.scope, lfn, self.rse)
return True return True
LOGGER.debug("No existing replicas of %s", lfn) LOGGER.debug("No existing replicas of %s", lfn)
return False return False
def enumerate_uploads(self, files):
"""
Create a list of dictionaries which describe files to pass to the rucio
UploadClient
"""
items = list()
for path in files:
dataset_did_str = ('%s:%s' % (self.scope, self.dataset_name))
items.append(
{'path': path,
'rse': self.rse,
'did_scope': self.scope,
'did_name': os.path.basename(path),
'dataset_scope': self.scope,
'dataset_name': self.dataset_name,
'dataset_did_str': dataset_did_str,
'force_scheme': None,
'no_register': False,
'register_after_upload': True,
'lifetime': self.lifetime,
'transfer_timeout': self.transfer_timeout,
'guid': str(uuid.uuid4())})
# check given sources, resolve dirs into files, and collect meta infos
LOGGER.info("Checking file list")
self.files = self._collect_and_validate_file_info(items)
LOGGER.debug("Checks passed")
def register_files(self):
"""
Pass the list of file dictionaries to an UploadClient instance and upload.
"""
# FIXME: change this to "add_files": upload if the DID does not exist
map((lambda file: self._register_file(file)), self.files)
def upload_files(self):
"""
Pass the list of file dictionaries to an UploadClient instance and upload.
"""
self.upload_client.upload(self.files)
def _register_file(self, file):
"""
Registers the given file in Rucio. Creates a dataset if
needed. Registers the file DID and creates the replication
rule if needed. Adds a replica to the file did.
(This function is modified from `uploadclient.py` to support
registration of files at arbitrary sites).
:param file: dictionary describing the file
:raises DataIdentifierAlreadyExists: if file DID is already registered
and the checksums do not match
"""
LOGGER.debug('Registering file')
rse = file['rse']
dataset_did_str = file.get('dataset_did_str')
# register a dataset if we need to
try:
LOGGER.debug('Trying to create dataset: %s' % dataset_did_str)
self.client.add_dataset(scope=file['dataset_scope'],
name=file['dataset_name'],
rules=[{'account': self.client.account,
'copies': 1,
'rse_expression': rse,
'grouping': 'DATASET',
'lifetime': file.get('lifetime')}])
LOGGER.info('Successfully created dataset %s' % dataset_did_str)
except DataIdentifierAlreadyExists:
LOGGER.debug('Dataset %s already exists' % dataset_did_str)
file_scope = file['did_scope']
file_name = file['did_name']
file_did = {'scope': file_scope, 'name': file_name}
replica_for_api = self._convert_file_for_api(file)
try:
# if the remote checksum is different this did must not be used
meta = self.client.get_metadata(file_scope, file_name)
LOGGER.info('File DID already exists')
LOGGER.debug('local checksum: %s, remote checksum: %s' % (file['adler32'], meta['adler32']))
if meta['adler32'] != file['adler32']:
LOGGER.error('Local checksum %s does not match remote checksum %s' % (file['adler32'], meta['adler32']))
raise DataIdentifierAlreadyExists
# add file to rse if it is not registered yet
replicastate = list(self.client.list_replicas([file_did], all_states=True))
if rse not in replicastate[0]['rses']:
self.client.add_replicas(rse=rse, files=[replica_for_api])
LOGGER.info('Successfully added replica in Rucio catalogue at %s' % rse)
except DataIdentifierNotFound:
LOGGER.debug('File DID does not exist')
self.client.add_replicas(rse=rse, files=[replica_for_api])
LOGGER.info('Successfully added replica in Rucio catalogue at %s' % rse)
if not dataset_did_str:
# only need to add rules for files if no dataset is given
self.client.add_replication_rule([file_did], copies=1, rse_expression=rse, lifetime=file.get('lifetime'))
LOGGER.info('Successfully added replication rule at %s' % rse)
def _get_file_guid(self, file):
"""
Get the guid of a file, trying different strategies
(This function is meant to be used as class internal only)
:param file: dictionary describing the file
:returns: the guid
"""
guid = file.get('guid')
if not guid and 'pool.root' in file['basename'].lower() and not file.get('no_register'):
status, output, err = execute('pool_extractFileIdentifier %s' % file['path'])
if status != 0:
msg = 'Trying to upload ROOT files but pool_extractFileIdentifier tool can not be found.\n'
msg += 'Setup your ATHENA environment and try again.'
raise RucioException(msg)
try:
guid = output.splitlines()[-1].split()[0].replace('-', '').lower()
except Exception:
raise RucioException('Error extracting GUID from ouput of pool_extractFileIdentifier')
elif guid:
guid = guid.replace('-', '')
else:
guid = generate_uuid()
return guid
def _collect_file_info(self, filepath, item):
"""
Collects infos (e.g. size, checksums, etc.) about the file and
returns them as a dictionary
(This function is meant to be used as class internal only)
:param filepath: path where the file is stored
:param item: input options for the given file
:returns: a dictionary containing all collected info and the input options
"""
new_item = copy.deepcopy(item)
new_item['path'] = filepath
new_item['dirname'] = os.path.dirname(filepath)
new_item['basename'] = os.path.basename(filepath)
new_item['bytes'] = os.stat(filepath).st_size
new_item['adler32'] = adler32(filepath)
new_item['md5'] = md5(filepath)
new_item['meta'] = {'guid': self._get_file_guid(new_item)}
new_item['state'] = 'C'
if not new_item.get('did_scope'):
new_item['did_scope'] = self.default_file_scope
if not new_item.get('did_name'):
new_item['did_name'] = new_item['basename']
return new_item
def _convert_file_for_api(self, file):
"""
Creates a new dictionary that contains only the values
that are needed for the upload with the correct keys
(This function is meant to be used as class internal only)
:param file: dictionary describing a file to upload
:returns: dictionary containing not more then the needed values for the upload
"""
replica = {}
replica['scope'] = file['did_scope']
replica['name'] = file['did_name']
replica['bytes'] = file['bytes']
replica['adler32'] = file['adler32']
replica['md5'] = file['md5']
replica['meta'] = file['meta']
replica['state'] = file['state']
pfn = file.get('pfn')
if pfn:
replica['pfn'] = pfn
return replica
def _collect_and_validate_file_info(self, items):
"""
Checks if there are any inconsistencies within the given input
options and stores the output of _collect_file_info for every file
(This function is meant to be used as class internal only)
:param filepath: list of dictionaries with all input files and options
:returns: a list of dictionaries containing all descriptions of the files to upload
:raises InputValidationError: if an input option has a wrong format
"""
logger = LOGGER
files = []
for item in items:
path = item.get('path')
pfn = item.get('pfn')
if not path:
logger.warning('Skipping source entry because the key "path" is missing')
continue
if not item.get('rse'):
logger.warning('Skipping file %s because no rse was given' % path)
continue
if pfn:
item['force_scheme'] = pfn.split(':')[0]
if os.path.isdir(path):
dname, subdirs, fnames = next(os.walk(path))
for fname in fnames:
logger.debug('Collecting file info for: %s' % fname)
file = self._collect_file_info(os.path.join(dname, fname), item)
files.append(file)
if not len(fnames) and not len(subdirs):
logger.warning('Skipping %s because it is empty.' % dname)
elif not len(fnames):
logger.warning('Skipping %s because it has no files in it. Subdirectories are not supported.' % dname)
elif os.path.isfile(path):
file = self._collect_file_info(path, item)
files.append(file)
else:
logger.warning('No such file or directory: %s' % path)
if not len(files):
raise InputValidationError('No valid input files given')
return files
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment