Forked from
IGWN Computing and Software / GraceDB / GraceDB Server
2019 commits behind the upstream repository.
Branson Craig Stephens authoredBranson Craig Stephens authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more. 6.60 KiB
from django.conf import settings
from django.db.models import Q
from guardian.shortcuts import assign_perm
from django.contrib.auth.models import Group
from django.utils.functional import wraps
from django.http import HttpResponseForbidden
from gracedb.models import Event
import os
# A convenient wrapper for permission checks.
def user_has_perm(user, shortname, obj):
codename = shortname + '_%s' % obj.__class__.__name__.lower()
return user.has_perm(codename, obj)
# Filter a queryset of Event objects according to user permissions.
# This relies on the storage of perm info on the event itself, and is
# a much faster alternative to guardian.shortcuts.get_objects_for_user
# when there are many objects.
def filter_events_for_user(events, user, shortname):
# If user is None, return empty queryset
if not user:
return Event.objects.none()
if not user.groups.count():
return Event.objects.none()
if not events:
return Event.objects.none()
auth_filter = Q()
for group in user.groups.all():
perm_string = '%s_can_%s' % (, shortname)
auth_filter = auth_filter | Q(perms__contains=perm_string)
return events.filter(auth_filter)
# Create default permission objects for an event. This is intended
# to be used upon event creation. By default only internal LVC users
# will be able to view or annotate an event.
def assign_default_event_perms(event):
# Retrieve the group objects
executives = Group.objects.get(name=settings.EXEC_GROUP)
internal = Group.objects.get(name=settings.LVC_GROUP)
# Need to find the *type* of event. Could be a subclass.
model = event.__class__
model_name = model.__name__.lower()
view_codename = 'view_%s' % model_name
change_codename = 'change_%s' % model_name
# Assign the permissions
for g in [executives, internal]:
assign_perm(view_codename, g, event)
assign_perm(change_codename, g, event)
# If the event is an MDC event, then we expose it to LV-EM also
if and == 'MDC':
lvem = Group.objects.get(name=settings.LVEM_GROUP)
assign_perm(view_codename, lvem, event)
assign_perm(change_codename, lvem, event)
# A wrapper for views that checks whether the user is internal, and if not
# returns a 403.
def internal_user_required(view):
def inner(request, *args, **kwargs):
# XXX Should probably move this list of internal groups into settings.
internal_groups = Group.objects.filter(
name__in=[settings.LVC_GROUP, settings.EXEC_GROUP])
if not set(list(internal_groups)) & set(list(request.user.groups.all())):
return HttpResponseForbidden("Forbidden")
return view(request, *args, **kwargs)
return inner
# A wrapper for views that checks whether the user is in the LV-EM group, and if not
# returns a 403.
def lvem_user_required(view):
def inner(request, *args, **kwargs):
# XXX Should probably move this list of internal groups into settings.
lvem_groups = [Group.objects.get(name=settings.LVEM_GROUP)]
if not set(list(lvem_groups)) & set(list(request.user.groups.all())):
return HttpResponseForbidden("Forbidden")
return view(request, *args, **kwargs)
return inner
# A utility for determining whether a user is 'external' (i.e., not part of the
# LVC). This is useful for controlling which pieces of information to display
# in a view.
def is_external(user):
if user:
user_groups = [ for g in user.groups.all()]
if settings.LVC_GROUP not in user_groups:
return True
return False
return True
# if user.username == 'branson.stephens@LIGO.ORG':
# return False
# return True
# A utility for determining whether an external user should have access to a
# particular file, given the event and filename. This is done by finding the
# log message associated with that file and checking that the log message is
# tagged for external access. Returns True if the user should have access, and
# False if not. Note that this presumes that the user is external and does not
# check this.
def get_file_version(filename):
version = None
base_filename = filename
if len(filename.split(',')) > 1:
toks = filename.split(',')
base_filename = toks[0]
version = int(toks[1])
return base_filename, version
def check_external_file_access(event, filename):
filename, version = get_file_version(filename)
if version is None:
# Figure out the version by following the link
filepath = os.path.join(event.datadir(), filename)
if os.path.islink(filepath):
target_file = os.path.realpath(filepath)
target_basename = os.path.basename(target_file)
filename, version = get_file_version(target_basename)
return False
# find the associated log message
logs = event.eventlog_set.filter(filename=filename, file_version=version)
count = logs.count()
log = None
if count == 0:
# Could not find logfile
# XXX Write log
return False
elif count > 1:
# There should only be one file. Ugh. What's going on?
# XXX Write log
return False
log = logs[0]
# check access
tagnames = [ for t in log.tag_set.all()]
if settings.EXTERNAL_ACCESS_TAGNAME not in tagnames:
return False
return True