diff --git a/server/.test.env b/server/.test.env index bdaa7bfa..63294a3f 100644 --- a/server/.test.env +++ b/server/.test.env @@ -24,3 +24,4 @@ SECURITY_BEARER_SALT='bearer' SECURITY_EMAIL_SALT='email' SECURITY_PASSWORD_SALT='password' DIAGNOSTIC_LOGS_DIR=/tmp/diagnostic_logs +GEVENT_WORKER=0 \ No newline at end of file diff --git a/server/mergin/sync/commands.py b/server/mergin/sync/commands.py index 21f5ef15..882b73b6 100644 --- a/server/mergin/sync/commands.py +++ b/server/mergin/sync/commands.py @@ -127,3 +127,50 @@ def remove(project_name): project.removed_by = None db.session.commit() click.secho("Project removed", fg="green") + + @project.command() + @click.argument("project-name", callback=normalize_input(lowercase=False)) + @click.option("--since", type=int, required=False) + @click.option("--to", type=int, required=False) + def create_checkpoint(project_name, since=None, to=None): + """Create project delta checkpoint, corresponding lower checkpoints and merged diffs for project""" + ws, name = split_project_path(project_name) + workspace = current_app.ws_handler.get_by_name(ws) + if not workspace: + click.secho("ERROR: Workspace does not exist", fg="red", err=True) + sys.exit(1) + project = ( + Project.query.filter_by(workspace_id=workspace.id, name=name) + .filter(Project.storage_params.isnot(None)) + .first() + ) + if not project: + click.secho("ERROR: Project does not exist", fg="red", err=True) + sys.exit(1) + + since = since if since is not None else 0 + to = to if to is not None else project.latest_version + if since < 0 or to < 1: + click.secho( + "ERROR: Invalid version number, minimum version for 'since' is 0 and minimum version for 'to' is 1", + fg="red", + err=True, + ) + sys.exit(1) + + if to > project.latest_version: + click.secho( + "ERROR: 'to' version exceeds latest project version", fg="red", err=True + ) + sys.exit(1) + + if since >= to: + click.secho( + "ERROR: 'since' version must be less than 'to' version", + fg="red", + err=True, + ) + sys.exit(1) + + project.get_delta_changes(since, to) + click.secho("Project checkpoint(s) created", fg="green") diff --git a/server/mergin/sync/models.py b/server/mergin/sync/models.py index 985453c8..30e0901b 100644 --- a/server/mergin/sync/models.py +++ b/server/mergin/sync/models.py @@ -399,9 +399,6 @@ def get_delta_changes( ) existing_delta_map = {(c.rank, c.version): c for c in expected_deltas} - # Cache all individual (rank 0) delta rows in the required range. - individual_deltas: List[ProjectVersionDelta] = [] - result: List[DeltaChange] = [] for checkpoint in expected_checkpoints: existing_delta = existing_delta_map.get((checkpoint.rank, checkpoint.end)) @@ -411,29 +408,10 @@ def get_delta_changes( result.extend(DeltaChangeSchema(many=True).load(existing_delta.changes)) continue - # If higher rank delta checkopoint does not exists, we are using rank=0 deltas to create checkopoint + # If higher rank delta checkopoint does not exists we need to create it if checkpoint.rank > 0: - individual_deltas = ( - ProjectVersionDelta.query.filter( - ProjectVersionDelta.project_id == project_id, - ProjectVersionDelta.version >= since, - ProjectVersionDelta.version <= to, - ProjectVersionDelta.rank == 0, - ) - .order_by(ProjectVersionDelta.version) - .all() - if not individual_deltas - else individual_deltas - ) - - if not individual_deltas: - logging.error( - f"No individual deltas found for project {project_id} in range {since} / {to} to create checkpoint." - ) - return - new_checkpoint = ProjectVersionDelta.create_checkpoint( - project_id, checkpoint, individual_deltas + project_id, checkpoint ) if new_checkpoint: result.extend( @@ -443,6 +421,7 @@ def get_delta_changes( logging.error( f"Not possible to create checkpoint for project {project_id} in range {checkpoint.start}-{checkpoint.end}" ) + return return ProjectVersionDelta.merge_changes(result) @@ -520,6 +499,10 @@ def __init__(self, project_id, path): self.project_id = project_id self.path = path + def generate_diff_name(self): + """Generate uniqute diff file name for server generated diff""" + return mergin_secure_filename(f"{self.path}-diff-{uuid.uuid4()}") + class LatestProjectFiles(db.Model): """Store project latest version files history ids""" @@ -775,7 +758,7 @@ def diffs_chain( ), None, ) - if diff: + if diff and os.path.exists(diff.abs_path): diffs.append(diff) elif item.rank > 0: # fallback if checkpoint does not exist: replace merged diff with individual diffs @@ -876,16 +859,58 @@ def abs_path(self) -> str: """ return os.path.join(self.file.project.storage.project_dir, self.location) + @staticmethod + def can_create_checkpoint(file_path_id: int, checkpoint: Checkpoint) -> bool: + """Check if it makes sense to create a diff file for a checkpoint, e.g. there were relevant changes within the range without breaking changes""" + + basefile = FileHistory.get_basefile(file_path_id, checkpoint.end) + if not basefile: + return False + + file_was_deleted = ( + FileHistory.query.filter_by(file_path_id=file_path_id) + .filter( + FileHistory.project_version_name + >= max(basefile.project_version_name, checkpoint.start), + FileHistory.project_version_name <= checkpoint.end, + FileHistory.change == PushChangeType.DELETE.value, + ) + .count() + > 0 + ) + if file_was_deleted: + return False + + query = FileDiff.query.filter_by(basefile_id=basefile.id).filter( + FileDiff.rank == 0 + ) + + # rank 0 is a special case we only verify it exists + if checkpoint.rank == 0: + query = query.filter(FileDiff.version == checkpoint.end) + # for higher ranks we need to check if there were diff updates in that range + else: + query = query.filter( + FileDiff.version >= checkpoint.start, + FileDiff.version <= checkpoint.end, + ) + + return query.count() > 0 + def construct_checkpoint(self) -> bool: """Create a diff file checkpoint (aka. merged diff). Find all smaller diffs which are needed to create the final diff file and merge them. - In case of missing some lower rank checkpoint, use individual diffs instead. + In case of missing some lower rank checkpoints, create them recursively. Once checkpoint is created, size and checksum are updated in the database. Returns: bool: True if checkpoint was successfully created or already present """ + logging.debug( + f"Construct checkpoint for file {self.path} v{self.version} of rank {self.rank}" + ) + if os.path.exists(self.abs_path): return True @@ -914,7 +939,7 @@ def construct_checkpoint(self) -> bool: return False diffs_paths = [] - # let's confirm we have all intermediate diffs needed, if not, we need to use individual diffs instead + # let's confirm we have all intermediate diffs needed, if not, we need to create them (recursively) first cached_items = Checkpoint.get_checkpoints( cache_level.start, cache_level.end - 1 ) @@ -936,6 +961,7 @@ def construct_checkpoint(self) -> bool: continue # find diff in table and on disk + # diffs might not exist because theye were not created yet or there were no changes (e.g. for zeroth rank diffs) diff = next( ( d @@ -944,27 +970,34 @@ def construct_checkpoint(self) -> bool: ), None, ) - if diff and os.path.exists(diff.abs_path): + + if not diff: + # lower rank diff not even in DB yet - create it and try to construct merged file + if item.rank > 0 and FileDiff.can_create_checkpoint( + self.file_path_id, item + ): + diff = FileDiff( + basefile=basefile, + version=item.end, + rank=item.rank, + path=basefile.file.generate_diff_name(), + size=None, + checksum=None, + ) + db.session.add(diff) + db.session.commit() + else: + # such diff is not expected to exist + continue + + diff_exists = diff.construct_checkpoint() + if diff_exists: diffs_paths.append(diff.abs_path) else: - individual_diffs = ( - FileDiff.query.filter_by( - basefile_id=basefile.id, - rank=0, - ) - .filter( - FileDiff.version >= item.start, FileDiff.version <= item.end - ) - .order_by(FileDiff.version) - .all() + logging.error( + f"Unable to create checkpoint diff for {item} for file {self.file_path_id}" ) - if individual_diffs: - diffs_paths.extend([i.abs_path for i in individual_diffs]) - else: - logging.error( - f"Unable to find diffs for {item} for file {self.file_path_id}" - ) - return False + return False # we apply latest change (if any) on previous version end_diff = FileDiff.query.filter_by( @@ -1186,26 +1219,51 @@ def create_checkpoint( cls, project_id: str, checkpoint: Checkpoint, - from_deltas: List[ProjectVersionDelta] = [], ) -> Optional[ProjectVersionDelta]: """ - Creates and caches new checkpoint and any required FileDiff checkpoints. - Use from_deltas to create checkpoint from existing individual deltas. - Returns created ProjectVersionDelta object with checkpoint. + Creates and caches new checkpoint and any required FileDiff checkpoints recursively if needed. """ - delta_range = [ - change - for change in from_deltas - if checkpoint.start <= change.version <= checkpoint.end - ] + delta_range = [] + # our new checkpoint will be created by adding last individual delta to previous checkpoints + expected_checkpoints = Checkpoint.get_checkpoints( + checkpoint.start, checkpoint.end - 1 + ) + expected_checkpoints.append(Checkpoint(rank=0, index=checkpoint.end)) + + expected_deltas = ( + ProjectVersionDelta.query.filter( + ProjectVersionDelta.project_id == project_id, + tuple_(ProjectVersionDelta.rank, ProjectVersionDelta.version).in_( + [(item.rank, item.end) for item in expected_checkpoints] + ), + ) + .order_by(ProjectVersionDelta.version) + .all() + ) + + existing_delta_map = {(c.rank, c.version): c for c in expected_deltas} + # make sure we have all components, if not, created them (recursively) + for item in expected_checkpoints: + existing_delta = existing_delta_map.get((item.rank, item.end)) + if not existing_delta: + existing_delta = cls.create_checkpoint(project_id, item) + + if existing_delta: + delta_range.append(existing_delta) + else: + logging.error( + f"Missing project delta endpoint for {project_id} v{item.end} rank {item.rank} which could not be recreated" + ) + return if not delta_range: logging.warning( - f"No individual changes found for project {project_id} in range v{checkpoint.start}-v{checkpoint.end} to create checkpoint." + f"No changes found for project {project_id} in range v{checkpoint.start}-v{checkpoint.end} to create checkpoint." ) return None # dump changes lists from database and flatten list for merging + delta_range = sorted(delta_range, key=lambda x: x.version) changes = [] for delta in delta_range: changes.extend(DeltaChangeSchema(many=True).load(delta.changes)) @@ -1213,7 +1271,7 @@ def create_checkpoint( d.to_data_delta() for d in cls.merge_changes(changes) ] - # Pre-fetch data for all versioned files to create FileDiff checkpoints + # Pre-fetch data for all versioned files to create FileDiff checkpoints where it makes sense versioned_delta_items = [ item for item in merged_delta_items @@ -1246,17 +1304,17 @@ def create_checkpoint( if not base_file: continue - diff_path = mergin_secure_filename( - f"{item.path}-diff-{uuid.uuid4()}" - ) + if not FileDiff.can_create_checkpoint(file_path_id, checkpoint): + continue + checkpoint_diff = FileDiff( basefile=base_file, - path=diff_path, + path=base_file.file.generate_diff_name(), rank=checkpoint.rank, version=checkpoint.end, ) # Patch the delta with the path to the new diff checkpoint - item.diff = diff_path + item.diff = checkpoint_diff.path db.session.add(checkpoint_diff) checkpoint_delta = ProjectVersionDelta( diff --git a/server/mergin/tests/test_cli.py b/server/mergin/tests/test_cli.py index d0b91717..d4197f86 100644 --- a/server/mergin/tests/test_cli.py +++ b/server/mergin/tests/test_cli.py @@ -13,7 +13,7 @@ from mergin.auth.models import User from mergin.commands import _check_permissions, _check_celery from mergin.stats.models import MerginInfo -from mergin.sync.models import Project, ProjectVersion +from mergin.sync.models import FileDiff, Project, ProjectVersion, ProjectVersionDelta from mergin.tests import ( test_project, test_workspace_id, @@ -545,3 +545,70 @@ def test_check_celery(mock_ping, ping, result, output, capsys): out, err = capsys.readouterr() # capture what was echoed to stdout assert ("Error: " not in out) == result assert output in out + + +create_project_checkpoint_data = [ + ( + f"{test_workspace_name}/non-existing", + 0, + 1, + "ERROR: Project does not exist", + ), + ( + f"{test_workspace_name}/{test_project}", + 4, + 1, + "ERROR: 'since' version must be less than 'to' version", + ), + ( + f"{test_workspace_name}/{test_project}", + 0, + 100, + "ERROR: 'to' version exceeds latest project version", + ), + ( + f"{test_workspace_name}/{test_project}", + 0, + 0, + "ERROR: Invalid version number, minimum version for 'since' is 0 and minimum version for 'to' is 1", + ), + ( + f"{test_workspace_name}/{test_project}", + 0, + 4, + "Project checkpoint(s) created", + ), + ( + f"{test_workspace_name}/{test_project}", + None, + None, + "Project checkpoint(s) created", + ), +] + + +@pytest.mark.parametrize("project_name,since,to,output", create_project_checkpoint_data) +def test_create_checkpoint(runner, project_name, since, to, output, diff_project): + """Test 'project remove' command""" + ProjectVersionDelta.query.filter_by(project_id=diff_project.id).filter( + ProjectVersionDelta.rank > 0 + ).delete() + db.session.commit() + + remove = runner.invoke( + args=[ + "project", + "create-checkpoint", + project_name, + "--since", + since, + "--to", + to, + ] + ) + assert output in remove.output + checkpoints = ProjectVersionDelta.query.filter(ProjectVersionDelta.rank > 0).count() + if remove.exit_code == 0: + assert checkpoints > 0 + else: + assert checkpoints == 0 diff --git a/server/mergin/tests/test_middleware.py b/server/mergin/tests/test_middleware.py index 82b9cf26..2f5cbe4f 100644 --- a/server/mergin/tests/test_middleware.py +++ b/server/mergin/tests/test_middleware.py @@ -6,6 +6,7 @@ import psycogreen.gevent import pytest import sqlalchemy +from unittest.mock import patch from ..app import create_simple_app, GeventTimeoutMiddleware, db from ..config import Configuration @@ -14,58 +15,74 @@ @pytest.mark.parametrize("use_middleware", [True, False]) def test_use_middleware(use_middleware): """Test using middleware""" - Configuration.GEVENT_WORKER = use_middleware - Configuration.GEVENT_REQUEST_TIMEOUT = 1 - application = create_simple_app() + with patch.object( + Configuration, + "GEVENT_WORKER", + use_middleware, + ), patch.object( + Configuration, + "GEVENT_REQUEST_TIMEOUT", + 1, + ): + application = create_simple_app() - def ping(): - gevent.sleep(Configuration.GEVENT_REQUEST_TIMEOUT + 1) - return "pong" + def ping(): + gevent.sleep(Configuration.GEVENT_REQUEST_TIMEOUT + 1) + return "pong" - application.add_url_rule("/test", "ping", ping) - app_context = application.app_context() - app_context.push() + application.add_url_rule("/test", "ping", ping) + app_context = application.app_context() + app_context.push() - assert isinstance(application.wsgi_app, GeventTimeoutMiddleware) == use_middleware - # in case of gevent, dummy endpoint it set to time out - assert application.test_client().get("/test").status_code == ( - 504 if use_middleware else 200 - ) + assert ( + isinstance(application.wsgi_app, GeventTimeoutMiddleware) == use_middleware + ) + # in case of gevent, dummy endpoint it set to time out + assert application.test_client().get("/test").status_code == ( + 504 if use_middleware else 200 + ) def test_catch_timeout(): """Test proper handling of gevent timeout with db.session.rollback""" psycogreen.gevent.patch_psycopg() - Configuration.GEVENT_WORKER = True - Configuration.GEVENT_REQUEST_TIMEOUT = 1 - application = create_simple_app() + with patch.object( + Configuration, + "GEVENT_WORKER", + True, + ), patch.object( + Configuration, + "GEVENT_REQUEST_TIMEOUT", + 1, + ): + application = create_simple_app() - def unhandled(): - try: - db.session.execute("SELECT pg_sleep(1.1);") - finally: - db.session.execute("SELECT 1;") - return "" + def unhandled(): + try: + db.session.execute("SELECT pg_sleep(1.1);") + finally: + db.session.execute("SELECT 1;") + return "" - def timeout(): - try: - db.session.execute("SELECT pg_sleep(1.1);") - except gevent.timeout.Timeout: - db.session.rollback() - raise - finally: - db.session.execute("SELECT 1;") - return "" + def timeout(): + try: + db.session.execute("SELECT pg_sleep(1.1);") + except gevent.timeout.Timeout: + db.session.rollback() + raise + finally: + db.session.execute("SELECT 1;") + return "" - application.add_url_rule("/unhandled", "unhandled", unhandled) - application.add_url_rule("/timeout", "timeout", timeout) - app_context = application.app_context() - app_context.push() + application.add_url_rule("/unhandled", "unhandled", unhandled) + application.add_url_rule("/timeout", "timeout", timeout) + app_context = application.app_context() + app_context.push() - assert application.test_client().get("/timeout").status_code == 504 + assert application.test_client().get("/timeout").status_code == 504 - # in case of missing rollback sqlalchemy would raise error - with pytest.raises(sqlalchemy.exc.PendingRollbackError): - application.test_client().get("/unhandled") + # in case of missing rollback sqlalchemy would raise error + with pytest.raises(sqlalchemy.exc.PendingRollbackError): + application.test_client().get("/unhandled") - db.session.rollback() + db.session.rollback() diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index b71f465b..f30fe462 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -29,7 +29,7 @@ ProjectVersionDelta, ) from ..sync.files import DeltaChange, PushChangeType -from ..sync.utils import is_versioned_file +from ..sync.utils import Checkpoint, is_versioned_file from sqlalchemy.exc import IntegrityError import pytest from datetime import datetime, timedelta, timezone @@ -260,6 +260,7 @@ def test_create_diff_checkpoint(diff_project): assert len(diffs) == 22 # diff for v17-v20 from individual diffs + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(1, 5)) is True diff = FileDiff( basefile=basefile, path=f"test.gpkg-diff-{uuid.uuid4()}", version=20, rank=1 ) @@ -280,7 +281,10 @@ def test_create_diff_checkpoint(diff_project): diff.construct_checkpoint() assert mtime == os.path.getmtime(diff.abs_path) - # diff for v17-v32 with merged diffs (using one above) + # some lower rank diffs still missing + assert not FileDiff.query.filter_by(version=24, rank=1).count() + + # diff for v17-v32 with merged diffs, this will also create lower missing ranks diff = FileDiff( basefile=basefile, path=f"test.gpkg-diff-{uuid.uuid4()}", version=32, rank=2 ) @@ -288,6 +292,8 @@ def test_create_diff_checkpoint(diff_project): db.session.commit() diff.construct_checkpoint() assert os.path.exists(diff.abs_path) + lower_diff = FileDiff.query.filter_by(version=24, rank=1).first() + assert os.path.exists(lower_diff.abs_path) # assert gpkg diff is the same as it would be from merging all individual diffs individual_diffs = ( @@ -324,6 +330,38 @@ def test_create_diff_checkpoint(diff_project): assert not os.path.exists(diff.abs_path) +def test_can_create_checkpoint(diff_project): + """Test if diff file checkpoint can be created""" + file_path_id = ( + ProjectFilePath.query.filter_by(project_id=diff_project.id, path="base.gpkg") + .first() + .id + ) + + # we target v1 where file was uploaded => no diff + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(0, 1)) is False + + # for zero rank diffs we can always create a checkpoint (but that should already exist) + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(0, 4)) is True + + # there are diffs in both ranges, v1-v4 and v5-v8 + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(1, 1)) is True + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(1, 2)) is True + + # higher ranks cannot be created as file was removed at v9 + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(2, 1)) is False + + # there is no diff for such file in this range + file_path_id = ( + ProjectFilePath.query.filter_by( + project_id=diff_project.id, path="inserted_1_A.gpkg" + ) + .first() + .id + ) + assert FileDiff.can_create_checkpoint(file_path_id, Checkpoint(1, 1)) is False + + def test_delta_merge_changes(): """Test merging of delta changes works as expected""" @@ -501,6 +539,7 @@ def test_project_version_delta_changes(client, diff_project: Project): assert len(delta_base_gpkg.diffs) == 0 # get data with multiple ranks = 1 level checkpoints 1-4, 5-8 + checkpoint 9 and 10 + assert not ProjectVersionDelta.query.filter_by(rank=1, version=8).first() delta = diff_project.get_delta_changes(0, 10) assert len(delta) == len(latest_version.files) delta_test_gpkg = next((d for d in delta if d.path == "test.gpkg"), None) @@ -510,6 +549,7 @@ def test_project_version_delta_changes(client, diff_project: Project): assert ProjectVersionDelta.query.filter_by(rank=2).count() == 0 # check if version is having rank 1 checkpoint with proper end version assert ProjectVersionDelta.query.filter_by(rank=1, version=4).first() + # missing lower checkpoint is recreated assert ProjectVersionDelta.query.filter_by(rank=1, version=8).first() # base gpgk is transparent, bacause we are requesting from 0 assert not next((c for c in delta if c.path == "base.gpkg"), None) @@ -565,6 +605,24 @@ def test_project_version_delta_changes(client, diff_project: Project): ) assert response.status_code == 200 + # remove intermediate deltas and assert they would be recreated if needed for higher ranks + ProjectVersionDelta.query.filter_by(project_id=diff_project.id).filter( + ProjectVersionDelta.rank > 0 + ).delete() + db.session.commit() + # v1-v16 would be created from v1-v4, v5-v8 and v9-v12 and 4 individual deltas + delta = diff_project.get_delta_changes(0, diff_project.latest_version) + assert ( + ProjectVersionDelta.query.filter_by(project_id=diff_project.id, rank=1).count() + == 3 + ) + assert ( + ProjectVersionDelta.query.filter_by( + project_id=diff_project.id, rank=2, version=16 + ).count() + == 1 + ) + push_data = [ # success