gwrucio_registrar 11.9 KB
Newer Older
1
#!/usr/bin/env python
2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
#
# Copyright (C) 2018  James Alexander Clark <james.clark@ligo.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
#
"""
Command line tool to register LIGO/Virgo datasets into rucio.

James Clark's avatar
James Clark committed
21 22
Data may be registered as individual files, ascii lists of files, or registered
on the fly as a background process monitoring a DiskCacheFile.
23 24
"""

25 26 27
import argparse
import logging
import multiprocessing
James Clark's avatar
James Clark committed
28
# Native
29 30
import os
import sys
31
import time
32

James Clark's avatar
James Clark committed
33
import yaml
34

35
import argcomplete
James Clark's avatar
James Clark committed
36
# LSC/Virgo
James Clark's avatar
James Clark committed
37 38
from ligo_rucio import rucio_data
from ligo_rucio.diskcache import DiskCacheFile
39

James Clark's avatar
James Clark committed
40 41
SUCCESS = 0
FAILURE = 1
42

James Clark's avatar
James Clark committed
43 44
GRACEFUL_STOP = multiprocessing.Event()

45

46
def update_progress(progress):
47 48 49
    """
    Progress bar for less verbose output
    """
50 51
    print '\r\r[{0}] {1}%'.format(
        '#' * (progress / 2) + ' ' * (50 - progress / 2), progress),
52 53 54 55
    if progress == 100:
        print "\nDone"
    sys.stdout.flush()

56

James Clark's avatar
James Clark committed
57
def get_parser():
58 59 60
    """
    Command line parser
    """
61

James Clark's avatar
James Clark committed
62
    oparser = argparse.ArgumentParser(description=__doc__)
63

64 65 66 67 68 69
    oparser.add_argument('-r',
                         "--reg-script",
                         type=str,
                         default=None,
                         required=True,
                         help="""YAML instructions
James Clark's avatar
James Clark committed
70
                         for end point and data naming""")
James Clark's avatar
James Clark committed
71

72 73 74 75 76
    oparser.add_argument(
        "--dry-run",
        default=False,
        action="store_true",
        help="""Find files, construct replica list but don't actually
James Clark's avatar
James Clark committed
77
                         upload to rucio""")
78

79 80 81
    oparser.add_argument("--verbose",
                         default=False,
                         action="store_true",
James Clark's avatar
James Clark committed
82
                         help="""Print all logging info""")
83

84 85 86 87
    oparser.add_argument("--lifetime",
                         type=float,
                         default=None,
                         required=False,
James Clark's avatar
James Clark committed
88 89
                         help="""Dataset lifetime in seconds""")

90 91 92 93
    oparser.add_argument("--force-checksums",
                         default=False,
                         action="store_true",
                         help="""Compute checksums and
James Clark's avatar
James Clark committed
94
                         register files even if they are already present""")
James Clark's avatar
James Clark committed
95

James Clark's avatar
James Clark committed
96
    subparsers = oparser.add_subparsers()
97

James Clark's avatar
James Clark committed
98 99 100
    #
    # Parser for adding files manually
    #
101 102 103 104 105
    add_files_parser = subparsers.add_parser(
        'add-files',
        formatter_class=argparse.RawDescriptionHelpFormatter,  # noqa: E501
        help="Register individual files.",
        epilog="""Usage example
James Clark's avatar
James Clark committed
106 107 108
^^^^^^^^^^^^^

To register a pair of files:
109

110
  $ gwrucio_register_data -r H_HOFT_ER13.yml add-files H-H1_HOFT_C00-1164353536-4096.gwf H-H1_HOFT_C00-1164353536-4096.gwf
James Clark's avatar
James Clark committed
111 112 113

H_HOFT_ER13.yml is a YAML file with:

James Clark's avatar
James Clark committed
114
 - dataset-name (section name, required)
James Clark's avatar
James Clark committed
115 116
 - scope (required)
 - RSE (required)
117 118
 - dataset GPS min (will be ignored)
 - dataset GPS max (will be ignored)
James Clark's avatar
James Clark committed
119

120
E.g., YAML should look like:
James Clark's avatar
James Clark committed
121 122 123 124 125 126 127

"H1_ER10_hoft":
  scope: "ER10"
  rse: "LIGO-CIT"

Note: register a list of files from a text file:

128
  $ gwrucio_register_data -r H_HOFT_ER13.yml add-files $(< H-H1_HOFT_C00-11643.txt)
129 130

    """)  # noqa:E501
James Clark's avatar
James Clark committed
131

132
    add_files_parser.set_defaults(which='add_files')
James Clark's avatar
James Clark committed
133

134 135 136
    add_files_parser.add_argument(dest="files",
                                  nargs='+',
                                  help="""Files for
137
                                  registration""")
James Clark's avatar
James Clark committed
138

139 140 141 142 143
    add_files_parser.add_argument("--rset",
                                  type=str,
                                  default=None,
                                  required=True,
                                  help="""Registration set in
James Clark's avatar
James Clark committed
144 145 146
                                  the YAML configuration you wish to register
                                  (only 1 permitted at this time)""")

James Clark's avatar
James Clark committed
147 148 149
    #
    # Running in daemon mode / from a diskcache
    #
150 151 152 153 154
    daemon_parser = subparsers.add_parser(
        'daemon',
        formatter_class=argparse.RawDescriptionHelpFormatter,  # noqa: E501
        help='Monitor a diskcache and register files on the fly.',  # noqa: E501
        epilog="""Usage example
James Clark's avatar
James Clark committed
155
^^^^^^^^^^^^^^^
156

James Clark's avatar
James Clark committed
157 158
Monitor and files appearing in
/var/lib/diskcache/frame_cache_dump:
159 160 161

\t$ gwrucio_register_data -r H_HOFT_ER13.yml daemon

James Clark's avatar
James Clark committed
162 163
Where H_HOFT_ER13.yml is a YAML file with:

James Clark's avatar
James Clark committed
164
- dataset-name (section name, required)
James Clark's avatar
James Clark committed
165 166
- scope (required)
- RSE (required)
James Clark's avatar
James Clark committed
167 168 169
- regexp for file pattern (typically frame-type, required)
- dataset GPS min (required)
- dataset GPS max (required)
James Clark's avatar
James Clark committed
170

171
E.g., YAML should look like:
James Clark's avatar
James Clark committed
172

James Clark's avatar
James Clark committed
173
"H-H1_HOFT_C02":
James Clark's avatar
James Clark committed
174 175
  scope: "ER10"
  rse: "LIGO-CIT"
James Clark's avatar
James Clark committed
176
  regexp: "H-H1_HOFT_C02"
James Clark's avatar
James Clark committed
177 178 179 180 181

    """)

    daemon_parser.set_defaults(which='daemon')

182 183
    daemon_parser.add_argument(dest="cachefile",
                               nargs='?',
James Clark's avatar
James Clark committed
184 185 186 187
                               default="/var/lib/diskcache/frame_cache_dump",
                               help="""Path to diskcache ascii dump [default:
                               /var/lib/diskcache/frame_cache_dump]""")

188 189 190
    daemon_parser.add_argument("--run-once",
                               action='store_true',
                               default=False,
James Clark's avatar
James Clark committed
191 192
                               help="""Run a single iteration""")

193 194 195
    daemon_parser.add_argument("--force-check",
                               action='store_true',
                               default=False,
James Clark's avatar
James Clark committed
196 197 198 199
                               help="""Always attempt to register files
                               (regardless of whether diskcache has been
                               modified)""")

200 201 202 203 204
    daemon_parser.add_argument("--daemon-sleep",
                               type=float,
                               default=30,
                               required=False,
                               help="""Seconds to wait between
James Clark's avatar
James Clark committed
205
                               checking diskcache for new entries""")
206

James Clark's avatar
James Clark committed
207
    return oparser
208

209

210
def get_rsets(configyml):
211 212
    """
    Read configuration from YAML
James Clark's avatar
James Clark committed
213

214 215
    Returns :rset: YAML dictionary
            :rsets_mtime: Linux epoch of last modification time to input YAML
216
    """
James Clark's avatar
James Clark committed
217 218
    # FIXME: The YAML should really have a well-defined schema.  We should also
    # write out a copy the YAML file so we know what was run.
219
    with open(configyml, 'r') as stream:
220
        rset = yaml.load(stream)
James Clark's avatar
James Clark committed
221

222
    return rset
223

James Clark's avatar
James Clark committed
224

225 226 227 228 229 230 231 232 233 234 235 236
#   def validate_rsets(rsets):
#       """
#       Check that the rset YAML has required fields with required data types
#       """
#       schema = """
#       type: object
#       properties:
#         dataset:
#         type: dict
#
#       """

James Clark's avatar
James Clark committed
237

James Clark's avatar
James Clark committed
238
def add_files(aparser):
239
    """
240
    %(prog)s add_files [options] <dsn>
James Clark's avatar
James Clark committed
241 242

    Register data interactively
243
    """
James Clark's avatar
James Clark committed
244

245
    rucio_data.LOGGER.info("Rset contains: %s", ','.join(aparser.files))
246

247 248
    # Get rset instructions
    rsets = get_rsets(aparser.reg_script)
249

250 251
    # Add the file list to the rset
    rsets[aparser.rset]['filelist'] = list(aparser.files)
252

253
    inject_data(aparser.rset, rsets[aparser.rset], aparser)
254 255 256

    return SUCCESS

257

James Clark's avatar
James Clark committed
258
def daemon(aparser):
James Clark's avatar
James Clark committed
259 260 261 262 263 264
    """
    %(prog)s daemon [options] <dsn>

    Run data registration as daemon, using the ascii dump of diskcache
    """

James Clark's avatar
James Clark committed
265
    rucio_data.LOGGER.info("Starting %s as daemon", os.path.basename(__file__))
James Clark's avatar
James Clark committed
266

267 268
    # Get rset instructions
    rsets = get_rsets(aparser.reg_script)
James Clark's avatar
James Clark committed
269

James Clark's avatar
James Clark committed
270
    # Initial diskcache file read
271
    for rset in rsets:
James Clark's avatar
James Clark committed
272

273
        #
274
        # Create initial diskcache object for each rset
275
        #
276
        rucio_data.LOGGER.info("%s: reading diskcache [%s]", rset,
James Clark's avatar
James Clark committed
277 278 279 280 281 282
                               aparser.cachefile)

        # If necessary, wait for diskcache file to appear
        while True:
            try:
                last_check = time.time()
283 284 285 286 287 288 289
                rsets[rset]['diskcache'] = DiskCacheFile(
                    aparser.cachefile,
                    minimum_gps=rsets[rset]['minimum-gps'],
                    maximum_gps=rsets[rset]['maximum-gps'],
                    regexp=rsets[rset]['regexp'],
                    prune=True,
                    update_file_count=True)
James Clark's avatar
James Clark committed
290 291
                break
            except IOError:
292 293
                rucio_data.LOGGER.error("No diskcache found at %s",
                                        aparser.cachefile)
294 295
                rucio_data.LOGGER.debug("Waiting %.1f s", aparser.daemon_sleep)
                time.sleep(aparser.daemon_sleep)
James Clark's avatar
James Clark committed
296 297

        last_check = time.time()
298 299 300 301 302 303 304
        rsets[rset]['diskcache'] = DiskCacheFile(
            aparser.cachefile,
            minimum_gps=rsets[rset]['minimum-gps'],
            maximum_gps=rsets[rset]['maximum-gps'],
            regexp=rsets[rset]['regexp'],
            prune=True,
            update_file_count=True)
James Clark's avatar
James Clark committed
305 306 307
    #
    # Begin Daemon loop
    #
James Clark's avatar
James Clark committed
308
    check_cache = True
James Clark's avatar
James Clark committed
309 310
    while not GRACEFUL_STOP.is_set():
        # FIXME do something useful with this GRACEFUL_STOP
James Clark's avatar
James Clark committed
311

312 313
        rucio_data.LOGGER.info(
            "--------------------------------------------------")  # noqa: E501
James Clark's avatar
James Clark committed
314

315
        for rset in rsets:
James Clark's avatar
James Clark committed
316

317 318 319 320 321
            # Refresh the diskcache to update the last-modified time
            msg = rsets[rset]['diskcache'].refresh()
            if msg is not None:
                rucio_data.LOGGER.error(msg)

James Clark's avatar
James Clark committed
322
            # If check enabled or if diskcache has updated, heck for new data
323
            if check_cache or rsets[rset]['diskcache'].mtime() > last_check:
James Clark's avatar
James Clark committed
324

325
                rucio_data.LOGGER.info("%s: looking for new data", rset)
James Clark's avatar
James Clark committed
326

327 328
                # Inject each rset (ignores pre-registered files)
                inject_data(rset, rsets[rset], aparser)
James Clark's avatar
James Clark committed
329

James Clark's avatar
James Clark committed
330
                # Reset
James Clark's avatar
James Clark committed
331
                check_cache = True
James Clark's avatar
James Clark committed
332 333

            else:
334
                rucio_data.LOGGER.info("%s: diskcache not modified", rset)
James Clark's avatar
James Clark committed
335 336 337 338

        if aparser.run_once:
            break

James Clark's avatar
James Clark committed
339
        # Update last-modified time to that of the cache
James Clark's avatar
James Clark committed
340 341 342
        last_check = time.time()

        # Initialised
James Clark's avatar
James Clark committed
343
        check_cache = False
James Clark's avatar
James Clark committed
344 345

        # Snooze to allow cache updates
James Clark's avatar
James Clark committed
346
        rucio_data.LOGGER.info("Going to sleep for %.1f s...",
James Clark's avatar
James Clark committed
347
                               aparser.daemon_sleep)
James Clark's avatar
James Clark committed
348
        time.sleep(aparser.daemon_sleep)
James Clark's avatar
James Clark committed
349

350
    return SUCCESS
351 352


353
def inject_data(label, rset, aparser):
James Clark's avatar
James Clark committed
354 355
    """
    Execute the main registration loop
James Clark's avatar
James Clark committed
356

357 358
    This could be a good place to attach different metadata and for logging,
    too.
James Clark's avatar
James Clark committed
359
    """
360 361 362 363 364
    dataset = rucio_data.DatasetInjector(
        label,
        data=rset,
        force_checksums=aparser.force_checksums,
        lifetime=aparser.lifetime)
James Clark's avatar
James Clark committed
365

James Clark's avatar
James Clark committed
366
    if aparser.dry_run:
367 368
        rucio_data.LOGGER.info(
            "Dry run: ending process before rucio interactions")
James Clark's avatar
James Clark committed
369
        return SUCCESS
370

James Clark's avatar
James Clark committed
371
    # Register files for replication
James Clark's avatar
James Clark committed
372
    rucio_data.LOGGER.info("Adding files")
373 374
    #dataset.upload_files()
    dataset.register_files()
James Clark's avatar
James Clark committed
375
    rucio_data.LOGGER.info("Files Added")
James Clark's avatar
James Clark committed
376

James Clark's avatar
James Clark committed
377
    return SUCCESS
James Clark's avatar
James Clark committed
378

James Clark's avatar
James Clark committed
379

James Clark's avatar
James Clark committed
380
#########################################################################
381

James Clark's avatar
James Clark committed
382
if __name__ == "__main__":
James Clark's avatar
James Clark committed
383

James Clark's avatar
James Clark committed
384 385 386
    #
    # Parse input and choose operation
    #
James Clark's avatar
James Clark committed
387 388
    PARSER = get_parser()
    argcomplete.autocomplete(PARSER)
James Clark's avatar
James Clark committed
389

James Clark's avatar
James Clark committed
390
    if len(sys.argv) == 1:
James Clark's avatar
James Clark committed
391
        PARSER.print_help()
James Clark's avatar
James Clark committed
392 393
        sys.exit(FAILURE)

James Clark's avatar
James Clark committed
394
    ARGS = PARSER.parse_args(sys.argv[1:])
395

James Clark's avatar
James Clark committed
396 397
    if ARGS.verbose:
        rucio_data.LOGGER.setLevel(logging.DEBUG)
James Clark's avatar
James Clark committed
398

399
    #
James Clark's avatar
James Clark committed
400
    # Execution
401
    #
James Clark's avatar
James Clark committed
402
#    try:
James Clark's avatar
James Clark committed
403
    GLOBAL_START_TIME = time.time()
404
    COMMANDS = {'add_files': add_files, 'daemon': daemon}
James Clark's avatar
James Clark committed
405 406 407
    COMMAND = COMMANDS.get(ARGS.which)
    RESULT = COMMAND(ARGS)
    END_TIME = time.time()
408 409
    rucio_data.LOGGER.info("Total uptime: %-0.4f sec.",
                           (time.time() - GLOBAL_START_TIME))
James Clark's avatar
James Clark committed
410
    sys.exit(RESULT)
James Clark's avatar
James Clark committed
411
#   except Exception as error:
James Clark's avatar
James Clark committed
412
#       rucio_data.LOGGER.error("Strange error: {0}".format(error))
James Clark's avatar
James Clark committed
413
#       sys.exit(FAILURE)