Skip to content
Snippets Groups Projects

SciTokens and X.509 cert authentication

Closed Duncan Meacher requested to merge (removed):master into master
5 unresolved threads
2 files
+ 153
0
Compare changes
  • Side-by-side
  • Inline
Files
2
+ 137
0
import re
import traceback
import scitokens
from flask import request
from functools import wraps
issuer = "https://ligo.org/oauth"
aud = "https://ligo.org/oauth"
def validate(**outer_kwargs):
def real_decorator(some_function):
@wraps(some_function)
def wrapper(*args, **kwargs):
# Check for SciToken, use if available
if 'Authorization' in request.headers:
# Get token from header
bearer = request.headers.get("Authorization")
if len(bearer.split()) != 2:
headers = {
'WWW-Authenticate': 'Bearer'
}
return ("Authentication header incorrect format", 401, headers)
serialized_token = bearer.split()[1]
try:
# Load public key
public_key = open('/home/duncan.meacher/test_scitokens.pub','rb').read()
# Deserialize token
token = scitokens.SciToken.deserialize(serialized_token, audience = outer_kwargs['audience'], public_key = public_key)
except Exception as e:
print(str(e))
traceback.print_exc()
headers = {
'WWW-Authenticate': 'Bearer'
}
return ("Unable to deserialize: %{}".format(str(e)), 401, headers)
# Validate token
def check_scope(value):
if value == outer_kwargs['scp']:
return True
else:
return False
def check_iss(value):
if value == issuer:
return True
else:
return False
def check_sub(value):
with open('/etc/grid-security/grid-mapfile') as gridmap:
if value in gridmap.read():
return True
else:
return False
def return_true(value):
return True
validator = scitokens.Validator()
validator.add_validator('scp', check_scope)
validator.add_validator('iss', check_iss)
validator.add_validator('sub', check_sub)
# the jwt library already validates the below in the deserialization
validator.add_validator('iat', return_true)
validator.add_validator('exp', return_true)
validator.add_validator('nbf', return_true)
validator.add_validator('aud', return_true)
validator.add_validator('jti', return_true)
try:
validator.validate(token)
except scitokens.scitokens.ClaimInvalid as ce:
headers = {
'WWW-Authenticate': 'Bearer'
}
return ("Validation incorrect", 403, headers)
# Enforce scope
enforcer = scitokens.Enforcer(issuer, audience=aud)
enforcer.generate_acls(token)
try:
enforcer.test(token, "read", "/protected")
except scitokens.scitokens.EnforcementError as ee:
headers = {
'WWW-Authenticate': 'Bearer'
}
return ("Incorrect scope", 403, headers)
# If no SciToken, check for and use X509 certificate
elif 'SSL_CLIENT_S_DN' and 'SSL_CLIENT_I_DN' in request.headers:
# Get subject and issuer from header
subject_dn_header = request.headers.get('SSL_CLIENT_S_DN')
issuer_dn_header = request.headers.get('SSL_CLIENT_I_DN')
# Clean up impersonation proxies. See:
# https://git.ligo.org/lscsoft/gracedb/-/blob/master/gracedb/api/backends.py#L119
subject_pattern = re.compile(r'^(.*?)(/CN=\d+)*$')
subject = subject_pattern.match(subject_dn_header).group(1)
# Check if subject is contained within grid-mapfile
with open('/etc/grid-security/grid-mapfile') as gridmap:
Please register or sign in to reply
if subject in gridmap.read():
    • Suggested change
      106 if subject in gridmap.read():
      107 pass
      108 else:
      106 for line in gridmap:
      107 thissubj = _parse_gridmap_subject(line)
      108 if subject == thissubj:
      109 break
      110 else:

      where _parse_gridmap_subject is just [ref]:

      def _parse_gridmap_subject(line):
          parts = line.strip().split('"')
          if len(parts) in {2, 3}:
             return parts[1]
          if len(parts) == 1:
             return parts[0]
          raise RuntimeError("error parsing gridmap line: {!r}".format(line))
Please register or sign in to reply
pass
else:
headers = {
'SSL_CLIENT_S_DN': '',
'SSL_CLIENT_I_DN': ''
}
return ("Subject not in grid-mapfile", 401, headers)
# Check if issuer is in allowed list
temp = '(?:% s)' % '|'.join(outer_kwargs['x509_issuer_list'])
if issuer_dn_header in outer_kwargs['x509_issuer_list'] or re.match(temp, subject):
pass
else:
headers = {
'SSL_CLIENT_S_DN': '',
'SSL_CLIENT_I_DN': ''
}
return ("x509 issuer not in accepted list", 401, headers)
else:
headers = {
'WWW-Authenticate': 'Bearer',
'SSL_CLIENT_S_DN': '',
'SSL_CLIENT_I_DN': ''
}
return ("No Authentication Header", 401, headers)
return some_function(*args, **kwargs)
return wrapper
return real_decorator
Loading