Skip to content
forms.py 10.1 KiB
Newer Older
Bengfort's avatar
Bengfort committed
# (c) 2018-2020
Bengfort's avatar
Bengfort committed
#     MPIB <https://www.mpib-berlin.mpg.de/>,
#     MPI-CBS <https://www.cbs.mpg.de/>,
#     MPIP <http://www.psych.mpg.de/>
#
# This file is part of Castellum.
#
# Castellum is free software; you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Castellum is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with Castellum. If not, see
# <http://www.gnu.org/licenses/>.

Bengfort's avatar
Bengfort committed
import itertools
import json
import os

from django import forms
from django.core.exceptions import ObjectDoesNotExist
Bengfort's avatar
Bengfort committed
from django.db import models
from django.utils.translation import gettext_lazy as _
from castellum.studies.models import Study
from castellum.utils.forms import BaseImportForm
Bengfort's avatar
Bengfort committed
from castellum.utils.forms import DateTimeField
from castellum.utils.forms import DisabledChoiceField
from castellum.utils.forms import DisabledModelChoiceField

Bengfort's avatar
Bengfort committed
from .models import Appointment
from .models import AttributeDescription
from .models import AttributeSet
from .models import Participation
from .models import SubjectFilter
Bengfort's avatar
Bengfort committed
from .models.appointments import MINUTE
from .models.attributesets import ANSWER_DECLINED
from .models.attributesets import UNCATEGORIZED
APP_DIR = os.path.dirname(__file__)
IMPORT_SCHEMA = json.load(open(os.path.join(APP_DIR, 'schemas', 'attributes.json')))
Bengfort's avatar
Bengfort committed
def get_description_choices(user):
    choices = [(None, '---')]
    disabled_choices = []

    categories = AttributeDescription.objects.by_category()

    for description in categories.pop(UNCATEGORIZED):
        choices.append((description.pk, description.filter_label))
        if not user.has_privacy_level(description.privacy_level_read):
            disabled_choices.append(description.pk)
Bengfort's avatar
Bengfort committed
    for category, descriptions in categories.items():
        sub = []
        for description in descriptions:
            sub.append((description.pk, description.filter_label))
            if not user.has_privacy_level(description.privacy_level_read):
                disabled_choices.append(description.pk)
        choices.append((category.label, sub))

    return choices, disabled_choices
class SubjectFilterAddForm(forms.Form):
    description = DisabledModelChoiceField(AttributeDescription.objects.none(), required=False)

Bengfort's avatar
Bengfort committed
    def __init__(self, user, *args, **kwargs):
        super().__init__(*args, **kwargs)

        choices, disabled_choices = get_description_choices(user)
        self.fields["description"].choices = choices
        self.fields["description"].widget.disabled_choices = disabled_choices

class SubjectFilterForm(forms.ModelForm):
    class Meta:
        model = SubjectFilter
        fields = "__all__"

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

        selected_description = self._get_selected_description(**kwargs)
Bengfort's avatar
Bengfort committed
        self.fields["value"] = selected_description.field.filter_formfield(required=True)
Bengfort's avatar
Bengfort committed
        self.fields["operator"] = forms.ChoiceField(
Bengfort's avatar
Bengfort committed
            choices=selected_description.field.available_operators
        )
Bengfort's avatar
Bengfort committed
            if self.instance.group.study.status != Study.EDIT:
                self.fields["value"].widget.attrs["disabled"] = True
                self.fields["operator"].widget.attrs["disabled"] = True
        except ObjectDoesNotExist:
            pass
    def _get_selected_description(self, instance=None, data=None, prefix=None, **kwargs):
            return instance.description
Bengfort's avatar
Bengfort committed

        key = prefix + '-description' if prefix else 'description'
        if data and data[key]:
            try:
                return AttributeDescription.objects.get(pk=data[key])
            except AttributeDescription.DoesNotExist:
                return None

        try:
            return kwargs['initial']['description']
Bengfort's avatar
Bengfort committed
        except KeyError:
            return None
Bengfort's avatar
Bengfort committed

class SubjectFilterFormSet(forms.BaseModelFormSet):
    def clean(self):
        super().clean()

        values = []
        for form in self.forms:
            if not form.cleaned_data.get('DELETE', False):
                values.append((
                    form.cleaned_data.get('description'),
                    form.cleaned_data.get('operator'),
                    form.cleaned_data.get('value'),
                ))

        if len(set(values)) != len(values):
            raise forms.ValidationError(_(
                'There are duplicates in filters. Please change or delete filters!'
            ), code='invalid')


class AttributeSetForm(forms.ModelForm):

    class Meta:
        model = AttributeSet
        exclude = ('data',)
        widgets = {
            'study_type_disinterest': forms.CheckboxSelectMultiple(),
        }

Bengfort's avatar
Bengfort committed
    def __init__(self, instance=None, **kwargs):
        if not kwargs.get('initial'):
Bengfort's avatar
Bengfort committed
            kwargs['initial'] = {}
            for key, value in instance.get_data().items():
                kwargs['initial'][key] = None if value == ANSWER_DECLINED else value
        super().__init__(instance=instance, **kwargs)
Bengfort's avatar
Bengfort committed

Bengfort's avatar
Bengfort committed
    def clean(self):
        cleaned_data = super().clean()
        for key in cleaned_data:
            if key + '_answer_declined' in self.data:
                cleaned_data[key] = ANSWER_DECLINED
        study_type_disinterest = cleaned_data.pop('study_type_disinterest')
        return {
            'study_type_disinterest': study_type_disinterest,
            'data': cleaned_data,
        }
Bengfort's avatar
Bengfort committed

Bengfort's avatar
Bengfort committed
    def save(self):
        self.instance.data.update(self.cleaned_data['data'])
        return super().save()
Bengfort's avatar
Bengfort committed

    @classmethod
    def factory(cls, user, obj=None):
        allowed_descriptions = AttributeDescription.objects.allowed_write(user, obj=obj)
        form_fields = {desc.json_key: desc.field.formfield() for desc in allowed_descriptions}
        return type('AttributeSetForm', (cls,), form_fields)


class ContactForm(forms.ModelForm):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

Bengfort's avatar
Bengfort committed
        for session, appointment in self.instance.get_appointments():
            key = 'appointment-%i' % session.pk
            self.fields[key] = DateTimeField(
                label=self.format_session_name(session),
                initial=appointment.start if appointment else None,
                required=False,
            )
Bengfort's avatar
Bengfort committed

        model_status = Participation._meta.get_field('status')
        self.fields['status'] = DisabledChoiceField(
            label=model_status.verbose_name,
            choices=model_status.choices,
            coerce=int,
        )
        if not self.instance.match:
            self.fields['status'].widget.disabled_choices = [
                choice for choice, label in Participation.STATUS_OPTIONS
                if choice != Participation.UNSUITABLE
            ]
        elif self.instance.match == 'incomplete':
            self.fields['status'].widget.disabled_choices = [Participation.INVITED]
    def format_session_name(self, session):
        duration = _('%imin') % session.duration
        types = ', '.join(str(t) for t in session.type.order_by('pk'))
        if types:
            return '%s (%s) - %s' % (session.name, types, duration)
        else:
            return '%s - %s' % (session.name, duration)
Bengfort's avatar
Bengfort committed
    def clean(self):
        cleaned_data = super().clean()

        appointments = []
        for session in self.instance.study.studysession_set.all():
            key = 'appointment-%i' % session.pk
            start = cleaned_data.get(key)
            if start:
                appointments.append((session, key, start, start + session.duration * MINUTE))

        for a, b in itertools.combinations(appointments, 2):
            __, key1, start1, end1 = a
            __, key2, start2, end2 = b
            if start1 < end2 and start2 < end1:
                self.add_error(key1, _('Appointments must not overlap.'))
                self.add_error(key2, _('Appointments must not overlap.'))
                break

        qs = Appointment.objects\
            .exclude(participation=self.instance)\
            .filter(participation__status=Participation.INVITED)\
Bengfort's avatar
Bengfort committed
            .annotate(end=models.ExpressionWrapper(
                models.F('start') + models.F('session__duration') * MINUTE,
                output_field=models.DateTimeField(),
            ))

        for session, key, start, end in appointments:
            if session.resource and qs.filter(
                session__resource=session.resource, end__gt=start, start__lt=end,
            ).exists():
                self.add_error(key, _('The required resource is not available at this time'))

Bengfort's avatar
Bengfort committed
    def save(self):
        pariticipationrequest = super().save()

        for session, appointment in pariticipationrequest.get_appointments():
            start = self.cleaned_data.get('appointment-%i' % session.pk)

            if appointment:
                if not start:
                    appointment.delete()
                elif start != appointment.start:
                    appointment.start = start
                    appointment.save()
            elif start:
                pariticipationrequest.appointment_set.create(
                    session=session,
                    start=start,
                )

        return pariticipationrequest

    @property
    def appointments(self):
        for name in self.fields:
            if name.startswith('appointment-'):
                yield self[name]

    class Meta:
        model = Participation
Bengfort's avatar
Bengfort committed
        fields = ['status', 'followup_date', 'followup_time', 'exclusion_criteria_checked']
Hayat's avatar
Hayat committed


class SendMailForm(forms.Form):
Bengfort's avatar
Bengfort committed
    batch_size = forms.IntegerField(
        min_value=1, label=_('How many subjects do you want to contact?')
    )


class CategoryImportForm(BaseImportForm):
    schema = IMPORT_SCHEMA
Bengfort's avatar
Bengfort committed
    schema_ref = '#/$defs/AttributeCategoryExport'


class DescriptionImportForm(BaseImportForm):
    schema = IMPORT_SCHEMA
Bengfort's avatar
Bengfort committed
    schema_ref = '#/$defs/AttributeDescriptionExport'