diff --git a/server/mergin/sync/files.py b/server/mergin/sync/files.py index 316c1579..63c387fd 100644 --- a/server/mergin/sync/files.py +++ b/server/mergin/sync/files.py @@ -4,11 +4,18 @@ import datetime from enum import Enum import os -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Optional, List import uuid from flask import current_app -from marshmallow import ValidationError, fields, EXCLUDE, post_dump, validates_schema +from marshmallow import ( + ValidationError, + fields, + EXCLUDE, + post_dump, + validates_schema, + post_load, +) from pathvalidate import sanitize_filename from .utils import ( @@ -234,3 +241,113 @@ def patch_field(self, data, **kwargs): if not data.get("diff"): data.pop("diff") return data + + +@dataclass +class DeltaDiffFile: + """Diff file path in diffs list""" + + path: str + + +class DeltaChangeDiffFileSchema(ma.Schema): + """Schema for diff file path in diffs list""" + + path = fields.String(required=True) + + +@dataclass +class DeltaChangeBase(File): + """Base class for changes stored in json list or returned from delta endpoint""" + + change: PushChangeType + version: int + + +@dataclass +class DeltaChangeMerged(DeltaChangeBase): + """Delta item with merged diffs to list of multiple diff files""" + + diffs: List[DeltaDiffFile] = field(default_factory=list) + + def to_data_delta(self): + """Convert DeltaMerged to DeltaData with single diff""" + result = DeltaChange( + path=self.path, + size=self.size, + checksum=self.checksum, + change=self.change, + version=self.version, + ) + if self.diffs: + result.diff = self.diffs[0].path + return result + + +@dataclass +class DeltaChange(DeltaChangeBase): + """Change items stored in database as list of this item with single diff file""" + + diff: Optional[str] = None + + def to_merged(self) -> DeltaChangeMerged: + """Convert to DeltaMerged with multiple diffs""" + result = DeltaChangeMerged( + path=self.path, + size=self.size, + checksum=self.checksum, + change=self.change, + version=self.version, + ) + if self.diff: + result.diffs = [DeltaDiffFile(path=self.diff)] + return result + + +class DeltaChangeBaseSchema(ma.Schema): + """Base schema for delta json and response from delta endpoint""" + + path = fields.String(required=True) + size = fields.Integer(required=True) + checksum = fields.String(required=True) + version = fields.Integer(required=True) + change = fields.Enum(PushChangeType, by_value=True, required=True) + + +class DeltaChangeSchema(DeltaChangeBaseSchema): + """Schema for change data in changes column""" + + diff = fields.String(required=False) + + @post_load + def make_object(self, data, **kwargs): + return DeltaChange(**data) + + @post_dump + def patch_field(self, data, **kwargs): + # drop 'diff' key entirely if empty or None as database would expect + if not data.get("diff"): + data.pop("diff", None) + return data + + +class DeltaChangeItemSchema(DeltaChangeBaseSchema): + """Schema for delta changes response""" + + diffs = fields.List(fields.Nested(DeltaChangeDiffFileSchema())) + + @post_dump + def patch_field(self, data, **kwargs): + # drop 'diffs' key entirely if empty or None as clients would expect + if not data.get("diffs"): + data.pop("diffs", None) + return data + + +class DeltaChangeRespSchema(ma.Schema): + """Schema for list of delta changes wrapped in items field""" + + items = fields.List(fields.Nested(DeltaChangeItemSchema())) + + class Meta: + unknown = EXCLUDE diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 4a6c00ea..985453c8 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -24,7 +24,10 @@ from flask import current_app from .files import ( - File, + DeltaChangeMerged, + DeltaDiffFile, + DeltaChange, + DeltaChangeSchema, ProjectDiffFile, ProjectFileChange, ChangesSchema, @@ -62,6 +65,16 @@ class FileSyncErrorType(Enum): SYNC_ERROR = "sync error" +class ChangeComparisonAction(Enum): + """Actions to take when comparing two changes""" + + REPLACE = "replace" + DELETE = "delete" + UPDATE = "update" + UPDATE_DIFF = "update_diff" + EXCLUDE = "exclude" # Return None to exclude the file + + class Project(db.Model): id = db.Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4) name = db.Column(db.String, index=True) @@ -272,6 +285,11 @@ def delete(self, removed_by: int = None): db.session.execute( upload_table.delete().where(upload_table.c.project_id == self.id) ) + # remove project version delta related to project + delta_table = ProjectVersionDelta.__table__ + db.session.execute( + delta_table.delete().where(delta_table.c.project_id == self.id) + ) self.project_users.clear() access_requests = ( AccessRequest.query.filter_by(project_id=self.id) @@ -350,6 +368,84 @@ def bulk_roles_update(self, access: Dict) -> Set[int]: return set(id_diffs) + def get_delta_changes( + self, since: int, to: int + ) -> Optional[List[DeltaChangeMerged]]: + """ + Get changes between two versions, merging them if needed. + - create FileDiff checkpoints if needed + - create ProjectVersionDelta checkpoints if needed with changes json + """ + if since > to: + logging.error( + f"Start version {since} is higher than end version {to} - broken history" + ) + return + if since == to: + return None + project_id = self.id + expected_checkpoints = Checkpoint.get_checkpoints(since + 1, to) + expected_deltas: List[ProjectVersionDelta] = ( + ProjectVersionDelta.query.filter( + ProjectVersionDelta.project_id == project_id, + ProjectVersionDelta.version > since, + ProjectVersionDelta.version <= to, + tuple_(ProjectVersionDelta.rank, ProjectVersionDelta.version).in_( + [(item.rank, item.end) for item in expected_checkpoints] + ), + ) + .order_by(ProjectVersionDelta.version) + .all() + ) + existing_delta_map = {(c.rank, c.version): c for c in expected_deltas} + + # Cache all individual (rank 0) delta rows in the required range. + individual_deltas: List[ProjectVersionDelta] = [] + + result: List[DeltaChange] = [] + for checkpoint in expected_checkpoints: + existing_delta = existing_delta_map.get((checkpoint.rank, checkpoint.end)) + + # we have delta in database, just return delta data from it + if existing_delta: + result.extend(DeltaChangeSchema(many=True).load(existing_delta.changes)) + continue + + # If higher rank delta checkopoint does not exists, we are using rank=0 deltas to create checkopoint + if checkpoint.rank > 0: + individual_deltas = ( + ProjectVersionDelta.query.filter( + ProjectVersionDelta.project_id == project_id, + ProjectVersionDelta.version >= since, + ProjectVersionDelta.version <= to, + ProjectVersionDelta.rank == 0, + ) + .order_by(ProjectVersionDelta.version) + .all() + if not individual_deltas + else individual_deltas + ) + + if not individual_deltas: + logging.error( + f"No individual deltas found for project {project_id} in range {since} / {to} to create checkpoint." + ) + return + + new_checkpoint = ProjectVersionDelta.create_checkpoint( + project_id, checkpoint, individual_deltas + ) + if new_checkpoint: + result.extend( + DeltaChangeSchema(many=True).load(new_checkpoint.changes) + ) + else: + logging.error( + f"Not possible to create checkpoint for project {project_id} in range {checkpoint.start}-{checkpoint.end}" + ) + + return ProjectVersionDelta.merge_changes(result) + class ProjectRole(Enum): """Project roles ordered by rank (do not change)""" @@ -902,6 +998,278 @@ def construct_checkpoint(self) -> bool: return True +class ProjectVersionDelta(db.Model): + id = db.Column(db.BigInteger, primary_key=True, autoincrement=True) + version = db.Column(db.Integer, nullable=False, index=True) + # exponential order of changes json + rank = db.Column(db.Integer, nullable=False, index=True) + # to which project is this linked + project_id = db.Column( + UUID(as_uuid=True), + db.ForeignKey("project.id", ondelete="CASCADE"), + index=True, + nullable=False, + ) + # cached changes for versions from start to end (inclusive) + changes = db.Column(JSONB, nullable=False) + + __table_args__ = ( + db.UniqueConstraint("project_id", "version", "rank", name="unique_deltas"), + db.Index( + "ix_project_version_delta_project_id_version_rank", + project_id, + version, + rank, + ), + ) + project = db.relationship( + "Project", + uselist=False, + ) + + @staticmethod + def merge_changes( + items: List[DeltaChange], + ) -> List[DeltaChangeMerged]: + """ + Merge changes json array objects into one list of changes. + Changes are merged based on file path and change type. + """ + updating_files: Set[str] = set() + # sorting changes by version to apply them in correct order + items.sort(key=lambda x: x.version) + + # Merge changes for each file in a single pass + result: Dict[str, DeltaChangeMerged] = {} + for item in items: + path = item.path + current = item.to_merged() + + # First change for this file + if path not in result: + result[path] = current + # track existing paths to avoid deleting files that are already in history before + if current.change != PushChangeType.CREATE: + updating_files.add(path) + continue + + # Compare and merge with previous change for this file + can_delete = path in updating_files + new_change = ProjectVersionDelta._compare_changes( + result[path], current, can_delete + ) + + # Update result (or remove if no change is detected) + if new_change is not None: + result[path] = new_change + else: + del result[path] + + return list(result.values()) + + @staticmethod + def _compare_changes( + previous: DeltaChangeMerged, + new: DeltaChangeMerged, + prevent_delete_change: bool, + ) -> Optional[DeltaChangeMerged]: + """ + Compare and merge two changes for the same file. + + Args: + previous: Previously accumulated change + new: New change to compare + prevent_delete_change: Whether the change can be deleted when resolving create+delete sequences + + Returns: + Merged change or None if file should be excluded + """ + + # Map change type pairs to actions + action_map = { + # create + delete = file is transparent for current changes -> delete it + ( + PushChangeType.CREATE, + PushChangeType.DELETE, + ): ChangeComparisonAction.DELETE, + # create + update = create with updated info + ( + PushChangeType.CREATE, + PushChangeType.UPDATE, + ): ChangeComparisonAction.UPDATE, + ( + PushChangeType.CREATE, + PushChangeType.UPDATE_DIFF, + ): ChangeComparisonAction.UPDATE, + ( + PushChangeType.CREATE, + PushChangeType.CREATE, + ): ChangeComparisonAction.EXCLUDE, + # update + update_diff = update with latest info + ( + PushChangeType.UPDATE, + PushChangeType.UPDATE_DIFF, + ): ChangeComparisonAction.UPDATE, + ( + PushChangeType.UPDATE, + PushChangeType.UPDATE, + ): ChangeComparisonAction.REPLACE, + ( + PushChangeType.UPDATE, + PushChangeType.DELETE, + ): ChangeComparisonAction.REPLACE, + ( + PushChangeType.UPDATE, + PushChangeType.CREATE, + ): ChangeComparisonAction.REPLACE, + # update_diff + update_diff = update_diff with latest info with proper order of diffs + ( + PushChangeType.UPDATE_DIFF, + PushChangeType.UPDATE_DIFF, + ): ChangeComparisonAction.UPDATE_DIFF, + ( + PushChangeType.UPDATE_DIFF, + PushChangeType.UPDATE, + ): ChangeComparisonAction.REPLACE, + ( + PushChangeType.UPDATE_DIFF, + PushChangeType.DELETE, + ): ChangeComparisonAction.REPLACE, + ( + PushChangeType.UPDATE_DIFF, + PushChangeType.CREATE, + ): ChangeComparisonAction.EXCLUDE, + ( + PushChangeType.DELETE, + PushChangeType.CREATE, + ): ChangeComparisonAction.REPLACE, + # delete + update = invalid sequence + ( + PushChangeType.DELETE, + PushChangeType.UPDATE, + ): ChangeComparisonAction.EXCLUDE, + ( + PushChangeType.DELETE, + PushChangeType.UPDATE_DIFF, + ): ChangeComparisonAction.EXCLUDE, + ( + PushChangeType.DELETE, + PushChangeType.DELETE, + ): ChangeComparisonAction.EXCLUDE, + } + + action = action_map.get((previous.change, new.change)) + result = None + if action == ChangeComparisonAction.REPLACE: + result = new + + elif action == ChangeComparisonAction.DELETE: + # if change is create + delete, we can just remove the change from accumulated changes + # only if this action is allowed (file existed before) + if prevent_delete_change: + result = new + + elif action == ChangeComparisonAction.UPDATE: + # handle update case, when previous change was create - just revert to create with new metadata + new.change = previous.change + new.diffs = [] + result = new + + elif action == ChangeComparisonAction.UPDATE_DIFF: + new.diffs = (previous.diffs or []) + (new.diffs or []) + result = new + + return result + + @classmethod + def create_checkpoint( + cls, + project_id: str, + checkpoint: Checkpoint, + from_deltas: List[ProjectVersionDelta] = [], + ) -> Optional[ProjectVersionDelta]: + """ + Creates and caches new checkpoint and any required FileDiff checkpoints. + Use from_deltas to create checkpoint from existing individual deltas. + Returns created ProjectVersionDelta object with checkpoint. + """ + delta_range = [ + change + for change in from_deltas + if checkpoint.start <= change.version <= checkpoint.end + ] + + if not delta_range: + logging.warning( + f"No individual changes found for project {project_id} in range v{checkpoint.start}-v{checkpoint.end} to create checkpoint." + ) + return None + + # dump changes lists from database and flatten list for merging + changes = [] + for delta in delta_range: + changes.extend(DeltaChangeSchema(many=True).load(delta.changes)) + merged_delta_items: List[DeltaChange] = [ + d.to_data_delta() for d in cls.merge_changes(changes) + ] + + # Pre-fetch data for all versioned files to create FileDiff checkpoints + versioned_delta_items = [ + item + for item in merged_delta_items + if is_versioned_file(item.path) + and item.change == PushChangeType.UPDATE_DIFF + ] + versioned_file_paths = [delta.path for delta in versioned_delta_items] + if versioned_file_paths: + # get versioned files from DB and lookup their paths to next processing + file_paths = ProjectFilePath.query.filter( + ProjectFilePath.project_id == project_id, + ProjectFilePath.path.in_(versioned_file_paths), + ).all() + file_path_map = {fp.path: fp.id for fp in file_paths} + + for item in versioned_delta_items: + file_path_id = file_path_map.get(item.path) + if not file_path_id: + continue + + # Check if a FileDiff checkpoint already exists + existing_diff_checkpoint = FileDiff.query.filter_by( + file_path_id=file_path_id, + rank=checkpoint.rank, + version=checkpoint.end, + ).first() + # If does not exists, let's create diff with higher rank and some generated path (name of diff file) + if not existing_diff_checkpoint: + base_file = FileHistory.get_basefile(file_path_id, checkpoint.end) + if not base_file: + continue + + diff_path = mergin_secure_filename( + f"{item.path}-diff-{uuid.uuid4()}" + ) + checkpoint_diff = FileDiff( + basefile=base_file, + path=diff_path, + rank=checkpoint.rank, + version=checkpoint.end, + ) + # Patch the delta with the path to the new diff checkpoint + item.diff = diff_path + db.session.add(checkpoint_diff) + + checkpoint_delta = ProjectVersionDelta( + project_id=project_id, + version=checkpoint.end, + rank=checkpoint.rank, + changes=DeltaChangeSchema(many=True).dump(merged_delta_items), + ) + db.session.add(checkpoint_delta) + db.session.commit() + return checkpoint_delta + + class ProjectVersion(db.Model): id = db.Column(db.Integer, primary_key=True, autoincrement=True) name = db.Column(db.Integer, index=True) @@ -968,7 +1336,6 @@ def __init__( .filter(ProjectFilePath.path.in_(changed_files_paths)) .all() } - for item in changes: # get existing DB file reference or create a new one (for added files) db_file = existing_files_map.get( @@ -997,6 +1364,29 @@ def __init__( else: latest_files_map[fh.path] = fh.id + # cache changes data json for version checkpoints + # rank 0 is for all changes from start to current version + delta_data = [ + DeltaChange( + path=c.path, + change=c.change, + size=c.size, + checksum=c.checksum, + version=name, + diff=c.diff.path if c.diff else None, + ) + for c in changes + ] + pvd = ProjectVersionDelta( + project_id=project.id, + version=name, + rank=0, + changes=DeltaChangeSchema(many=True).dump(delta_data), + ) + + db.session.add(pvd) + db.session.flush() + # update cached values in project and push to transaction buffer so that self.files is up-to-date self.project.latest_project_files.file_history_ids = latest_files_map.values() db.session.flush() diff --git a/server/mergin/sync/public_api_v2.yaml b/server/mergin/sync/public_api_v2.yaml index d4c87016..7436e71d 100644 --- a/server/mergin/sync/public_api_v2.yaml +++ b/server/mergin/sync/public_api_v2.yaml @@ -381,6 +381,41 @@ paths: $ref: "#/components/schemas/ProjectLocked" x-openapi-router-controller: mergin.sync.public_api_v2_controller + /projects/{id}/delta: + get: + tags: + - project + summary: Get project changes (delta) between two versions + operationId: get_project_delta + parameters: + - $ref: "#/components/parameters/ProjectId" + - name: since + in: query + required: true + schema: + type: integer + example: + minimum: 0 + description: Start version (exclusive) + - name: to + in: query + schema: + type: integer + example: 2 + minimum: 1 + description: End version (inclusive) + responses: + "200": + description: Project changes between two versions + content: + application/json: + schema: + $ref: "#/components/schemas/ProjectDeltaResponse" + "400": + $ref: "#/components/responses/BadRequest" + "404": + $ref: "#/components/responses/NotFound" + x-openapi-router-controller: mergin.sync.public_api_v2_controller components: responses: NoContent: @@ -800,3 +835,48 @@ components: - editor - writer - owner + ProjectChangeType: + type: string + enum: [create, update, delete, update_diff] + example: update + ProjectDeltaChange: + type: object + required: + - path + - size + - checksum + - version + - change + properties: + path: + type: string + example: survey.gpkg + size: + type: integer + example: 1024 + checksum: + type: string + example: 9adb76bf81a34880209040ffe5ee262a090b62ab + version: + type: integer + example: 1 + change: + $ref: "#/components/schemas/ProjectChangeType" + diffs: + type: array + nullable: true + items: + type: object + properties: + path: + type: string + example: survey.gpkg-diff-1 + ProjectDeltaResponse: + type: object + required: + - items + properties: + items: + type: array + items: + $ref: "#/components/schemas/ProjectDeltaChange" diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 6bac0ff6..dd1802df 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -4,6 +4,7 @@ import os from datetime import datetime +from typing import Optional import uuid import gevent import logging @@ -22,7 +23,7 @@ from ..app import db from ..auth import auth_required from ..auth.models import User -from .models import FileDiff, Project, ProjectRole, ProjectMember +from .models import FileDiff, Project, ProjectRole, ProjectMember, ProjectVersionDelta from .permissions import ProjectPermissions, require_project_by_uuid from .utils import prepare_download_response from ..app import db @@ -38,7 +39,7 @@ StorageLimitHit, UploadError, ) -from .files import ChangesSchema +from .files import ChangesSchema, DeltaChangeRespSchema from .forms import project_name_validation from .models import ( Project, @@ -402,3 +403,19 @@ def upload_chunk(id: str): UploadChunkSchema().dump({"id": chunk_id, "valid_until": valid_until}), 200, ) + + +def get_project_delta(id: str, since: int, to: Optional[int] = None): + """Get project changes (delta) between two versions""" + + project: Project = require_project_by_uuid(id, ProjectPermissions.Read) + to = project.latest_version if to is None else to + if to > project.latest_version: + abort(400, "'to' version exceeds latest project version") + + if since >= to: + abort(400, "'since' version must be less than 'to' version") + + delta_changes = project.get_delta_changes(since, to) or [] + + return DeltaChangeRespSchema().dump({"items": delta_changes}), 200 diff --git a/server/mergin/tests/test_project_controller.py b/server/mergin/tests/test_project_controller.py index 5b34bfba..916c2bc4 100644 --- a/server/mergin/tests/test_project_controller.py +++ b/server/mergin/tests/test_project_controller.py @@ -28,6 +28,7 @@ from ..sync.models import ( FileDiff, Project, + ProjectVersionDelta, Upload, ProjectVersion, SyncFailuresHistory, @@ -536,6 +537,7 @@ def test_delete_project(client): assert not Project.query.filter_by( workspace_id=test_workspace_id, name=test_project ).count() + assert not ProjectVersionDelta.query.filter_by(project_id=project.id).count() assert not os.path.exists(project_dir) rm_project = Project.query.get(project.id) assert rm_project.removed_at and not rm_project.storage_params @@ -1781,6 +1783,8 @@ def test_optimize_storage(app, client, diff_project): diff_project.latest_version = 8 ProjectVersion.query.filter_by(project_id=diff_project.id, name=9).delete() ProjectVersion.query.filter_by(project_id=diff_project.id, name=10).delete() + ProjectVersionDelta.query.filter_by(project_id=diff_project.id, version=9).delete() + ProjectVersionDelta.query.filter_by(project_id=diff_project.id, version=10).delete() db.session.commit() diff_project.cache_latest_files() assert diff_project.latest_version == 8 @@ -2337,6 +2341,10 @@ def _get_user_agent(): .order_by(desc(ProjectVersion.created)) .first() ) + # remove project version delta entries + ProjectVersionDelta.query.filter_by( + project_id=upload.project_id, version=pv.name + ).delete() db.session.delete(pv) db.session.commit() upload.project.cache_latest_files() diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index 762f5a59..d9efe7f1 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -3,15 +3,33 @@ # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial import os import shutil +from typing import List from unittest.mock import patch import uuid from pygeodiff import GeoDiffLibError -from .utils import add_user, diffs_are_equal, execute_query, push_change +from .utils import ( + add_user, + create_project, + create_workspace, + diffs_are_equal, + execute_query, + login_as_admin, + push_change, +) from ..app import db from tests import test_project, test_workspace_id from ..config import Configuration -from ..sync.models import FileDiff, FileHistory, Project, ProjectFilePath, ProjectRole +from ..sync.models import ( + FileDiff, + FileHistory, + Project, + ProjectFilePath, + ProjectRole, + ProjectVersionDelta, +) +from ..sync.files import DeltaChange, PushChangeType +from ..sync.utils import is_versioned_file from sqlalchemy.exc import IntegrityError import pytest from datetime import datetime, timedelta, timezone @@ -306,6 +324,248 @@ def test_create_diff_checkpoint(diff_project): assert not os.path.exists(diff.abs_path) +def test_delta_merge_changes(): + """Test merging of delta changes works as expected""" + + create = DeltaChange( + path="file1.gpkg", + change=PushChangeType.CREATE, + version=1, + size=100, + checksum="abc", + ) + update = DeltaChange( + path="file1.gpkg", + change=PushChangeType.UPDATE, + version=2, + size=120, + checksum="def", + ) + delete = DeltaChange( + path="file1.gpkg", + change=PushChangeType.DELETE, + version=3, + size=0, + checksum="ghi", + ) + update_diff1 = DeltaChange( + path="file1.gpkg", + change=PushChangeType.UPDATE_DIFF, + version=4, + size=130, + checksum="xyz", + diff="diff1", + ) + update_diff2 = DeltaChange( + path="file1.gpkg", + change=PushChangeType.UPDATE_DIFF, + version=5, + size=140, + checksum="uvw", + diff="diff2", + ) + + # CREATE + UPDATE -> CREATE + merged = ProjectVersionDelta.merge_changes([create, update]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.CREATE + assert merged[0].version == update.version + # check reverse order as well + merged = ProjectVersionDelta.merge_changes([update, create]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.CREATE + assert merged[0].version == update.version + + # CREATE + DELETE -> removed + merged = ProjectVersionDelta.merge_changes([create, delete]) + assert len(merged) == 0 + + # UPDATE + DELETE -> DELETE + merged = ProjectVersionDelta.merge_changes([update, delete]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.DELETE + + # CREATE + UPDATE_DIFF -> CREATE + merged = ProjectVersionDelta.merge_changes([create, update_diff1]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.CREATE + assert merged[0].diffs == [] + + # UPDATE + UPDATE_DIFF -> UPDATE + merged = ProjectVersionDelta.merge_changes([update, update_diff1]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.UPDATE + assert merged[0].diffs == [] + + # UPDATE_DIFF + UPDATE_DIFF -> merged diffs + merged = ProjectVersionDelta.merge_changes([update_diff1, update_diff2]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.UPDATE_DIFF + assert merged[0].version == update_diff2.version + assert merged[0].size == update_diff2.size + assert merged[0].checksum == update_diff2.checksum + assert [d.path for d in merged[0].diffs] == ["diff1", "diff2"] + + # case when trying to delete already existing file in history + # copy create with new version number + delete = DeltaChange( + path="file1.gpkg", + change=PushChangeType.DELETE, + version=6, + size=0, + checksum="ghi", + ) + create = DeltaChange( + path="file1.gpkg", + change=PushChangeType.CREATE, + version=7, + size=100, + checksum="abc", + ) + delete8 = DeltaChange( + path="file1.gpkg", + change=PushChangeType.DELETE, + version=8, + size=0, + checksum="abc2", + ) + merged = ProjectVersionDelta.merge_changes([delete, create, delete8]) + assert len(merged) == 1 + assert merged[0].change == PushChangeType.DELETE + assert merged[0].version == delete8.version + assert merged[0].size == delete8.size + assert merged[0].checksum == delete8.checksum + + +def test_project_version_delta_changes(client, diff_project: Project): + """Test that get_delta_changes and its schema work as expected""" + latest_version = diff_project.get_latest_version() + project_id = diff_project.id + assert latest_version.name == 10 + assert diff_project.get_delta_changes(2, 1) is None + assert diff_project.get_delta_changes(2, 2) is None + deltas: List[ProjectVersionDelta] = ( + ProjectVersionDelta.query.filter_by(project_id=project_id) + .order_by(ProjectVersionDelta.version) + .all() + ) + # check if deltas are created after pushes within ProjectVersion creation + assert len(deltas) == 10 + initial_delta = deltas[0] + initial_version = ProjectVersion.query.filter_by( + project_id=project_id, name=initial_delta.version + ).first() + assert initial_version + assert initial_delta.version + assert initial_delta.rank == 0 + assert initial_delta.version == 1 + + # delete file + delta = diff_project.get_delta_changes(1, 2) + assert len(delta) == 1 + assert delta[0].change == PushChangeType.DELETE + + # delete + create version + delta = diff_project.get_delta_changes(1, 3) + assert len(delta) == 1 + assert delta[0].change == PushChangeType.CREATE + # file was created in v3 + assert delta[0].version == 3 + assert delta[0].checksum == deltas[3].changes[0]["checksum"] + + # get_delta with update diff + delta = diff_project.get_delta_changes(1, 4) + assert len(delta) == 1 + assert delta[0].change == PushChangeType.CREATE + assert ProjectVersionDelta.query.filter_by(rank=1).count() == 0 + + # create rank 1 checkpoint for v4 + delta = diff_project.get_delta_changes(0, 4) + checkpoint = ProjectVersionDelta.query.filter_by(rank=1) + filediff_checkpoints = FileDiff.query.filter_by(rank=1) + checkpoint_change = checkpoint.first() + assert checkpoint.count() == 1 + assert checkpoint_change.version == deltas[3].version + assert filediff_checkpoints.count() == 0 + # check if filediff basefile is correctly set + file_history = FileHistory.query.filter_by(project_version_name=4).first() + assert len(delta) == len(initial_version.files) + delta_base_gpkg = next((d for d in delta if d.path == "base.gpkg"), None) + assert delta_base_gpkg + # from history is clear, that we are just creating geopackage in this range + assert delta_base_gpkg.change == PushChangeType.CREATE + assert delta_base_gpkg.version == 4 + assert delta_base_gpkg.path == file_history.path + assert delta_base_gpkg.size == file_history.size + assert delta_base_gpkg.checksum == file_history.checksum + assert len(delta_base_gpkg.diffs) == 0 + + # get data with multiple ranks = 1 level checkpoints 1-4, 5-8 + checkpoint 9 and 10 + delta = diff_project.get_delta_changes(0, 10) + assert len(delta) == len(latest_version.files) + delta_test_gpkg = next((d for d in delta if d.path == "test.gpkg"), None) + assert delta_test_gpkg + assert delta_test_gpkg.change == PushChangeType.CREATE + assert ProjectVersionDelta.query.filter_by(rank=1).count() == 2 + assert ProjectVersionDelta.query.filter_by(rank=2).count() == 0 + # check if version is having rank 1 checkpoint with proper end version + assert ProjectVersionDelta.query.filter_by(rank=1, version=4).first() + assert ProjectVersionDelta.query.filter_by(rank=1, version=8).first() + # base gpgk is transparent, bacause we are requesting from 0 + assert not next((c for c in delta if c.path == "base.gpkg"), None) + + delta = diff_project.get_delta_changes(latest_version.name - 3, latest_version.name) + delta_base_gpkg = next((c for c in delta if c.path == "base.gpkg"), None) + assert delta_base_gpkg.change == PushChangeType.DELETE + + # create just update_diff versions with checkpoint + base_gpkg = os.path.join(diff_project.storage.project_dir, "test.gpkg") + shutil.copy( + os.path.join(diff_project.storage.project_dir, "v9", "test.gpkg"), base_gpkg + ) + for i in range(6): + sql = f"UPDATE simple SET rating={i}" + execute_query(base_gpkg, sql) + push_change( + diff_project, "updated", "test.gpkg", diff_project.storage.project_dir + ) + delta = diff_project.get_delta_changes(8, latest_version.name + 6) + assert len(delta) == 2 + # file history in 9.th version is basefile + fh = FileHistory.query.filter_by( + project_version_name=latest_version.name - 1 + ).first() + # testing constistency of db entries FileDiff and ProjectVersionDelta + test_gpkg_checkpoint = FileDiff.query.filter_by(basefile_id=fh.id, rank=1).first() + assert test_gpkg_checkpoint + assert test_gpkg_checkpoint.version == latest_version.name + 6 + delta_checkpoint = ProjectVersionDelta.query.filter_by( + project_id=diff_project.id, version=latest_version.name + 6, rank=1 + ).first() + assert delta_checkpoint + assert len(delta_checkpoint.changes) == 1 + assert delta_checkpoint.changes[0]["version"] == latest_version.name + 6 + assert delta_checkpoint.changes[0]["change"] == PushChangeType.UPDATE_DIFF.value + assert delta_checkpoint.changes[0]["diff"] == test_gpkg_checkpoint.path + + fh = FileHistory.query.filter_by( + project_version_name=latest_version.name + 6 + ).first() + delta = diff_project.get_delta_changes(12, latest_version.name + 6) + assert len(delta) == 1 + assert len(delta[0].diffs) == 1 + assert delta[0].diffs[0].path == test_gpkg_checkpoint.path + assert delta[0].change == PushChangeType.UPDATE_DIFF + assert delta[0].checksum == fh.checksum + assert delta[0].size == fh.size + + # check if checkpoint will be there + response = client.get( + f"v2/projects/{diff_project.id}/raw/diff/{delta[0].diffs[0].path}" + ) + assert response.status_code == 200 + + push_data = [ # success ( @@ -647,3 +907,100 @@ def test_full_push(client): os.path.join(project.storage.project_dir, "v2", test_file["path"]) ) assert not Upload.query.filter_by(project_id=project.id).first() + + +def test_project_delta(client, diff_project): + """Test project delta endpoint""" + login_as_admin(client) + user = add_user() + workspace = create_workspace() + initial_project = create_project("empty_project", workspace=workspace, user=user) + working_dir = os.path.join(TMP_DIR, "empty_work_dir") + os.makedirs(os.path.join(TMP_DIR, "empty_work_dir"), exist_ok=True) + # add basefile + shutil.copy( + os.path.join(test_project_dir, "base.gpkg"), + os.path.join(working_dir, "base.gpkg"), + ) + push_change(initial_project, "added", "base.gpkg", working_dir) + response = client.get(f"v2/projects/{initial_project.id}/delta?since=0") + assert response.status_code == 200 + changes = response.json["items"] + assert len(changes) == 1 + assert changes[0]["change"] == PushChangeType.CREATE.value + assert changes[0]["version"] == 1 + + # remove the file and get changes from 0 -> 2 where base gpgkg is removed -> transparent + push_change(initial_project, "removed", "base.gpkg", working_dir) + response = client.get(f"v2/projects/{initial_project.id}/delta?since=0") + assert response.status_code == 200 + changes = response.json["items"] + assert len(changes) == 0 + + # non valid cases + response = client.get(f"v2/projects/{diff_project.id}/delta") + assert response.status_code == 400 + response = client.get(f"v2/projects/{diff_project.id}/delta?since=2&to=1") + assert response.status_code == 400 + response = client.get(f"v2/projects/{diff_project.id}/delta?since=-2") + assert response.status_code == 400 + response = client.get(f"v2/projects/{diff_project.id}/delta?since=-2&to=-1") + assert response.status_code == 400 + # exceeding latest version + response = client.get(f"v2/projects/{diff_project.id}/delta?since=0&to=2000") + assert response.status_code == 400 + # no changes between versions with same number + response = client.get(f"v2/projects/{diff_project.id}/delta?since=1&to=1") + assert response.status_code == 400 + + # since 1 to latest version + response = client.get(f"v2/projects/{diff_project.id}/delta?since=1") + assert response.status_code == 200 + changes = response.json["items"] + # create of test.gpkg and delete base.gpkg + assert len(changes) == 2 + assert changes[0]["change"] == PushChangeType.DELETE.value + assert changes[0]["version"] == 9 + assert changes[0]["path"] == "base.gpkg" + assert changes[0]["size"] == 98304 + + assert changes[1]["change"] == PushChangeType.CREATE.value + assert changes[1]["version"] == 9 + assert changes[1]["path"] == "test.gpkg" + assert changes[1]["size"] == 98304 + + # simple update + response = client.get(f"v2/projects/{diff_project.id}/delta?since=4&to=8") + assert response.status_code == 200 + changes = response.json["items"] + assert len(changes) == 1 + assert changes[0]["change"] == PushChangeType.UPDATE.value + # version is new latest version of the change + assert changes[0]["version"] == 7 + assert not changes[0].get("diffs") + + +def test_project_pull_diffs(client, diff_project): + """Test project pull mechanisom in v2 with diff files. Integration test for pull mechanism""" + since = 5 + to = 7 + # check diff files in database where we can get them with right order and metadata + current_diffs = ( + FileDiff.query.filter(FileDiff.version > since, FileDiff.version <= to) + .order_by(FileDiff.version) + .all() + ) + response = client.get(f"v2/projects/{diff_project.id}/delta?since={since}&to={to}") + assert response.status_code == 200 + delta = response.json["items"] + assert len(delta) == 1 + assert delta[0]["change"] == PushChangeType.UPDATE_DIFF.value + assert delta[0]["version"] == 7 + first_diff = delta[0]["diffs"][0] + second_diff = delta[0]["diffs"][1] + assert first_diff["path"] == current_diffs[0].path + assert second_diff["path"] == current_diffs[1].path + response = client.get( + f"v2/projects/{diff_project.id}/raw/diff/{first_diff['path']}" + ) + assert response.status_code == 200 diff --git a/server/mergin/tests/utils.py b/server/mergin/tests/utils.py index fc7e4240..89766d24 100644 --- a/server/mergin/tests/utils.py +++ b/server/mergin/tests/utils.py @@ -303,9 +303,12 @@ def push_change(project, action, path, src_dir): current_files = project.files new_version = ProjectVersion.to_v_name(project.next_version()) changes = {"added": [], "updated": [], "removed": []} - metadata = {**file_info(src_dir, path), "location": os.path.join(new_version, path)} if action == "added": + metadata = { + **file_info(src_dir, path), + "location": os.path.join(new_version, path), + } new_file = os.path.join(project.storage.project_dir, metadata["location"]) os.makedirs(os.path.dirname(new_file), exist_ok=True) shutil.copy(os.path.join(src_dir, metadata["path"]), new_file) @@ -349,6 +352,7 @@ def push_change(project, action, path, src_dir): changes["updated"].append(metadata) elif action == "removed": f_removed = next(f for f in current_files if f.path == path) + os.remove(os.path.join(project.storage.project_dir, f_removed.location)) changes["removed"].append(asdict(f_removed)) else: return diff --git a/server/migrations/community/4b4648483770_add_project_version_delta.py b/server/migrations/community/4b4648483770_add_project_version_delta.py new file mode 100644 index 00000000..9f13eced --- /dev/null +++ b/server/migrations/community/4b4648483770_add_project_version_delta.py @@ -0,0 +1,129 @@ +"""Add project version delta + +Revision ID: 4b4648483770 +Revises: bd1ec73db389 +Create Date: 2025-10-24 09:55:18.286286 + +""" + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "4b4648483770" +down_revision = "bd1ec73db389" +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "project_version_delta", + sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False), + sa.Column("version", sa.Integer(), nullable=False), + sa.Column("rank", sa.Integer(), nullable=False), + sa.Column("project_id", postgresql.UUID(as_uuid=True), nullable=False), + sa.Column("changes", postgresql.JSONB(astext_type=sa.Text()), nullable=False), + sa.ForeignKeyConstraint( + ["project_id"], + ["project.id"], + name=op.f("fk_project_version_delta_project_id_project"), + ondelete="CASCADE", + ), + sa.PrimaryKeyConstraint("id", name=op.f("pk_project_version_delta")), + sa.UniqueConstraint("project_id", "version", "rank", name="unique_deltas"), + ) + op.create_index( + op.f("ix_project_version_delta_project_id"), + "project_version_delta", + ["project_id"], + unique=False, + ) + op.create_index( + "ix_project_version_delta_project_id_version_rank", + "project_version_delta", + ["project_id", "version", "rank"], + unique=False, + ) + op.create_index( + op.f("ix_project_version_delta_rank"), + "project_version_delta", + ["rank"], + unique=False, + ) + op.create_index( + op.f("ix_project_version_delta_version"), + "project_version_delta", + ["version"], + unique=False, + ) + # ### end Alembic commands ### + + op.execute( + """ + INSERT INTO project_version_delta (project_id, version, rank, changes) + WITH delta AS ( + SELECT + h.version_id, + jsonb_agg( + jsonb_strip_nulls( + jsonb_build_object( + 'path', fp.path, + 'size', h.size, + 'change', h.change, + 'version', h.project_version_name, + 'checksum', h.checksum, + 'diff', fdj.diff_path + ) + ) + ) AS changes + FROM + file_history h + JOIN + project_file_path fp ON h.file_path_id = fp.id + LEFT JOIN LATERAL ( + SELECT + fd.path AS diff_path + FROM + file_diff fd + WHERE + fd.file_path_id = fp.id + AND fd.version = h.project_version_name + AND fd.rank = 0 + ) fdj ON TRUE + GROUP BY + -- Group by the single unique version identifier + h.version_id + ) + SELECT + pv.project_id, + pv.name, + 0 AS rank, + d.changes + FROM + delta AS d + JOIN project_version AS pv ON d.version_id = pv.id + ; + """ + ) + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index( + op.f("ix_project_version_delta_version"), table_name="project_version_delta" + ) + op.drop_index( + op.f("ix_project_version_delta_rank"), table_name="project_version_delta" + ) + op.drop_index( + "ix_project_version_delta_project_id_version_rank", + table_name="project_version_delta", + ) + op.drop_index( + op.f("ix_project_version_delta_project_id"), table_name="project_version_delta" + ) + op.drop_table("project_version_delta") + # ### end Alembic commands ###