Commit f10d40f9 authored by Tanner Prestegard's avatar Tanner Prestegard Committed by GraceDB

events: add custom managers for Pipeline model

parent eedfa2c3
from django.db import models
# Custom managers for the Pipeline model --------------------------------------
class ProductionPipelineManager(models.Manager):
"""Pipelines which are production search pipelines"""
def get_queryset(self):
return super(ProductionPipelineManager, self).get_queryset().filter(
pipeline_type=self.model.PIPELINE_TYPE_SEARCH_PRODUCTION
)
class ExternalPipelineManager(models.Manager):
"""Pipelines which correspond to external experiments"""
def get_queryset(self):
return super(ExternalPipelineManager, self).get_queryset().filter(
pipeline_type=self.model.PIPELINE_TYPE_EXTERNAL
)
......@@ -37,6 +37,9 @@ from cStringIO import StringIO
from hashlib import sha1
import shutil
from .managers import ProductionPipelineManager, ExternalPipelineManager
UserModel = get_user_model()
SERVER_TZ = pytz.timezone(settings.TIME_ZONE)
......@@ -84,6 +87,11 @@ class Pipeline(models.Model):
pipeline_type = models.CharField(max_length=2,
choices=PIPELINE_TYPE_CHOICES)
# Add custom managers; must manually define 'objects' as well
objects = models.Manager()
production_objects = ProductionPipelineManager()
external_objects = ExternalPipelineManager()
class Meta:
permissions = (
('manage_pipeline', 'Can enable or disable pipeline'),
......
try:
from unittest import mock
except ImportError: # python < 3
import mock
import pytest
from django.urls import reverse
......@@ -52,6 +48,8 @@ def test_pipeline_change_views(view, standard_user, client):
# Create a pipeline
p, _ = Pipeline.objects.get_or_create(name='fake_pipeline')
p.pipeline_type = Pipeline.PIPELINE_TYPE_SEARCH_PRODUCTION
p.save(update_fields=['pipeline_type'])
# NOTE: get() is wired to post() in the view
response = client.get(reverse(view, args=[p.pk]))
......@@ -69,11 +67,10 @@ def test_pipeline_change_views_as_advocate(view, em_advocate_user, client):
# Create a pipeline
p, _ = Pipeline.objects.get_or_create(name='fake_pipeline')
p.pipeline_type = Pipeline.PIPELINE_TYPE_SEARCH_PRODUCTION
p.save(update_fields=['pipeline_type'])
# NOTE: get() is wired to post() in the view
with mock.patch('events.views.PIPELINE_LIST', new_callable=list) \
as mock_pipeline_list:
mock_pipeline_list.append(p.name)
response = client.get(reverse(view, args=[p.pk]))
response = client.get(reverse(view, args=[p.pk]))
assert response.status_code == 302
......@@ -975,7 +975,6 @@ def modify_signoff(request, event):
# Managing pipeline submissions -----------------------------------------------
PIPELINE_LIST = ['gstlal', 'pycbc', 'MBTAOnline', 'CWB', 'oLIB', 'spiir']
PIPELINE_LOG_ACTION_DICT = dict(PipelineLog.PIPELINE_LOG_ACTION_CHOICES)
@method_decorator(internal_user_required(raise_exception=True),
......@@ -986,8 +985,7 @@ class PipelineManageView(ListView):
log_number = 10
def get_queryset(self):
qs = Pipeline.objects.filter(name__in=PIPELINE_LIST).order_by('name')
return qs
return Pipeline.production_objects.order_by('name')
def get_context_data(self, **kwargs):
context = super(PipelineManageView, self).get_context_data(**kwargs)
......@@ -1033,8 +1031,7 @@ class PipelineEnableView(UpdateView):
success_url = reverse_lazy('manage-pipelines')
def get_queryset(self):
qs = Pipeline.objects.filter(name__in=PIPELINE_LIST)
return qs
return Pipeline.production_objects.all()
def get(self, request, *args, **kwargs):
return self.post(request, *args, **kwargs)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment