Forked from
IGWN Computing and Software / GraceDB / GraceDB Server
1155 commits behind the upstream repository.
-
Tanner Prestegard authoredTanner Prestegard authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
backends.py 11.20 KiB
import base64
import logging
import OpenSSL.crypto
import OpenSSL.SSL
import re
from django.contrib.auth import get_user_model, authenticate
from django.conf import settings
from django.http import HttpResponseForbidden
from django.utils import timezone
from django.utils.http import unquote, unquote_plus
from django.utils.translation import ugettext_lazy as _
from django.urls import resolve
from rest_framework import authentication, exceptions
from ligoauth.models import X509Cert
from .utils import is_api_request
# Set up logger
logger = logging.getLogger(__name__)
class GraceDbBasicAuthentication(authentication.BasicAuthentication):
api_only = True
def authenticate(self, request, *args, **kwargs):
"""
Same as base class, except we require the request to be directed
toward the basic auth API.
"""
# Make sure this request is directed to the API
if self.api_only and not is_api_request(request.path):
return None
# Call base class authenticate() method
return super(GraceDbBasicAuthentication, self).authenticate(request)
def authenticate_credentials(self, userid, password, request=None):
"""
Add a hacky password expiration check to the inherited method.
"""
user_auth_tuple = super(GraceDbBasicAuthentication, self) \
.authenticate_credentials(userid, password, request)
user = user_auth_tuple[0]
# Check password expiration
# NOTE: This is *super* hacky because we are using date_joined to store
# the date when the password was set. See managePassword() in
# userprofile.views.
password_expiry = user.date_joined + settings.PASSWORD_EXPIRATION_TIME
if timezone.now() > password_expiry:
msg = ('Your password has expired. Please log in to the web '
'interface and request another.')
raise exceptions.AuthenticationFailed(_(msg))
return user_auth_tuple
class GraceDbX509Authentication(authentication.BaseAuthentication):
"""
Authentication based on X509 certificate subject.
Certificate should be verified by Apache already.
"""
api_only = True
www_authenticate_realm = 'api'
subject_dn_header = getattr(settings, 'X509_SUBJECT_DN_HEADER',
'SSL_CLIENT_S_DN')
issuer_dn_header = getattr(settings, 'X509_ISSUER_DN_HEADER',
'SSL_CLIENT_I_DN')
proxy_pattern = re.compile(r'^(.*?)(/CN=\d+)*$')
def authenticate(self, request):
# Make sure this request is directed to the API
if self.api_only and not is_api_request(request.path):
return None
# Try to get credentials from request headers.
user_cert_dn = self.get_cert_dn_from_request(request)
# If no user dn is found, pass on to the next auth method
if not user_cert_dn:
return None
return self.authenticate_credentials(user_cert_dn)
@classmethod
def authenticate_header(cls, request):
return 'X509 realm="{0}"'.format(cls.www_authenticate_realm)
@classmethod
def get_cert_dn_from_request(cls, request):
"""Get SSL headers and return DN for user"""
# Get subject and issuer DN from SSL headers
certdn = request.META.get(cls.subject_dn_header, None)
issuer = request.META.get(cls.issuer_dn_header, '')
# Handled proxied certificates
certdn = cls.extract_subject_from_proxied_cert(certdn, issuer)
return certdn
@classmethod
def extract_subject_from_proxied_cert(cls, subject, issuer):
"""
Handles the case of "impersonation proxies", where /CN=[0-9]+ is
appended to the end of the certificate subject. This occurs when you
generate a certificate and it "follows" you to another machine - you
effectively self-sign a copy of the certificate to use on the other
machine.
Example:
Albert generates a certificate with ligo-proxy-init on his laptop.
Subject and issuer when he pings the GraceDB server from his laptop:
/DC=org/DC=cilogon/C=US/O=LIGO/CN=Albert Einstein albert.einstein@ligo.org
/DC=org/DC=cilogon/C=US/O=CILogon/CN=CILogon Basic CA 1
Subject and issuer when he gsisshs to an LDG cluster and then pings the
GraceDB server from there:
/DC=org/DC=cilogon/C=US/O=LIGO/CN=Albert Einstein albert.einstein@ligo.org/CN=1492637212
/DC=org/DC=cilogon/C=US/O=LIGO/CN=Albert Einstein albert.einstein@ligo.org
If he then gsisshs to *another* machine from there and repeats this,
he would get:
/DC=org/DC=cilogon/C=US/O=LIGO/CN=Albert Einstein albert.einstein@ligo.org/CN=1492637212/CN=28732493
/DC=org/DC=cilogon/C=US/O=LIGO/CN=Albert Einstein albert.einstein@ligo.org/CN=1492637212
"""
if subject and issuer and subject.startswith(issuer):
# If we get here, we have an impersonation proxy, so we extract
# the proxy /CN=12345... part from the subject. Could also
# do it from the issuer (see above examples)
subject = cls.proxy_pattern.match(subject).group(1)
return subject
def authenticate_credentials(self, user_cert_dn):
certs = X509Cert.objects.filter(subject=user_cert_dn)
if not certs.exists():
raise exceptions.AuthenticationFailed(_('Invalid certificate '
'subject'))
cert = certs.first()
# Handle incorrect number of users for a certificate
num_users = cert.users.count()
if (num_users > 1):
raise exceptions.AuthenticationFailed(_('Multiple users have the '
'same certificate subject'))
elif (num_users == 0):
raise exceptions.AuthenticationFailed(_('No user found for this '
'certificate'))
user = cert.users.first()
# Check if user is active
if not user.is_active:
raise exceptions.AuthenticationFailed(
_('User inactive or deleted'))
return (user, None)
class GraceDbX509CertInfosAuthentication(GraceDbX509Authentication):
"""
Authentication based on X509 "infos" header.
Certificate should be verified by Traefik already.
"""
api_only = True
infos_header = getattr(settings, 'X509_INFOS_HEADER',
'HTTP_X_FORWARDED_TLS_CLIENT_CERT_INFOS')
infos_pattern = re.compile(r'Subject="(.*?)".*Issuer="(.*?)"')
@classmethod
def get_cert_dn_from_request(cls, request):
"""Get SSL headers and return subject for user"""
# Get infos from request headers
infos = request.META.get(cls.infos_header, None)
# Unquote (handle pluses -> spaces)
infos_unquoted = unquote_plus(infos)
# Extract subject and issuer
subject, issuer = cls.infos_pattern.search(infos_unquoted).groups()
# Convert formats
subject = cls.convert_format(subject)
issuer = cls.convert_format(issuer)
# Handled proxied certificates
subject = cls.extract_subject_from_proxied_cert(subject, issuer)
return subject
@staticmethod
def convert_format(s):
# Convert subject or issuer strings from comma to slash format
s = s.replace(',', '/')
if not s.startswith('/'):
s = '/' + s
return s
class GraceDbX509FullCertAuthentication(GraceDbX509Authentication):
"""
Authentication based on a full X509 certificate. We verify the
certificate here.
"""
api_only = True
www_authenticate_realm = 'api'
cert_header = getattr(settings, 'X509_CERT_HEADER',
'HTTP_X_FORWARDED_TLS_CLIENT_CERT')
def authenticate(self, request):
# Make sure this request is directed to the API
if self.api_only and not is_api_request(request.path):
return None
# Try to get certificate from request headers
cert_data = self.get_certificate_data_from_request(request)
# If no certificate is found, abort
if not cert_data:
return None
# Verify certificate
try:
certificate = self.verify_certificate_chain(cert_data)
except exceptions.AuthenticationFailed as e:
raise
except Exception as e:
raise exceptions.AuthenticationFailed(_('Certificate could not be '
'verified'))
return self.authenticate_credentials(certificate)
@classmethod
def get_certificate_data_from_request(cls, request):
"""Get certificate data from request"""
cert_quoted = request.META.get(cls.cert_header, None)
if cert_quoted is None:
return None
# Process the certificate a bit
cert_b64 = unquote(cert_quoted)
cert_der = base64.b64decode(cert_b64)
return cert_der
def verify_certificate_chain(self, cert_data, capath=settings.CAPATH):
# Load certificate data
certificate = OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_ASN1, cert_data)
# Set up context and get certificate store
ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
ctx.load_verify_locations(None, capath=capath)
store = ctx.get_cert_store()
# Verify certificate
store_ctx = OpenSSL.crypto.X509StoreContext(store, certificate)
store_ctx.verify_certificate()
# Check if expired
if certificate.has_expired():
raise exceptions.AuthenticationFailed(_('Certificate has expired'))
return certificate
def authenticate_credentials(self, certificate):
# Get subject and issuer
subject = self.get_certificate_subject_string(certificate)
issuer = self.get_certificate_issuer_string(certificate)
# Handled proxied certificates
subject = self.extract_subject_from_proxied_cert(subject, issuer)
# Authenticate credentials
return super(GraceDbX509FullCertAuthentication, self) \
.authenticate_credentials(subject)
@staticmethod
def get_certificate_subject_string(certificate):
subject = certificate.get_subject()
subject_string = '/' + "/".join(["=".join(c) for c in
subject.get_components()])
return subject_string
@staticmethod
def get_certificate_issuer_string(certificate):
issuer = certificate.get_issuer()
issuer_string = '/' + "/".join(["=".join(c) for c in
issuer.get_components()])
return issuer_string
class GraceDbAuthenticatedAuthentication(authentication.BaseAuthentication):
"""
If user is already authenticated by the main Django middleware,
don't make them authenticate again.
This is mostly (only?) used for access to the web-browsable API when
the user is already authenticated via Shibboleth.
"""
api_only = True
def authenticate(self, request):
# Make sure this request is directed to the API
if self.api_only and not is_api_request(request.path):
return None
if (hasattr(request, '_request') and hasattr(request._request, 'user')
and hasattr(request._request.user, 'is_authenticated') and
request._request.user.is_authenticated):
return (request._request.user, None)
else:
return None