Newer
Older
# MPIB <https://www.mpib-berlin.mpg.de/>,
# MPI-CBS <https://www.cbs.mpg.de/>,
# MPIP <http://www.psych.mpg.de/>
#
# This file is part of Castellum.
#
# Castellum is free software; you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation; either version 3 of the License, or
# (at your option) any later version.
#
# Castellum is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public
# License along with Castellum. If not, see
# <http://www.gnu.org/licenses/>.
from django.core.exceptions import PermissionDenied
from django.core.mail import EmailMessage
from django.core.mail import get_connection
from django.utils.translation import gettext_lazy as _
from castellum.castellum_auth.mixins import PermissionRequiredMixin
from castellum.contacts.mixins import BaseContactUpdateView
from castellum.recruitment.mixins import BaseAttributeSetUpdateView
from castellum.recruitment.models.attributesets import get_description_by_statistics_rank
from castellum.studies.models import Study
from castellum.subjects.mixins import BaseAdditionalInfoUpdateView
from castellum.subjects.mixins import BaseDataProtectionUpdateView
from castellum.subjects.models import Subject
from .mixins import ParticipationMixin
from .models import AttributeDescription
from .models import AttributeSet
from .models import Participation
logger = logging.getLogger(__name__)
"""Get all subjects who are recruitable for this study, i.e. apply all filters."""
qs = Subject.objects.filter(
filter_queries.has_consent(),
filter_queries.already_in_study(study),
)
last_contacted = timezone.now() - settings.CASTELLUM_PERIOD_BETWEEN_CONTACT_ATTEMPTS
qs = qs.annotate(last_contacted=models.Max(
'participation__revisions__created_at',
participation__revisions__status=Participation.NOT_CONTACTED
),
)).filter(
models.Q(last_contacted__lte=last_contacted) |
models.Q(last_contacted__isnull=True)
)
return list(qs)
class RecruitmentView(StudyMixin, PermissionRequiredMixin, ListView):
permission_required = 'recruitment.view_participation'
def dispatch(self, request, *args, **kwargs):
self.sort = self.request.GET.get('sort', self.request.session.get('recruitment_sort'))
self.request.session['recruitment_sort'] = self.sort
return super().dispatch(request, *args, **kwargs)
order_by = ['-status_not_reached', '-updated_at']
elif self.sort == 'statistics':
desc1 = get_description_by_statistics_rank('primary')
desc2 = get_description_by_statistics_rank('secondary')
order_by = [
'-subject__attributeset__data__' + desc1.json_key,
'-subject__attributeset__data__' + desc2.json_key,
'-updated_at',
]
order_by = [
'-followup_urgent', 'status', 'followup_date', 'followup_time', '-updated_at'
]
.filter(status__in=self.shown_status, study=self.study)\
participations = Participation.objects.filter(
study=self.study, status=Participation.INVITED
).prefetch_related('subject__attributeset')
buckets1 = AttributeDescription.get_statistics_buckets('primary')
buckets2 = AttributeDescription.get_statistics_buckets('secondary')
if len(buckets1) == 1:
return None
statistics = defaultdict(lambda: defaultdict(lambda: 0))
for participation in participations:
attributeset = participation.subject.attributeset
key1 = attributeset.get_statistics_bucket('primary')
key2 = attributeset.get_statistics_bucket('secondary')
statistics[key1][key2] += 1
except AttributeSet.DoesNotExist:
pass
data = {
'rows': [{
'label': str(label1),
'values': [statistics[key1][key2] for key2, label2 in buckets2]
} for key1, label1 in buckets1],
if len(buckets2) > 1:
data['legend'] = [str(label2) for key2, label2 in buckets2]
renderer = StackedColumnRenderer(width=760, height=320)
buckets1 = dict(AttributeDescription.get_statistics_buckets('primary'))
buckets2 = dict(AttributeDescription.get_statistics_buckets('secondary'))
for participation in self.get_queryset():
try:
attributeset = participation.subject.attributeset
bucket1 = buckets1[attributeset.get_statistics_bucket('primary')]
bucket2 = buckets2[attributeset.get_statistics_bucket('secondary')]
buckets = ', '.join(str(bucket) for bucket in [bucket1, bucket2] if bucket)
except AttributeSet.DoesNotExist:
buckets = ''
can_access = self.request.user.has_privacy_level(
participation.subject.privacy_level
participations.append((participation, buckets, can_access))
context['participations'] = participations
('last_contact_attempt', _('Last contact attempt')),
('statistics', _('Statistics')),
context['sort_label'] = sort_options.get(self.sort, _('Relevance'))
context['statistics'] = self.get_statistics()
queryset = Participation.objects.filter(study=self.study)
context['all_count'] = queryset.count()
context['invited_count'] = queryset.filter(
context['unsuitable_count'] = queryset.filter(
context['open_count'] = queryset.filter(status__in=[
Participation.NOT_CONTACTED,
Participation.NOT_REACHED,
Participation.FOLLOWUP_APPOINTED,
Participation.AWAITING_RESPONSE,
context['last_agreeable_contact_time'] = (
datetime.date.today() - settings.CASTELLUM_PERIOD_BETWEEN_CONTACT_ATTEMPTS
shown_status = [Participation.INVITED]
class RecruitmentViewUnsuitable(RecruitmentView):
shown_status = [Participation.UNSUITABLE]
class RecruitmentViewOpen(RecruitmentView):
shown_status = [
Participation.NOT_CONTACTED,
Participation.NOT_REACHED,
Participation.FOLLOWUP_APPOINTED,
Participation.AWAITING_RESPONSE,
def create_participations(self, batch_size):
subjects = get_recruitable(self.study)
added = min(batch_size, len(subjects))
for subject in random.sample(subjects, k=added):
Participation.objects.create(study=self.study, subject=subject)
def post(self, request, *args, **kwargs):
if not request.user.has_perm('recruitment.add_participation', obj=self.study):
if self.study.min_subject_count == 0:
messages.error(request, _(
'This study does not require participants. '
'Please, contact the responsible person who can set it up.'
))
return self.get(request, *args, **kwargs)
not_contacted_count = self.get_queryset()\
.filter(status=Participation.NOT_CONTACTED).count()
hard_limit = self.study.min_subject_count * settings.CASTELLUM_RECRUITMENT_HARD_LIMIT_FACTOR
if not_contacted_count >= hard_limit:
messages.error(request, _(
'Application privacy does not allow you to add more subjects. '
'Please contact provided subjects before adding new ones.'
return self.get(request, *args, **kwargs)
# Silently trim down to respect hard limit.
# In many cases, the soft limit warning will already be shown in that case.
batch_size = min(
settings.CASTELLUM_RECRUITMENT_BATCH_SIZE,
hard_limit - not_contacted_count,
)
added = self.create_participations(batch_size)
except KeyError:
added = 0
messages.error(request, _(
'The custom filter that is set for this study does not exist.'
))
if not_contacted_count + added > settings.CASTELLUM_RECRUITMENT_SOFT_LIMIT:
messages.error(request, _(
'Such workflow is not intended due to privacy reasons. '
'Please contact provided subjects before adding new ones.'
))
if added == 0:
messages.error(
self.request,
_('No potential participants could be found for this study.'),
)
elif added < batch_size:
messages.warning(
self.request,
_(
'Only {added} out of {batchsize} potential participants '
'could be found for this study. These have been added.'
).format(added=added, batchsize=batch_size),
return self.get(request, *args, **kwargs)
class MailRecruitmentView(StudyMixin, PermissionRequiredMixin, FormView):
permission_required = 'recruitment.view_participation'
template_name = 'recruitment/mail.html'
tab = 'mail'
form_class = SendMailForm
replace_first = mail_body.replace("{firstname}", contact.first_name)
replace_second = replace_first.replace("{lastname}", contact.last_name)
def get_mail_settings(self):
from_email = settings.CASTELLUM_RECRUITMENT_EMAIL or settings.DEFAULT_FROM_EMAIL
host=settings.CASTELLUM_RECRUITMENT_EMAIL_HOST,
port=settings.CASTELLUM_RECRUITMENT_EMAIL_PORT,
username=settings.CASTELLUM_RECRUITMENT_EMAIL_USER,
password=settings.CASTELLUM_RECRUITMENT_EMAIL_PASSWORD,
use_tls=settings.CASTELLUM_RECRUITMENT_EMAIL_USE_TLS,
return from_email, connection
def get_contact_email(self, contact):
if contact.email:
return contact.email
else:
for data in contact.guardians.exclude(email='').values('email'):
return data['email']
def create_participations(self, batch_size):
subjects = get_recruitable(self.study)
from_email, connection = self.get_mail_settings()
while counter < batch_size and subjects:
pick = subjects.pop(random.randrange(0, len(subjects)))
email = self.get_contact_email(pick.contact)
self.personalize_mail_body(self.study.mail_body, pick.contact),
reply_to=[self.study.mail_reply_address],
try:
success = mail.send()
except ValueError:
logger.debug('Failed to send email to subject %i', pick.pk)
success = False
status=Participation.AWAITING_RESPONSE,
return counter
def send_confirmation_mail(self, counter):
from_email, connection = self.get_mail_settings()
'%i emails have been sent.\n\n---\n\n%s' % (counter, self.study.mail_body),
connection=connection,
batch_size = form.cleaned_data['batch_size']
try:
added = self.create_participations(batch_size)
except KeyError:
added = 0
messages.error(self.request, _(
'The custom filter that is set for this study does not exist.'
))
if added == 0:
messages.error(
self.request,
_(
'No potential participants with email addresses could be found for '
'this study.'
)
elif added < batch_size:
messages.warning(
self.request,
_(
'Only {added} out of {batchsize} potential participants with email '
'addresses could be found for this study. These have been contacted.'
).format(added=added, batchsize=batch_size),
)
else:
messages.success(self.request, _('Emails sent successfully!'))
MailBatch.objects.create(
study=self.study,
contacted_size=added,
)
class ContactView(ParticipationMixin, PermissionRequiredMixin, UpdateView):
model = Participation
template_name = 'recruitment/contact.html'
permission_required = (
'contacts.view_contact',
'recruitment.change_participation',
def get_initial(self):
initial = super().get_initial()
initial['status'] = Participation.UNSUITABLE
initial['followup_date'] = None
initial['followup_time'] = None
def get_object(self):
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
choices = [(None, '---')] + list(Participation.STATUS_OPTIONS[1:])
status_field = context['form'].fields['status']
status_field.widget.choices = choices
context['context'] = self.request.GET.get('context')
context = self.request.GET.get('context')
return reverse(context, args=[self.object.subject.pk])
return reverse('recruitment:recruitment-open', args=[self.object.study.pk])
class RecruitmentUpdateMixin(ParticipationMixin):
def get_success_url(self):
return reverse('recruitment:contact', args=[self.kwargs['study_pk'], self.kwargs['pk']])
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['base_template'] = "recruitment/base.html"
return context
class ContactUpdateView(RecruitmentUpdateMixin, BaseContactUpdateView):
return self.participation.subject.contact
class AttributeSetUpdateView(RecruitmentUpdateMixin, BaseAttributeSetUpdateView):
def get_object(self):
return self.participation.subject.attributeset
class DataProtectionUpdateView(RecruitmentUpdateMixin, BaseDataProtectionUpdateView):
return self.participation.subject
class AdditionalInfoUpdateView(RecruitmentUpdateMixin, BaseAdditionalInfoUpdateView):
return self.participation.subject
class CalendarView(StudyMixin, PermissionRequiredMixin, BaseCalendarView):
model = Study
permission_required = 'recruitment.view_participation'
def get_appointments(self):
return super().get_appointments().filter(session__study=self.object)