Skip to content

Commit 054a818

Browse files
committed
fixes for missing schema
- fixing update chunks for files with the same checksum - added minimal version of sync command
1 parent a2cfe62 commit 054a818

File tree

7 files changed

+343
-67
lines changed

7 files changed

+343
-67
lines changed

mergin/cli.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -468,6 +468,38 @@ def pull(ctx):
468468
except Exception as e:
469469
_print_unhandled_exception()
470470

471+
@cli.command()
472+
@click.pass_context
473+
def sync(ctx):
474+
"""Synchronize the project. Pull latest project version from the server and push split changes."""
475+
mc = ctx.obj["client"]
476+
if mc is None:
477+
return
478+
directory = os.getcwd()
479+
upload_job = None
480+
length = 1
481+
try:
482+
def on_progress(increment, push_job):
483+
nonlocal upload_job
484+
upload_job = push_job
485+
486+
# run pull & push cycles until there are no local changes
487+
mc.sync_project_with_callback(directory, progress_callback=on_progress)
488+
489+
click.secho("Sync complete.", fg="green")
490+
491+
except InvalidProject as e:
492+
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
493+
except ClientError as e:
494+
click.secho("Error: " + str(e), fg="red")
495+
return
496+
except KeyboardInterrupt:
497+
click.secho("Cancelling...")
498+
if upload_job:
499+
push_project_cancel(upload_job)
500+
except Exception as e:
501+
_print_unhandled_exception()
502+
471503

472504
@cli.command()
473505
@click.argument("version")

mergin/client.py

Lines changed: 59 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import os
44
import json
55
import shutil
6-
import time
76
import zlib
87
import base64
98
import urllib.parse
@@ -17,6 +16,7 @@
1716
import re
1817
import typing
1918
import warnings
19+
from time import sleep
2020

2121
from typing import List
2222

@@ -41,7 +41,7 @@
4141
download_diffs_finalize,
4242
)
4343
from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize
44-
from .client_push import push_project_async, push_project_wait, push_project_finalize, UploadChunksCache
44+
from .client_push import get_push_changes_batch, push_project_async, push_project_is_running, push_project_wait, push_project_finalize, UploadChunksCache
4545
from .utils import DateTimeEncoder, get_versions_with_file_changes, int_version, is_version_acceptable
4646
from .version import __version__
4747

@@ -1486,32 +1486,72 @@ def create_invitation(self, workspace_id: int, email: str, workspace_role: Works
14861486
ws_inv = self.post(f"v2/workspaces/{workspace_id}/invitations", params, json_headers)
14871487
return json.load(ws_inv)
14881488

1489-
def sync_project(self, project_dir):
1489+
def sync_project(self, project_directory):
14901490
"""
14911491
Syncs project by loop with these steps:
14921492
1. Pull server version
14931493
2. Get local changes
14941494
3. Push first change batch
14951495
Repeat if there are more local changes.
1496-
The batch pushing makes use of the server ability to handle simultaneously exclusive upload (that blocks
1497-
other uploads) and non-exclusive upload (for adding assets)
14981496
"""
1499-
attempts = 2
1500-
for attempt in range(attempts):
1497+
mp = MerginProject(project_directory)
1498+
has_changes = True
1499+
server_conflict_attempts = 0
1500+
while has_changes:
1501+
pull_job = pull_project_async(self, project_directory)
1502+
if pull_job:
1503+
pull_project_wait(pull_job)
1504+
pull_project_finalize(pull_job)
1505+
15011506
try:
1502-
pull_job = pull_project_async(self, project_dir)
1503-
if pull_job:
1504-
pull_project_wait(pull_job)
1505-
pull_project_finalize(pull_job)
1506-
1507-
job = push_project_async(self, project_dir)
1508-
if job:
1509-
push_project_wait(job)
1510-
push_project_finalize(job)
1511-
break
1507+
job = push_project_async(self, project_directory)
1508+
if not job:
1509+
break
1510+
push_project_wait(job)
1511+
push_project_finalize(job)
1512+
_, has_changes = get_push_changes_batch(self, mp, job.server_resp)
1513+
except ClientError as e:
1514+
if e.http_error == 409 and server_conflict_attempts < 2:
1515+
# retry on conflict, e.g. when server has changes that we do not have yet
1516+
mp.log.info("Attempting sync process due to conflicts between server and local directory or another user is syncing.")
1517+
server_conflict_attempts += 1
1518+
sleep(5)
1519+
continue
1520+
raise e
1521+
1522+
def sync_project_with_callback(self, project_directory, progress_callback=None, sleep_time=0.1):
1523+
"""
1524+
Syncs project while sending push progress info as callback.
1525+
Sync is done in this loop:
1526+
Pending changes? -> Pull -> Get changes batch -> Push the changes -> repeat
1527+
:param progress_callback: updates the progress bar in CLI, on_progress(increment)
1528+
:param sleep_time: sleep time between calling the callback function
1529+
"""
1530+
mp = MerginProject(project_directory)
1531+
has_changes = True
1532+
server_conflict_attempts = 0
1533+
while has_changes:
1534+
pull_job = pull_project_async(self, project_directory)
1535+
if pull_job:
1536+
pull_project_wait(pull_job)
1537+
pull_project_finalize(pull_job)
1538+
try:
1539+
job = push_project_async(self, project_directory)
1540+
if not job:
1541+
break
1542+
last = 0
1543+
while push_project_is_running(job):
1544+
sleep(sleep_time)
1545+
now = job.transferred_size
1546+
progress_callback(now - last, job) # update progressbar with transferred size increment
1547+
last = now
1548+
push_project_finalize(job)
1549+
_, has_changes = get_push_changes_batch(self, mp, job.server_resp)
15121550
except ClientError as e:
1513-
if e.http_error == 409 and attempt < attempts - 1:
1551+
if e.http_error == 409 and server_conflict_attempts < 2:
15141552
# retry on conflict, e.g. when server has changes that we do not have yet
1515-
time.sleep(5)
1553+
mp.log.info("Attempting sync process due to conflicts between server and local directory or another user is syncing.")
1554+
server_conflict_attempts += 1
1555+
sleep(5)
15161556
continue
15171557
raise e

mergin/client_pull.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,7 @@ def dump(self):
373373
print("--- END ---")
374374

375375

376-
def pull_project_async(mc, directory):
376+
def pull_project_async(mc, directory) -> PullJob:
377377
"""
378378
Starts project pull in background and returns handle to the pending job.
379379
Using that object it is possible to watch progress or cancel the ongoing work.

mergin/client_push.py

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,6 @@ def upload_chunk_v2_api(self, data: bytes, checksum: str):
109109
resp_dict = json.load(resp)
110110
self.server_chunk_id = resp_dict.get("id")
111111
self.mc.upload_chunks_cache.add(checksum, self.server_chunk_id)
112-
self.mp.log.debug(f"Upload chunk finished: {self.file_path}")
113112

114113
def upload_blocking(self):
115114
with open(self.file_path, "rb") as file_handle:
@@ -137,7 +136,7 @@ def upload_blocking(self):
137136
continue
138137
raise
139138

140-
self.mp.log.debug(f"Upload chunk finished: {self.file_path}")
139+
self.mp.log.debug(f"Upload chunk {self.chunk_id} finished: {self.file_path}")
141140

142141

143142
class UploadJob:
@@ -188,7 +187,9 @@ def add_items(self, items: List[UploadQueueItem], total_size: int):
188187

189188
def update_chunks_from_items(self):
190189
"""Update chunks in LocalChanges from the upload queue items."""
191-
self.changes.update_chunks([(item.file_checksum, item.server_chunk_id) for item in self.upload_queue_items])
190+
self.changes.update_chunks(
191+
[(item.file_checksum, item.server_chunk_id) for item in self.upload_queue_items]
192+
)
192193

193194

194195
def create_upload_chunks(mc, mp: MerginProject, local_changes: List[LocalChange]) -> Tuple[List[UploadQueueItem], int]:
@@ -348,11 +349,13 @@ def push_project_async(mc, directory) -> Optional[UploadJob]:
348349
+ f"\n\nLocal version: {local_version}\nServer version: {server_version}"
349350
)
350351

351-
changes = mp.get_push_changes()
352-
changes = filter_changes(mc, project_info, changes)
353-
mp.log.debug("push changes:\n" + pprint.pformat(changes))
354-
355352
tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")
353+
changes, changes_len = get_push_changes_batch(mc, mp, project_info)
354+
if not changes_len:
355+
mp.log.info(f"--- push {project_path} - nothing to do")
356+
return
357+
358+
mp.log.debug("push changes:\n" + pprint.pformat(changes))
356359

357360
# If there are any versioned files (aka .gpkg) that are not updated through a diff,
358361
# we need to make a temporary copy somewhere to be sure that we are uploading full content.
@@ -367,10 +370,6 @@ def push_project_async(mc, directory) -> Optional[UploadJob]:
367370
if mp.is_versioned_file(f["path"]):
368371
mp.copy_versioned_file_for_upload(f, tmp_dir.name)
369372

370-
if not sum(len(v) for v in changes.values()):
371-
mp.log.info(f"--- push {project_path} - nothing to do")
372-
return
373-
374373
server_feaure_flags = mc.server_features()
375374
job = None
376375
local_changes = LocalChanges(
@@ -527,3 +526,13 @@ def remove_diff_files(job: UploadJob) -> None:
527526
diff_file = job.mp.fpath_meta(diff.path)
528527
if os.path.exists(diff_file):
529528
os.remove(diff_file)
529+
530+
531+
def get_push_changes_batch(mc, mp: MerginProject, project_info: dict) -> Tuple[dict, int]:
532+
"""
533+
Get changes that need to be pushed to the server.
534+
"""
535+
changes = mp.get_push_changes()
536+
changes = filter_changes(mc, project_info, changes)
537+
538+
return changes, sum(len(v) for v in changes.values())

mergin/local_changes.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ class LocalChange(BaseLocalChange):
1515
chunks: List[str] = field(default_factory=list)
1616
diff: Optional[dict] = None
1717
upload_file: Optional[str] = None
18+
# some functions (MerginProject.compare_file_sets) are adding version to the change from project info
19+
version: Optional[str] = None
20+
# some functions (MerginProject.compare_file_sets) are adding history dict to the change from project info
21+
history: Optional[dict] = None
22+
# some functions (MerginProject.compare_file_sets) are adding location dict to the change from project info
23+
location: Optional[str] = None
1824

1925
def get_diff(self) -> Optional[BaseLocalChange]:
2026
if self.diff:
@@ -69,15 +75,15 @@ def update_chunks(self, chunks: List[Tuple[str, str]]) -> None:
6975
lists based on the provided `chunks` list, which contains tuples of (checksum, chunk_id).
7076
"""
7177
for change in self.added:
72-
change.chunks = [
78+
change.chunks = list({
7379
chunk[1]
7480
for chunk in chunks
7581
if chunk[0] == change.checksum
76-
]
82+
})
7783

7884
for change in self.updated:
79-
change.chunks = [
85+
change.chunks = list({
8086
chunk[1]
8187
for chunk in chunks
8288
if chunk[0] == change.checksum
83-
]
89+
})

0 commit comments

Comments
 (0)