rucio_data.py 16.2 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
# Copyright (C) 2018  James Alexander Clark <james.clark@ligo.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
"""Methods and classes for data registration.
"""

# Native
20
import copy
21 22 23 24
import os
import sys
import logging
import time
James Clark's avatar
James Clark committed
25
import requests
26 27

# Rucio
28
from rucio.client.client import Client
James Clark's avatar
James Clark committed
29
from rucio.client.uploadclient import UploadClient
James Clark's avatar
James Clark committed
30
from rucio.common.config import config_get
James Clark's avatar
Linted  
James Clark committed
31 32
from rucio.common.exception import (DataIdentifierAlreadyExists,
                                    FileAlreadyExists, RSEBlacklisted,
James Clark's avatar
linting  
James Clark committed
33
                                    InputValidationError)
James Clark's avatar
Linted  
James Clark committed
34
from rucio.common.utils import adler32, generate_uuid, md5
35
import rucio.rse.rsemanager as rsemgr
James Clark's avatar
James Clark committed
36
from rucio.rse.protocols.protocol import RSEDeterministicTranslation
James Clark's avatar
James Clark committed
37

38 39 40
SUCCESS = 0
FAILURE = 1

41

James Clark's avatar
linting  
James Clark committed
42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
def convert_file_for_api(filemd):
    """
    Creates a new dictionary that contains only the values that are needed
    for the upload with the correct keys (Taken from `uploadclient.py`)

    :param file: dictionary describing a file to upload

    :returns: dictionary containing not more then the needed values for the
    upload
    """
    replica = {}
    replica['scope'] = filemd['did_scope']
    replica['name'] = filemd['did_name']
    replica['bytes'] = filemd['bytes']
    replica['adler32'] = filemd['adler32']
    replica['md5'] = filemd['md5']
    replica['meta'] = filemd['meta']
    replica['state'] = filemd['state']
    pfn = filemd.get('pfn')
    if pfn:
        replica['pfn'] = pfn
    return replica


James Clark's avatar
James Clark committed
66
def get_rse_pfn(rse_info, scope, name):
James Clark's avatar
James Clark committed
67
    """
James Clark's avatar
James Clark committed
68 69 70 71 72 73 74 75 76 77 78
    Return the PFN at this RSE

    Parameters
    ----------
    rse_info : dict
        Protocol related RSE attributes.  See
        `rucio/lib/rucio/rse/rsemanager.py`.
    scope : str
        Scope for the DID
    name : str
        filename of the DID
James Clark's avatar
James Clark committed
79 80 81 82 83 84 85 86
    """

    protocol = rse_info['protocols'][0]
    schema = protocol['scheme']
    prefix = protocol['prefix']
    port = protocol['port']
    rucioserver = protocol['hostname']

James Clark's avatar
James Clark committed
87 88
    lfn2pfn_translator = RSEDeterministicTranslation(rse=rse_info['rse'])

James Clark's avatar
James Clark committed
89
    if schema == 'srm':
James Clark's avatar
James Clark committed
90
        prefix = protocol['extended_attributes']['web_service_path'] + prefix
James Clark's avatar
James Clark committed
91 92 93
    url = schema + '://' + rucioserver
    if port != 0:
        url = url + ':' + str(port)
James Clark's avatar
James Clark committed
94

James Clark's avatar
James Clark committed
95 96 97
    pfn = url + os.path.join(prefix, lfn2pfn_translator.path(scope=scope,
                             name=name))

James Clark's avatar
James Clark committed
98 99

    return pfn
James Clark's avatar
James Clark committed
100 101


102 103 104 105
class DatasetInjector(object):
    """
    General Class for injecting a LIGO dataset in rucio

106
    1) Load list of files for dataset from text file OR diskcache object
107 108 109 110
    2) Get their checksums
    2) Convert frame names to rucio DIDs
    3) Create Rucio dataset
    4) Register Rucio dataset
111

James Clark's avatar
James Clark committed
112
    data is a dictionary with a list of files to register
113
    """
James Clark's avatar
James Clark committed
114

James Clark's avatar
James Clark committed
115
    # pylint: disable=too-many-instance-attributes,too-many-arguments
116

James Clark's avatar
James Clark committed
117 118 119 120 121 122
    def __init__(self,
                 rse_info,
                 dataset_name,
                 data,
                 allow_uploads=False,
                 logger=None):
123 124 125 126 127

        if not logger:
            logger = logging.getLogger('%s.null' % __name__)
            logger.disabled = True
        self.logger = logger
James Clark's avatar
James Clark committed
128

James Clark's avatar
Linted  
James Clark committed
129
        # Check rucio server connection
James Clark's avatar
James Clark committed
130
        try:
131
            requests.get(config_get('client', 'rucio_host'))
James Clark's avatar
James Clark committed
132
        except requests.exceptions.RequestException as exe:
133
            self.logger.error(exe)
James Clark's avatar
James Clark committed
134
            sys.exit(FAILURE)
135

James Clark's avatar
Linted  
James Clark committed
136
        # Dataset configuration
James Clark's avatar
James Clark committed
137
        self.scope = data['scope']
James Clark's avatar
James Clark committed
138 139 140 141 142
        self.dataset_name = dataset_name
        try:
            self.lifetime = data['lifetime']
        except KeyError:
            self.lifetime = None
143

James Clark's avatar
James Clark committed
144
        self.rse_info = rse_info
James Clark's avatar
James Clark committed
145
        self.allow_uploads = allow_uploads
James Clark's avatar
James Clark committed
146
        self.client = Client(rucio_host=config_get('client', 'rucio_host'))
147

James Clark's avatar
James Clark committed
148 149
        # Read and attach list of file attributes:
        # (filename, adler32, md5, bytes)
150 151 152
        # If file is not found, continue to compute attributes on the fly
        try:
            self.fileinfos = data['fileinfos']
153
            self.logger.info("Using pre-computed file sizes and checksums")
154
        except KeyError:
155
            self.logger.info("Computing file sizes and checksums on the fly")
156

157 158 159
        # Support lists OR diskcache for files
        try:
            # Treat data as a diskcache, fall back to list on failure
James Clark's avatar
James Clark committed
160 161 162
            files = [entry for entry in data['diskcache'].expand()]
        except KeyError:
            files = data['filelist'][:]
163

164
        # Get dictionary of files and metadata to register
James Clark's avatar
linting  
James Clark committed
165
        files = self._reduce_file_list(files)
166
        if not files:
James Clark's avatar
James Clark committed
167
            self.logger.info("No new replicas to add at %s",
James Clark's avatar
James Clark committed
168
                             self.rse_info['rse'])
169
            self.files = []
170 171
        else:
            self._enumerate_uploads(files)
172

173 174 175 176 177 178
    def _create_dataset(self):
        """
        Add a dataset object to contain the files we're registering
        """
        logger = self.logger
        try:
179
            # Add rule in here if DID does not exist
180
            logger.info("Trying to create dataset: %s", self.dataset_name)
181 182
            self.client.add_dataset(scope=self.scope,
                                    name=self.dataset_name,
183
                                    rules=[{
James Clark's avatar
James Clark committed
184 185 186 187 188 189 190 191 192 193 194
                                        'account':
                                        self.client.account,
                                        'copies':
                                        1,
                                        'rse_expression':
                                        self.rse_info['rse'],
                                        'grouping':
                                        'DATASET',
                                        'lifetime':
                                        self.lifetime
                                    }])
195 196 197 198
            logger.info('Created new dataset %s', self.dataset_name)
        except RSEBlacklisted:
            logger.warning(
                'RSE write blacklisted, not adding replication rule')
James Clark's avatar
James Clark committed
199
            self.client.add_dataset(scope=self.scope, name=self.dataset_name)
200
            logger.info('Created new dataset %s', self.dataset_name)
201 202 203
        except DataIdentifierAlreadyExists:
            logger.debug("Dataset %s already exists", self.dataset_name)

James Clark's avatar
James Clark committed
204
    def _reduce_file_list(self, files):
205
        """
James Clark's avatar
Linted  
James Clark committed
206 207
        Reduce the list of files for registration to extant, unregistered
        files.  Skip files which don't exist or are already registered.
208 209
        """

210
        logger = self.logger
211
        # Eliminate invalid files from a copy of the list for easier logging
212
        reduced_files = files[:]
James Clark's avatar
James Clark committed
213
        logger.info("%d files in list", len(files))
214 215 216 217
        for fil in files:

            # Check file exists and is a file
            if not os.path.exists(fil) or not os.path.isfile(fil):
218
                logger.warning("%s not a valid file", fil)
219 220 221 222 223 224 225
                reduced_files.remove(fil)
                continue

            # Check if file registered already
            if self._check_replica(os.path.basename(fil)):
                reduced_files.remove(fil)

226
        logger.info("%d new files to register", len(reduced_files))
227 228 229

        return reduced_files

230
    def _check_replica(self, lfn):
231
        """
232
        Check if a replica of the given file at the site already exists.
233
        """
234
        logger = self.logger
James Clark's avatar
James Clark committed
235 236
        logger.debug("Checking catalog for replica %s:%s at %s", self.scope,
                     lfn, self.rse_info['rse'])
237 238 239 240 241
        replicas = list(
            self.client.list_replicas([{
                'scope': self.scope,
                'name': lfn
            }]))
James Clark's avatar
James Clark committed
242

243 244 245
        if replicas:
            replicas = replicas[0]
            if 'rses' in replicas:
James Clark's avatar
James Clark committed
246
                if self.rse_info['rse'] in replicas['rses']:
247
                    logger.debug("%s:%s already has a replica at %s",
James Clark's avatar
James Clark committed
248
                                 self.scope, lfn, self.rse_info['rse'])
249
                    return True
James Clark's avatar
linting  
James Clark committed
250

251
        return False
252

James Clark's avatar
James Clark committed
253
    def _enumerate_uploads(self, files):
254
        """
255 256
        Create a list of dictionaries which describe files to pass to the rucio
        UploadClient
257
        """
258
        logger = self.logger
259 260
        items = list()

James Clark's avatar
James Clark committed
261
        for path in files:
James Clark's avatar
James Clark committed
262 263
            pfn = get_rse_pfn(self.rse_info, self.scope,
                              os.path.basename(path))
264
            dataset_did_str = ('%s:%s' % (self.scope, self.dataset_name))
James Clark's avatar
Linted  
James Clark committed
265 266 267
            items.append({
                'path': path,
                'pfn': pfn,
James Clark's avatar
James Clark committed
268
                'rse': self.rse_info['rse'],
James Clark's avatar
Linted  
James Clark committed
269 270 271 272 273 274 275 276 277
                'did_scope': self.scope,
                'did_name': os.path.basename(path),
                'dataset_scope': self.scope,
                'dataset_name': self.dataset_name,
                'dataset_did_str': dataset_did_str,
                'force_scheme': None,
                'no_register': False,
                'register_after_upload': True,
                'lifetime': self.lifetime,
James Clark's avatar
James Clark committed
278
                'transfer_timeout': None
James Clark's avatar
Linted  
James Clark committed
279
            })
280 281

        # check given sources, resolve dirs into files, and collect meta infos
282 283
        logger.info("Checking file integrity")
        then = time.time()
284
        self.files = self._collect_and_validate_file_info(items)
James Clark's avatar
linting  
James Clark committed
285
        logger.info("File integrity check took %.2fs", (time.time() - then))
286 287 288

    def _collect_and_validate_file_info(self, items):
        """
James Clark's avatar
Linted  
James Clark committed
289 290 291
        Checks if there are any inconsistencies within the given input options
        and stores the output of _collect_file_info for every file (Adapted
        from `uploadclient.py`)
292 293 294

        :param filepath: list of dictionaries with all input files and options

James Clark's avatar
Linted  
James Clark committed
295 296
        :returns: a list of dictionaries containing all descriptions of the
        files to upload
297 298 299

        :raises InputValidationError: if an input option has a wrong format
        """
300
        logger = self.logger
301 302 303 304
        files = []
        for item in items:
            path = item.get('path')
            pfn = item.get('pfn')
James Clark's avatar
linting  
James Clark committed
305
            logger.debug('Checking info for: %s', path)
306
            if not path:
307
                logger.warning(
James Clark's avatar
Linted  
James Clark committed
308
                    'Skipping source entry because the key "path" is missing')
309 310
                continue
            if not item.get('rse'):
James Clark's avatar
linting  
James Clark committed
311
                logger.warning('Skipping file %s because no rse was given',
James Clark's avatar
Linted  
James Clark committed
312
                               path)
313 314 315 316 317 318 319
                continue
            if pfn:
                item['force_scheme'] = pfn.split(':')[0]

            if os.path.isdir(path):
                dname, subdirs, fnames = next(os.walk(path))
                for fname in fnames:
James Clark's avatar
linting  
James Clark committed
320 321 322 323 324 325 326
                    logger.debug('Collecting file info for: %s', fname)
                    filemd = self._collect_file_info(
                        os.path.join(dname, fname), item)
                    files.append(filemd)
                if not fnames and not subdirs:
                    logger.warning('Skipping %s because it is empty.', dname)
                elif not fnames:
327
                    logger.warning(
James Clark's avatar
Linted  
James Clark committed
328
                        'Skipping %s because it has no files in it. '
James Clark's avatar
linting  
James Clark committed
329
                        'Subdirectories are not supported.', dname)
330
            elif os.path.isfile(path):
James Clark's avatar
linting  
James Clark committed
331 332
                filemd = self._collect_file_info(path, item)
                files.append(filemd)
333
            else:
James Clark's avatar
linting  
James Clark committed
334
                logger.warning('No such file or directory: %s', path)
335

James Clark's avatar
linting  
James Clark committed
336
        if not files:
337 338 339
            raise InputValidationError('No valid input files given')

        return files
340

341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358
    def _collect_file_info(self, filepath, item):
        """
        Collects infos (e.g. size, checksums, etc.) about the file and returns
        them as a dictionary (Adapted from `uploadclient.py`)

        :param filepath: path where the file is stored
        :param item: input options for the given file

        :returns: a dictionary containing all collected info and the input
        options
        """
        logger = self.logger

        new_item = copy.deepcopy(item)
        new_item['path'] = filepath
        new_item['dirname'] = os.path.dirname(filepath)
        new_item['basename'] = os.path.basename(filepath)

359
        # Try getting file info from fileinfos dict
360 361
        try:
            # Try getting file info from fileinfos dict
362 363 364
            new_item['bytes'] = self.fileinfos[new_item['path']]['bytes']
            new_item['adler32'] = self.fileinfos[new_item['path']]['adler32']
            new_item['md5'] = self.fileinfos[new_item['path']]['md5']
365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380
        except AttributeError:
            # Compute file infos on the fly
            new_item['bytes'] = os.stat(filepath).st_size
            then = time.time()
            new_item['adler32'] = adler32(filepath)
            duration = time.time() - then
            logger.debug('Adler32 checksum took %fs', duration)
            then = time.time()
            new_item['md5'] = md5(filepath)
            duration = time.time() - then
            logger.debug('MD5 checksum took %fs', duration)
        new_item['meta'] = {'guid': generate_uuid()}
        new_item['state'] = 'A'

        return new_item

James Clark's avatar
linting  
James Clark committed
381 382 383 384 385 386 387 388 389 390 391 392 393
    def upload_file(self, filemd):
        """
        Upload file to RSE
        """
        logger = self.logger
        # instantiate upload client for files not present
        upload_client = UploadClient(self.client, logger)

        #  Remove PFN so that `upload_client` registers file for us
        filemd_tmp = copy.deepcopy(filemd)
        filemd_tmp['pfn'] = None
        upload_client.upload([filemd_tmp])

James Clark's avatar
James Clark committed
394
    def add_files(self):
395 396
        """
        Add files replicas in rucio catalog
James Clark's avatar
James Clark committed
397

398
        """
James Clark's avatar
James Clark committed
399

400 401
        logger = self.logger
        then = time.time()
402

403
        # Create dataset
James Clark's avatar
James Clark committed
404
        self._create_dataset()
405 406

        # Register files and attach to dataset
James Clark's avatar
linting  
James Clark committed
407
        for filemd in self.files:
James Clark's avatar
James Clark committed
408
            logger.info("Adding %s to catalog", filemd['did_name'])
James Clark's avatar
James Clark committed
409 410 411 412 413 414

            if self.allow_uploads:
                # Test for file existence at end-point
                file_exists = rsemgr.exists(self.rse_info, filemd['pfn'])
            else:
                file_exists = True
James Clark's avatar
James Clark committed
415

James Clark's avatar
linting  
James Clark committed
416 417 418
            if not self._check_replica(filemd['did_name']) or not file_exists:
                # IF not in catalog OR doesn't exist on the RSE, test for
                # upload and re-catalog
James Clark's avatar
James Clark committed
419 420 421

                if not file_exists:
                    # File does not exist at RSE so upload it
422 423
                    logger.info("%s not found at %s, beginning upload",
                                self.rse_info['rse'], filemd['did_name'])
James Clark's avatar
linting  
James Clark committed
424
                    self.upload_file(filemd)
James Clark's avatar
James Clark committed
425

426
                else:
James Clark's avatar
James Clark committed
427
                    # Register replica
428 429
                    logger.debug("File %s already exits at RSE",
                                 filemd['did_name'])
James Clark's avatar
linting  
James Clark committed
430
                    replica_for_api = convert_file_for_api(filemd)
431

James Clark's avatar
James Clark committed
432
                    if self.client.add_replicas(rse=self.rse_info['rse'],
433 434
                                                files=[replica_for_api]):
                        logger.debug("File %s registered", filemd['did_name'])
435

James Clark's avatar
James Clark committed
436 437 438 439 440 441 442 443
                    # Attach to dataset
                    try:
                        logger.debug("Attaching file %s to dataset %s",
                                     filemd['did_name'], self.dataset_name)

                        self.client.attach_dids(scope=self.scope,
                                                name=self.dataset_name,
                                                dids=[{
James Clark's avatar
James Clark committed
444 445 446 447
                                                    'scope':
                                                    self.scope,
                                                    'name':
                                                    filemd['did_name']
James Clark's avatar
James Clark committed
448 449 450 451
                                                }])
                    except FileAlreadyExists:
                        logger.debug("File %s already exists in dataset %s",
                                     filemd['did_name'], self.dataset_name)
James Clark's avatar
James Clark committed
452

453 454
        if self.files:
            logger.info('File registration took %fs', (time.time() - then))