diff --git a/share/models/ingest.py b/share/models/ingest.py index 3e2d6426a..c5a662c01 100644 --- a/share/models/ingest.py +++ b/share/models/ingest.py @@ -197,6 +197,18 @@ def latest_by_suid_queryset(self, suid_queryset) -> models.QuerySet: .values('latest_rawdatum_id') )) + def latest_for_each_suid(self) -> models.QuerySet: + # only the latest datum for each described resource + _latest_pk_subquery = models.Subquery( + self.filter(suid_id=models.OuterRef('suid_id')) + .order_by(Coalesce('datestamp', 'date_created').desc(nulls_last=True)) + .values('pk') + [:1] + ) + return self.annotate( + latest_same_suid=_latest_pk_subquery, + ).filter(pk=models.F('latest_same_suid')) + class RawDatum(models.Model): diff --git a/trove/management/commands/migrate_rawdatum_expiration.py b/trove/management/commands/migrate_rawdatum_expiration.py new file mode 100644 index 000000000..b0373b35f --- /dev/null +++ b/trove/management/commands/migrate_rawdatum_expiration.py @@ -0,0 +1,51 @@ +import datetime +import time + +from django.db.models import OuterRef + +from trove.util.django import pk_chunked + +from share import models as share_db +from share.management.commands import BaseShareCommand +from trove import models as trove_db + + +class Command(BaseShareCommand): + # copy all non-null values from `RawDatum.expiration_date` to `SupplementaryIndexcardRdf.expiration_date` + # (while being overly cautious to avoid joins on `RawDatum` or `SourceUniqueIdentifier`) + # meant to be run after trove migration 0008_expiration_dates, before share.RawDatum is deleted + + def add_arguments(self, parser): + parser.add_argument('--chunk-size', type=int, default=666) + parser.add_argument('--today', type=datetime.date.fromisoformat, default=datetime.date.today()) + parser.add_argument('--continue-after', type=str, default=None) + + def handle(self, *args, chunk_size: int, today: datetime.date, continue_after, **kwargs): + _before = time.perf_counter() + _total_updated = 0 + _raw_qs = ( + share_db.RawDatum.objects.latest_for_each_suid() + .filter(expiration_date__gt=today) # ignore the expired (and the non-expiring) + ) + if continue_after is not None: + _raw_qs = _raw_qs.filter(pk__gt=continue_after) + for _raw_pk_chunk in pk_chunked(_raw_qs, chunk_size): + _supp_qs = trove_db.SupplementaryIndexcardRdf.objects.filter( + from_raw_datum_id__in=_raw_pk_chunk, + expiration_date__isnull=True, # avoid overwriting non-null values + ) + _updated_count = _supp_qs.update( + expiration_date=share_db.RawDatum.objects.filter( + id=OuterRef('from_raw_datum_id'), + ).values('expiration_date'), + ) + _total_updated += _updated_count + _last_pk = _raw_pk_chunk[-1] + _elapsed = time.perf_counter() - _before + self.stdout.write( + f'{_elapsed:.2f}: migrated {_updated_count} of {len(_raw_pk_chunk)} --continue-after={_last_pk}', + ) + _total_seconds = time.perf_counter() - _before + self.stdout.write( + self.style.SUCCESS(f'done! migrated {_total_updated} in {_total_seconds}s'), + ) diff --git a/trove/migrations/0008_expiration_dates.py b/trove/migrations/0008_expiration_dates.py new file mode 100644 index 000000000..fd25c7fb5 --- /dev/null +++ b/trove/migrations/0008_expiration_dates.py @@ -0,0 +1,36 @@ +# Generated by Django 3.2.25 on 2025-06-09 15:42 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('trove', '0007_rawdata_fks_do_nothing'), + ] + + operations = [ + migrations.AddField( + model_name='archivedindexcardrdf', + name='expiration_date', + field=models.DateField(blank=True, help_text='An (optional) date when this description will no longer be valid.', null=True), + ), + migrations.AddField( + model_name='latestindexcardrdf', + name='expiration_date', + field=models.DateField(blank=True, help_text='An (optional) date when this description will no longer be valid.', null=True), + ), + migrations.AddField( + model_name='supplementaryindexcardrdf', + name='expiration_date', + field=models.DateField(blank=True, help_text='An (optional) date when this description will no longer be valid.', null=True), + ), + migrations.AddIndex( + model_name='latestindexcardrdf', + index=models.Index(fields=['expiration_date'], name='trove_lates_expirat_92ac89_idx'), + ), + migrations.AddIndex( + model_name='supplementaryindexcardrdf', + index=models.Index(fields=['expiration_date'], name='trove_suppl_expirat_3ea6e1_idx'), + ), + ] diff --git a/trove/models/indexcard.py b/trove/models/indexcard.py index 42c3dc29c..6ae24b4b0 100644 --- a/trove/models/indexcard.py +++ b/trove/models/indexcard.py @@ -248,6 +248,7 @@ def update_rdf( defaults={ 'rdf_as_turtle': _rdf_as_turtle, 'focus_iri': focus_iri, + 'expiration_date': from_raw_datum.expiration_date, }, ) if (not _archived_created) and (_archived.rdf_as_turtle != _rdf_as_turtle): @@ -260,6 +261,7 @@ def update_rdf( 'turtle_checksum_iri': _turtle_checksum_iri, 'rdf_as_turtle': _rdf_as_turtle, 'focus_iri': focus_iri, + 'expiration_date': from_raw_datum.expiration_date, }, ) return _latest_indexcard_rdf @@ -282,6 +284,7 @@ def update_supplementary_rdf( 'turtle_checksum_iri': _turtle_checksum_iri, 'rdf_as_turtle': _rdf_as_turtle, 'focus_iri': focus_iri, + 'expiration_date': from_raw_datum.expiration_date, }, ) return _supplement_rdf @@ -307,6 +310,13 @@ class IndexcardRdf(models.Model): focus_iri = models.TextField() # exact iri used in rdf_as_turtle rdf_as_turtle = models.TextField() # TODO: store elsewhere by checksum + # optional: + expiration_date = models.DateField( + null=True, + blank=True, + help_text='An (optional) date when this description will no longer be valid.', + ) + def as_rdf_tripledict(self) -> rdf.RdfTripleDictionary: return rdf.tripledict_from_turtle(self.rdf_as_turtle) @@ -344,6 +354,7 @@ class Meta: ] indexes = [ models.Index(fields=('modified',)), # for OAI-PMH selective harvest + models.Index(fields=['expiration_date']), # for expiring ] @@ -373,6 +384,9 @@ class Meta: name='%(app_label)s_%(class)s_uniq_supplement', ), ] + indexes = [ + models.Index(fields=['expiration_date']), # for expiring + ] class DerivedIndexcard(models.Model): diff --git a/trove/util/django.py b/trove/util/django.py new file mode 100644 index 000000000..e927f2a68 --- /dev/null +++ b/trove/util/django.py @@ -0,0 +1,29 @@ +from __future__ import annotations +from collections.abc import Iterator + + +__all__ = ('pk_chunked',) + + +def pk_chunked(queryset, chunksize: int) -> Iterator[list]: + '''pk_chunked: get primary key values, in chunks, for the given queryset + + yields non-empty lists of primary keys up to `chunksize` long + ''' + _ordered_qs = queryset.order_by('pk') + _prior_end_pk = None + while True: # for each chunk: + _qs = ( + _ordered_qs + if _prior_end_pk is None + else _ordered_qs.filter(pk__gt=_prior_end_pk) + ) + # load primary key values only + _pks = list(_qs.values_list('pk', flat=True)[:chunksize]) + if not _pks: + break # done + _end_pk = _pks[-1] + if (_prior_end_pk is not None) and (_end_pk <= _prior_end_pk): + raise RuntimeError(f'sentinel pks not ascending?? got {_end_pk} after {_prior_end_pk}') + _prior_end_pk = _end_pk + yield _pks