From 222bee91dbc05b3a5290fd2ccc4cda4bbb9ff965 Mon Sep 17 00:00:00 2001 From: James Clark Date: Mon, 26 Aug 2019 16:34:24 -0700 Subject: [PATCH] incremental progress copying upload client --- bin/gwrucio_registrar | 3 +- ligo_rucio/rucio_data.py | 261 +++++++++++++++++++++++++++++++++++---- 2 files changed, 236 insertions(+), 28 deletions(-) diff --git a/bin/gwrucio_registrar b/bin/gwrucio_registrar index d7b6087..7ac0427 100755 --- a/bin/gwrucio_registrar +++ b/bin/gwrucio_registrar @@ -370,7 +370,8 @@ def inject_data(label, rset, aparser): # Register files for replication rucio_data.LOGGER.info("Adding files") - dataset.upload_files() + #dataset.upload_files() + dataset.register_files() rucio_data.LOGGER.info("Files Added") return SUCCESS diff --git a/ligo_rucio/rucio_data.py b/ligo_rucio/rucio_data.py index 0075c08..56bfbe1 100644 --- a/ligo_rucio/rucio_data.py +++ b/ligo_rucio/rucio_data.py @@ -17,6 +17,7 @@ """ # Native +import copy import os import sys import logging @@ -27,7 +28,11 @@ import requests from rucio.client.client import Client from rucio.common.config import config_get from rucio.client.uploadclient import UploadClient +from rucio.common.exception import (DataIdentifierAlreadyExists, + DataIdentifierNotFound) +from rucio.common.utils import adler32, execute, generate_uuid, md5 +# FIXME: define this more cleanly LOGGER = logging.getLogger() SUCCESS = 0 @@ -118,6 +123,7 @@ class DatasetInjector(object): # Initialization for dataset self.rucio_host = config_get('client', 'rucio_host') self.client = Client(rucio_host=self.rucio_host) + # XXX: only initialise me if uploading self.upload_client = UploadClient(logger=LOGGER) # Support lists OR diskcache for files @@ -128,6 +134,7 @@ class DatasetInjector(object): files = data['filelist'][:] # Get dictionary of files and metadata to register + # XXX force_checksums=True if not force_checksums: LOGGER.info("Checking how many of %d files already registered", @@ -135,33 +142,6 @@ class DatasetInjector(object): files = self.reduce_file_list(files) self.enumerate_uploads(files) - def upload_files(self): - """ - Pass the list of file dictionaries to an UploadClient instance and upload. - """ - self.upload_client.upload(self.items_to_upload) - - def enumerate_uploads(self, files): - """ - Create a list of dictionaries which describe files to pass to the rucio - UploadClient - """ - self.items_to_upload = list() - for path in files: - self.items_to_upload.append( - {'path': path, - 'rse': self.rse, - 'did_scope': self.scope, - 'did_name': os.path.basename(path), - 'dataset_scope': self.scope, - 'dataset_name': self.dataset_name, - 'force_scheme': None, - 'no_register': False, - 'register_after_upload': True, - 'lifetime': self.lifetime, - 'transfer_timeout': self.transfer_timeout, - 'guid': str(uuid.uuid4())}) - def reduce_file_list(self, files): """ Reduce the list of files for registration to extant, unregistered files. @@ -206,4 +186,231 @@ class DatasetInjector(object): self.scope, lfn, self.rse) return True LOGGER.debug("No existing replicas of %s", lfn) + return False + + def enumerate_uploads(self, files): + """ + Create a list of dictionaries which describe files to pass to the rucio + UploadClient + """ + items = list() + for path in files: + + dataset_did_str = ('%s:%s' % (self.scope, self.dataset_name)) + items.append( + {'path': path, + 'rse': self.rse, + 'did_scope': self.scope, + 'did_name': os.path.basename(path), + 'dataset_scope': self.scope, + 'dataset_name': self.dataset_name, + 'dataset_did_str': dataset_did_str, + 'force_scheme': None, + 'no_register': False, + 'register_after_upload': True, + 'lifetime': self.lifetime, + 'transfer_timeout': self.transfer_timeout, + 'guid': str(uuid.uuid4())}) + + # check given sources, resolve dirs into files, and collect meta infos + LOGGER.info("Checking file list") + self.files = self._collect_and_validate_file_info(items) + LOGGER.debug("Checks passed") + + def register_files(self): + """ + Pass the list of file dictionaries to an UploadClient instance and upload. + """ + # FIXME: change this to "add_files": upload if the DID does not exist + map((lambda file: self._register_file(file)), self.files) + + def upload_files(self): + """ + Pass the list of file dictionaries to an UploadClient instance and upload. + """ + self.upload_client.upload(self.files) + + def _register_file(self, file): + """ + Registers the given file in Rucio. Creates a dataset if + needed. Registers the file DID and creates the replication + rule if needed. Adds a replica to the file did. + (This function is modified from `uploadclient.py` to support + registration of files at arbitrary sites). + + :param file: dictionary describing the file + + :raises DataIdentifierAlreadyExists: if file DID is already registered + and the checksums do not match + """ + LOGGER.debug('Registering file') + rse = file['rse'] + dataset_did_str = file.get('dataset_did_str') + # register a dataset if we need to + try: + LOGGER.debug('Trying to create dataset: %s' % dataset_did_str) + self.client.add_dataset(scope=file['dataset_scope'], + name=file['dataset_name'], + rules=[{'account': self.client.account, + 'copies': 1, + 'rse_expression': rse, + 'grouping': 'DATASET', + 'lifetime': file.get('lifetime')}]) + LOGGER.info('Successfully created dataset %s' % dataset_did_str) + except DataIdentifierAlreadyExists: + LOGGER.debug('Dataset %s already exists' % dataset_did_str) + + file_scope = file['did_scope'] + file_name = file['did_name'] + file_did = {'scope': file_scope, 'name': file_name} + replica_for_api = self._convert_file_for_api(file) + try: + # if the remote checksum is different this did must not be used + meta = self.client.get_metadata(file_scope, file_name) + LOGGER.info('File DID already exists') + LOGGER.debug('local checksum: %s, remote checksum: %s' % (file['adler32'], meta['adler32'])) + + if meta['adler32'] != file['adler32']: + LOGGER.error('Local checksum %s does not match remote checksum %s' % (file['adler32'], meta['adler32'])) + + raise DataIdentifierAlreadyExists + + # add file to rse if it is not registered yet + replicastate = list(self.client.list_replicas([file_did], all_states=True)) + if rse not in replicastate[0]['rses']: + self.client.add_replicas(rse=rse, files=[replica_for_api]) + LOGGER.info('Successfully added replica in Rucio catalogue at %s' % rse) + except DataIdentifierNotFound: + LOGGER.debug('File DID does not exist') + self.client.add_replicas(rse=rse, files=[replica_for_api]) + LOGGER.info('Successfully added replica in Rucio catalogue at %s' % rse) + if not dataset_did_str: + # only need to add rules for files if no dataset is given + self.client.add_replication_rule([file_did], copies=1, rse_expression=rse, lifetime=file.get('lifetime')) + LOGGER.info('Successfully added replication rule at %s' % rse) + + def _get_file_guid(self, file): + """ + Get the guid of a file, trying different strategies + (This function is meant to be used as class internal only) + + :param file: dictionary describing the file + + :returns: the guid + """ + guid = file.get('guid') + if not guid and 'pool.root' in file['basename'].lower() and not file.get('no_register'): + status, output, err = execute('pool_extractFileIdentifier %s' % file['path']) + if status != 0: + msg = 'Trying to upload ROOT files but pool_extractFileIdentifier tool can not be found.\n' + msg += 'Setup your ATHENA environment and try again.' + raise RucioException(msg) + try: + guid = output.splitlines()[-1].split()[0].replace('-', '').lower() + except Exception: + raise RucioException('Error extracting GUID from ouput of pool_extractFileIdentifier') + elif guid: + guid = guid.replace('-', '') + else: + guid = generate_uuid() + return guid + + def _collect_file_info(self, filepath, item): + """ + Collects infos (e.g. size, checksums, etc.) about the file and + returns them as a dictionary + (This function is meant to be used as class internal only) + + :param filepath: path where the file is stored + :param item: input options for the given file + + :returns: a dictionary containing all collected info and the input options + """ + new_item = copy.deepcopy(item) + new_item['path'] = filepath + new_item['dirname'] = os.path.dirname(filepath) + new_item['basename'] = os.path.basename(filepath) + + new_item['bytes'] = os.stat(filepath).st_size + new_item['adler32'] = adler32(filepath) + new_item['md5'] = md5(filepath) + new_item['meta'] = {'guid': self._get_file_guid(new_item)} + new_item['state'] = 'C' + if not new_item.get('did_scope'): + new_item['did_scope'] = self.default_file_scope + if not new_item.get('did_name'): + new_item['did_name'] = new_item['basename'] + + return new_item + + def _convert_file_for_api(self, file): + """ + Creates a new dictionary that contains only the values + that are needed for the upload with the correct keys + (This function is meant to be used as class internal only) + + :param file: dictionary describing a file to upload + + :returns: dictionary containing not more then the needed values for the upload + """ + replica = {} + replica['scope'] = file['did_scope'] + replica['name'] = file['did_name'] + replica['bytes'] = file['bytes'] + replica['adler32'] = file['adler32'] + replica['md5'] = file['md5'] + replica['meta'] = file['meta'] + replica['state'] = file['state'] + pfn = file.get('pfn') + if pfn: + replica['pfn'] = pfn + return replica + + def _collect_and_validate_file_info(self, items): + """ + Checks if there are any inconsistencies within the given input + options and stores the output of _collect_file_info for every file + (This function is meant to be used as class internal only) + + :param filepath: list of dictionaries with all input files and options + + :returns: a list of dictionaries containing all descriptions of the files to upload + + :raises InputValidationError: if an input option has a wrong format + """ + logger = LOGGER + files = [] + for item in items: + path = item.get('path') + pfn = item.get('pfn') + if not path: + logger.warning('Skipping source entry because the key "path" is missing') + continue + if not item.get('rse'): + logger.warning('Skipping file %s because no rse was given' % path) + continue + if pfn: + item['force_scheme'] = pfn.split(':')[0] + + if os.path.isdir(path): + dname, subdirs, fnames = next(os.walk(path)) + for fname in fnames: + logger.debug('Collecting file info for: %s' % fname) + file = self._collect_file_info(os.path.join(dname, fname), item) + files.append(file) + if not len(fnames) and not len(subdirs): + logger.warning('Skipping %s because it is empty.' % dname) + elif not len(fnames): + logger.warning('Skipping %s because it has no files in it. Subdirectories are not supported.' % dname) + elif os.path.isfile(path): + file = self._collect_file_info(path, item) + files.append(file) + else: + logger.warning('No such file or directory: %s' % path) + + if not len(files): + raise InputValidationError('No valid input files given') + + return files + -- GitLab