diff --git a/article/sources/xmlsps.py b/article/sources/xmlsps.py index 7501501f9..6fc512ed3 100755 --- a/article/sources/xmlsps.py +++ b/article/sources/xmlsps.py @@ -22,17 +22,25 @@ from packtools.sps.models.v2.article_toc_sections import ArticleTocSections from packtools.sps.models.v2.related_articles import RelatedArticles from packtools.sps.pid_provider.xml_sps_lib import XMLWithPre +from rapidfuzz import fuzz, process from article import choices -from article.models import Article, ArticleFunding, DocumentAbstract, DocumentTitle, DataAvailabilityStatement +from article.models import ( + Article, + ArticleFunding, + DataAvailabilityStatement, + DocumentAbstract, + DocumentTitle, +) from core.models import Language from core.utils.extracts_normalized_email import extracts_normalized_email from doi.models import DOI from institution.models import Sponsor from issue.models import Issue, TableOfContents from journal.models import Journal -from location.models import Location -from pid_provider.choices import PPXML_STATUS_DONE, PPXML_STATUS_INVALID +from location.models import City, Country, Location, State +from location.utils import clean_acronym, clean_name +from pid_provider.choices import PPXML_STATUS_INVALID from pid_provider.models import PidProviderXML from researcher.models import Affiliation, InstitutionalAuthor, Researcher from tracker.models import UnexpectedEvent @@ -59,6 +67,215 @@ def add_error(errors, function_name, error, **kwargs): errors.append(error_dict) +def fuzzy_match_official(search_term, official_dict, threshold=85): + """ + Realiza fuzzy matching genérico contra um dicionário de itens oficiais. + + Args: + search_term: Termo a ser buscado + official_dict: Dicionário {chave_busca: objeto} de itens oficiais + threshold: Score mínimo para considerar match (0-100) + + Returns: + tuple: (objeto_matched, score) ou (None, 0) se não encontrar match + """ + if not official_dict or not search_term: + return None, 0 + + result = process.extractOne( + search_term, + official_dict.keys(), + scorer=fuzz.WRatio, + score_cutoff=threshold, + ) + + if result: + matched_key, score, _ = result + matched_obj = official_dict[matched_key] + return matched_obj, score + + return None, 0 + + +def get_country_by_acronym(country_name, user): + """ + Busca país por acronym (2) sem fuzzy matching. + + Args: + country_name: Nome/código do país + user: Usuário para criação + + Returns: + Country: Objeto Country ou None + """ + if len(country_name) == 2 and country_name.isalpha(): + try: + return Country.objects.get(acronym__iexact=country_name, status="OFFICIAL") + except Country.DoesNotExist: + return None + + +def normalize_country(country_name, user, errors, threshold=85): + """ + Normaliza nome de país usando fuzzy matching com dados oficiais. + + Args: + country_name: Nome ou código do país + user: Usuário para criação de registros + errors: Lista para coletar erros + threshold: Score mínimo para fuzzy matching (0-100) + + Returns: + Country: Objeto Country normalizado ou None + """ + if not country_name: + return None + + try: + country_name = str(country_name).strip() + + country_by_acronym = get_country_by_acronym(country_name, user) + if country_by_acronym: + return country_by_acronym + + cleaned_name = clean_name(country_name) + if not cleaned_name: + return None + + try: + return Country.objects.get(name__iexact=cleaned_name, status="OFFICIAL") + except Country.DoesNotExist: + official_countries = Country.objects.filter(status="OFFICIAL") + official_dict = {c.name: c for c in official_countries if c.name} + + matched_country, score = fuzzy_match_official( + search_term=cleaned_name, + official_dict=official_dict, + threshold=threshold, + ) + + if matched_country: + return matched_country + + + return Country.create_or_update( + user=user, + name=cleaned_name, + ) + except Exception as e: + add_error(errors, "normalize_country", e, country_name=country_name) + return None + + +def normalize_state(state_name, user, errors, threshold=85): + """ + Normaliza nome de estado usando fuzzy matching com dados oficiais. + + Args: + state_name: Nome do estado + user: Usuário para criação de registros + errors: Lista para coletar erros + threshold: Score mínimo para fuzzy matching (0-100) + + Returns: + State: Objeto State normalizado ou None + """ + if not state_name: + return None + + try: + cleaned_name = clean_name(state_name) + cleaned_acronym = clean_acronym(state_name) + + if not cleaned_name and not cleaned_acronym: + return None + try: + if cleaned_name and cleaned_acronym: + return State.objects.get( + name__iexact=cleaned_name, + acronym__iexact=cleaned_acronym, + status="OFFICIAL" + ) + elif cleaned_name: + return State.objects.get(name__iexact=cleaned_name, status="OFFICIAL") + except State.DoesNotExist: + official_states = State.objects.filter(status="OFFICIAL") + official_dict = {s.name: s for s in official_states if s.name} + + state_name = str(state_name).strip() + + + matched_state, score = fuzzy_match_official( + search_term=cleaned_name, + official_dict=official_dict, + threshold=threshold, + ) + if matched_state: + return matched_state + + return State.create_or_update( + user=user, + name=cleaned_name, + acronym=cleaned_acronym, + ) + except Exception as e: + add_error(errors, "normalize_state", e, state_name=state_name) + return None + + +def normalize_city(city_name, user, errors): + """ + Normaliza nome de cidade (apenas limpeza, sem fuzzy matching). + + Args: + city_name: Nome da cidade + user: Usuário para criação de registros + errors: Lista para coletar erros + + Returns: + City: Objeto City normalizado ou None + """ + if not city_name: + return None + + try: + city_name = str(city_name).strip() + cleaned_name = clean_name(city_name) + if not cleaned_name: + return None + + return City.get_or_create(name=cleaned_name, user=user) + except Exception as e: + add_error(errors, "normalize_city", e, city_name=city_name) + return None + + +def normalize_location_data(country_name, state_name, city_name, user, errors): + """ + Normaliza dados de localização usando fuzzy matching. + + Args: + country_name: Nome ou código do país + state_name: Nome do estado + city_name: Nome da cidade + user: Usuário para criação + errors: Lista para coletar erros + + Returns: + dict: {"country": Country obj, "state": State obj, "city": City obj} + """ + result = {"country": None, "state": None, "city": None} + + if country_name: + result["country"] = normalize_country(country_name, user, errors) + if state_name: + result["state"] = normalize_state(state_name, user, errors) + if city_name: + result["city"] = normalize_city(city_name, user, errors) + + return result + + def load_article(user, xml=None, file_path=None, v3=None, pp_xml=None): """ Carrega um artigo a partir de XML. @@ -736,11 +953,20 @@ def get_or_create_institution_authors(xmltree, user, item, errors): if collab := author.get("collab"): if affs := author.get("affs"): for aff in affs: - location = Location.create_or_update( - user=user, + # Normalizar dados antes de criar location + normalized_data = normalize_location_data( country_name=aff.get("country_name"), state_name=aff.get("state"), city_name=aff.get("city"), + user=user, + errors=errors + ) + + location = Location.create_or_update( + user=user, + country=normalized_data.get("country"), + state=normalized_data.get("state"), + city=normalized_data.get("city"), ) affiliation = Affiliation.get_or_create( name=aff.get("orgname"), diff --git a/journal/management/__init__.py b/journal/management/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/journal/management/commands/__init__.py b/journal/management/commands/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/journal/management/commands/reload_institutions.py b/journal/management/commands/reload_institutions.py new file mode 100644 index 000000000..9f451b40f --- /dev/null +++ b/journal/management/commands/reload_institutions.py @@ -0,0 +1,181 @@ +from locale import normalize +import logging + +from django.core.management.base import BaseCommand, CommandError +from rapidfuzz import fuzz, process + +from collection.models import Collection +from core.utils.rename_dictionary_keys import rename_dictionary_keys +from core.utils.utils import _get_user +from journal.models import AMJournal, SciELOJournal +from journal.sources.am_field_names import correspondencia_journal +from location import utils +from location.models import Country, State, City, Location +from django.db.models import Q +from journal.sources.am_data_extraction import extract_value +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Reexecuta instituições para obter localização normalizada." + + def add_arguments(self, parser): + parser.add_argument( + "--delete-locations-linked-with-institutions", + action="store_true", + help="Deleta localização de institutições (Publisher, Owner, Sponsor, Copyright)" + ) + parser.add_argument( + "--delete-institutions", + action="store_true", + help="Deleta instituições vinculado aos periódicos" + ) + parser.add_argument( + "--update-am-journals", + type=str, + help="Atualiza periódicos" + ) + parser.add_argument( + "--reload-institutions", + type=str, + help="Atualiza periódicos" + ) + + + def handle(self, *args, **options): + if not any(options.values()): + raise CommandError( + "Informe ao menos uma ação: " + "--delete-locations-linked-with-institutions, --delete-institutions" + ) + if options['delete_locations_linked_with_institutions']: + self.stdout.write("Excluindo localizações de instituições;") + + if options["delete_institutions"]: + self.stdout.write("Excluindo instituições para recarregar.") + if options["update_am_journals"]: + self.stdout.write("Excluindo instituições para recarregar.") + self.update_am_journal(username=options["update_am_journals"]) + if options["reload_institutions"]: + self.stdout.write("Atualizando localização de instituições e criando novas localizações officiais.") + self.replace_locations_official_linked_institutions(username=options["reload_institutions"]) + + + def update_am_journal(self, username): + from journal.tasks import process_journal_article_meta + user = _get_user(request=None, username=username, user_id=None) + items = Collection.objects.filter(collection_type="journals").iterator() + for item in items: + process_journal_article_meta(user=user, limit=None, collection=item.acron3) + + def replace_locations_official_linked_institutions(self, username): + am_journals = AMJournal.objects.all() + for am_journal in am_journals: + try: + scielo_journal = SciELOJournal.objects.filter(issn_scielo=am_journal.pid) + except SciELOJournal.DoesNotExist: + continue + if am_journal.data: + journal_dict = rename_dictionary_keys( + am_journal.data, correspondencia_journal + ) + country_name = extract_value(journal_dict.get("publisher_country")) + state_name = extract_value(journal_dict.get("publisher_state")) + city_name = extract_value(journal_dict.get("publisher_city")) + print(country_name, state_name, city_name) + + + def replace_location_publisher(self, scielo_journal, new_location): + journal = scielo_journal.journal + if publisher_history := journal.publisher_history.all(): + for ph in publisher_history: + if ph.institution and ph.institution.institution: + logging.info(f"Atualizando localização de {ph.institution.institution}") + ph.institution.institution.location = new_location + ph.institution.institution.save() + + def normalize_location(self, country, state, city): + normalize_country = self.normalize_country(country=country) + normalize_state = self.normalize_state(state=state) + normalize_city = self.normalize_city(city=city) + logging.info(normalize_country, normalize_state, normalize_city) + if normalize_country: + country = self.get_official_country(name=normalize_country) + if normalize_state: + state = self.get_official_state(name=normalize_state) + if normalize_city: + city = self.get_official_city(name=normalize_city) + return { + 'country': country, + 'state': state, + 'city': city, + } + + def create_location(self, country, state, city): + location, created = Location.objects.get_or_create( + country=country, + state=state, + city=city + ) + if all([ + country is not None and getattr(country, "status", None) == "OFFICIAL", + state is not None and getattr(state, "status", None) == "OFFICIAL", + city is not None and getattr(city, "status", None) == "OFFICIAL" + ]): + location.status = "OFFICIAL" + location.save() + return location, created + + + def get_official_country(self, name): + fuzz_match = self.fuzzy_match(name=name, type_obj_location=Country) + logging.info(f"Country name fuzz: {fuzz_match}. name: {name}") + try: + return Country.objects.get(Q(name=name) | Q(acronym__iexact=name), status="OFFICIAL") + except Country.DoesNotExist: + country, _ = Country.objects.get_or_create(Q(name=name) | Q(acronym__iexact=name)) + return country + + def get_official_state(self, name): + fuzz_match = self.fuzzy_match(name=name, type_obj_location=State) + name = fuzz_match + try: + return State.objects.get(Q(name=name) | Q(acronym__iexact=name), status="OFFICIAL") + except State.DoesNotExist: + state, _ = State.objects.get_or_create(Q(name=name) | Q(acronym__iexact=name)) + return state + + def get_official_city(self, name): + city, _ = City.objects.get_or_create(name=name) + return city + + def normalize_country(self, country): + country = utils.clean_name(country) + + def normalize_state(self, state): + state = utils.clean_name(state) + + def normalize_city(self, city): + city = utils.clean_name(city) + + def fuzzy_match(self, name, type_obj_location, threshold=85): + matches_found = [] + official = type_obj_location.objects.filter(status="OFFICIAL") + official_names = {c.name: c for c in official} + + result = process.extractOne( + name, + official_names.keys(), + scorer=fuzz.WRatio, + score_cutoff=threshold, + ) + if result: + matched_name, score, _ = result + official = official_names[matched_name] + matches_found.append({ + 'unmatched': name, + 'official': official, + 'score': score, + } + ) + return name diff --git a/journal/migrations/0055_remove_journal_url_oa_alter_journal_digital_pa.py b/journal/migrations/0055_remove_journal_url_oa_alter_journal_digital_pa.py new file mode 100644 index 000000000..405570896 --- /dev/null +++ b/journal/migrations/0055_remove_journal_url_oa_alter_journal_digital_pa.py @@ -0,0 +1,26 @@ +# Generated by Django 5.2.7 on 2025-12-05 14:53 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("journal", "0054_journaltableofcontents"), + ] + + operations = [ + migrations.RemoveField( + model_name="journal", + name="url_oa", + ), + migrations.AlterField( + model_name="journal", + name="digital_pa", + field=models.ManyToManyField( + blank=True, + to="journal.digitalpreservationagency", + verbose_name="Digital Preservation (SciELO)", + ), + ), + ] diff --git a/journal/models.py b/journal/models.py index d16b8d257..d0cbc262c 100755 --- a/journal/models.py +++ b/journal/models.py @@ -358,16 +358,6 @@ class Journal(CommonControlField, ClusterableModel): blank=True, ) - url_oa = models.URLField( - _("Open Science accordance form"), - null=True, - blank=True, - help_text=mark_safe( - _( - """Suggested form: https://wp.scielo.org/wp-content/uploads/Formulario-de-Conformidade-Ciencia-Aberta.docx""" - ) - ), - ) main_collection = models.ForeignKey( Collection, verbose_name=_("Main Collection"), @@ -579,7 +569,7 @@ class Journal(CommonControlField, ClusterableModel): digital_pa = models.ManyToManyField( "DigitalPreservationAgency", blank=True, - verbose_name=_("DigitalPreservationAgency"), + verbose_name=_("Digital Preservation (SciELO)"), ) doi_prefix = models.CharField(max_length=20, blank=True, null=True) valid = models.BooleanField(default=False, null=True, blank=True) @@ -622,6 +612,9 @@ def autocomplete_custom_queryset_filter(search_term): ] panels_scope_and_about = [ + InlinePanel("mission", label=_("Mission"), classname="collapsed"), + InlinePanel("history", label=_("Brief History"), classname="collapsed"), + InlinePanel("focus", label=_("Focus and Scope"), classname="collapsed"), AutocompletePanel("indexed_at"), AutocompletePanel("additional_indexed_at"), AutocompletePanel("subject"), @@ -629,9 +622,6 @@ def autocomplete_custom_queryset_filter(search_term): InlinePanel("thematic_area", label=_("Thematic Areas"), classname="collapsed"), AutocompletePanel("wos_db"), AutocompletePanel("wos_area"), - InlinePanel("mission", label=_("Mission"), classname="collapsed"), - InlinePanel("history", label=_("Brief History"), classname="collapsed"), - InlinePanel("focus", label=_("Focus and Scope"), classname="collapsed"), ] panels_institutions = [ @@ -669,7 +659,11 @@ def autocomplete_custom_queryset_filter(search_term): panels_open_science = [ FieldPanel("open_access"), - FieldPanel("url_oa"), + InlinePanel( + "open_science_compliance", + label=_("Open Science Compliance"), + classname="collapsed", + ), InlinePanel( "file_oa", label=_("Open Science accordance form"), classname="collapsed" ), @@ -677,11 +671,6 @@ def autocomplete_custom_queryset_filter(search_term): InlinePanel("open_data", label=_("Open data"), classname="collapsed"), InlinePanel("preprint", label=_("Preprint"), classname="collapsed"), InlinePanel("review", label=_("Peer review"), classname="collapsed"), - InlinePanel( - "open_science_compliance", - label=_("Open Science Compliance"), - classname="collapsed", - ), ] panels_notes = [InlinePanel("annotation", label=_("Notes"), classname="collapsed")] diff --git a/journal/proxys.py b/journal/proxys.py index 65313967d..58642b307 100644 --- a/journal/proxys.py +++ b/journal/proxys.py @@ -59,7 +59,11 @@ class JournalProxyEditor(Journal): panels_open_science = [ FieldPanel("open_access"), - FieldPanel("url_oa"), + InlinePanel( + "open_science_compliance", + label=_("Open Science Compliance"), + classname="collapsed", + ), InlinePanel( "file_oa", label=_("Open Science accordance form"), classname="collapsed" ), @@ -67,11 +71,6 @@ class JournalProxyEditor(Journal): InlinePanel("open_data", label=_("Open data"), classname="collapsed"), InlinePanel("preprint", label=_("Preprint"), classname="collapsed"), InlinePanel("review", label=_("Peer review"), classname="collapsed"), - InlinePanel( - "open_science_compliance", - label=_("Open Science Compliance"), - classname="collapsed", - ), ] panels_policy = [ diff --git a/journal/sources/am_to_core.py b/journal/sources/am_to_core.py index a5b787e29..f8ec4503b 100644 --- a/journal/sources/am_to_core.py +++ b/journal/sources/am_to_core.py @@ -1,15 +1,9 @@ -import logging import re import sys from datetime import datetime from urllib.parse import urlparse -from core.utils.utils import fetch_data from django.db.models import Q -from django.core.files.base import ContentFile -from wagtail.images.models import Image - -from collection.exceptions import MainCollectionNotFoundError from core.models import Language from institution.models import CopyrightHolder, Owner, Publisher, Sponsor from journal.models import ( @@ -39,12 +33,11 @@ JournalLicense, ) from journal import tasks -from location.models import City, CountryName, Location, State, Country +from location.models import City, Location, State, Country from vocabulary.models import Vocabulary from .am_data_extraction import ( get_issns, - extract_issn_print_electronic, extract_value, extract_value_from_journal_history, extract_value_mission, diff --git a/journal/tests.py b/journal/tests.py index 73a66d89c..021b547e2 100755 --- a/journal/tests.py +++ b/journal/tests.py @@ -1,6 +1,6 @@ import json from unittest.mock import patch -from deepdiff import DeepDiff + from django.test import TestCase from django_test_migrations.migrator import Migrator @@ -8,6 +8,7 @@ from core.models import Gender, Language, License from core.users.models import User from editorialboard.models import RoleModel +from journal.formats.articlemeta_format import get_articlemeta_format_title from journal.models import ( AMJournal, DigitalPreservationAgency, @@ -25,7 +26,6 @@ child_load_license_of_use_in_journal, load_license_of_use_in_journal, ) -from journal.formats.articlemeta_format import get_articlemeta_format_title from thematic_areas.models import ThematicArea from vocabulary.models import Vocabulary diff --git a/journalpage/models.py b/journalpage/models.py index 64c04f079..44f2f4ae7 100644 --- a/journalpage/models.py +++ b/journalpage/models.py @@ -71,10 +71,12 @@ def journal_bibliographic_info_page(self, request, collection_acron3, acron): financing_statement = journal.financing_statement.get_object_in_preferred_language(language=language) acknowledgements = journal.acknowledgements.get_object_in_preferred_language(language=language) additional_information = journal.additional_information.get_object_in_preferred_language(language=language) - digital_preservation = journal.digital_pa.all() + digital_pa = journal.digital_pa.all() + digital_preservation = journal.digital_preservation.get_object_in_preferred_language(language=language) ethics = journal.ethics.get_object_in_preferred_language(language=language) fee_charging = journal.fee_charging.get_object_in_preferred_language(language=language) sponsor_history = journal.sponsor_history.all() + open_science_compliance = journal.open_science_compliance.get_object_in_preferred_language(language=language) context = { "journal": journal, @@ -101,12 +103,14 @@ def journal_bibliographic_info_page(self, request, collection_acron3, acron): "acknowledgements": acknowledgements, "additional_information": additional_information, "digital_preservation": digital_preservation, - "digital_preservation_clockss": digital_preservation.filter(acronym="CLOCKSS"), + "digital_pa": digital_pa, + "digital_preservation_clockss": digital_pa.filter(acronym="CLOCKSS"), "ethics": ethics, "fee_charging": fee_charging, "sponsor_history": sponsor_history, "editorial_board": editorial_board, "role_editorial_board": ROLE, + "open_science_compliance": open_science_compliance, # Current e available language "language": str(self.locale), "translations": context["available_translations"], diff --git a/journalpage/static/journalpage/css/about.css b/journalpage/static/journalpage/css/about.css new file mode 100644 index 000000000..f9fc2ba18 --- /dev/null +++ b/journalpage/static/journalpage/css/about.css @@ -0,0 +1,41 @@ +/* About page styles */ +ul.scielo__menu-contexto, +ul.scielo__menu-contexto ul { + list-style: none; +} + +ul.scielo__menu-contexto ul { + margin-bottom: 1rem; +} + +ul.scielo__menu-contexto .nav-link { + padding: 0; + color: gray; +} + +ul.scielo__menu-contexto .nav-link.active { + color: #3867ce; +} + +.sticky-top { + top: 80px; +} + +.bd-example h5, +.bd-example h4 { + margin-top: 3rem; +} + +.bd-example hr { + margin-top: 3rem; +} + +.bd-example ul { + margin-bottom: 3rem; +} + +a { + word-wrap: break-word; +} + + diff --git a/journalpage/templates/journalpage/about.html b/journalpage/templates/journalpage/about.html index c61a2d869..9d9c95632 100644 --- a/journalpage/templates/journalpage/about.html +++ b/journalpage/templates/journalpage/about.html @@ -4,567 +4,74 @@ {% load wagtailroutablepage_tags %} {% load wagtailcore_tags %} {% load custom_tags %} + {% block body_class %}journal about{% endblock %} + {% block content %} {% include "journalpage/includes/header.html" %} {% include "journalpage/includes/journal_info.html" %} {% include "journalpage/includes/levelMenu.html" %} - {% block main_content %} - - - - - - + +{% include "journalpage/includes/about/breadcrumbs.html" %} - - - {% include "journalpage/includes/contact_footer.html" %} - - {% include "journalpage/includes/footer.html" %} +{% endblock %} +{% include "journalpage/includes/contact_footer.html" %} +{% include "journalpage/includes/footer.html" %} {% endblock %} diff --git a/journalpage/templates/journalpage/includes/about/author_instructions.html b/journalpage/templates/journalpage/includes/about/author_instructions.html new file mode 100644 index 000000000..bff6f02c0 --- /dev/null +++ b/journalpage/templates/journalpage/includes/about/author_instructions.html @@ -0,0 +1,52 @@ +{% load i18n %} +{% load wagtailcore_tags %} + +
+

{% trans 'INSTRUÇÕES PARA OS AUTORES' %}

+ +
{% trans 'Tipos de documentos aceitos' %}
+ + +
{% trans 'Contribuição dos Autores' %}
+{% for ac in authors_contributions %} + {{ ac.rich_text|richtext }} +{% endfor %} + +
{% trans 'Formato de envio dos artigos' %}
+ +
{% trans 'Ativos digitais' %}
+{% for da in digital_assets %} + {{ da.rich_text|richtext }} +{% endfor %} +
{% trans 'Citações e referências' %}
+{% for cr in citations_and_references %} + {{ cr.rich_text|richtext }} +{% endfor %} +
{% trans 'Documentos Suplementares Necessários para Submissão' %}
+{% for sds in supp_docs_submission %} + {{ sds.rich_text|richtext }} +{% endfor %} +
{% trans 'Declaração de Financiamento' %}
+{% for fs in financing_statement %} + {{ fs.rich_text|richtext }} +{% endfor %} +
{% trans 'Agradecimentos' %}
+{% for ak in acknowledgements %} + {{ ak.rich_text|richtext }} +{% endfor %} +
{% trans 'Informações adicionais' %}
+{% for ai in additional_information %} + {{ ai.rich_text|richtext }} +{% endfor %} +

+ {% trans '*dados precisam estar disponíveis em alfabeto romano' %} +

+ diff --git a/journalpage/templates/journalpage/includes/about/bibliographic_info.html b/journalpage/templates/journalpage/includes/about/bibliographic_info.html new file mode 100644 index 000000000..fae8a4387 --- /dev/null +++ b/journalpage/templates/journalpage/includes/about/bibliographic_info.html @@ -0,0 +1,116 @@ +{% load i18n %} +{% load wagtailcore_tags %} + + + +

{% trans 'Sobre o periódico' %}

+
{% trans 'Ficha Bibliográfica' %}
+ {% trans 'Título do periódico conforme registro do ISSN:' %} + {{ journal.official.title }} +
+ {% trans 'Título abreviado:' %} + {{ journal.short_title }} +
+ {% trans 'Publicação de:' %} + {% for publisher in journal.publisher_history.all %} + {% if publisher.organization %} + {{ publisher.organization.name }} + {% endif %} + {% endfor %} +
+ {% trans 'Modelo de publicação:' %} + {{ journal.get_publishing_model_display }} +
+ {% trans 'Ano de criação do periódico:' %} {% if journal.official.initial_year %} {{ journal.official.initial_year }} {% endif %} +
+ {% trans 'Área:' %} {% for study_area in journal.subject.all %} {{ study_area }} {% endfor %} +
+ {% if journal.official.issn_print %} + {% trans 'Versão impressa:' %} + {{ journal.official.issn_print }} +
+ {% endif %} + {% if journal.official.issn_electronic %} + {% trans 'Versão on-line ISSN:' %} + {{ journal.official.issn_electronic }} +
+ {% endif %} + +
{% trans 'Missão' %}
+{% for m in mission %} + {{ m.rich_text|richtext }} +{% endfor %} +
{% trans 'Breve Histórico' %}
+{% for bf in brief_history %} + {{ bf.rich_text|richtext }} +{% endfor %} +
{% trans 'Foco e escopo' %}
+{% for hs in focus_and_scope %} + {{ hs.rich_text|richtext }} +{% endfor %} +
{% trans 'Contato' %}
+ {% trans 'Endereço completo da unidade / instituição responsável pelo periódico:' %} + {{ journal.contact_address }}
+ {% trans 'Cidade:' %} + {{journal.contact_location.city}}
+ {% trans 'Estado:' %} + {{journal.contact_location.state}}
+ {% trans 'País:' %} + {{journal.contact_location.country}}
+ {% trans 'E-mail:' %} + {% for email in journal.journal_email.all %} + {{email.email}}
+ {% endfor%} +
{% trans 'Websites e Mídias Sociais' %}
+ +
{% trans 'Fontes de indexação' %}
+ +
{% trans 'Patrocinadores e agências de Fomento' %}
+ + + +
{% trans 'Preservação digital' %}
+ {% if digital_preservation_clockss %} + +
+ + + + + +
+ {% endif%} + {% trans 'Preservação digital' %} (SciELO): +
+ {% for dp in digital_pa %} + {{ dp.name }} {{ dp.acronym }} | {{ dp.url }} + {% endfor %} +
+ {% trans 'Outros tipos de preservação digital:' %} + {% for digital_preservation in digital_preservation %} + {{ digital_preservation.rich_text|richtext }} + {% endfor %} + + + diff --git a/journalpage/templates/journalpage/includes/about/breadcrumbs.html b/journalpage/templates/journalpage/includes/about/breadcrumbs.html new file mode 100644 index 000000000..83fcc1cca --- /dev/null +++ b/journalpage/templates/journalpage/includes/about/breadcrumbs.html @@ -0,0 +1,48 @@ +{% load i18n %} +{% load wagtailroutablepage_tags %} + + + + + + + diff --git a/journalpage/templates/journalpage/includes/about/editorial_board.html b/journalpage/templates/journalpage/includes/about/editorial_board.html new file mode 100644 index 000000000..982dcb09b --- /dev/null +++ b/journalpage/templates/journalpage/includes/about/editorial_board.html @@ -0,0 +1,34 @@ +{% load i18n %} +{% load custom_tags %} + +
+

{% trans 'CORPO EDITORIAL' %}

+ + +{% for role, role_display in role_editorial_board %} +

{{ role_display }}

+ +{% endfor %} + diff --git a/journalpage/templates/journalpage/includes/about/editorial_policy.html b/journalpage/templates/journalpage/includes/about/editorial_policy.html new file mode 100644 index 000000000..14e27c9db --- /dev/null +++ b/journalpage/templates/journalpage/includes/about/editorial_policy.html @@ -0,0 +1,84 @@ +{% load i18n %} +{% load wagtailcore_tags %} + +
+

{% trans 'Política editorial' %}

+ +
{% trans 'Conformidade com a Ciência Aberta' %}
+ {% for open_science in open_science_compliance %} + {{ open_science.rich_text|richtext }} + {% endfor %} +
{% trans 'Dados abertos' %}
+ {% for od in open_data %} + {{ od.rich_text|richtext }} + {% endfor %} +
{% trans 'Preprint' %}
+ {% for p in preprint %} + {{ p.rich_text|richtext }} + {% endfor %} +
{% trans 'Peer review informado' %}
+ + {% for r in review %} + {{ r.rich_text|richtext }} + {% endfor %} + + +
{% trans 'Ética' %}
+ +
{% trans 'Comitê de Ética' %}
+{% for ec in ecommittee %} + {{ ec.rich_text|richtext }} +{% endfor %} +
{% trans 'Direitos Autorais' %}
+{% for c in copyright %} + {{ c.rich_text|richtext }} +{% endfor %} +
{% trans 'Propriedade Intelectual' %}
+ +
{% trans 'Política de Ética e Más condutas' %}
+

+ {% trans 'Política de retratação:' %} +

+{% for p in policies %} + {{ p.rich_text|richtext }} +{% endfor %} +
{% trans 'Política sobre Conflito de Interesses' %}
+ +
{% trans 'Questões de gênero' %}
+ +
{% trans 'Licença' %}
+ +
{{ journal.journal_use_license.license_type }} + +
{% trans 'Cobrança de taxas' %}
+ + diff --git a/journalpage/templates/journalpage/includes/about/sidebar_menu.html b/journalpage/templates/journalpage/includes/about/sidebar_menu.html new file mode 100644 index 000000000..2090a4bb1 --- /dev/null +++ b/journalpage/templates/journalpage/includes/about/sidebar_menu.html @@ -0,0 +1,126 @@ +{% load i18n %} +{% load wagtailroutablepage_tags %} + + + diff --git a/location/management/__init__.py b/location/management/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/location/management/commands/__init__.py b/location/management/commands/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/location/management/commands/normalize_countries.py b/location/management/commands/normalize_countries.py new file mode 100644 index 000000000..08004f996 --- /dev/null +++ b/location/management/commands/normalize_countries.py @@ -0,0 +1,242 @@ +import json +import logging +import re + +import pycountry +from django.contrib.auth import get_user_model +from django.core.management.base import BaseCommand, CommandError +from django.db import transaction +from django.db.models import Count +from rapidfuzz import fuzz, process + +from location.models import Country, CountryMatched, State +from location.utils import ( + choose_canonical_country, + clean_name, + process_duplicates_countries, +) + +User = get_user_model() +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Normaliza dados de paises e carrega dados oficiais de paises" + + def add_arguments(self, parser) -> None: + parser.add_argument( + "--clean", + action="store_true", + help="Remove pontuação, acento, spaços extras" + ) + parser.add_argument( + "--unificate-country", + action="store_true", + help="Remove duplicidade de nomes de paises (Prioriza os registros mais completos)" + ) + parser.add_argument( + "--load-official-countries", + action="store_true", + help="Carrega nomes de países e atribuem eles como verificados." + ) + parser.add_argument( + "--load-official-states", + action="store_true", + help="Carrega nomes de países e atribuem eles como verificados." + ) + parser.add_argument( + "--fuzzy-match-countries", + type=int, + help="Faz fuzzy matching entre países CLEANED e official" + ) + parser.add_argument( + "--reprocess", + action="store_true", + help="Reprocessa países já processados" + ) + def handle(self, *args, **options): + if not any(options.values()): + raise CommandError( + "Informe ao menos uma ação: " + "--clean, --unificate-country ou --load-official-countries" + ) + + if options['clean']: + self.stdout.write("Limpando nomes de países...") + self.clean_name_countries() + if options['unificate_country']: + self.stdout.write("Unificando países...") + self.unificate_countries() + if options['load_official_countries']: + self.stdout.write("Carregando países verificados...") + self.load_official_countries() + if options['load_official_states']: + self.stdout.write("Carregando Estados verificados...") + self.load_official_states() + if options['load_official_states']: + self.stdout.write("Carregando Cidades verificados...") + self.load_official_cities() + if options['fuzzy_match_countries']: + fuzzy_params = options["fuzzy_match_countries"] + reprocess = options["reprocess"] + self.stdout.write(f"Realizando matched dos paises que não sao verificados com verificados...threshold: {fuzzy_params}") + self.auto_create_fuzzy_matches(threshold=fuzzy_params, reprocess=reprocess) + + + def clean_name_countries(self): + countries = Country.objects.filter(name__isnull=False) + + for country in countries: + name_country = country.name + if clean_name(name_country) == country.name: + continue + country.name = clean_name(name_country) + country.status = "CLEANED" + country.save() + + def unificate_countries(self): + duplicate_names = ( + Country.objects.filter(status="CLEANED") + .values("name") + .annotate(count=Count('id')) + .filter(count__gt=1) + ) + total_merged = 0 + total_deleted = 0 + + for item in duplicate_names: + name = item['name'] + try: + with transaction.atomic(): + countries_with_same_name = Country.objects.filter(name=name, status="CLEANED").order_by('created', 'id') + if countries_with_same_name.count() <= 1: + continue + # Escolher o país canonical + # Prioridade: 1) com acronym e acron3, 2) com acronym, 3) mais antigo + canonical_country = choose_canonical_country(countries_with_same_name) + duplicates = countries_with_same_name.exclude(id=canonical_country.id) + logging.info(f"Duplicate IDs: {duplicates.values('names', 'id')}") + locations_moved = process_duplicates_countries(duplicates=duplicates, canonical_country=canonical_country, total_deleted=total_deleted) + canonical_country.save() + logging.info(f"'{name}': {duplicates.count()} duplicatas removidas, {locations_moved} locations atualizados") + total_merged += 1 + except Exception as e: + logging.error(f"Error ao processar {name}: {e}") + continue + + def load_official_countries(self): + countries = pycountry.countries + + for py_country in countries: + name = py_country.name + acron2 = py_country.alpha_2 + acron3 = py_country.alpha_3 + try: + country = Country.objects.get(name__iexact=name, acronym=acron2) + country.status = "OFFICIAL" + country.save() + except Country.DoesNotExist: + Country.objects.create( + name=name, + acronym=acron2, + acron3=acron3, + status="OFFICIAL" + ) + + def auto_create_fuzzy_matches(self, threshold, reprocess=None): + matches = self.fuzzy_match_countries(threshold=threshold, reprocess=reprocess) + created_count = 0 + high_confidence_count = 0 + + for match_data in matches: + unmatched = match_data['unmatched'] + official = match_data['official'] + score = match_data['score'] + # confidence = match_data['confidence'] + if score >= threshold: + country_match, created = CountryMatched.objects.get_or_create( + official=official, + ) + unmatched.status = "MATCHED" + unmatched.save() + country_match.matched.add(unmatched) + country_match.score = threshold + country_match.save() + + created_count += 1 + + logging.info(f"Total matches: {created_count}") + logging.info(f"Auto-applied: {high_confidence_count}") + + return matches + + def apply_fuzzy_matched_countries(self, name=None): + if name: + countries_official = Country.objects.filter(name=name, status="official") + else: + countries_official = Country.objects.filter(status="official") + + for country in countries_official: + logging.info(f"Apply fuzzy matched for {country.name}") + country_matched = CountryMatched.objects.get(official=country) + locations_count = country_matched.apply_to_locations() + matched = country_matched.matched.all() + matched.update(status="PROCESSED") + logging.info(f"Total processed matches: {locations_count} {matched.values_list('name', flat=True)}") + + def unset_matched_countries(self, name=None): + if name: + countries_official = Country.objects.filter(name=name, status="official") + + for country in countries_official: + country_matched = CountryMatched.objects.get(official=country) + unset_countries = country_matched.unset_matched_countries() + logging.info(f"unset matched countries {country}: {unset_countries}") + + + def fuzzy_match_countries(self, threshold=85, reprocess=None): + """ + Faz fuzzy matching entre países CLEANED e official + Args: + threshold: Score mínimo para considerar um match (0-100) + reprocess: Se True, incluir países com status "MATCHED" para reprocessamento, + senão considerar apenas "CLEANED" + + Returns: + list: Lista de matches encontrados. + """ + official_countries = Country.objects.filter(status="OFFICIAL") + if reprocess: + CountryMatched.objects.all().delete() + status = ["MATCHED", "CLEANED"] + else: + status = ["CLEANED"] + unmatched_countries = Country.objects.filter(status__in=status) + + matches_found = [] + official_names = {c.name: c for c in official_countries} + for unmatched in unmatched_countries: + result = process.extractOne( + unmatched.name, + official_names.keys(), + scorer=fuzz.WRatio, + score_cutoff=threshold, + ) + if result: + matched_name, score, _ = result + official = official_names[matched_name] + + matches_found.append({ + 'unmatched': unmatched, + 'official': official, + 'score': score, + 'confidence': score / 100.0 + } + ) + #TODO + #REJECTED + logging.info( + f"Match: {unmatched.name} -> {official.name}" + f"(score: {score})" + ) + return matches_found \ No newline at end of file diff --git a/location/management/commands/normalize_states.py b/location/management/commands/normalize_states.py new file mode 100644 index 000000000..ac438133a --- /dev/null +++ b/location/management/commands/normalize_states.py @@ -0,0 +1,322 @@ +import json +import logging +import re + +import pycountry +from django.contrib.auth import get_user_model +from django.core.management.base import BaseCommand, CommandError +from django.db import IntegrityError, transaction +from django.db.models import Count +from rapidfuzz import fuzz, process + +from location.models import Country, State, StateMatched +from location.utils import choose_canonical_state, clean_name, process_duplicates_states, clean_acronym + +User = get_user_model() +logger = logging.getLogger(__name__) + + +class Command(BaseCommand): + help = "Normaliza dados de estados e carrega dados oficiais de estados" + + def add_arguments(self, parser) -> None: + parser.add_argument( + "--clean", + action="store_true", + help="Remove pontuação, acento, espaços extras dos estados" + ) + parser.add_argument( + "--unificate-states", + action="store_true", + help="Remove duplicidade de nomes de estados" + ) + parser.add_argument( + "--load-official-states", + action="store_true", + help="Carrega nomes de estados oficiais do pycountry" + ) + parser.add_argument( + "--fuzzy-match-states", + type=int, + help="Faz fuzzy matching entre estados CLEANED e OFFICIAL" + ) + parser.add_argument( + "--apply-matches", + action="store_true", + help="Aplica os matches aos locations" + ) + parser.add_argument( + "--reprocess", + action="store_true", + help="Reprocessa estados já processados" + ) + + def handle(self, *args, **options): + if not any(options.values()): + raise CommandError( + "Informe ao menos uma ação: " + "--clean, --unificate-states, --load-official-states, " + "--fuzzy-match-states, ou --apply-matches" + ) + + if options['clean']: + self.stdout.write("Limpando nomes de estados...") + self.clean_name_states() + + if options['unificate_states']: + self.stdout.write("Unificando estados...") + self.unificate_states() + + if options['load_official_states']: + self.stdout.write("Carregando estados verificados...") + self.load_official_states() + + if options['fuzzy_match_states']: + fuzzy_params = options["fuzzy_match_states"] + reprocess = options["reprocess"] + self.stdout.write(f"Realizando matched dos estados...threshold: {fuzzy_params}") + self.auto_create_fuzzy_matches_states(threshold=fuzzy_params, reprocess=reprocess) + + if options['apply_matches']: + self.stdout.write("Aplicando matches aos locations...") + self.apply_fuzzy_matched_states() + + def clean_name_states(self): + """Limpa nomes de estados (remove HTML, pontuação, normaliza espaços)""" + states = State.objects.filter(name__isnull=False) + count = 0 + deleted = 0 + + for state in states: + name_state = state.name + acronym_state = state.acronym + cleaned_name = clean_name(name_state) + cleaned_acronym = clean_acronym(acronym_state) + if cleaned_name == name_state and cleaned_acronym == acronym_state: + continue + + try: + with transaction.atomic(): + state.name = cleaned_name + state.acronym = cleaned_acronym + state.status = "CLEANED" + state.save() + logging.info(f"Nome de estado limpado {name_state} -> {state.name}") + count += 1 + except IntegrityError: + # Estado duplicado já existe com esse nome limpo + logging.info(f"Estado duplicado após limpeza: {name_state} -> {cleaned_name}, deletando...") + try: + state.delete() + deleted += 1 + except Exception as e: + logging.error(f"Erro ao deletar estado {state.id}: {e}") + + self.stdout.write(self.style.SUCCESS(f"✓ {count} estados limpos, {deleted} duplicados removidos")) + + self.stdout.write(self.style.SUCCESS(f"✓ {count} estados limpos")) + + def unificate_states(self): + """Unifica estados duplicados mantendo o mais completo""" + duplicate_names = ( + State.objects.filter(status="CLEANED") + .values("name") + .annotate(count=Count('id')) + .filter(count__gt=1) + ) + logging.info(f"Quantidade de estados duplicados: {duplicate_names.count()} Estados: {duplicate_names}") + total_merged = 0 + total_deleted = 0 + for item in duplicate_names: + name = item['name'] + try: + with transaction.atomic(): + states_with_same_name = State.objects.filter( + name=name, + status="CLEANED" + ).order_by('created', 'id') + + if states_with_same_name.count() <= 1: + continue + + canonical_state = choose_canonical_state(states_with_same_name) + + duplicates = states_with_same_name.exclude(id=canonical_state.id) + + logging.info(f"Duplicate IDs: {duplicates.values_list('name', 'id')}") + locations_moved = process_duplicates_states( + duplicates=duplicates, + canonical_state=canonical_state, + total_deleted=total_deleted + ) + canonical_state.save() + + logging.info( + f"'{name} ({canonical_state.acronym if canonical_state else None})': {duplicates.count()} duplicatas removidas, " + f"{locations_moved} locations atualizados" + ) + total_merged += 1 + except Exception as e: + logging.error(f"Erro ao processar {name} ({canonical_state.acronym if canonical_state else None}): {e}") + continue + + self.stdout.write(self.style.SUCCESS( + f"✓ {total_merged} grupos de estados unificados, {total_deleted} deletados" + )) + + def get_country_subdivision(self, country_code): + """Busca subdivisões (estados) de um país no pycountry""" + subdivisions = [] + + try: + for subdivision in pycountry.subdivisions.get(country_code=country_code): + subdivisions.append({ + 'code': subdivision.code, + 'name': subdivision.name, + 'type': subdivision.type, + 'country_code': subdivision.country_code, + }) + except KeyError: + subdivisions.append({ + 'country_code': country_code + }) + return subdivisions + + def load_official_states(self): + """Carrega estados oficiais do pycountry para países OFFICIAL""" + for country_official in Country.objects.filter(status="OFFICIAL"): + try: + subdivisions = self.get_country_subdivision(country_code=country_official.acronym) + logging.info(f"Carregando estados para {country_official}") + for sub in subdivisions: + if 'code' not in sub: + continue + # Extrair a sigla. Ex: PT-CE -> CE + acronym = sub['code'].split('-')[-1] + name = sub['name'] + state, created = State.objects.get_or_create( + name=name, + acronym=acronym, + defaults={'status': "OFFICIAL"}, + ) + if not created and state.status != "OFFICIAL": + state.status = "OFFICIAL" + state.save(update_fields=["status"]) + except Exception as e: + logging.error(e) + logging.error(f"Estado do País {country_official} não criado.") + continue + + def fuzzy_match_states(self, threshold=85, reprocess=None): + """Faz fuzzy matching entre estados CLEANED e OFFICIAL + + Args: + threshold: Score mínimo para considerar um match (0-100) + reprocess: Se True, reprocessa estados com status MATCHED + + Returns: + list: Lista de matches encontrados + """ + official_states = State.objects.filter(status="OFFICIAL") + + if reprocess: + StateMatched.objects.all().delete() + status = ["MATCHED", "CLEANED"] + else: + status = ["CLEANED"] + + unmatched_states = State.objects.filter(status__in=status) + matches_found = [] + + # Criar dict de estados oficiais por (name, acronym) + official_dict = { + f"{s.name}|{s.acronym}": s + for s in official_states + if s.name and s.acronym + } + + for unmatched in unmatched_states: + if not unmatched.name: + continue + + search_key = f"{unmatched.name}|{unmatched.acronym or ''}" + + result = process.extractOne( + search_key, + official_dict.keys(), + scorer=fuzz.WRatio, + score_cutoff=threshold, + ) + + if result: + matched_key, score, _ = result + official = official_dict[matched_key] + + matches_found.append({ + 'unmatched': unmatched, + 'official': official, + 'score': score, + 'confidence': score / 100.0 + }) + + logging.info( + f"Match: {unmatched.name} ({unmatched.acronym}) -> " + f"{official.name} ({official.acronym}) (score: {score})" + ) + + return matches_found + + def auto_create_fuzzy_matches_states(self, threshold, reprocess=None): + """Cria automaticamente matches entre estados não oficiais e oficiais""" + matches = self.fuzzy_match_states(threshold=threshold, reprocess=reprocess) + created_count = 0 + + for match_data in matches: + unmatched = match_data['unmatched'] + official = match_data['official'] + score = match_data['score'] + + if score >= threshold: + state_match, created = StateMatched.objects.get_or_create( + official=official, + ) + unmatched.status = "MATCHED" + unmatched.save() + state_match.matched.add(unmatched) + state_match.score = score + state_match.save() + + created_count += 1 + + logging.info(f"Total state matches: {created_count}") + self.stdout.write(self.style.SUCCESS(f"✓ {created_count} matches criados")) + + return matches + + def apply_fuzzy_matched_states(self, name=None): + """Aplica os matches, atualizando locations para usar estados oficiais""" + if name: + states_official = State.objects.filter(name=name, status="OFFICIAL") + else: + states_official = State.objects.filter(status="OFFICIAL") + + total_locations = 0 + for state in states_official: + try: + state_matched = StateMatched.objects.get(official=state) + locations_count = state_matched.apply_to_locations() + matched = state_matched.matched.all() + matched.update(status="PROCESSED") + total_locations += locations_count + + logging.info( + f"{state.name}: {locations_count} locations, " + f"matched: {list(matched.values_list('name', flat=True))}" + ) + except StateMatched.DoesNotExist: + continue + + self.stdout.write(self.style.SUCCESS( + f"✓ {total_locations} locations atualizados" + )) + diff --git a/location/migrations/0004_alter_city_unique_together_city_status_and_more.py b/location/migrations/0004_alter_city_unique_together_city_status_and_more.py new file mode 100644 index 000000000..25bcab701 --- /dev/null +++ b/location/migrations/0004_alter_city_unique_together_city_status_and_more.py @@ -0,0 +1,261 @@ +# Generated by Django 5.2.7 on 2026-01-08 21:11 + +import django.db.models.deletion +from django.conf import settings +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("location", "0003_alter_city_options_alter_country_options_and_more"), + migrations.swappable_dependency(settings.AUTH_USER_MODEL), + ] + + operations = [ + migrations.AlterUniqueTogether( + name="city", + unique_together=set(), + ), + migrations.AddField( + model_name="city", + name="status", + field=models.CharField( + blank=True, + choices=[ + ("RAW", "RAW"), + ("CLEANED", "CLEANED"), + ("MATCHED", "MATCHED"), + ("PROCESSED", "PROCESSED"), + ("OFFICIAL", "OFFICIAL"), + ("REJECTED", "REJECTED"), + ], + default="RAW", + max_length=9, + ), + ), + migrations.AddField( + model_name="country", + name="status", + field=models.CharField( + blank=True, + choices=[ + ("RAW", "RAW"), + ("CLEANED", "CLEANED"), + ("MATCHED", "MATCHED"), + ("PROCESSED", "PROCESSED"), + ("OFFICIAL", "OFFICIAL"), + ("REJECTED", "REJECTED"), + ], + default="RAW", + max_length=9, + ), + ), + migrations.AddField( + model_name="location", + name="status", + field=models.CharField( + blank=True, + choices=[ + ("RAW", "RAW"), + ("CLEANED", "CLEANED"), + ("MATCHED", "MATCHED"), + ("PROCESSED", "PROCESSED"), + ("OFFICIAL", "OFFICIAL"), + ("REJECTED", "REJECTED"), + ], + default="RAW", + max_length=9, + ), + ), + migrations.AddField( + model_name="state", + name="status", + field=models.CharField( + blank=True, + choices=[ + ("RAW", "RAW"), + ("CLEANED", "CLEANED"), + ("MATCHED", "MATCHED"), + ("PROCESSED", "PROCESSED"), + ("OFFICIAL", "OFFICIAL"), + ("REJECTED", "REJECTED"), + ], + default="RAW", + max_length=9, + ), + ), + migrations.AlterField( + model_name="state", + name="acronym", + field=models.CharField( + blank=True, max_length=3, null=True, verbose_name="State Acronym" + ), + ), + migrations.AlterUniqueTogether( + name="city", + unique_together={("name", "status")}, + ), + migrations.CreateModel( + name="CountryMatched", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "created", + models.DateTimeField( + auto_now_add=True, verbose_name="Creation date" + ), + ), + ( + "updated", + models.DateTimeField( + auto_now=True, verbose_name="Last update date" + ), + ), + ( + "score", + models.FloatField( + default=1.0, help_text="Confiança do match (0.0 a 1.0)" + ), + ), + ( + "creator", + models.ForeignKey( + editable=False, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="%(class)s_creator", + to=settings.AUTH_USER_MODEL, + verbose_name="Creator", + ), + ), + ( + "matched", + models.ManyToManyField( + blank=True, + help_text="Variações/duplicatas que correspondem a este país oficial", + limit_choices_to={"status__in": ["RAW", "CLEANED"]}, + related_name="official_match", + to="location.country", + verbose_name="Matched Countries", + ), + ), + ( + "official", + models.OneToOneField( + help_text="País oficial verificado (do pycountry)", + limit_choices_to={"status": "VERIFIED"}, + on_delete=django.db.models.deletion.CASCADE, + related_name="matched_countries", + to="location.country", + verbose_name="Official Country", + ), + ), + ( + "updated_by", + models.ForeignKey( + blank=True, + editable=False, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="%(class)s_last_mod_user", + to=settings.AUTH_USER_MODEL, + verbose_name="Updater", + ), + ), + ], + options={ + "verbose_name": "Country Match", + "verbose_name_plural": "Country Matches", + }, + ), + migrations.CreateModel( + name="StateMatched", + fields=[ + ( + "id", + models.BigAutoField( + auto_created=True, + primary_key=True, + serialize=False, + verbose_name="ID", + ), + ), + ( + "created", + models.DateTimeField( + auto_now_add=True, verbose_name="Creation date" + ), + ), + ( + "updated", + models.DateTimeField( + auto_now=True, verbose_name="Last update date" + ), + ), + ( + "score", + models.FloatField( + default=1.0, help_text="Confiança do match (0.0 a 1.0)" + ), + ), + ( + "creator", + models.ForeignKey( + editable=False, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="%(class)s_creator", + to=settings.AUTH_USER_MODEL, + verbose_name="Creator", + ), + ), + ( + "matched", + models.ManyToManyField( + blank=True, + help_text="Variações/duplicatas que correspondem a este estado oficial", + limit_choices_to={"status__in": ["RAW", "CLEANED"]}, + related_name="official_match_state", + to="location.state", + verbose_name="Matched States", + ), + ), + ( + "official", + models.OneToOneField( + help_text="Estado oficial verificado", + limit_choices_to={"status": "OFFICIAL"}, + on_delete=django.db.models.deletion.CASCADE, + related_name="matched_states", + to="location.state", + verbose_name="Official State", + ), + ), + ( + "updated_by", + models.ForeignKey( + blank=True, + editable=False, + null=True, + on_delete=django.db.models.deletion.SET_NULL, + related_name="%(class)s_last_mod_user", + to=settings.AUTH_USER_MODEL, + verbose_name="Updater", + ), + ), + ], + options={ + "verbose_name": "State Match", + "verbose_name_plural": "State Matches", + }, + ), + ] diff --git a/location/models.py b/location/models.py index 015e84b2f..fe298db5d 100755 --- a/location/models.py +++ b/location/models.py @@ -1,21 +1,33 @@ import csv import logging import os +import re -from django.db import models, IntegrityError +from django.db import IntegrityError, models from django.db.models import Q from django.utils.translation import gettext_lazy as _ from modelcluster.fields import ParentalKey from modelcluster.models import ClusterableModel -from wagtail.admin.panels import FieldPanel, InlinePanel, ObjectList, TabbedInterface -from wagtail.fields import RichTextField +from wagtail.admin.panels import FieldPanel, InlinePanel from wagtail.models import Orderable from wagtailautocomplete.edit_handlers import AutocompletePanel from core.forms import CoreAdminModelForm from core.models import CommonControlField, Language, TextWithLang -from core.utils.standardizer import standardize_name, standardize_code_and_name, remove_extra_spaces - +from core.utils.standardizer import ( + remove_extra_spaces, + standardize_code_and_name, + standardize_name, +) + +STATUS = [ + ("RAW", _("RAW")), + ("CLEANED", _("CLEANED")), + ("MATCHED", _("MATCHED")), # Foi correspondido com um pais oficiaul + ("PROCESSED", _("PROCESSED")), # Foi Sustituido por um país official em Location + ("OFFICIAL", _("OFFICIAL")), + ("REJECTED", _("REJECTED")), +] class City(CommonControlField): """ @@ -26,6 +38,7 @@ class City(CommonControlField): """ name = models.TextField(_("Name of the city"), unique=True) + status = models.CharField(max_length=9, choices=STATUS, default="RAW", blank=True) base_form_class = CoreAdminModelForm panels = [FieldPanel("name")] @@ -40,7 +53,7 @@ class Meta: indexes = [ models.Index(fields=["name"]), ] - unique_together = [("name",)] + unique_together = [("name", "status")] def __unicode__(self): return self.name @@ -50,13 +63,33 @@ def __str__(self): @classmethod def load(cls, user, file_path=None): + import csv + file_path = file_path or "./location/fixtures/cities.csv" - with open(file_path, "r") as fp: - for name in fp.readlines(): - try: - cls.get_or_create(name=name, user=user) - except Exception as e: - logging.exception(e) + try: + with open(file_path, newline="", encoding="utf-8") as fp: + reader = csv.reader(fp) + for row in reader: + if not row or not row[0].strip(): + continue + name = remove_extra_spaces(row[0]) + if not name: + continue + try: + obj, created = cls.objects.get_or_create(name=name) + updated = False + if obj.status != "OFFICIAL": + obj.status = "OFFICIAL" + updated = True + if user: + obj.creator = user + updated = True + if updated: + obj.save() + except Exception as e: + logging.exception(f"Failed to process city '{name}': {e}") + except Exception as e: + logging.exception(f"Could not open file {file_path}: {e}") @classmethod def get_or_create(cls, user=None, name=None): @@ -113,7 +146,8 @@ class State(CommonControlField): """ name = models.TextField(_("State name"), null=True, blank=True) - acronym = models.CharField(_("State Acronym"), max_length=2, null=True, blank=True) + acronym = models.CharField(_("State Acronym"), max_length=3, null=True, blank=True) + status = models.CharField(max_length=9, choices=STATUS, default="RAW", blank=True) base_form_class = CoreAdminModelForm panels = [FieldPanel("name"), FieldPanel("acronym")] @@ -121,11 +155,11 @@ class State(CommonControlField): @staticmethod def autocomplete_custom_queryset_filter(search_term): return State.objects.filter( - Q(name__icontains=search_term) | Q(acronym__icontains=search_term) + Q(name__icontains=search_term) | Q(acronym__icontains=search_term), status="OFFICIAL" ) def autocomplete_label(self): - return f"{self.acronym or self.name}" + return str(self) class Meta: verbose_name = _("State") @@ -145,10 +179,10 @@ class Meta: ] def __unicode__(self): - return f"{self.acronym or self.name}" + return f"{self.name} ({self.acronym})" def __str__(self): - return f"{self.acronym or self.name}" + return f"{self.name} ({self.acronym})" @classmethod def load(cls, user, file_path=None): @@ -341,6 +375,7 @@ class Country(CommonControlField, ClusterableModel): acron3 = models.CharField( _("Country Acronym (3 char)"), blank=True, null=True, max_length=3 ) + status = models.CharField(max_length=9, choices=STATUS, default="RAW", blank=True) base_form_class = CoreAdminModelForm panels = [ @@ -355,7 +390,7 @@ def autocomplete_custom_queryset_filter(search_term): return Country.objects.filter( Q(name__icontains=search_term) | Q(acronym__icontains=search_term) - | Q(acron3__icontains=search_term) + | Q(acron3__icontains=search_term), status="OFFICIAL" ) def autocomplete_label(self): @@ -378,10 +413,10 @@ class Meta: ] def __unicode__(self): - return self.name or self.acronym + return f"{self.name or self.acronym}" def __str__(self): - return self.name or self.acronym + return f"{self.name or self.acronym}" @classmethod def load(cls, user, file_path=None): @@ -506,6 +541,7 @@ class Location(CommonControlField): null=True, blank=True, ) + status = models.CharField(max_length=9, choices=STATUS, default="RAW", blank=True) base_form_class = CoreAdminModelForm @@ -518,11 +554,40 @@ class Location(CommonControlField): # autocomplete_search_field = "country__name" @staticmethod def autocomplete_custom_queryset_filter(search_term): - return Location.objects.filter( - Q(city__name__icontains=search_term) - | Q(state__name__icontains=search_term) - | Q(country__name__icontains=search_term) - ).prefetch_related("city", "state", "country") + """ + Permite pesquisar por termos livres ou por filtros específicos: + - country:Nome do país + - state:Nome do estado + - city:Nome da cidade + Exemplo: country:Brasil state:São Paulo + """ + # Expressão regular para capturar argumentos específicos no formato key:valor (sem aspas) + pattern = r'\b(?Pcountry|state|city):(?P[^ ]+)' + + filters = {} + free_terms = search_term + for match in re.finditer(pattern, search_term): + key = match.group("key") + value = match.group("value") + filters[key] = value.strip() + free_terms = free_terms.replace(match.group(0), "") + + free_terms = free_terms.strip() + query = Q() + if filters.get("country"): + query &= Q(country__name__icontains=filters["country"]) + if filters.get("state"): + query &= Q(state__name__icontains=filters["state"], state__status="OFFICIAL") + if filters.get("city"): + query &= Q(city__name__icontains=filters["city"]) + if free_terms: + term = free_terms + query &= ( + Q(city__name__icontains=term) + | Q(state__name__icontains=term, country__status="OFFICIAL") + | Q(country__name__icontains=term, state__status="OFFICIAL") + ) + return Location.objects.filter(query).prefetch_related("city", "state", "country") def autocomplete_label(self): return str(self) @@ -679,3 +744,138 @@ def filename(self): return os.path.basename(self.attachment.name) panels = [FieldPanel("attachment")] + + +class CountryMatched(CommonControlField): + official = models.OneToOneField( + Country, + on_delete=models.CASCADE, + related_name='matched_countries', + limit_choices_to={'status': 'VERIFIED'}, + verbose_name=_("Official Country"), + help_text=_("País oficial verificado (do pycountry)") + ) + + matched = models.ManyToManyField( + Country, + related_name='official_match', + limit_choices_to={'status__in': ['RAW', 'CLEANED']}, + verbose_name=_("Matched Countries"), + help_text=_("Variações/duplicatas que correspondem a este país oficial"), + blank=True + ) + + score = models.FloatField( + default=1.0, + help_text=_("Confiança do match (0.0 a 1.0)"), + + ) + def matched_list(self): + # Retorna a lista de países correspondentes + matched_countries = self.matched.all() + if matched_countries: + return ", ".join([c.name for c in matched_countries]) + return "-" + + matched_list.short_description = "Matched Countries" + + panels = [ + AutocompletePanel("official", read_only=True), + AutocompletePanel("matched"), + FieldPanel("score") + ] + + class Meta: + verbose_name = _("Country Match") + verbose_name_plural = _("Country Matches") + + def __str__(self): + matched_count = self.matched.count() + return f"{self.official.name} ({matched_count} matches)" + + def apply_to_locations(self): + """ + Atualiza todos os Locations que usam países matched para usar o oficial. + + Returns: + int: Número de locations atualizados + """ + matched_countries = self.matched.filter(status="MATCHED") + locations = Location.objects.filter(country__in=matched_countries) + + count = locations.update(country=self.official) + + return count + + def unset_matched_countries(self): + unset_countries = self.matched.filter(status="PROCESSED") + self.matched.remove(*unset_countries) + return list(unset_countries.values_list("id", flat=True)) + + +class StateMatched(CommonControlField): + official = models.OneToOneField( + State, + on_delete=models.CASCADE, + related_name='matched_states', + limit_choices_to={'status': 'OFFICIAL'}, + verbose_name=_("Official State"), + help_text=_("Estado oficial verificado") + ) + + matched = models.ManyToManyField( + State, + related_name='official_match_state', + limit_choices_to={'status__in': ['RAW', 'CLEANED']}, + verbose_name=_("Matched States"), + help_text=_("Variações/duplicatas que correspondem a este estado oficial"), + blank=True + ) + + score = models.FloatField( + default=1.0, + help_text=_("Confiança do match (0.0 a 1.0)"), + ) + + def matched_list(self): + """Retorna a lista de estados correspondentes""" + matched_states = self.matched.all() + if matched_states: + return ", ".join([s.name for s in matched_states]) + return "-" + + matched_list.short_description = "Matched States" + + panels = [ + AutocompletePanel("official", read_only=True), + AutocompletePanel("matched"), + FieldPanel("score") + ] + + class Meta: + verbose_name = _("State Match") + verbose_name_plural = _("State Matches") + + def __str__(self): + matched_count = self.matched.count() + return f"{self.official.name} ({matched_count} matches)" + + def apply_to_locations(self): + """ + Atualiza todos os Locations que usam estados matched para usar o oficial. + + Returns: + int: Número de locations atualizados + """ + matched_states = self.matched.filter(status="MATCHED") + locations = Location.objects.filter(state__in=matched_states) + + count = locations.update(state=self.official) + + return count + + def unset_matched_states(self): + """Remove states já processados da lista de matched""" + unset_states = self.matched.filter(status="PROCESSED") + self.matched.remove(*unset_states) + return list(unset_states.values_list("id", flat=True)) \ No newline at end of file diff --git a/location/tests/tests_normalize_countries.py b/location/tests/tests_normalize_countries.py new file mode 100644 index 000000000..2f575f5b1 --- /dev/null +++ b/location/tests/tests_normalize_countries.py @@ -0,0 +1,412 @@ +import logging +from unittest.mock import patch + +import pycountry +from django.contrib.auth import get_user_model +from django.test import TestCase + +from location import models +from location.management.commands import normalize_countries + +User = get_user_model() +logger = logging.getLogger(__name__) + +class NormalizeLocationsTest(TestCase): + """ + Testa a normalização e unificação de países duplicados. + + Simula o cenário real onde existem múltiplas variações do nome de um país + (com caracteres especiais, espaços, etc.) que devem ser normalizadas e + consolidadas em um único registro. + """ + + def setUp(self) -> None: + """Configura o ambiente de teste com países duplicados e locations""" + self.name = 'Brasil' + self.user, _ = User.objects.get_or_create(username="test_user") + + # Criar países duplicados com variações de "Brasil" + # Simulando dados reais que podem vir de diferentes fontes + self.country1 = models.Country.objects.create( + name="Brasile", # Erro de digitação + creator=self.user + ) + + self.country2 = models.Country.objects.create( + name="Brasil", + acronym="IO", + creator=self.user + ) + + self.country3 = models.Country.objects.create( + name="- BRASIL", # Com prefixo e maiúsculas + creator=self.user + ) + + self.country4 = models.Country.objects.create( + name="Brasil", + acronym="BV", + creator=self.user + ) + + self.country5 = models.Country.objects.create( + name=", Brasil", # Com vírgula no início + creator=self.user + ) + + self.country6 = models.Country.objects.create( + name="Brasill", # Erro de digitação (duplo 'l') + creator=self.user + ) + + self.country7 = models.Country.objects.create( + name="Brasil.", # Com ponto final + creator=self.user + ) + + self.country8 = models.Country.objects.create( + name="Brasil", + acronym="BM", + creator=self.user + ) + + self.country9 = models.Country.objects.create( + name="- Brasil", # Com prefixo + creator=self.user + ) + + self.country10 = models.Country.objects.create( + name="Brasil", + acronym="BT", + acron3="BTN", + creator=self.user + ) + + self.country11 = models.Country.objects.create( + name="Brasil", + acronym="AF", + acron3="AFG", + creator=self.user + ) + + # Criar locations associados a diferentes países duplicados + self.location1 = models.Location.objects.create( + country=self.country1, # Brasile + creator=self.user, + ) + self.location2 = models.Location.objects.create( + country=self.country2, # Brasil (IO) + creator=self.user + ) + self.location3 = models.Location.objects.create( + country=self.country3, # - BRASIL + creator=self.user + ) + self.location4 = models.Location.objects.create( + country=self.country9, # - Brasil + creator=self.user + ) + + # Armazenar IDs originais para verificação posterior + self.original_country_ids = [ + self.country1.id, self.country2.id, self.country3.id, + self.country4.id, self.country5.id, self.country6.id, + self.country7.id, self.country8.id, self.country9.id, + self.country10.id, self.country11.id + ] + + def test_clean_country_name(self): + """Testa a normalização de nomes com diferentes variações""" + test_cases = [ + ("- BRASIL", "Brasil"), + ("- Brasil", "Brasil"), + ("Brasil.", "Brasil"), + (" BRASIL", "Brasil"), + (" BRASIL ", "Brasil"), + (", Brasil", "Brasil"), + ("BRASIL!!!", "Brasil"), + (" Brasil ", "Brasil"), + ] + + for input_name, expected_output in test_cases: + with self.subTest(input=input_name): + self.assertEqual(normalize_countries.clean_country_name(input_name), expected_output) + + def test_clean_model_country_name(self): + """Testa a normalização de todos os países no banco""" + # Verificar estado inicial + self.assertEqual(self.country1.name, "Brasile") + self.assertEqual(self.country3.name, "- BRASIL") + self.assertEqual(self.country7.name, "Brasil.") + + # Normalizar + normalize_countries.Command().clean_name_countries() + + # Recarregar e verificar + self.country1.refresh_from_db() + self.country3.refresh_from_db() + self.country7.refresh_from_db() + + self.assertEqual(self.country1.name, "Brasile") + self.assertEqual(self.country3.name, "Brasil") + self.assertEqual(self.country7.name, "Brasil") + + # Verificar que o status foi atualizado + self.assertEqual(self.country1.status, "CLEANED") + self.assertEqual(self.country3.status, "CLEANED") + + def test_unificate_country_full_workflow(self): + """Testa o fluxo completo de normalização e unificação""" + # 1. Estado inicial: múltiplos países com nomes diferentes + initial_count = models.Country.objects.count() + self.assertEqual(initial_count, 11) + + # 2. Normalizar nomes + normalize_countries.Command().clean_name_countries() + + # Verificar que todos foram normalizados para "Brasil" + brasil_count = models.Country.objects.filter(name="Brasil").count() + self.assertEqual(brasil_count, 9) + + # Mas ainda são registros separados + self.assertEqual(models.Country.objects.count(), 11) + + # 3. Unificar países duplicados + normalize_countries.Command().unificate_countries() + + # 4. Verificações após unificação + # Deve existir apenas 1 país "Brasil" + final_count = models.Country.objects.filter(name__exact="Brasil").count() + self.assertEqual(final_count, 1) + + # Total de países deve ser 3 + # ['Brasile', 'Brasil', 'Brasill'] + self.assertEqual(models.Country.objects.count(), 3) + + # 5. Verificar que todos os locations apontam para o mesmo país + self.location1.refresh_from_db() + self.location2.refresh_from_db() + self.location3.refresh_from_db() + self.location4.refresh_from_db() + + canonical_country = models.Country.objects.get(name="Brasil") + + # Todos devem apontar para o mesmo país + self.assertEqual(self.location2.country, canonical_country) + self.assertEqual(self.location3.country, canonical_country) + self.assertEqual(self.location4.country, canonical_country) + + # Verificar por ID também + self.assertEqual(self.location2.country.id, canonical_country.id) + self.assertEqual(self.location3.country.id, canonical_country.id) + self.assertEqual(self.location4.country.id, canonical_country.id) + + # O país canonical deve ter todos os 3 locations + self.assertEqual(canonical_country.location_set.count(), 3) + + def test_locations_point_to_same_country_after_unification(self): + """Testa especificamente que location2 e location3 apontam para o mesmo país""" + # Normalizar e unificar + normalize_countries.Command().clean_name_countries() + normalize_countries.Command().unificate_countries() + + # Recarregar locations + self.location2.refresh_from_db() + self.location3.refresh_from_db() + self.location4.refresh_from_db() + + # Verificar que são o mesmo objeto (mesmo ID) + self.assertEqual(self.location2.country, self.location3.country) + self.assertEqual(self.location2.country.id, self.location3.country.id) + + # Verificar com location4 também + self.assertEqual(self.location2.country, self.location4.country) + self.assertEqual(self.location3.country, self.location4.country) + + # Todos devem ter o mesmo nome normalizado + self.assertEqual(self.location2.country.name, "Brasil") + self.assertEqual(self.location3.country.name, "Brasil") + self.assertEqual(self.location4.country.name, "Brasil") + + def test_no_locations_lost_during_unification(self): + """Garante que nenhum location é perdido durante a unificação""" + # Contar locations antes + locations_before = models.Location.objects.count() + + # Normalizar e unificar + normalize_countries.Command().clean_name_countries() + normalize_countries.Command().unificate_countries() + + # Contar locations depois + locations_after = models.Location.objects.count() + + # Nenhum location deve ser perdido + self.assertEqual(locations_before, locations_after) + + # Todos os locations devem ter um país associado + locations_without_country = models.Location.objects.filter(country__isnull=True).count() + self.assertEqual(locations_without_country, 0) + + def test_canonical_country_preserves_acronyms(self): + """Verifica se o país canonical preserva os acrônimos""" + normalize_countries.Command().clean_name_countries() + normalize_countries.Command().unificate_countries() + + canonical = models.Country.objects.get(name="Brasil") + + # Deve ter pelo menos um acrônimo (de algum dos países originais) + # O canonical escolhido deve ser um que tinha acrônimos + self.assertTrue( + canonical.acronym is not None or canonical.acron3 is not None, + "País canonical deveria preservar acrônimos" + ) + + +class VerifiedCountriesInDatabaseTest(TestCase): + def setUp(self): + self.user, _ = User.objects.get_or_create(username="test_user") + self.country1 = models.Country.objects.create( + name="Brazil", + acronym="BR", + creator=self.user + ) + self.country2 = models.Country.objects.create( + name="Colombia", + acronym="CO", + creator=self.user + ) + self.country3 = models.Country.objects.create( + name="United States", + acronym="US", + creator=self.user + ) + + def test_verified_countries_with_pycountry(self): + normalize_countries.Command().process_verified_countries() + self.country1.refresh_from_db() + self.country2.refresh_from_db() + self.country3.refresh_from_db() + self.assertEqual(models.Country.objects.all().count(), len(pycountry.countries)) + self.assertEqual(self.country1.status, "OFFICIAL") + self.assertEqual(self.country2.status, "OFFICIAL") + self.assertEqual(self.country3.status, "OFFICIAL") + + +class ProcessMatchedCountriesTest(TestCase): + def setUp(self) -> None: + """Configura o ambiente de teste com países duplicados e locations""" + self.user, _ = User.objects.get_or_create(username="test_user") + self.country1 = models.Country.objects.create( + name="Brasile", # Erro de digitação + creator=self.user + ) + + self.country2 = models.Country.objects.create( + name="Brasil", + acronym="IO", + creator=self.user + ) + + self.country3 = models.Country.objects.create( + name="- BRASIL", # Com prefixo e maiúsculas + creator=self.user + ) + + self.country4 = models.Country.objects.create( + name="Brasil", + acronym="BV", + creator=self.user + ) + + self.country5 = models.Country.objects.create( + name=", Brasil", # Com vírgula no início + creator=self.user + ) + + self.country6 = models.Country.objects.create( + name="Brasill", # Erro de digitação (duplo 'l') + creator=self.user + ) + + self.country7 = models.Country.objects.create( + name="Brasil.", # Com ponto final + creator=self.user + ) + + self.country8 = models.Country.objects.create( + name="Brasil", + acronym="BM", + creator=self.user + ) + + self.country9 = models.Country.objects.create( + name="- Brasil", # Com prefixo + creator=self.user + ) + + self.country10 = models.Country.objects.create( + name="Brasil", + acronym="BT", + acron3="BTN", + creator=self.user + ) + + self.country11 = models.Country.objects.create( + name="Brasil", + acronym="AF", + acron3="AFG", + creator=self.user + ) + self.country11 = models.Country.objects.create( + name="teste@gmail.com", + creator=self.user + ) + self.location1 = models.Location.objects.create( + country=self.country1, # Brasile + creator=self.user, + ) + self.location2 = models.Location.objects.create( + country=self.country2, # Brasil (IO) + creator=self.user + ) + self.location3 = models.Location.objects.create( + country=self.country3, # - BRASIL + creator=self.user + ) + normalize_countries.Command().clean_name_countries() # primeiro limpar os nomes, remove acento, spaco, pontuacao + normalize_countries.Command().unificate_countries() # Remove duplicidade de nomes de paises. ['Brasile', 'Brasil', 'Brasill'] + normalize_countries.Command().process_verified_countries() # Carrega nomes officiais de países em ingles + + def test_matched_countries(self): + matches = normalize_countries.Command().auto_create_fuzzy_matches(threshold=70) + country_matched = models.CountryMatched.objects.all() + self.assertEqual(country_matched.count(), 1) + self.assertEqual(country_matched.first().matched.all()[0].status, "MATCHED") + self.assertEqual(country_matched.first().matched.all()[1].status, "MATCHED") + self.assertEqual(country_matched.first().matched.all()[2].status, "MATCHED") + self.assertEqual(country_matched.first().official, models.Country.objects.get(name="Brazil", acronym="BR", status="OFFICIAL")) + self.assertEqual(country_matched.first().matched.count(), 3) + self.assertEqual(set(country_matched.first().matched.values_list("name", flat=True)), set(['Brasile', 'Brasil', 'Brasill'])) + + def test_apply_fuzzy_matched_countries(self): + matches = normalize_countries.Command().auto_create_fuzzy_matches(threshold=70) + official = models.Country.objects.get(name="Brazil", status="OFFICIAL") + normalize_countries.Command().apply_fuzzy_matched_countries(name="Brazil") + self.location1.refresh_from_db() + self.location2.refresh_from_db() + self.location3.refresh_from_db() + self.assertEqual(self.location1.country, official) + self.assertEqual(self.location2.country, official) + self.assertEqual(self.location3.country, official) + + def test_unset_matched_countries(self): + self.country_status_matched = models.Country.objects.create( + creator=self.user, + name="Pais teste", + status="MATCHED" + ) + matches = normalize_countries.Command().auto_create_fuzzy_matches(threshold=70) + official = models.Country.objects.get(name="Brazil", status="OFFICIAL") + normalize_countries.Command().apply_fuzzy_matched_countries(name="Brazil") + normalize_countries.Command().unset_matched_countries(name="Brazil") + self.assertEqual(models.CountryMatched.objects.first().matched.count(), 0) \ No newline at end of file diff --git a/location/tests/tests_normalize_states.py b/location/tests/tests_normalize_states.py new file mode 100644 index 000000000..f2a90d085 --- /dev/null +++ b/location/tests/tests_normalize_states.py @@ -0,0 +1,1106 @@ +""" +Testes para o comando normalize_states.py + +Cobre: +- Limpeza de nomes de estados (clean_name_states) +- Unificação de estados duplicados (unificate_states) +- Carregamento de estados oficiais (load_official_states) +- Fuzzy matching entre estados (fuzzy_match_states) +- Criação automática de matches (auto_create_fuzzy_matches_states) +- Aplicação de matches aos locations (apply_fuzzy_matched_states) +""" + +import logging +from unittest.mock import patch + +from django.contrib.auth import get_user_model +from django.test import TestCase + +from location.management.commands import normalize_states +from location.models import Country, Location, State, StateMatched + +User = get_user_model() +logger = logging.getLogger(__name__) + + +class CleanNameStatesTest(TestCase): + """Testes para a limpeza de nomes de estados""" + + def setUp(self): + self.user, _ = User.objects.get_or_create(username="test_user") + self.command = normalize_states.Command() + + def test_clean_name_removes_html_tags(self): + """Testa remoção de tags HTML""" + state = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user + ) + + self.command.clean_name_states() + + state.refresh_from_db() + self.assertEqual(state.name, "São Paulo") + self.assertEqual(state.status, "CLEANED") + + def test_clean_name_handles_duplicate_after_cleaning(self): + """Testa que estados que ficam duplicados após limpeza são tratados""" + # Criar múltiplos estados que resultarão no mesmo nome após limpeza + State.objects.create( + name="- São Paulo", + acronym="SP>", + creator=self.user + ) + State.objects.create( + name="São Paulo.", + acronym="SP", + creator=self.user + ) + State.objects.create( + name="São Paulo!!!", + acronym="SP", + creator=self.user + ) + + initial_count = State.objects.count() + self.assertEqual(initial_count, 3) + + self.command.clean_name_states() + + # Após limpeza, deve haver apenas 1 estado (outros deletados por IntegrityError) + final_count = State.objects.filter(name="São Paulo", acronym="SP").count() + self.assertEqual(final_count, 1) + + # Verificar que o estado restante está limpo + remaining_state = State.objects.get(name="São Paulo", acronym="SP") + self.assertEqual(remaining_state.status, "CLEANED") + + def test_clean_name_normalizes_spaces(self): + """Testa normalização de espaços extras""" + state = State.objects.create( + name=" São Paulo ", + acronym="SP", + creator=self.user + ) + + self.command.clean_name_states() + + state.refresh_from_db() + self.assertEqual(state.name, "São Paulo") + self.assertEqual(state.status, "CLEANED") + + def test_clean_name_capitalizes(self): + """Testa capitalização de nomes""" + state = State.objects.create( + name="são paulo", + acronym="SP", + creator=self.user + ) + + self.command.clean_name_states() + + state.refresh_from_db() + self.assertEqual(state.name, "São Paulo") + self.assertEqual(state.status, "CLEANED") + + def test_clean_name_skips_already_clean(self): + """Testa que estados já limpos não são modificados""" + state = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="CLEANED" + ) + + self.command.clean_name_states() + + state.refresh_from_db() + self.assertEqual(state.name, "São Paulo") + self.assertEqual(state.status, "CLEANED") + + def test_clean_name_multiple_states(self): + """Testa limpeza de múltiplos estados""" + states_data = [ + ("Rio de Janeiro", "RJ"), + ("- Minas Gerais", "MG"), + ("bahia", "BA"), + ] + + for name, acronym in states_data: + State.objects.create( + name=name, + acronym=acronym, + creator=self.user + ) + + self.command.clean_name_states() + + cleaned_states = State.objects.filter(status="CLEANED") + self.assertEqual(cleaned_states.count(), 3) + self.assertTrue(State.objects.filter(name="Rio De Janeiro", acronym="RJ").exists()) + self.assertTrue(State.objects.filter(name="Minas Gerais", acronym="MG").exists()) + self.assertTrue(State.objects.filter(name="Bahia", acronym="BA").exists()) + + +class UnificateStatesTest(TestCase): + """Testes para a unificação de estados duplicados""" + + def setUp(self): + self.user, _ = User.objects.get_or_create(username="test_user") + self.command = normalize_states.Command() + + def test_unificate_removes_duplicates(self): + """Testa que duplicatas são removidas""" + # Criar estados duplicados + state1 = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="CLEANED" + ) + state2 = State.objects.create( + name="São Paulo", + creator=self.user, + status="CLEANED" + ) + state3 = State.objects.create( + name="São Paulo", + creator=self.user, + status="CLEANED" + ) + + initial_count = State.objects.filter(name="São Paulo").count() + self.assertEqual(initial_count, 3) + self.command.clean_name_states() + self.command.unificate_states() + + final_count = State.objects.filter(name="São Paulo", acronym="SP").count() + self.assertEqual(final_count, 1) + + def test_unificate_keeps_state_with_acronym(self): + """Testa que o estado com acronym é mantido como canonical""" + # Criar estados - um sem acronym, outro com + state_no_acronym = State.objects.create( + name="São Paulo", + creator=self.user, + status="CLEANED" + ) + state_with_acronym = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="CLEANED" + ) + self.command.clean_name_states() + self.command.unificate_states() + + remaining_state = State.objects.get(name="São Paulo", acronym="SP") + # O estado com acronym deve ser mantido + self.assertIsNotNone(remaining_state.acronym) + + def test_unificate_moves_locations_to_canonical(self): + """Testa que locations são movidos para o estado canônico""" + # Criar estados duplicados + state1 = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="CLEANED" + ) + state2 = State.objects.create( + name="São Paulo", + creator=self.user, + status="CLEANED" + ) + + # Criar locations associados a cada estado + location1 = Location.objects.create( + state=state1, + creator=self.user + ) + location2 = Location.objects.create( + state=state2, + creator=self.user + ) + self.command.clean_name_states() + self.command.unificate_states() + + # Recarregar locations + location1.refresh_from_db() + + # Ambos devem apontar para o mesmo estado + self.assertEqual(location1.state, state1) + + # Deve existir apenas um estado + self.assertEqual(State.objects.filter(name="São Paulo", acronym="SP").count(), 1) + + def test_unificate_no_locations_lost(self): + """Garante que nenhum location é perdido durante unificação""" + # Criar estados duplicados + state1 = State.objects.create( + name="Rio de Janeiro", + acronym="RJ", + creator=self.user, + status="CLEANED" + ) + state2 = State.objects.create( + name="Rio de Janeiro", + creator=self.user, + status="CLEANED" + ) + state3 = State.objects.create( + name="Rio de Janeiro", + creator=self.user, + status="CLEANED" + ) + + # Criar locations (sem city, então não haverá duplicatas) + location1 = Location.objects.create(state=state1, creator=self.user) + location2 = Location.objects.create(state=state2, creator=self.user) + location3 = Location.objects.create(state=state3, creator=self.user) + + locations_before = Location.objects.count() + self.assertEqual(locations_before, 3) + self.command.clean_name_states() + self.command.unificate_states() + + # Nenhum location deve ser perdido + locations_after = Location.objects.count() + self.assertEqual(locations_after, 3) + + # Deve existir apenas 1 estado + states_count = State.objects.filter(name="Rio de Janeiro", acronym="RJ").count() + self.assertEqual(states_count, 1) + + # Todos devem apontar para o mesmo estado + canonical_state = State.objects.get(name="Rio de Janeiro", acronym="RJ") + self.assertEqual(canonical_state.location_set.count(), 3) + + # Verificar que todos os locations apontam para o canonical + location1.refresh_from_db() + location2.refresh_from_db() + location3.refresh_from_db() + + self.assertEqual(location1.state, canonical_state) + self.assertEqual(location2.state, canonical_state) + self.assertEqual(location3.state, canonical_state) + + def test_unificate_handles_duplicate_locations(self): + """Testa tratamento de locations duplicados (mesmo country, state, city)""" + country = Country.objects.create( + name="Brasil", + acronym="BR", + creator=self.user + ) + + # Criar estados duplicados + state1 = State.objects.create( + name="Minas Gerais", + creator=self.user, + status="CLEANED" + ) + state2 = State.objects.create( + name="Minas Gerais", + acronym="MG", + creator=self.user, + status="CLEANED" + ) + + # Criar locations que seriam duplicados após unificação + location1 = Location.objects.create( + country=country, + state=state1, + creator=self.user + ) + location2 = Location.objects.create( + country=country, + state=state2, + creator=self.user + ) + + locations_before = Location.objects.count() + self.command.clean_name_states() + self.command.unificate_states() + + # Um dos locations deve ser deletado (pois seriam duplicados) + locations_after = Location.objects.count() + self.assertEqual(locations_after, 1) + + # Deve existir apenas um estado + self.assertEqual(State.objects.filter(name="Minas Gerais", acronym="MG").count(), 1) + + +class LoadOfficialStatesTest(TestCase): + """Testes para carregamento de estados oficiais do pycountry""" + + def setUp(self): + self.user, _ = User.objects.get_or_create(username="test_user") + self.command = normalize_states.Command() + + def test_load_official_states_from_brazil(self): + """Testa carregamento de estados brasileiros do pycountry""" + # Criar país oficial Brasil + country_br = Country.objects.create( + name="Brazil", + acronym="BR", + creator=self.user, + status="OFFICIAL" + ) + + self.command.load_official_states() + + # Verificar que estados foram criados + official_states = State.objects.filter(status="OFFICIAL") + self.assertGreater(official_states.count(), 0) + + # Verificar alguns estados específicos do Brasil + # BR tem 27 subdivisões (26 estados + 1 DF) + br_states = State.objects.filter(status="OFFICIAL") + self.assertGreaterEqual(br_states.count(), 20) + + def test_load_official_states_creates_with_acronym(self): + """Testa que estados são criados com sigla extraída do código""" + country_br = Country.objects.create( + name="Brazil", + acronym="BR", + creator=self.user, + status="OFFICIAL" + ) + + self.command.load_official_states() + + # Verificar que pelo menos um estado tem acronym + states_with_acronym = State.objects.filter( + status="OFFICIAL", + acronym__isnull=False + ).exclude(acronym='') + + self.assertGreater(states_with_acronym.count(), 0) + + def test_load_official_states_updates_existing(self): + """Testa que estados existentes são atualizados para OFFICIAL""" + country_br = Country.objects.create( + name="Brazil", + acronym="BR", + creator=self.user, + status="OFFICIAL" + ) + + # Criar um estado que existe no pycountry + state = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="CLEANED" + ) + + self.command.load_official_states() + + state.refresh_from_db() + # O estado deve ter sido atualizado para OFFICIAL + self.assertEqual(state.status, "OFFICIAL") + + def test_load_official_states_only_for_official_countries(self): + """Testa que estados são carregados apenas para países OFFICIAL""" + # Criar país não oficial + country_non_official = Country.objects.create( + name="Fake Country", + acronym="FK", + creator=self.user, + status="CLEANED" + ) + + initial_count = State.objects.count() + + self.command.load_official_states() + + # Nenhum estado deve ser criado para país não oficial + final_count = State.objects.count() + # Count pode aumentar se houver outros países OFFICIAL, mas não para FK + states_for_fake = State.objects.filter(status="OFFICIAL") + # Não deve haver estados OFFICIAL se não há países OFFICIAL + self.assertEqual(states_for_fake.count(), 0) + + @patch('pycountry.subdivisions.get') + def test_load_official_states_handles_country_without_subdivisions(self, mock_get): + """Testa tratamento de países sem subdivisões""" + mock_get.side_effect = KeyError("No subdivisions") + + country = Country.objects.create( + name="Monaco", + acronym="MC", + creator=self.user, + status="OFFICIAL" + ) + + # Não deve lançar exceção + self.command.load_official_states() + + # Comando deve continuar normalmente + + +class FuzzyMatchStatesTest(TestCase): + """Testes para fuzzy matching de estados""" + + def setUp(self): + self.user, _ = User.objects.get_or_create(username="test_user") + self.command = normalize_states.Command() + + # Criar estados oficiais + self.official_sp = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="OFFICIAL" + ) + self.official_rj = State.objects.create( + name="Rio de Janeiro", + acronym="RJ", + creator=self.user, + status="OFFICIAL" + ) + self.official_mg = State.objects.create( + name="Minas Gerais", + acronym="MG", + creator=self.user, + status="OFFICIAL" + ) + + def test_fuzzy_match_exact_match(self): + """Testa match exato""" + cleaned_state = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="CLEANED" + ) + + matches = self.command.fuzzy_match_states(threshold=85) + + self.assertEqual(len(matches), 1) + self.assertEqual(matches[0]['unmatched'], cleaned_state) + self.assertEqual(matches[0]['official'], self.official_sp) + self.assertGreaterEqual(matches[0]['score'], 95) + + def test_fuzzy_match_similar_name(self): + """Testa match com nome similar""" + # Criar estado com erro de digitação + cleaned_state = State.objects.create( + name="Sao Paulo", # Sem acento + acronym="SP", + creator=self.user, + status="CLEANED" + ) + + matches = self.command.fuzzy_match_states(threshold=80) + + self.assertEqual(len(matches), 1) + self.assertEqual(matches[0]['unmatched'], cleaned_state) + self.assertEqual(matches[0]['official'], self.official_sp) + + def test_fuzzy_match_respects_threshold(self): + """Testa que threshold é respeitado""" + # Criar estado muito diferente + cleaned_state = State.objects.create( + name="Estado Completamente Diferente", + acronym="XX", + creator=self.user, + status="CLEANED" + ) + + matches = self.command.fuzzy_match_states(threshold=95) + + # Não deve haver match + self.assertEqual(len(matches), 0) + + def test_fuzzy_match_considers_acronym(self): + """Testa que acronym é considerado no matching""" + cleaned_state = State.objects.create( + name="Sao Paulo", + acronym="SP", + creator=self.user, + status="CLEANED" + ) + + matches = self.command.fuzzy_match_states(threshold=85) + + self.assertEqual(len(matches), 1) + # Deve fazer match com São Paulo (SP), não com outros + self.assertEqual(matches[0]['official'].acronym, "SP") + + def test_fuzzy_match_multiple_states(self): + """Testa matching de múltiplos estados""" + states_data = [ + ("Sao Paulo", "SP"), + ("Rio Janeiro", "RJ"), + ("Minas", "MG"), + ] + + for name, acronym in states_data: + State.objects.create( + name=name, + acronym=acronym, + creator=self.user, + status="CLEANED" + ) + + matches = self.command.fuzzy_match_states(threshold=70) + + # Deve encontrar matches para todos os 3 estados + self.assertEqual(len(matches), 3) + + def test_fuzzy_match_reprocess_option(self): + """Testa opção de reprocessamento""" + cleaned_state = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="MATCHED" # Já foi matched antes + ) + + # Criar um StateMatched existente + state_match = StateMatched.objects.create( + official=self.official_sp, + creator=self.user + ) + state_match.matched.add(cleaned_state) + + # Sem reprocess, não deve encontrar nada + matches = self.command.fuzzy_match_states(threshold=85, reprocess=False) + self.assertEqual(len(matches), 0) + + # Com reprocess, deve encontrar + matches = self.command.fuzzy_match_states(threshold=85, reprocess=True) + self.assertEqual(len(matches), 1) + + +class AutoCreateFuzzyMatchesStatesTest(TestCase): + """Testes para criação automática de matches""" + + def setUp(self): + self.user, _ = User.objects.get_or_create(username="test_user") + self.command = normalize_states.Command() + + # Criar estados oficiais + self.official_sp = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="OFFICIAL" + ) + self.official_rj = State.objects.create( + name="Rio de Janeiro", + acronym="RJ", + creator=self.user, + status="OFFICIAL" + ) + + def test_auto_create_creates_state_matched(self): + """Testa que StateMatched é criado""" + cleaned_state = State.objects.create( + name="Sao Paulo", + acronym="SP", + creator=self.user, + status="CLEANED" + ) + + self.command.auto_create_fuzzy_matches_states(threshold=80) + + # Verificar que StateMatched foi criado + self.assertEqual(StateMatched.objects.count(), 1) + + state_match = StateMatched.objects.first() + self.assertEqual(state_match.official, self.official_sp) + self.assertIn(cleaned_state, state_match.matched.all()) + + def test_auto_create_updates_state_status_to_matched(self): + """Testa que status do estado é atualizado para MATCHED""" + cleaned_state = State.objects.create( + name="Sao Paulo", + acronym="SP", + creator=self.user, + status="CLEANED" + ) + + self.command.auto_create_fuzzy_matches_states(threshold=80) + + cleaned_state.refresh_from_db() + self.assertEqual(cleaned_state.status, "MATCHED") + + def test_auto_create_stores_match_score(self): + """Testa que score do match é armazenado""" + cleaned_state = State.objects.create( + name="Sao Paulo", + acronym="SP", + creator=self.user, + status="CLEANED" + ) + + self.command.auto_create_fuzzy_matches_states(threshold=80) + + state_match = StateMatched.objects.first() + self.assertGreater(state_match.score, 0) + self.assertLessEqual(state_match.score, 100) + + def test_auto_create_multiple_states_same_official(self): + """Testa que múltiplos estados podem ser matched ao mesmo oficial""" + states_data = [ + ("Sao Paulo", "SP"), + ("S Paulo", "SP"), + ("Sao Paulo State", "SP"), + ] + + for name, acronym in states_data: + State.objects.create( + name=name, + acronym=acronym, + creator=self.user, + status="CLEANED" + ) + + self.command.auto_create_fuzzy_matches_states(threshold=70) + + # Deve criar apenas 1 StateMatched (para o oficial) + self.assertEqual(StateMatched.objects.count(), 1) + + # Mas deve ter múltiplos matched + state_match = StateMatched.objects.first() + self.assertEqual(state_match.matched.count(), 3) + + def test_auto_create_reprocess_deletes_old_matches(self): + """Testa que reprocess deleta matches antigos""" + cleaned_state = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="MATCHED" + ) + + # Criar match existente + state_match = StateMatched.objects.create( + official=self.official_sp, + creator=self.user + ) + state_match.matched.add(cleaned_state) + + initial_count = StateMatched.objects.count() + + # Reprocessar + self.command.auto_create_fuzzy_matches_states(threshold=85, reprocess=True) + + # Matches antigos devem ter sido deletados e recriados + # Count pode ser igual se os mesmos matches forem recriados + self.assertGreaterEqual(StateMatched.objects.count(), 1) + + +class ApplyFuzzyMatchedStatesTest(TestCase): + """Testes para aplicação de matches aos locations""" + + def setUp(self): + self.user, _ = User.objects.get_or_create(username="test_user") + self.command = normalize_states.Command() + + # Criar país + self.country = Country.objects.create( + name="Brasil", + acronym="BR", + creator=self.user + ) + + # Criar estado oficial + self.official_sp = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="OFFICIAL" + ) + + # Criar estados não oficiais (matched) + self.cleaned_sp1 = State.objects.create( + name="Sao Paulo", + acronym="SP", + creator=self.user, + status="MATCHED" + ) + self.cleaned_sp2 = State.objects.create( + name="S Paulo", + acronym="SP", + creator=self.user, + status="MATCHED" + ) + + # Criar StateMatched + self.state_match = StateMatched.objects.create( + official=self.official_sp, + creator=self.user, + score=95.0 + ) + self.state_match.matched.add(self.cleaned_sp1, self.cleaned_sp2) + + # Criar locations com estados não oficiais + self.location1 = Location.objects.create( + country=self.country, + state=self.cleaned_sp1, + creator=self.user + ) + self.location2 = Location.objects.create( + country=self.country, + state=self.cleaned_sp2, + creator=self.user + ) + + def test_apply_updates_locations_to_official_state(self): + """Testa que locations são atualizados para usar estado oficial""" + self.command.apply_fuzzy_matched_states() + + self.location1.refresh_from_db() + self.location2.refresh_from_db() + + # Ambos devem apontar para o estado oficial + self.assertEqual(self.location1.state, self.official_sp) + self.assertEqual(self.location2.state, self.official_sp) + + def test_apply_updates_matched_states_status_to_processed(self): + """Testa que estados matched têm status atualizado para PROCESSED""" + self.command.apply_fuzzy_matched_states() + + self.cleaned_sp1.refresh_from_db() + self.cleaned_sp2.refresh_from_db() + + self.assertEqual(self.cleaned_sp1.status, "PROCESSED") + self.assertEqual(self.cleaned_sp2.status, "PROCESSED") + + def test_apply_specific_state_by_name(self): + """Testa aplicação de match para estado específico""" + # Criar outro estado oficial e match + official_rj = State.objects.create( + name="Rio de Janeiro", + acronym="RJ", + creator=self.user, + status="OFFICIAL" + ) + cleaned_rj = State.objects.create( + name="Rio Janeiro", + acronym="RJ", + creator=self.user, + status="MATCHED" + ) + state_match_rj = StateMatched.objects.create( + official=official_rj, + creator=self.user + ) + state_match_rj.matched.add(cleaned_rj) + + location_rj = Location.objects.create( + country=self.country, + state=cleaned_rj, + creator=self.user + ) + + # Aplicar apenas para São Paulo + self.command.apply_fuzzy_matched_states(name="São Paulo") + + # Locations de SP devem ser atualizados + self.location1.refresh_from_db() + self.assertEqual(self.location1.state, self.official_sp) + + # Location de RJ não deve ser atualizado + location_rj.refresh_from_db() + self.assertEqual(location_rj.state, cleaned_rj) + + def test_apply_counts_updated_locations(self): + """Testa que número de locations atualizados é retornado corretamente""" + # Criar mais locations + for i in range(5): + Location.objects.create( + country=self.country, + state=self.cleaned_sp1, + creator=self.user + ) + + total_locations = Location.objects.filter( + state__in=[self.cleaned_sp1, self.cleaned_sp2] + ).count() + + self.command.apply_fuzzy_matched_states() + + # Verificar que todos foram atualizados + updated_locations = Location.objects.filter(state=self.official_sp).count() + self.assertEqual(updated_locations, total_locations) + + def test_apply_handles_state_without_match(self): + """Testa que estados sem match não causam erro""" + # Criar estado oficial sem matches + official_no_match = State.objects.create( + name="Bahia", + acronym="BA", + creator=self.user, + status="OFFICIAL" + ) + + # Não deve lançar exceção + self.command.apply_fuzzy_matched_states() + + # Locations originais devem continuar atualizados + self.location1.refresh_from_db() + self.assertEqual(self.location1.state, self.official_sp) + + def test_apply_preserves_other_location_fields(self): + """Testa que outros campos do location são preservados""" + # Adicionar city ao location + from location.models import City + city = City.objects.create(name="São Paulo", creator=self.user) + self.location1.city = city + self.location1.save() + + self.command.apply_fuzzy_matched_states() + + self.location1.refresh_from_db() + + # State deve ser atualizado + self.assertEqual(self.location1.state, self.official_sp) + # Mas city e country devem permanecer + self.assertEqual(self.location1.city, city) + self.assertEqual(self.location1.country, self.country) + + +class FullWorkflowTest(TestCase): + """Testes do fluxo completo de normalização de estados""" + + def setUp(self): + self.user, _ = User.objects.get_or_create(username="test_user") + self.command = normalize_states.Command() + + # Criar país oficial + self.country_br = Country.objects.create( + name="Brazil", + acronym="BR", + creator=self.user, + status="OFFICIAL" + ) + + def test_full_workflow_clean_unificate_load_match_apply(self): + """Testa o fluxo completo: limpar -> unificar -> carregar oficiais -> match -> aplicar""" + + # 1. Criar estados com nomes sujos e duplicados + states_raw = [ + ("São Paulo", "SP"), + ("- São Paulo", "SP"), + ("são paulo", "SP"), + ("Rio de Janeiro", "RJ"), + ("rio janeiro", "RJ"), + ] + + for name, acronym in states_raw: + State.objects.create( + name=name, + acronym=acronym, + creator=self.user, + status="RAW" + ) + + # Criar locations com estados não limpos + sp_dirty = State.objects.get(name="São Paulo", acronym="SP") + rj_dirty = State.objects.get(name="Rio de Janeiro", acronym="RJ") + + location_sp = Location.objects.create( + country=self.country_br, + state=sp_dirty, + creator=self.user + ) + location_rj = Location.objects.create( + country=self.country_br, + state=rj_dirty, + creator=self.user + ) + + initial_states = State.objects.count() + self.assertEqual(initial_states, 5) + + # 2. Limpar nomes + self.command.clean_name_states() + + cleaned_states = State.objects.filter(status="CLEANED") + self.assertGreater(cleaned_states.count(), 0) + + # 3. Unificar duplicados + self.command.unificate_states() + + # Deve ter menos estados agora (duplicados foram removidos) + after_unification = State.objects.count() + self.assertLess(after_unification, initial_states) + + # 4. Carregar estados oficiais do pycountry + self.command.load_official_states() + + official_states = State.objects.filter(status="OFFICIAL") + self.assertGreater(official_states.count(), 0) + + # Verificar que São Paulo oficial existe + sp_official = State.objects.filter( + name="São Paulo", + acronym="SP", + status="OFFICIAL" + ).first() + self.assertIsNotNone(sp_official) + + # 5. Fazer fuzzy matching + self.command.auto_create_fuzzy_matches_states(threshold=75, reprocess=False) + + # Verificar que matches foram criados + matches = StateMatched.objects.all() + self.assertGreater(matches.count(), 0) + + # 6. Aplicar matches aos locations + self.command.apply_fuzzy_matched_states() + + # Verificar que locations foram atualizados para estados oficiais + location_sp.refresh_from_db() + location_rj.refresh_from_db() + + self.assertEqual(location_sp.state.status, "OFFICIAL") + self.assertEqual(location_rj.state.status, "OFFICIAL") + + # Verificar que apontam para estados oficiais corretos + self.assertEqual(location_sp.state.acronym, "SP") + self.assertEqual(location_rj.state.acronym, "RJ") + + def test_workflow_preserves_data_integrity(self): + """Testa que integridade dos dados é preservada durante todo o fluxo""" + # Criar estrutura completa + from location.models import City + + city_sp = City.objects.create(name="São Paulo", creator=self.user) + city_rj = City.objects.create(name="Rio de Janeiro", creator=self.user) + + state_sp_dirty = State.objects.create( + name="São Paulo", + acronym="SP", + creator=self.user, + status="RAW" + ) + state_rj_dirty = State.objects.create( + name="- Rio de Janeiro", + acronym="RJ", + creator=self.user, + status="RAW" + ) + + location1 = Location.objects.create( + country=self.country_br, + state=state_sp_dirty, + city=city_sp, + creator=self.user + ) + location2 = Location.objects.create( + country=self.country_br, + state=state_rj_dirty, + city=city_rj, + creator=self.user + ) + + # Armazenar dados originais + original_city1 = location1.city + original_city2 = location2.city + original_country = location1.country + + # Executar fluxo completo + self.command.clean_name_states() + self.command.unificate_states() + self.command.load_official_states() + self.command.auto_create_fuzzy_matches_states(threshold=75) + self.command.apply_fuzzy_matched_states() + + # Recarregar locations + location1.refresh_from_db() + location2.refresh_from_db() + + # Verificar que apenas states foram alterados + self.assertEqual(location1.city, original_city1) + self.assertEqual(location2.city, original_city2) + self.assertEqual(location1.country, original_country) + self.assertEqual(location2.country, original_country) + + # Mas states devem ser oficiais + self.assertEqual(location1.state.status, "OFFICIAL") + self.assertEqual(location2.state.status, "OFFICIAL") + + +class CommandArgumentsTest(TestCase): + """Testes para argumentos do comando""" + + def setUp(self): + self.user, _ = User.objects.get_or_create(username="test_user") + self.command = normalize_states.Command() + + def test_handle_requires_at_least_one_action(self): + """Testa que pelo menos uma ação deve ser especificada""" + from django.core.management.base import CommandError + + options = { + 'clean': False, + 'unificate_states': False, + 'load_official_states': False, + 'fuzzy_match_states': None, + 'apply_matches': False, + 'reprocess': False, + } + + with self.assertRaises(CommandError): + self.command.handle(**options) + + def test_handle_clean_action(self): + """Testa que ação --clean funciona""" + State.objects.create( + name="Test", + acronym="TS", + creator=self.user + ) + + options = { + 'clean': True, + 'unificate_states': False, + 'load_official_states': False, + 'fuzzy_match_states': None, + 'apply_matches': False, + 'reprocess': False, + } + + # Não deve lançar exceção + self.command.handle(**options) + + # Estado deve estar limpo + state = State.objects.first() + self.assertEqual(state.status, "CLEANED") + + def test_handle_multiple_actions(self): + """Testa que múltiplas ações podem ser executadas juntas""" + State.objects.create( + name="Test", + acronym="TS", + creator=self.user, + status="RAW" + ) + State.objects.create( + name="Test", + acronym="TS", + creator=self.user, + status="RAW" + ) + + options = { + 'clean': True, + 'unificate_states': True, + 'load_official_states': False, + 'fuzzy_match_states': None, + 'apply_matches': False, + 'reprocess': False, + } + + # Não deve lançar exceção + self.command.handle(**options) + + # Deve haver apenas 1 estado (após unificação) + self.assertEqual(State.objects.filter(acronym="TS").count(), 1) + diff --git a/location/utils.py b/location/utils.py new file mode 100644 index 000000000..50bbb0bf5 --- /dev/null +++ b/location/utils.py @@ -0,0 +1,149 @@ + +import logging +import re + +from django.contrib.auth import get_user_model +from django.db import IntegrityError + +from location.models import Location + +User = get_user_model() +logger = logging.getLogger(__name__) + +def remove_html_tags(text): + """Remove tags HTML completas e resíduos de tags""" + # Remove tags HTML completas: ... ou + text = re.sub(r'<[^>]+>', '', text) + + # Remove resíduos de abertura de tags: + # Exemplo: "São Pauloi>" → "São Paulo" + text = re.sub(r'[a-zA-Z]>', '', text) + + return text + +def remove_unaccent(name): + if not name: + return name + + name = remove_html_tags(str(name)) + + # Se o nome for apenas números + if re.fullmatch(r'\s*\d+\s*', name): + return name + + # Remove caracteres especiais, mantendo acentos + name = re.sub(r'[^a-zA-ZÀ-ÿ\s]', '', name) + + name = ' '.join(name.split()) + + return name + +def capitalize(name): + return name.title() if name else name + +def clean_name(name): + name_clean = remove_unaccent(name) + return capitalize(name_clean) + +def clean_acronym(acronym): + """Limpa acronym preservando maiúsculas""" + if not acronym: + return acronym + # Remove apenas espaços e caracteres especiais, mantém maiúsculas + acronym = remove_html_tags(str(acronym)) + acronym = re.sub(r'[^A-Z0-9]', '', acronym.upper()) + return acronym if acronym else None + +def choose_canonical_country(countries): + canonical_country = ( + countries.filter( + acronym__isnull=False, + acron3__isnull=False + ).first() or + countries.filter( + acronym__isnull=False, + ).first() or + countries.first() + ) + logging.info(f"Canonicial chosen: {canonical_country.name} (ID: {canonical_country.id})") + return canonical_country + +def process_duplicates_countries(duplicates, canonical_country, total_deleted): + """Processa países duplicados, movendo locations e deletando""" + + locations_moved = 0 + + for duplicate in duplicates: + duplicate_locations = duplicate.location_set.all() + + for location in duplicate_locations: + try: + existing = Location.objects.filter( + country=canonical_country, + state=location.state, + city=location.city + ).first() + + if existing: + logging.info(f"Location já existe com país canônico: {location.id} -> {existing.id}") + location.delete() + else: + location.country = canonical_country + location.save() + locations_moved += 1 + except IntegrityError as e: + logging.error(f"Erro ao atualizar location {location.id}: {e}") + continue + + duplicate.delete() + total_deleted += 1 + + return locations_moved + + +def choose_canonical_state(states): + """Escolhe o estado canônico entre duplicatas + Prioridade: 1) com acronym preenchido, 2) mais antigo + """ + canonical_state = ( + states.filter(acronym__isnull=False).first() or + states.first() + ) + logging.info(f"Canonical state chosen: {canonical_state.name} (ID: {canonical_state.id})") + return canonical_state + + +def process_duplicates_states(duplicates, canonical_state, total_deleted): + """Processa estados duplicados, movendo locations e deletando""" + locations_moved = 0 + + for duplicate in duplicates: + duplicate_locations = duplicate.location_set.all() + + for location in duplicate_locations: + try: + existing = Location.objects.filter( + country=location.country, + state=canonical_state, + city=location.city + ).first() + + if existing: + logging.info(f"Location já existe com estado canônico: {location.id} -> {existing.id}") + location.delete() + else: + location.state = canonical_state + location.save() + locations_moved += 1 + except IntegrityError as e: + logging.error(f"Erro ao atualizar location {location.id}: {e}") + continue + + duplicate.delete() + total_deleted += 1 + + return locations_moved diff --git a/location/wagtail_hooks.py b/location/wagtail_hooks.py index 47e076418..b4af95e71 100755 --- a/location/wagtail_hooks.py +++ b/location/wagtail_hooks.py @@ -2,17 +2,22 @@ from django.urls import include, path from django.utils.translation import gettext_lazy as _ from wagtail import hooks -from wagtail_modeladmin.options import ( - ModelAdmin, - ModelAdminGroup, - modeladmin_register, -) +from wagtail_modeladmin.options import ModelAdmin, ModelAdminGroup, modeladmin_register from wagtail_modeladmin.views import CreateView +from config.menu import get_menu_order + from .button_helpers import CountryHelper -from .models import City, Country, CountryFile, Location, State +from .models import ( + City, + Country, + CountryFile, + CountryMatched, + Location, + State, + StateMatched, +) from .views import import_file_country, validate_country -from config.menu import get_menu_order class LocationCreateView(CreateView): @@ -62,6 +67,7 @@ class CityAdmin(ModelAdmin): exclude_from_explorer = False list_display = ("name",) search_fields = ("name",) + list_filter = ("status",) list_export = ("name",) export_filename = "cities" @@ -86,6 +92,7 @@ class StateAdmin(ModelAdmin): "name", "acronym", ) + list_filter = ("status",) export_filename = "states" @@ -112,6 +119,7 @@ class CountryAdmin(ModelAdmin): "acronym", "acron3", ) + list_filter = ("status",) export_filename = "countries" @@ -159,3 +167,32 @@ def register_url(): name="import_file_country", ), ] + +from wagtail.snippets.models import register_snippet +from wagtail.snippets.views.snippets import ( + CreateView, + SnippetViewSet, + SnippetViewSetGroup, +) + + +@register_snippet +class CountryMatchedSnippetViewAdmin(SnippetViewSet): + model = CountryMatched + menu_label = "Correspondencia Country" + menu_icon = "folder" + search_fields = ( + "official__name", + ) + list_display = ("official", "matched_list", "score") + + +@register_snippet +class StateMatchedSnippetViewAdmin(SnippetViewSet): + model = StateMatched + menu_label = "Correspondencia State" + menu_icon = "folder" + search_fields = ( + "official__name", + ) + list_display = ("official", "matched_list", "score") diff --git a/requirements/base.txt b/requirements/base.txt index 7da57b9f6..6cd66b741 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -144,3 +144,11 @@ feedparser==6.0.12 # Xlwt # ------------------------------------------------------------------------------ xlwt==1.3.0 + +# pycountry +# ------------------------------------------------------------------------------ +pycountry==24.6.1 + +# RapidFuzz +# ------------------------------------------------------------------------------ +RapidFuzz==3.14.3 \ No newline at end of file