Skip to content
Snippets Groups Projects

Improvements to validating incoming scitokens

@@ -9,15 +9,17 @@ import re
import scitokens
from flask import request, current_app
from functools import wraps
from jwt import DecodeError
from scitokens.utils.errors import InvalidTokenFormat
from jwt import InvalidTokenError
from scitokens.utils.errors import SciTokensException
__author__ = 'Duncan Meacher <duncan.meacher@ligo.org>'
def _get_scitokens_params():
config = current_app.config
audience = config['scitokens_audience'].split(",")
audience = config['scitokens_audience']
if isinstance(audience, str):
audience = [audience]
scope = config['scitokens_scope']
issuer = config['scitokens_issuer']
return audience, scope, issuer
@@ -52,7 +54,7 @@ def _get_auth_type():
return authType
def _validate_scitoken(request, audience=None, scope=None, issuer=None):
def _validate_scitoken(request):
# Get token from header
bearer = request.headers.get("Authorization")
auth_type, serialized_token = bearer.split()
@@ -61,49 +63,27 @@ def _validate_scitoken(request, audience=None, scope=None, issuer=None):
except AssertionError:
raise RuntimeError("Invalid header format")
# get server params
audience, scope, issuer = _get_scitokens_params()
# Deserialize token
try:
token = scitokens.SciToken.deserialize(
serialized_token,
audience=audience)
except (InvalidTokenFormat, DecodeError):
raise RuntimeError("Unable to deserialize token: Invalid token format")
# Validate token
def check_scope(value):
return scope in value
def check_iss(value):
return value == issuer
validator = scitokens.Validator()
validator.add_validator('scope', check_scope)
validator.add_validator('iss', check_iss)
# FIXME: Remove if test case statement for pytest
if audience == ['TEST']:
def return_true(value):
return True
validator.add_validator('iat', return_true)
validator.add_validator('nbf', return_true)
validator.add_validator('exp', return_true)
validator.add_validator('aud', return_true)
validator.add_validator('jti', return_true)
try:
validator.validate(token)
except scitokens.scitokens.ClaimInvalid:
raise RuntimeError("Token validation failed")
# Enforce scitoken logic
enforcer = scitokens.Enforcer(issuer, audience)
serialized_token,
# deserialize all tokens, enforce audience later
audience={"ANY",} | set(audience),
)
except (InvalidTokenError, SciTokensException) as exc:
raise RuntimeError(f"Unable to deserialize token: {exc}")
try:
enforcer.generate_acls(token)
except scitokens.scitokens.ClaimInvalid:
raise RuntimeError("Enforcement failed")
enforcer = scitokens.Enforcer(
issuer,
audience=audience,
)
authz, path = scope.split(":", 1)
if not enforcer.test(token, authz, path):
raise RuntimeError("token enforcement failed")
current_app.logger.info('User SciToken authorised.')
@@ -141,9 +121,8 @@ def validate(func):
and 'Authorization' in request.headers
):
current_app.logger.info('View request with SciToken.')
audience, scope, issuer = _get_scitokens_params()
try:
_validate_scitoken(request, audience, scope, issuer)
_validate_scitoken(request)
except RuntimeError as exc:
msg = "SciToken authentication failed: {!r}"\
.format(exc)
Loading