Commit 8573e0a5 authored by James Clark's avatar James Clark
Browse files

Improved cache read robustness

parent 745b03ba
Pipeline #79018 passed with stages
in 45 seconds
......@@ -22,16 +22,15 @@ Data may be registered as individual files, ascii lists of files, or registered
on the fly as a background process monitoring a DiskCacheFile.
"""
import argparse
import logging
# Native
import os
import signal
import sys
import time
import yaml
import signal
import argparse
import argcomplete
import yaml
import rucio.rse.rsemanager as rsemgr
from ligo_rucio import rucio_data
......@@ -39,6 +38,7 @@ from ligo_rucio.diskcache import DiskCacheFile
SUCCESS = 0
FAILURE = 1
MAX_CACHE_TRIES = 5
LOGGER = logging.getLogger('user')
......@@ -51,7 +51,13 @@ def setup_logger(logger):
hdlr = logging.StreamHandler()
def emit_decorator(fnc):
"""
Format logger
"""
def func(*args):
"""
Logging colours
"""
levelno = args[0].levelno
if levelno >= logging.CRITICAL:
color = '\033[31;1m'
......@@ -65,6 +71,7 @@ def setup_logger(logger):
color = '\033[36;1m'
else:
color = '\033[0m'
#pylint: disable=line-too-long
formatter = logging.Formatter('{0}%(asctime)s\t%(levelname)s\t%(message)s\033[0m'.format(color))
hdlr.setFormatter(formatter)
return fnc(*args)
......@@ -77,12 +84,12 @@ setup_logger(LOGGER)
def signal_handler(sig, frame):
"""
Catch interrupts
Catch INTERRUPTs
"""
LOGGER.critical('Interrupt received')
os.kill(os.getpid(), signal.SIGTERM)
#pylint: disable=unused-argument
LOGGER.error('Interrupt received')
# Do some cleanup?
sys.exit(1)
signal.signal(signal.SIGINT, signal_handler)
def get_parser():
"""
......@@ -278,7 +285,6 @@ def add_files(aparser):
rset = rsets[aparser.rset]
# Add the file list to the rset
# FIXME: don't add this to the rset, pass it through as its own object
rset['filelist'] = list(aparser.files)
inject_data(aparser.rset, rset, rse_info, aparser.dry_run)
......@@ -304,68 +310,66 @@ def daemon(aparser):
# Get rset instructions
rsets = get_rsets(aparser.reg_yaml)
# Initial diskcache file read
for rset in rsets:
# Create initial diskcache object for each rset
LOGGER.info("%s: reading diskcache [%s]", rset,
aparser.cachefile)
# If necessary, wait for diskcache file to appear
# FIXME: add exception for bad diskcache (i.e., retry if exists but is empty/incomplete)
# FIXME: update minimum_gps to time of last check, so we don't waste
# time checking files we just registered
while True:
try:
last_check = time.time()
rsets[rset]['diskcache'] = DiskCacheFile(
aparser.cachefile,
minimum_gps=rsets[rset]['minimum-gps'],
maximum_gps=rsets[rset]['maximum-gps'],
regexp=rsets[rset]['regexp'],
prune=True,
update_file_count=True)
break
except IOError:
LOGGER.error("No diskcache found at %s",
aparser.cachefile)
LOGGER.debug("Waiting %.1f s", aparser.daemon_sleep)
time.sleep(aparser.daemon_sleep)
last_check = time.time()
rsets[rset]['diskcache'] = DiskCacheFile(
aparser.cachefile,
minimum_gps=rsets[rset]['minimum-gps'],
maximum_gps=rsets[rset]['maximum-gps'],
regexp=rsets[rset]['regexp'],
prune=True,
update_file_count=True)
# Wait for cache file to appear
force_check_cache = True
while force_check_cache:
try:
os.stat(aparser.cachefile)
break
except (IOError, OSError) as cache_error:
LOGGER.critical('Cache file not found')
LOGGER.critical(cache_error)
LOGGER.info('Sleeping for %d s', aparser.daemon_sleep)
time.sleep(aparser.daemon_sleep)
# Begin Daemon loop
check_cache = True
# FIXME: better interrupt condition
while True:
LOGGER.info("Starting registration loop")
daemon_running = True
while daemon_running:
LOGGER.info(
"--------------------------------------------------") # noqa: E501
LOGGER.info("--------------------------------------------------")
for rset in rsets:
# Refresh the diskcache to update the last-modified time
msg = rsets[rset]['diskcache'].refresh()
if msg is not None:
LOGGER.error(msg)
cache_try_interval = aparser.daemon_sleep
for ntry in range(MAX_CACHE_TRIES+1):
try:
LOGGER.debug("Finding %s in DiskCache", rset)
rsets[rset]['diskcache'] = DiskCacheFile(
aparser.cachefile,
# FIXME set minimum GPS to last_check (and confirm that
# mtime of DiskCache is last modify time of *file*, not
# Cache object)
minimum_gps=rsets[rset]['minimum-gps'],
maximum_gps=rsets[rset]['maximum-gps'],
regexp=rsets[rset]['regexp'],
prune=True,
update_file_count=True)
last_check = time.time()
break
except (IOError, OSError, StopIteration) as cache_error:
# StopIteration if cachefile is incomplete
LOGGER.critical('Cannot read cache file')
LOGGER.critical(cache_error)
if ntry == MAX_CACHE_TRIES:
LOGGER.error('Max retries reached, aborting')
sys.exit(1)
else:
LOGGER.debug('Retry in %d s (%d/%d)', cache_try_interval,
ntry+1, MAX_CACHE_TRIES)
time.sleep(cache_try_interval)
cache_try_interval *= 2
# If check enabled or if diskcache has updated, heck for new data
if check_cache or rsets[rset]['diskcache'].mtime() > last_check:
if force_check_cache or rsets[rset]['diskcache'].mtime() > last_check:
LOGGER.info("%s: looking for new data", rset)
# Inject each rset (ignores pre-registered files)
inject_data(rset, rsets[rset], rse_info, aparser.dry_run)
# Reset
check_cache = True
# Disable force check-cache after initial read
force_check_cache = False
else:
LOGGER.info("%s: diskcache not modified", rset)
......@@ -376,14 +380,12 @@ def daemon(aparser):
# Update last-modified time to that of the cache
last_check = time.time()
# Initialised
check_cache = False
# Snooze to allow cache updates
LOGGER.info("Going to sleep for %.1f s...",
aparser.daemon_sleep)
LOGGER.info("Going to sleep for %d s...", aparser.daemon_sleep)
time.sleep(aparser.daemon_sleep)
LOGGER.info("Registration stopping condition reached")
return SUCCESS
......@@ -408,7 +410,6 @@ def inject_data(dataset_name, rset, rse_info, dry_run=False):
return SUCCESS
# Register files for replication
then = time.time()
dataset.add_files()
return SUCCESS
......@@ -420,10 +421,7 @@ def main():
Principal operations
"""
#
# parse input and choose operation
#
parser = get_parser()
argcomplete.autocomplete(parser)
......@@ -436,6 +434,10 @@ def main():
if args.verbose:
LOGGER.setLevel(logging.DEBUG)
# Add hooks for SIGTERM and SIGINT
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
# Start timer
global_start_time = time.time()
......@@ -454,11 +456,8 @@ def main():
result = command(args)
# Stop timer
end_time = time.time()
LOGGER.info("total uptime: %-0.4f sec.",
(time.time() - global_start_time))
LOGGER.info("total uptime: %-0.4f sec.", (time.time() - global_start_time))
return result
if __name__ == "__main__":
main()
......@@ -197,8 +197,8 @@ class DatasetInjector(object):
Check if a replica of the given file at the site already exists.
"""
logger = self.logger
logger.debug("Checking for %s:%s at %s", self.scope, lfn,
self.rse_info['rse'])
logger.debug("Checking catalog for replica %s:%s at %s", self.scope,
lfn, self.rse_info['rse'])
replicas = list(
self.client.list_replicas([{
'scope': self.scope,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment