From cac45a8880b5f690c9cb379cf2e949177ab988cd Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Mon, 24 Nov 2025 10:26:19 +0100 Subject: [PATCH 1/5] remove bottlnech with removing chunks --- server/mergin/sync/public_api_controller.py | 8 -------- server/mergin/sync/public_api_v2_controller.py | 9 --------- 2 files changed, 17 deletions(-) diff --git a/server/mergin/sync/public_api_controller.py b/server/mergin/sync/public_api_controller.py index f0c9047c..0b487874 100644 --- a/server/mergin/sync/public_api_controller.py +++ b/server/mergin/sync/public_api_controller.py @@ -1028,14 +1028,6 @@ def push_finish(transaction_id): # let's move uploaded files where they are expected to be os.renames(files_dir, version_dir) - # remove used chunks - for file in upload.changes["added"] + upload.changes["updated"]: - file_chunks = file.get("chunks", []) - for chunk_id in file_chunks: - chunk_file = os.path.join(upload.upload_dir, "chunks", chunk_id) - if os.path.exists(chunk_file): - move_to_tmp(chunk_file) - logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, transaction id: {transaction_id}." ) diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index 27e0355a..d317e273 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -318,14 +318,6 @@ def create_project_version(id): temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) os.renames(temp_files_dir, version_dir) - # remove used chunks - for file in to_be_added_files + to_be_updated_files: - file_chunks = file.get("chunks", []) - for chunk_id in file_chunks: - chunk_file = get_chunk_location(chunk_id) - if os.path.exists(chunk_file): - move_to_tmp(chunk_file) - logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, upload id: {upload.id}." ) @@ -377,7 +369,6 @@ def upload_chunk(id: str): # we could have used request.data here, but it could eventually cause OOM issue save_to_file(request.stream, dest_file, current_app.config["MAX_CHUNK_SIZE"]) except IOError: - move_to_tmp(dest_file, chunk_id) return BigChunkError().response(413) except Exception as e: return UploadError(error="Error saving chunk").response(400) From 85a58341b66958c9a53aec5f747a3a3f17992a0d Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Mon, 24 Nov 2025 10:33:07 +0100 Subject: [PATCH 2/5] update tests for chunks - they exist --- server/mergin/tests/test_public_api_v2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index dda0bc53..d6d3150a 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -378,7 +378,7 @@ def test_create_version(client, data, expected, err_code): if expected == 201: assert response.json["version"] == "v2" assert project.latest_version == 2 - assert all(not os.path.exists(chunk) for chunk in chunks) + assert all(os.path.exists(chunk) for chunk in chunks) else: assert project.latest_version == 1 if err_code: From 88e5490992ccda615eca1856dc43b625a7400bf6 Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Mon, 24 Nov 2025 11:13:14 +0100 Subject: [PATCH 3/5] update tests --- server/mergin/tests/test_public_api_v2.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index d6d3150a..bfb705f3 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -2,6 +2,7 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial +from mergin.sync.tasks import remove_unused_chunks from . import DEFAULT_USER from .utils import ( add_user, @@ -22,6 +23,7 @@ from mergin.app import db from mergin.config import Configuration +from mergin.sync.config import Configuration as SyncConfiguration from mergin.sync.errors import ( BigChunkError, ProjectLocked, @@ -375,10 +377,15 @@ def test_create_version(client, data, expected, err_code): response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == expected + # mock chunks expiration to check if removed if expected == 201: assert response.json["version"] == "v2" assert project.latest_version == 2 + # chunks exists after upload, cleanup job did not remove them assert all(os.path.exists(chunk) for chunk in chunks) + with patch.object(SyncConfiguration, "UPLOAD_CHUNKS_EXPIRATION", 0): + remove_unused_chunks() + assert all(not os.path.exists(chunk) for chunk in chunks) else: assert project.latest_version == 1 if err_code: From 2149c0c4a4fe720525e9d73232e27cec533d319f Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Mon, 24 Nov 2025 17:10:09 +0100 Subject: [PATCH 4/5] call remove unused chunks async job --- server/mergin/sync/public_api_v2_controller.py | 10 ++++++++++ server/mergin/sync/tasks.py | 11 ++++++++++- server/mergin/tests/test_public_api_v2.py | 15 ++++++++++----- 3 files changed, 30 insertions(+), 6 deletions(-) diff --git a/server/mergin/sync/public_api_v2_controller.py b/server/mergin/sync/public_api_v2_controller.py index d317e273..217204c1 100644 --- a/server/mergin/sync/public_api_v2_controller.py +++ b/server/mergin/sync/public_api_v2_controller.py @@ -14,6 +14,8 @@ from marshmallow import ValidationError from sqlalchemy.exc import IntegrityError +from mergin.sync.tasks import remove_transaction_chunks + from .schemas_v2 import ProjectSchema as ProjectSchemaV2 from ..app import db from ..auth import auth_required @@ -318,6 +320,14 @@ def create_project_version(id): temp_files_dir = os.path.join(upload.upload_dir, "files", v_next_version) os.renames(temp_files_dir, version_dir) + # remove used chunks + # get chunks from added and updated files + chunks_ids = [] + for file in to_be_added_files + to_be_updated_files: + file_chunks = file.get("chunks", []) + chunks_ids.extend(file_chunks) + remove_transaction_chunks.delay(chunks_ids) + logging.info( f"Push finished for project: {project.id}, project version: {v_next_version}, upload id: {upload.id}." ) diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index 7688a3ee..bc1ae044 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -13,7 +13,7 @@ from .models import Project, ProjectVersion, FileHistory from .storages.disk import move_to_tmp from .config import Configuration -from .utils import remove_outdated_files +from .utils import get_chunk_location, remove_outdated_files from ..celery import celery from ..app import db @@ -169,3 +169,12 @@ def remove_unused_chunks(): if not os.path.isdir(dir): continue remove_outdated_files(dir, time_delta) + + +@celery.task +def remove_transaction_chunks(chunks=[]): + """Remove chunks related to a specific sync transaction""" + for chunk in chunks: + chunk_path = get_chunk_location(chunk) + if os.path.exists(chunk_path): + os.remove(chunk_path) diff --git a/server/mergin/tests/test_public_api_v2.py b/server/mergin/tests/test_public_api_v2.py index bfb705f3..6a4243fd 100644 --- a/server/mergin/tests/test_public_api_v2.py +++ b/server/mergin/tests/test_public_api_v2.py @@ -2,7 +2,7 @@ # # SPDX-License-Identifier: AGPL-3.0-only OR LicenseRef-MerginMaps-Commercial -from mergin.sync.tasks import remove_unused_chunks +from mergin.sync.tasks import remove_transaction_chunks, remove_unused_chunks from . import DEFAULT_USER from .utils import ( add_user, @@ -358,6 +358,7 @@ def test_create_version(client, data, expected, err_code): assert project.latest_version == 1 chunks = [] + chunk_ids = [] if expected == 201: # mimic chunks were uploaded for f in data["changes"]["added"] + data["changes"]["updated"]: @@ -374,17 +375,21 @@ def test_create_version(client, data, expected, err_code): out_file.write(in_file.read(CHUNK_SIZE)) chunks.append(chunk_location) + chunk_ids.append(chunk) - response = client.post(f"v2/projects/{project.id}/versions", json=data) + with patch( + "mergin.sync.public_api_v2_controller.remove_transaction_chunks.delay" + ) as mock_remove: + response = client.post(f"v2/projects/{project.id}/versions", json=data) assert response.status_code == expected - # mock chunks expiration to check if removed if expected == 201: assert response.json["version"] == "v2" assert project.latest_version == 2 # chunks exists after upload, cleanup job did not remove them assert all(os.path.exists(chunk) for chunk in chunks) - with patch.object(SyncConfiguration, "UPLOAD_CHUNKS_EXPIRATION", 0): - remove_unused_chunks() + if chunk_ids: + assert mock_remove.called_once_with(chunk_ids) + remove_transaction_chunks(chunk_ids) assert all(not os.path.exists(chunk) for chunk in chunks) else: assert project.latest_version == 1 From 05fb028eba5baf26c67b25d6ca5f4acafc63a364 Mon Sep 17 00:00:00 2001 From: "marcel.kocisek" Date: Tue, 25 Nov 2025 12:10:46 +0100 Subject: [PATCH 5/5] type for transaction chunks --- server/mergin/sync/tasks.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/mergin/sync/tasks.py b/server/mergin/sync/tasks.py index bc1ae044..9392997c 100644 --- a/server/mergin/sync/tasks.py +++ b/server/mergin/sync/tasks.py @@ -7,6 +7,7 @@ import os import time from datetime import datetime, timedelta, timezone +from typing import List, Optional from zipfile import ZIP_DEFLATED, ZipFile from flask import current_app @@ -172,8 +173,10 @@ def remove_unused_chunks(): @celery.task -def remove_transaction_chunks(chunks=[]): +def remove_transaction_chunks(chunks: Optional[List[str]] = None): """Remove chunks related to a specific sync transaction""" + if not chunks: + return for chunk in chunks: chunk_path = get_chunk_location(chunk) if os.path.exists(chunk_path):