Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cron/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@
from rq import Retry, Queue

DEFAULT_QUEUE = Queue('default_queue', connection=REDIS_CLIENT, failure_ttl='72h')
TOPPER_QUEUE = Queue('topper_queue', connection=REDIS_CLIENT, failure_ttl='72h')
TOPPER_QUEUE = Queue('topper_queue', connection=REDIS_CLIENT, failure_ttl='72h')
CERTIFICATE_QUEUE = Queue('certificate_queue',connection=REDIS_CLIENT,failure_ttl='72h')
36 changes: 36 additions & 0 deletions cron/migrations/0006_certificatebatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.11 on 2026-02-08 23:16
from __future__ import unicode_literals

from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion


class Migration(migrations.Migration):

dependencies = [
('events', '0057_auto_20260127_1633'),
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('cron', '0005_asynccronmail_ers_job_id'),
]

operations = [
migrations.CreateModel(
name='CertificateBatch',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('batch_type', models.IntegerField(choices=[(1, 'TEST'), (2, 'TRAINING')])),
('status', models.IntegerField(choices=[(0, 'QUEUED'), (1, 'RUNNING'), (2, 'DONE'), (3, 'FAILED')], default=0)),
('rq_job_id', models.CharField(blank=True, max_length=128, null=True)),
('created_at', models.DateTimeField(auto_now_add=True)),
('started_at', models.DateTimeField(blank=True, null=True)),
('completed_at', models.DateTimeField(blank=True, null=True)),
('output_path', models.TextField(blank=True, null=True)),
('error', models.TextField(blank=True, null=True)),
('created_by', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to=settings.AUTH_USER_MODEL)),
('test', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='events.Test')),
('training', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.CASCADE, to='events.TrainingRequest')),
],
),
]
27 changes: 27 additions & 0 deletions cron/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,30 @@ def __str__(self):
return self.subject


class CertificateBatch(models.Model):
TYPE_CHOICES = (
(1, "TEST"),
(2, "TRAINING")
)

STATUS_CHOICES = (
(0, "QUEUED"),
(1, "RUNNING"),
(2, "DONE"),
(3, "FAILED"),
)

batch_type = models.IntegerField(choices=TYPE_CHOICES)
test = models.ForeignKey("events.Test", null=True, blank=True, on_delete=models.CASCADE)
training = models.ForeignKey("events.TrainingRequest", null=True, blank=True, on_delete=models.CASCADE)

status = models.IntegerField(choices=STATUS_CHOICES, default=0)
rq_job_id = models.CharField(max_length=128, blank=True, null=True)

created_by = models.ForeignKey(User, on_delete=models.CASCADE)
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)

output_path = models.TextField(null=True, blank=True)
error = models.TextField(null=True, blank=True)
284 changes: 281 additions & 3 deletions cron/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
os.environ["DJANGO_SETTINGS_MODULE"] = "spoken.settings"
application = get_wsgi_application()

from .models import AsyncCronMail
from datetime import datetime
from .models import AsyncCronMail, CertificateBatch
from datetime import datetime, date
from django.utils import timezone
from django.conf import settings
import uuid
Expand All @@ -21,7 +21,7 @@
from smtplib import SMTPException, SMTPServerDisconnected
from django.core.mail import BadHeaderError
from rq.decorators import job
from cron import REDIS_CLIENT, DEFAULT_QUEUE, TOPPER_QUEUE
from cron import REDIS_CLIENT, DEFAULT_QUEUE, TOPPER_QUEUE, CERTIFICATE_QUEUE
from rq import Retry
import time
from rq import get_current_job
Expand All @@ -37,6 +37,52 @@
from events.helpers import get_fossmdlcourse
from django.db import close_old_connections
# from events.views import update_events_log, update_events_notification
import logging
import traceback
from io import BytesIO
from datetime import timedelta
from reportlab.pdfgen import canvas
from reportlab.platypus import Paragraph
from reportlab.lib.styles import ParagraphStyle
from reportlab.lib.units import cm
from PyPDF2 import PdfFileWriter, PdfFileReader
from events import certificates as certs
from events.certificates import (
get_test_certificate,
get_training_certificate,
get_signature,
get_test_cert_text,
get_training_cert_text,
)
import random
import string

logger = logging.getLogger(__name__)


def _id_generator(size=6, chars=string.ascii_uppercase + string.digits):
return ''.join(random.choice(chars) for _ in range(size))


def _custom_strftime(format, t):
return t.strftime(format)


def _ensure_certificate_date():
if isinstance(certs.EDUPYRAMIDS_CERTIFICATE_DATE, date):
return
if not certs.EDUPYRAMIDS_CERTIFICATE_DATE:
certs.EDUPYRAMIDS_CERTIFICATE_DATE = date.max
return
for fmt in ("%Y-%m-%d", "%d-%m-%Y"):
try:
certs.EDUPYRAMIDS_CERTIFICATE_DATE = datetime.strptime(
certs.EDUPYRAMIDS_CERTIFICATE_DATE, fmt
).date()
return
except ValueError:
continue
certs.EDUPYRAMIDS_CERTIFICATE_DATE = date.max



Expand Down Expand Up @@ -408,3 +454,235 @@ def async_test_post_save(test, user, message):
job_timeout='24h'
)
print(f"\033[92m Added async_test_post_save job successfully \033[0m")


def _merge_overlay_page(output, template_path, overlay_buffer):
# Merge overlay onto template and add to output
with open(template_path, "rb") as template_file:
template_bytes = template_file.read()
page = PdfFileReader(BytesIO(template_bytes)).getPage(0)
overlay = PdfFileReader(BytesIO(overlay_buffer.getvalue())).getPage(0)
if hasattr(page, "merge_page"):
page.merge_page(overlay)
else:
page.mergePage(overlay)
output.addPage(page)


def _build_test_overlay(ta, test, mdluser, mdlgrade):
_ensure_certificate_date()
img_temp = BytesIO()
img_doc = canvas.Canvas(img_temp)

if ta.test.training.department.id != 169:
img_doc.setFont('Helvetica', 18, leading=None)
img_doc.drawCentredString(211, 115, _custom_strftime('%d %B %Y', test.tdate))

img_doc.setFillColorRGB(0, 0, 0)
img_doc.setFont('Helvetica', 10, leading=None)
img_doc.drawString(10, 6, ta.password)

img_path = get_signature(ta.test.tdate)
img_doc.drawImage(img_path, 600, 95, 150, 76)

credits = "<p><b>Credits:</b> " + str(test.foss.credits) + "&nbsp&nbsp&nbsp<b>Score:</b> " + str('{:.2f}'.format(mdlgrade.grade)) + "%</p>"
text = get_test_cert_text(ta.test, mdluser, credits=credits)
centered = ParagraphStyle(
name='centered',
fontSize=15,
leading=24,
alignment=1,
spaceAfter=20
)
p = Paragraph(text, centered)
p.wrap(700, 200)
p.drawOn(img_doc, 3 * cm, 6.5 * cm)

text = "Certificate for Completion of <br/>" + test.foss.foss + " Training"
centered = ParagraphStyle(
name='centered',
fontSize=25,
leading=25,
alignment=1,
spaceAfter=15
)
p = Paragraph(text, centered)
p.wrap(500, 20)
p.drawOn(img_doc, 6.2 * cm, 17 * cm)

img_doc.save()
return img_temp


def _build_training_overlay(ta, training_end):
_ensure_certificate_date()
img_temp = BytesIO()
img_doc = canvas.Canvas(img_temp)

img_doc.setFont('Helvetica', 35, leading=None)
img_doc.drawCentredString(405, 480, "Certificate of Participation")

if ta.training.department.id != 169:
img_doc.setFont('Helvetica', 18, leading=None)
img_doc.drawCentredString(211, 115, _custom_strftime('%d %B %Y', training_end))

img_doc.setFillColorRGB(211, 211, 211)
img_doc.setFont('Helvetica', 10, leading=None)
img_doc.drawString(10, 6, "")

img_path = get_signature(ta.training.training_start_date)
img_doc.drawImage(img_path, 600, 100, 150, 76)

text = get_training_cert_text(ta)
centered = ParagraphStyle(
name='centered',
fontSize=16,
leading=30,
alignment=0,
spaceAfter=20
)
p = Paragraph(text, centered)
p.wrap(630, 200)
p.drawOn(img_doc, 4.2 * cm, 7 * cm)
img_doc.save()
return img_temp


def _generate_test_certificates(batch):
if not batch.test_id:
raise ValueError("Test batch is missing test")

test = Test.objects.select_related(
'training',
'training__department',
'academic',
'foss',
'organiser__user',
'invigilator__user'
).get(pk=batch.test_id)

test_attendances = TestAttendance.objects.select_related(
'test',
'test__training',
'test__training__department',
'test__academic',
'test__foss',
'test__organiser__user',
'test__invigilator__user'
).filter(test_id=batch.test_id)

quiz_ids = {ta.mdlquiz_id for ta in test_attendances}
user_ids = {ta.mdluser_id for ta in test_attendances}
grades = MdlQuizGrades.objects.using('moodle').filter(quiz__in=quiz_ids, userid__in=user_ids)
grades_by_key = {(g.quiz, g.userid): g for g in grades}
users_by_id = MdlUser.objects.using('moodle').in_bulk(user_ids)

output = PdfFileWriter()

for ta in test_attendances:
mdlgrade = grades_by_key.get((ta.mdlquiz_id, ta.mdluser_id))
mdluser = users_by_id.get(ta.mdluser_id)
if not mdlgrade or not mdluser:
continue
if ta.status < 1 or round(mdlgrade.grade, 1) < 40:
continue

if ta.password:
certificate_pass = ta.password
else:
pad_len = max(0, 10 - len(str(ta.mdluser_id)))
certificate_pass = str(ta.mdluser_id) + _id_generator(pad_len)
ta.password = certificate_pass

ta.count += 1
ta.status = 4
ta.save(update_fields=["password", "count", "status", "updated"])

overlay = _build_test_overlay(ta, test, mdluser, mdlgrade)
template_path = get_test_certificate(ta)
_merge_overlay_page(output, template_path, overlay)

return output


def _generate_training_certificates(batch):
if not batch.training_id:
raise ValueError("Training batch is missing training")

ta_list = TrainingAttend.objects.select_related(
'student__user',
'training',
'training__department',
'training__course__foss',
'training__training_planner__academic',
'training__training_planner__organiser__user'
).filter(training_id=batch.training_id)

output = PdfFileWriter()

for ta in ta_list:
training_end = ta.training.sem_start_date + timedelta(days=60)
overlay = _build_training_overlay(ta, training_end)
template_path = get_training_certificate(ta)
_merge_overlay_page(output, template_path, overlay)

return output


def _get_certificate_output_path(batch):
cert_dir = os.path.join(settings.MEDIA_ROOT, 'certificates')
os.makedirs(cert_dir, exist_ok=True)
type_label = 'test' if batch.batch_type == 1 else 'training'
filename = "certificate_batch_%s_%s.pdf" % (batch.id, type_label)
rel_path = os.path.join('certificates', filename)
abs_path = os.path.join(settings.MEDIA_ROOT, rel_path)
return rel_path, abs_path


def generate_certificate_batch(batch_id):
# Generate merged certificates
close_old_connections()
_ensure_certificate_date()
batch = CertificateBatch.objects.select_related('test', 'training').get(pk=batch_id)
logger.info("Starting certificate batch %s", batch.id)

batch.status = 1
batch.started_at = timezone.now()
batch.error = None
batch.save(update_fields=["status", "started_at", "error"])

try:
if batch.batch_type == 1:
output = _generate_test_certificates(batch)
elif batch.batch_type == 2:
output = _generate_training_certificates(batch)
else:
raise ValueError("Unsupported certificate batch type: %s" % batch.batch_type)

rel_path, abs_path = _get_certificate_output_path(batch)
with open(abs_path, "wb") as output_file:
output.write(output_file)

batch.output_path = rel_path
batch.status = 2
batch.completed_at = timezone.now()
batch.save(update_fields=["output_path", "status", "completed_at"])
logger.info("Completed certificate batch %s", batch.id)
except Exception:
batch.status = 3
batch.error = traceback.format_exc()
batch.completed_at = timezone.now()
batch.save(update_fields=["status", "error", "completed_at"])
logger.exception("Certificate batch %s failed", batch.id)
raise


def async_generate_certificate_batch(batch):
job = CERTIFICATE_QUEUE.enqueue(
generate_certificate_batch,
batch.pk,
job_timeout='72h'
)
batch.rq_job_id = job.id
batch.status = 0
batch.save(update_fields=["rq_job_id", "status"])
Loading