Commit a68c1d5d authored by James Clark's avatar James Clark
Browse files

support for file characteristics from file

parent a643e9c1
Pipeline #83145 passed with stages
in 1 minute and 38 seconds
#!/bin/bash -e
echo "$(stat -c %s $1 | awk '{print $1}') $(xrdadler32 $1 | awk '{print $1}') $(md5sum $1 | awk '{print $1}')"
......@@ -27,10 +27,9 @@ import os
import signal
import sys
import time
import argparse
import argcomplete
import yaml
import argcomplete
import rucio.rse.rsemanager as rsemgr
from ligo_rucio import rucio_data
......@@ -277,6 +276,22 @@ def get_rsets(configyml):
#
# """
def list_file_infos(filename):
"""
Read a CSV file listing filenames, checksums and sizes into a dictionary of
file attributes
:param filename: path to CSV file with filepath, bytes, adler32, md5
:returns: a dictionary of dictionaries `{bytes, adler32, md5}`, keyed by
filenames
"""
fileinfos=dict()
with open(filename, 'r') as finfo:
for line in finfo.readlines():
elem = line.split()
fileinfos[elem[0]] = {'bytes':elem[1], 'adler32':elem[2], 'md5':elem[3]}
return fileinfos
def add_files(aparser):
"""
......@@ -298,9 +313,12 @@ def add_files(aparser):
# Add the file list to the rset
rset['filelist'] = list(aparser.files)
# Add a file-info file:
#rset['fileinfos'] = aparser.files_characteristics
try:
# Add a file-info file:
rset['fileinfos'] = list_file_infos(aparser.files_infos)
except AttributeError:
continue
inject_data(aparser.rset, rset, rse_info, aparser.dry_run)
......@@ -339,7 +357,6 @@ def daemon(aparser):
# Begin Daemon loop
LOGGER.info("Starting registration loop")
minimum_gps = rsets[rset]['minimum-gps']
daemon_running = True
while daemon_running:
......@@ -347,6 +364,8 @@ def daemon(aparser):
for rset in rsets:
minimum_gps = rsets[rset]['minimum-gps']
cache_try_interval = aparser.daemon_sleep
for ntry in range(MAX_CACHE_TRIES+1):
try:
......
......@@ -124,6 +124,13 @@ class DatasetInjector(object):
self.rse_info = rse_info
self.client = Client(rucio_host=config_get('client', 'rucio_host'))
# Read and attach list of file attributes (filename, adler32, md5, bytes)
# If file is not found, continue to compute attributes on the fly
try:
self.fileinfos = data['fileinfos']
except KeyError:
continue
# Support lists OR diskcache for files
try:
# Treat data as a diskcache, fall back to list on failure
......@@ -254,38 +261,6 @@ class DatasetInjector(object):
self.files = self._collect_and_validate_file_info(items)
logger.info("File integrity check took %.2fs", (time.time() - then))
def _collect_file_info(self, filepath, item):
"""
Collects infos (e.g. size, checksums, etc.) about the file and returns
them as a dictionary (Adapted from `uploadclient.py`)
:param filepath: path where the file is stored
:param item: input options for the given file
:returns: a dictionary containing all collected info and the input
options
"""
logger = self.logger
new_item = copy.deepcopy(item)
new_item['path'] = filepath
new_item['dirname'] = os.path.dirname(filepath)
new_item['basename'] = os.path.basename(filepath)
new_item['bytes'] = os.stat(filepath).st_size
then = time.time()
new_item['adler32'] = adler32(filepath)
duration = time.time() - then
logger.debug('Adler32 checksum took %fs', duration)
then = time.time()
new_item['md5'] = md5(filepath)
duration = time.time() - then
logger.debug('MD5 checksum took %fs', duration)
new_item['meta'] = {'guid': generate_uuid()}
new_item['state'] = 'A'
return new_item
def _collect_and_validate_file_info(self, items):
"""
Checks if there are any inconsistencies within the given input options
......@@ -340,6 +315,46 @@ class DatasetInjector(object):
return files
def _collect_file_info(self, filepath, item):
"""
Collects infos (e.g. size, checksums, etc.) about the file and returns
them as a dictionary (Adapted from `uploadclient.py`)
:param filepath: path where the file is stored
:param item: input options for the given file
:returns: a dictionary containing all collected info and the input
options
"""
logger = self.logger
new_item = copy.deepcopy(item)
new_item['path'] = filepath
new_item['dirname'] = os.path.dirname(filepath)
new_item['basename'] = os.path.basename(filepath)
try:
# Try getting file info from fileinfos dict
new_item['bytes'] = self.fileinfos[new_item['basename']]['bytes']
new_item['adler32'] = self.fileinfos[new_item['basename']]['adler32']
new_item['md5'] = self.fileinfos[new_item['basename']]['md5']
except AttributeError:
# Compute file infos on the fly
new_item['bytes'] = os.stat(filepath).st_size
then = time.time()
new_item['adler32'] = adler32(filepath)
duration = time.time() - then
logger.debug('Adler32 checksum took %fs', duration)
then = time.time()
new_item['md5'] = md5(filepath)
duration = time.time() - then
logger.debug('MD5 checksum took %fs', duration)
new_item['meta'] = {'guid': generate_uuid()}
new_item['state'] = 'A'
return new_item
def upload_file(self, filemd):
"""
Upload file to RSE
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment