Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 35 additions & 11 deletions core/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import logging
import io
import unicodedata
import pandas as pd

from django.core.mail import EmailMultiAlternatives
Expand Down Expand Up @@ -35,32 +36,55 @@ def get_users_online_cache_key() -> str:

class XlsxFileToExport:
"""
Writing data to `xlsx` file.
`filename` must contain `.xlsx` format prefix.
All data on 1 page.
Формирует XLSX в памяти.
`filename` сохранён для совместимости, но не используется для записи на диск.
Все данные пишутся на один лист.
"""

def __init__(self, filename="output.xlsx"):
self.filename = filename
self._buffer = None

def write_data_to_xlsx(self, data: list[dict], sheet_name: str = "scores") -> None:
try:
data_frames = pd.DataFrame(data)
with pd.ExcelWriter(self.filename) as writer:
buffer = io.BytesIO()
with pd.ExcelWriter(buffer, engine="openpyxl") as writer:
data_frames.to_excel(writer, sheet_name=sheet_name, index=False)
buffer.seek(0)
self._buffer = buffer
except Exception as e:
logger.error(f"Write export rates data error: {str(e)}", exc_info=True)
raise

def get_binary_data_from_self_file(self) -> bytes:
try:
with open(self.filename, "rb") as f:
binary_data = f.read()
return binary_data
if not self._buffer:
raise ValueError("XLSX buffer is empty")
return self._buffer.getvalue()
except Exception as e:
logger.error(f"Read export rates data error: {str(e)}", exc_info=True)
raise

def delete_self_xlsx_file_from_local_machine(self) -> None:
if os.path.isfile(self.filename) and self.filename.endswith(".xlsx"):
os.remove(self.filename)
def clear_buffer(self) -> None:
if self._buffer:
self._buffer.close()
self._buffer = None


def sanitize_filename(filename: str) -> str:
normalized_name = unicodedata.normalize("NFKD", filename)
safe_chars = [
char
for char in normalized_name
if char.isalnum() or char in ("-", "_", " ", ".")
]
cleaned_name = "".join(safe_chars)
return " ".join(cleaned_name.split())


def ascii_filename(filename: str) -> str:
safe_name = sanitize_filename(filename)
ascii_name = "".join(char if char.isascii() else "_" for char in safe_name)
ascii_name = " ".join(ascii_name.split())
return ascii_name or "export"
138 changes: 15 additions & 123 deletions partner_programs/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
import tablib
from django import forms
from django.contrib import admin
from django.db.models import Prefetch, QuerySet
from django.db.models import QuerySet
from django.http import HttpRequest, HttpResponse
from django.urls import path
from django.utils import timezone

from core.utils import XlsxFileToExport
from core.utils import XlsxFileToExport, ascii_filename, sanitize_filename
from mailing.views import MailingTemplateRender
from partner_programs.models import (
PartnerProgram,
Expand All @@ -19,7 +19,7 @@
PartnerProgramProject,
PartnerProgramUserProfile,
)
from project_rates.models import Criteria, ProjectScore
from partner_programs.services import prepare_project_scores_export_data


class PartnerProgramMaterialInline(admin.StackedInline):
Expand Down Expand Up @@ -234,18 +234,22 @@ def get_export_rates_view(self, request, object_id):
xlsx_file_writer = XlsxFileToExport()
xlsx_file_writer.write_data_to_xlsx(rates_data_to_write)
binary_data_to_export: bytes = xlsx_file_writer.get_binary_data_from_self_file()
xlsx_file_writer.delete_self_xlsx_file_from_local_machine()

encoded_file_name: str = urllib.parse.quote(
f"{PartnerProgram.objects.get(pk=object_id).name}_оценки {timezone.now().strftime('%d-%m-%Y %H:%M:%S')}"
f".xlsx"
)
xlsx_file_writer.clear_buffer()

program_name = PartnerProgram.objects.get(pk=object_id).name
date_suffix = timezone.now().strftime("%d.%m.%y")
base_name = f"scores - {program_name or 'program'} - {date_suffix}"
safe_name = sanitize_filename(base_name)
encoded_file_name: str = urllib.parse.quote(f"{safe_name}.xlsx")
fallback_filename = f"{ascii_filename(base_name)}.xlsx"
response = HttpResponse(
binary_data_to_export,
content_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
)
response["Content-Disposition"] = (
f"attachment; filename*=UTF-8''{encoded_file_name}"
"attachment; "
f"filename=\"{fallback_filename}\"; "
f"filename*=UTF-8''{encoded_file_name}"
)
return response

Expand All @@ -256,119 +260,7 @@ def _get_prepared_rates_data_for_export(self, program_id: int) -> list[dict]:
критерии → комментарий.
Если у проекта несколько экспертов, на каждый проект-эксперт создаётся отдельная строка.
"""
criterias = list(
Criteria.objects.filter(partner_program__id=program_id)
.select_related("partner_program")
.order_by("id")
)
if not criterias:
return []

comment_criteria = next(
(criteria for criteria in criterias if criteria.name == "Комментарий"),
None,
)
criterias_without_comment = [
criteria for criteria in criterias if criteria != comment_criteria
]

program_fields = list(
PartnerProgramField.objects.filter(partner_program_id=program_id).order_by(
"id"
)
)

scores = (
ProjectScore.objects.filter(criteria__in=criterias)
.select_related("user", "criteria", "project")
.order_by("project_id", "criteria_id", "id")
)
scores_dict: dict[int, list[ProjectScore]] = {}
for score in scores:
scores_dict.setdefault(score.project_id, []).append(score)

if not scores_dict:
empty_row: dict[str, str] = {
"Название проекта": "",
"Фамилия эксперта": "",
}
for field in program_fields:
empty_row[field.label] = ""
for criteria in criterias_without_comment:
empty_row[criteria.name] = ""
if comment_criteria:
empty_row["Комментарий"] = ""
return [empty_row]

project_ids = list(scores_dict.keys())

field_values_prefetch = Prefetch(
"field_values",
queryset=PartnerProgramFieldValue.objects.select_related("field").filter(
program_project__partner_program_id=program_id,
program_project__project_id__in=project_ids,
),
to_attr="_prefetched_field_values",
)
program_projects = (
PartnerProgramProject.objects.filter(
partner_program_id=program_id, project_id__in=project_ids
)
.select_related("project")
.prefetch_related(field_values_prefetch)
)
program_project_by_project_id: dict[int, PartnerProgramProject] = {
link.project_id: link for link in program_projects
}

prepared_projects_rates_data: list[dict] = []
for project_id, project_scores in scores_dict.items():
project_link = program_project_by_project_id.get(project_id)
project = (
project_link.project
if project_link
else (project_scores[0].project if project_scores else None)
)

field_values_map: dict[int, str] = {}
field_values = (
getattr(project_link, "_prefetched_field_values", None)
if project_link
else None
)
if field_values:
for field_value in field_values:
field_values_map[field_value.field_id] = field_value.get_value()

scores_by_expert: dict[int, list[ProjectScore]] = {}
for score in project_scores:
scores_by_expert.setdefault(score.user_id, []).append(score)

for _, expert_scores in scores_by_expert.items():
row_data: dict[str, str] = {}
row_data["Название проекта"] = (
getattr(project, "name", "") if project else ""
)
row_data["Фамилия эксперта"] = (
expert_scores[0].user.last_name if expert_scores else ""
)

for field in program_fields:
row_data[field.label] = field_values_map.get(field.id, "")

scores_map: dict[int, str] = {
score.criteria_id: score.value for score in expert_scores
}

for criteria in criterias_without_comment:
row_data[criteria.name] = scores_map.get(criteria.id, "")

if comment_criteria:
row_data["Комментарий"] = scores_map.get(comment_criteria.id, "")

prepared_projects_rates_data.append(row_data)

return prepared_projects_rates_data
return prepare_project_scores_export_data(program_id)


@admin.register(PartnerProgramUserProfile)
Expand Down
25 changes: 25 additions & 0 deletions partner_programs/permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,28 @@ def has_permission(self, request, view):
return True

return program.experts.filter(user=request.user).exists()


class IsAdminOrManagerOfProgram(BasePermission):
"""
Доступ разрешён только админам и менеджерам конкретной программы.
"""

def has_permission(self, request, view):
user = request.user
if not user or not user.is_authenticated:
return False

if getattr(user, "is_staff", False) or getattr(user, "is_superuser", False):
return True

program_id = view.kwargs.get("pk") or view.kwargs.get("program_id")
if not program_id:
return False

try:
program = PartnerProgram.objects.get(pk=program_id)
except PartnerProgram.DoesNotExist:
return False

return program.is_manager(user)
Loading