Skip to content

Commit 8ceee04

Browse files
committed
Address disscussions:
- update new table schema for project version delta - add tests and upgrade existing to handle order of diffs
1 parent d0ef271 commit 8ceee04

File tree

5 files changed

+396
-233
lines changed

5 files changed

+396
-233
lines changed

server/mergin/sync/models.py

Lines changed: 96 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ def get_delta_changes(
359359
"""
360360
Get changes between two versions, merging them if needed.
361361
- create FileDiff checkpoints if needed
362-
- create ProjectVersionDelta checkpoints if needed with delta json
362+
- create ProjectVersionDelta checkpoints if needed with changes json
363363
"""
364364
if since > to:
365365
logging.error(
@@ -371,19 +371,18 @@ def get_delta_changes(
371371
project_id = self.id
372372
expected_checkpoints = Checkpoint.get_checkpoints(since + 1, to)
373373
expected_deltas: List[ProjectVersionDelta] = (
374-
ProjectVersionDelta.query.join(ProjectVersion)
375-
.filter(
376-
ProjectVersion.project_id == project_id,
377-
ProjectVersion.name > since,
378-
ProjectVersion.name <= to,
379-
tuple_(ProjectVersionDelta.rank, ProjectVersion.name).in_(
374+
ProjectVersionDelta.query.filter(
375+
ProjectVersionDelta.project_id == project_id,
376+
ProjectVersionDelta.version > since,
377+
ProjectVersionDelta.version <= to,
378+
tuple_(ProjectVersionDelta.rank, ProjectVersionDelta.version).in_(
380379
[(item.rank, item.end) for item in expected_checkpoints]
381380
),
382381
)
383-
.order_by(ProjectVersion.name)
382+
.order_by(ProjectVersionDelta.version)
384383
.all()
385384
)
386-
existing_delta_map = {(c.rank, c.version.name): c for c in expected_deltas}
385+
existing_delta_map = {(c.rank, c.version): c for c in expected_deltas}
387386

388387
# Cache all individual (rank 0) delta rows in the required range.
389388
individual_deltas: List[ProjectVersionDelta] = []
@@ -400,14 +399,13 @@ def get_delta_changes(
400399
# If higher rank delta checkopoint does not exists, we are using rank=0 deltas to create checkopoint
401400
if checkpoint.rank > 0:
402401
individual_deltas = (
403-
ProjectVersionDelta.query.join(ProjectVersion)
404-
.filter(
405-
ProjectVersion.project_id == project_id,
406-
ProjectVersion.name >= since,
407-
ProjectVersion.name <= to,
402+
ProjectVersionDelta.query.filter(
403+
ProjectVersionDelta.project_id == project_id,
404+
ProjectVersionDelta.version >= since,
405+
ProjectVersionDelta.version <= to,
408406
ProjectVersionDelta.rank == 0,
409407
)
410-
.order_by(ProjectVersion.name)
408+
.order_by(ProjectVersionDelta.version)
411409
.all()
412410
if not individual_deltas
413411
else individual_deltas
@@ -431,7 +429,7 @@ def get_delta_changes(
431429
f"Not possible to create checkpoint for project {project_id} in range {checkpoint.start}-{checkpoint.end}"
432430
)
433431

434-
return ProjectVersionDelta.merge_delta_changes(result)
432+
return ProjectVersionDelta.merge_changes(result)
435433

436434

437435
class ProjectRole(Enum):
@@ -987,82 +985,105 @@ def construct_checkpoint(self) -> bool:
987985

988986
class ProjectVersionDelta(db.Model):
989987
id = db.Column(db.BigInteger, primary_key=True, autoincrement=True)
988+
version = db.Column(db.Integer, nullable=False, index=True)
990989
# exponential order of changes json
991990
rank = db.Column(db.Integer, nullable=False, index=True)
992991
# to which project version is this linked
993-
version_id = db.Column(
994-
db.Integer,
995-
db.ForeignKey("project_version.id", ondelete="CASCADE"),
992+
project_id = db.Column(
993+
UUID(as_uuid=True),
994+
db.ForeignKey("project.id", ondelete="CASCADE"),
996995
index=True,
997996
nullable=False,
998997
)
999998
# cached changes for versions from start to end (inclusive)
1000999
changes = db.Column(JSONB, nullable=False)
10011000

10021001
__table_args__ = (
1003-
db.UniqueConstraint("version_id", "rank", name="unique_deltas"),
1002+
db.UniqueConstraint("project_id", "version", "rank", name="unique_deltas"),
10041003
db.Index(
1005-
"ix_project_version_delta_version_id_rank",
1006-
version_id,
1004+
"ix_project_version_delta_project_id_version_rank",
1005+
project_id,
1006+
version,
10071007
rank,
10081008
),
10091009
)
1010-
version = db.relationship(
1011-
"ProjectVersion",
1010+
project = db.relationship(
1011+
"Project",
10121012
uselist=False,
10131013
)
10141014

10151015
@staticmethod
1016-
def merge_delta_changes(
1016+
def merge_changes(
10171017
items: List[DeltaChange],
10181018
) -> List[DeltaChangeMerged]:
10191019
"""
1020-
Merge multiple changes json array objects into one list of changes.
1020+
Merge changes json array objects into one list of changes.
10211021
Changes are merged based on file path and change type.
10221022
"""
10231023
result: Dict[str, DeltaChangeMerged] = {}
1024+
# sorting changes by version to apply them in correct order
1025+
items.sort(key=lambda x: x.version)
1026+
1027+
def handle_replace(result, path, current, previous):
1028+
result[path] = current
1029+
1030+
def handle_delete(result, path, current, previous):
1031+
del result[path]
1032+
1033+
def handle_update(result, path, current, previous):
1034+
# handle update case, when previous change was create - just revert to create with new metadata
1035+
current.change = previous.change
1036+
current.version = previous.version
1037+
current.diffs = []
1038+
result[path] = current
1039+
1040+
def handle_update_diff(result, path, current, previous):
1041+
current.diffs = (previous.diffs or []) + (current.diffs or [])
1042+
result[path] = current
1043+
1044+
dispatch = {
1045+
# create + delete = file is transparent for current changes -> delete it
1046+
(PushChangeType.CREATE, PushChangeType.DELETE): handle_delete,
1047+
# create + update = create with updated info
1048+
(PushChangeType.CREATE, PushChangeType.UPDATE): handle_update,
1049+
(PushChangeType.CREATE, PushChangeType.UPDATE_DIFF): handle_update,
1050+
(PushChangeType.CREATE, PushChangeType.CREATE): None,
1051+
# update + update_diff = update with latest info
1052+
(
1053+
PushChangeType.UPDATE,
1054+
PushChangeType.UPDATE_DIFF,
1055+
): handle_update,
1056+
(PushChangeType.UPDATE, PushChangeType.UPDATE): handle_replace,
1057+
(PushChangeType.UPDATE, PushChangeType.DELETE): handle_replace,
1058+
(PushChangeType.UPDATE, PushChangeType.CREATE): handle_replace,
1059+
# update_diff + update_diff = update_diff with latest info with proper order of diffs
1060+
(
1061+
PushChangeType.UPDATE_DIFF,
1062+
PushChangeType.UPDATE_DIFF,
1063+
): handle_update_diff,
1064+
(PushChangeType.UPDATE_DIFF, PushChangeType.UPDATE): handle_replace,
1065+
(PushChangeType.UPDATE_DIFF, PushChangeType.DELETE): handle_replace,
1066+
(PushChangeType.UPDATE_DIFF, PushChangeType.CREATE): None,
1067+
(PushChangeType.DELETE, PushChangeType.CREATE): handle_replace,
1068+
# delete + update = invalid sequence, keep delete
1069+
(PushChangeType.DELETE, PushChangeType.UPDATE): None,
1070+
(PushChangeType.DELETE, PushChangeType.UPDATE_DIFF): None,
1071+
(PushChangeType.DELETE, PushChangeType.DELETE): None,
1072+
}
1073+
10241074
for item in items:
10251075
current = item.to_merged_delta()
1026-
previous = result.get(current.path)
10271076
path = current.path
1028-
if previous:
1029-
# merge changes data jsons
1030-
if previous.change == PushChangeType.CREATE:
1031-
if current.change == PushChangeType.DELETE:
1032-
# create + delete = file is transparent for current changes -> delete it
1033-
del result[path]
1034-
elif current.change in (
1035-
PushChangeType.UPDATE,
1036-
PushChangeType.UPDATE_DIFF,
1037-
):
1038-
# create + update = create with with the most recent metadata
1039-
current.change = previous.change
1040-
current.diffs = []
1041-
else:
1042-
result[path] = current
1043-
elif previous.change == PushChangeType.UPDATE:
1044-
if current.change == PushChangeType.UPDATE_DIFF:
1045-
# update + update_diff = update with latest info
1046-
current.change = previous.change
1047-
current.diffs = []
1048-
result[path] = current
1049-
elif previous.change == PushChangeType.UPDATE_DIFF:
1050-
if current.change == PushChangeType.UPDATE_DIFF:
1051-
# update_diff + update_diff = update_diff with latest info
1052-
current.diffs.extend(previous.diffs or [])
1053-
result[path] = current
1054-
elif previous.change == PushChangeType.DELETE:
1055-
if current.change == PushChangeType.CREATE:
1056-
# delete + create = create
1057-
result[path] = current
1058-
elif current.change in (
1059-
PushChangeType.UPDATE,
1060-
PushChangeType.UPDATE_DIFF,
1061-
):
1062-
# delete + update = invalid sequence, keep delete
1063-
continue
1064-
else:
1065-
result[current.path] = current
1077+
previous = result.get(path)
1078+
1079+
if not previous:
1080+
result[path] = current
1081+
continue
1082+
1083+
handler = dispatch.get((previous.change, current.change))
1084+
if handler:
1085+
handler(result, path, current, previous)
1086+
10661087
return list(result.values())
10671088

10681089
@classmethod
@@ -1078,7 +1099,7 @@ def create_checkpoint(
10781099
delta_range = [
10791100
change
10801101
for change in changes
1081-
if checkpoint.start <= change.version.name <= checkpoint.end
1102+
if checkpoint.start <= change.version <= checkpoint.end
10821103
]
10831104

10841105
if not delta_range:
@@ -1092,7 +1113,7 @@ def create_checkpoint(
10921113
for delta in delta_range:
10931114
changes.extend(DeltaChangeSchema(many=True).load(delta.changes))
10941115
merged_delta_items: List[DeltaChange] = [
1095-
d.to_data_delta() for d in cls.merge_delta_changes(changes)
1116+
d.to_data_delta() for d in cls.merge_changes(changes)
10961117
]
10971118

10981119
# Pre-fetch data for all versioned files to create FileDiff checkpoints
@@ -1104,6 +1125,7 @@ def create_checkpoint(
11041125
]
11051126
versioned_file_paths = [delta.path for delta in versioned_delta_items]
11061127
if versioned_file_paths:
1128+
# get versioned files from DB and lookup their paths to next processing
11071129
file_paths = ProjectFilePath.query.filter(
11081130
ProjectFilePath.project_id == project_id,
11091131
ProjectFilePath.path.in_(versioned_file_paths),
@@ -1121,7 +1143,7 @@ def create_checkpoint(
11211143
rank=checkpoint.rank,
11221144
version=checkpoint.end,
11231145
).first()
1124-
1146+
# If does not exists, let's create diff with higher rank and some generated path (name of diff file)
11251147
if not existing_diff_checkpoint:
11261148
base_file = FileHistory.get_basefile(file_path_id, checkpoint.end)
11271149
if not base_file:
@@ -1141,7 +1163,8 @@ def create_checkpoint(
11411163
db.session.add(checkpoint_diff)
11421164

11431165
checkpoint_delta = ProjectVersionDelta(
1144-
version_id=delta_range[-1].version_id,
1166+
project_id=project_id,
1167+
version=checkpoint.end,
11451168
rank=checkpoint.rank,
11461169
changes=DeltaChangeSchema(many=True).dump(merged_delta_items),
11471170
)
@@ -1257,13 +1280,14 @@ def __init__(
12571280
)
12581281
for c in changes
12591282
]
1260-
pvc = ProjectVersionDelta(
1261-
version=self,
1283+
pvd = ProjectVersionDelta(
1284+
project_id=project.id,
1285+
version=name,
12621286
rank=0,
12631287
changes=DeltaChangeSchema(many=True).dump(delta_data),
12641288
)
12651289

1266-
db.session.add(pvc)
1290+
db.session.add(pvd)
12671291
db.session.flush()
12681292

12691293
# update cached values in project and push to transaction buffer so that self.files is up-to-date

server/mergin/sync/public_api_v2_controller.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -409,12 +409,15 @@ def get_project_delta(id: str, since: int, to: Optional[int] = None):
409409
"""Get project changes (delta) between two versions"""
410410

411411
project: Project = require_project_by_uuid(id, ProjectPermissions.Read)
412-
if to is None:
413-
to = project.latest_version
412+
to = project.latest_version if to is None else to
413+
if to > project.latest_version:
414+
abort(400, "'to' version exceeds latest project version")
414415

415-
if since > to:
416-
abort(400, "'since' version must be less than 'to' version")
416+
if since >= to:
417+
abort(400, "'since' version must be less than or equal to 'to' version")
417418

418419
delta_changes = project.get_delta_changes(since, to)
420+
if delta_changes is None:
421+
abort(404)
419422

420423
return DeltaChangeRespSchema(many=True).dump(delta_changes), 200

0 commit comments

Comments
 (0)