Skip to content

Commit 3fa4545

Browse files
committed
Syncer
1 parent 88cea53 commit 3fa4545

File tree

7 files changed

+327
-46
lines changed

7 files changed

+327
-46
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ Commands:
8585
show-file-history Display information about a single version of a...
8686
show-version Display information about a single version of a...
8787
status Show all changes in project files - upstream and...
88+
sync Syncronize the project.
8889
```
8990

9091
### Examples

mergin/cli.py

Lines changed: 48 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,8 @@
3333
download_project_is_running,
3434
)
3535
from mergin.client_pull import pull_project_async, pull_project_is_running, pull_project_finalize, pull_project_cancel
36-
from mergin.client_push import push_next_change, push_project_is_running, push_project_finalize, push_project_cancel
37-
36+
from mergin.client_push import push_next_change, push_project_is_running, push_project_finalize, push_project_cancel, push_project_async
37+
from mergin.client_sync import Syncer, calculate_uploads_size
3838

3939
from pygeodiff import GeoDiff
4040

@@ -403,6 +403,47 @@ def status(ctx):
403403
pretty_summary(push_changes_summary)
404404

405405

406+
@cli.command()
407+
@click.pass_context
408+
def sync(ctx):
409+
"""Synchronize the project. Pull latest project version from the server and push split changes."""
410+
mc = ctx.obj["client"]
411+
if mc is None:
412+
return
413+
directory = os.getcwd()
414+
syncer = Syncer(mc, directory)
415+
size = syncer.estimate_total_upload()
416+
if size == 0:
417+
click.secho("Already up to date.", fg="green")
418+
return
419+
420+
try:
421+
uploaded_so_far = 0
422+
423+
with click.progressbar(length=size, label="Uploading changes") as bar:
424+
# updates the progress bar
425+
def on_progress(last, now):
426+
nonlocal uploaded_so_far
427+
delta = now - last
428+
uploaded_so_far += delta
429+
bar.update(delta)
430+
431+
# run pull → push cycles
432+
syncer.sync_loop(progress_callback=on_progress)
433+
434+
click.secho("Sync complete.", fg="green")
435+
436+
except InvalidProject as e:
437+
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
438+
except ClientError as e:
439+
click.secho("Error: " + str(e), fg="red")
440+
return
441+
except KeyboardInterrupt:
442+
click.secho("Cancelling...")
443+
syncer.cancel()
444+
except Exception as e:
445+
_print_unhandled_exception()
446+
406447
@cli.command()
407448
@click.pass_context
408449
def push(ctx):
@@ -412,24 +453,17 @@ def push(ctx):
412453
return
413454
directory = os.getcwd()
414455
try:
415-
# keep going until there are no more changes
416-
while True:
417-
job = push_next_change(mc, directory)
418-
if job is None:
419-
click.echo("All changes uploaded.")
420-
break
421-
422-
# show progress for this single change upload
423-
with click.progressbar(length=job.total_size, label="Uploading change") as bar:
456+
job = push_project_async(mc, directory)
457+
if job is not None: # if job is none, we don't upload any files, and the transaction is finished already
458+
with click.progressbar(length=job.total_size) as bar:
424459
last_transferred_size = 0
425460
while push_project_is_running(job):
426461
time.sleep(1 / 10) # 100ms
427462
new_transferred_size = job.transferred_size
428463
bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only
429464
last_transferred_size = new_transferred_size
430-
# finalize this change upload (bump versions on server & locally)
431465
push_project_finalize(job)
432-
click.echo("Change pushed, checking for more…")
466+
click.secho("Done", fg="green")
433467
except InvalidProject as e:
434468
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
435469
except ClientError as e:
@@ -463,7 +497,7 @@ def pull(ctx):
463497
bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only
464498
last_transferred_size = new_transferred_size
465499
pull_project_finalize(job)
466-
click.echo("Done")
500+
click.secho("Done", fg="green")
467501
except InvalidProject as e:
468502
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
469503
except ClientError as e:

mergin/client.py

Lines changed: 32 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
from typing import List
2121

22+
from .client_sync import Syncer
2223
from .common import ClientError, LoginError, WorkspaceRole, ProjectRole, LOG_FILE_SIZE_TO_SEND, MERGIN_DEFAULT_LOGS_URL
2324
from .merginproject import MerginProject
2425
from .client_pull import (
@@ -33,7 +34,7 @@
3334
download_diffs_finalize,
3435
)
3536
from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize
36-
from .client_push import push_next_change, push_project_wait, push_project_finalize
37+
from .client_push import push_next_change, push_project_wait, push_project_finalize, push_project_async
3738
from .utils import DateTimeEncoder, get_versions_with_file_changes, int_version, is_version_acceptable
3839
from .version import __version__
3940

@@ -890,19 +891,24 @@ def project_user_permissions(self, project_path):
890891
result["readers"] = access.get("readersnames", [])
891892
return result
892893

894+
895+
896+
897+
898+
893899
def push_project(self, directory):
894900
"""
895901
Upload local changes to the repository.
896902
897903
:param directory: Project's directory
898904
:type directory: String
899905
"""
900-
while True:
901-
job = push_next_change(self, directory)
902-
if not job:
903-
return # there is nothing to push (or we only deleted some files)
904-
push_project_wait(job)
905-
push_project_finalize(job)
906+
# while True:
907+
job = push_project_async(self, directory)
908+
if not job:
909+
return # there is nothing to push (or we only deleted some files)
910+
push_project_wait(job)
911+
push_project_finalize(job)
906912

907913
def pull_project(self, directory):
908914
"""
@@ -917,6 +923,25 @@ def pull_project(self, directory):
917923
pull_project_wait(job)
918924
return pull_project_finalize(job)
919925

926+
927+
def sync_project(self, directory):
928+
929+
syncer = Syncer(self, directory)
930+
syncer.sync_loop()
931+
932+
# self.pull_project(directory)
933+
#
934+
# mp = MerginProject(directory)
935+
# changes = mp.get_push_changes()
936+
#
937+
# if not changes:
938+
# return
939+
#
940+
# push_changes(self, mp, changes)
941+
#
942+
# self.sync_project(directory)
943+
944+
920945
def clone_project(self, source_project_path, cloned_project_name, cloned_project_namespace=None):
921946
"""
922947
Clone project on server.

mergin/client_push.py

Lines changed: 22 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@
1515
import tempfile
1616
import concurrent.futures
1717
import os
18-
from typing import Dict, List, Optional
19-
import click
18+
from typing import Dict, List, Optional, Tuple
2019

2120
from .common import UPLOAD_CHUNK_SIZE, ClientError
2221
from .merginproject import MerginProject
@@ -27,7 +26,7 @@
2726
class UploadJob:
2827
"""Keeps all the important data about a pending upload job"""
2928

30-
def __init__(self, project_path, changes, transaction_id, mp, mc, tmp_dir):
29+
def __init__(self, project_path, changes, transaction_id, mp, mc, tmp_dir, exclusive: bool):
3130
self.project_path = project_path # full project name ("username/projectname")
3231
self.changes = changes # dictionary of local changes to the project
3332
self.transaction_id = transaction_id # ID of the transaction assigned by the server
@@ -37,6 +36,7 @@ def __init__(self, project_path, changes, transaction_id, mp, mc, tmp_dir):
3736
self.mp = mp # MerginProject instance
3837
self.mc = mc # MerginClient instance
3938
self.tmp_dir = tmp_dir # TemporaryDirectory instance for any temp file we need
39+
self.exclusive = exclusive # flag whether this upload blocks other uploads
4040
self.is_cancelled = False # whether upload has been cancelled
4141
self.executor = None # ThreadPoolExecutor that manages background upload tasks
4242
self.futures = [] # list of futures submitted to the executor
@@ -160,7 +160,7 @@ def split(self) -> List[Dict[str, List[dict]]]:
160160
return changes_list
161161

162162

163-
def push_next_change(mc, directory) -> Optional[UploadJob]:
163+
def push_project_async(mc, directory) -> Optional[UploadJob]:
164164
"""Starts push of a change of a project and returns pending upload job"""
165165

166166
mp = MerginProject(directory)
@@ -198,14 +198,8 @@ def push_next_change(mc, directory) -> Optional[UploadJob]:
198198
+ f"\n\nLocal version: {local_version}\nServer version: {server_version}"
199199
)
200200

201-
all_changes = mp.get_push_changes()
202-
changes_list = ChangesHandler(mc, project_info, all_changes).split()
203-
if not changes_list:
204-
return None
205-
206-
# take only the first change
207-
change = changes_list[0]
208-
mp.log.debug("push change:\n" + pprint.pformat(change))
201+
changes = change_batch or mp.get_push_changes()
202+
mp.log.debug("push change:\n" + pprint.pformat(changes))
209203

210204
tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")
211205

@@ -214,22 +208,22 @@ def push_next_change(mc, directory) -> Optional[UploadJob]:
214208
# That's because if there are pending transactions, checkpointing or switching from WAL mode
215209
# won't work, and we would end up with some changes left in -wal file which do not get
216210
# uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything.
217-
for f in change["updated"]:
211+
for f in changes["updated"]:
218212
if mp.is_versioned_file(f["path"]) and "diff" not in f:
219213
mp.copy_versioned_file_for_upload(f, tmp_dir.name)
220214

221-
for f in change["added"]:
215+
for f in changes["added"]:
222216
if mp.is_versioned_file(f["path"]):
223217
mp.copy_versioned_file_for_upload(f, tmp_dir.name)
224218

225-
if not any(len(v) for v in change.values()):
219+
if not any(len(v) for v in changes.values()):
226220
mp.log.info(f"--- push {project_path} - nothing to do")
227221
return
228222

229223
# drop internal info from being sent to server
230-
for item in change["updated"]:
224+
for item in changes["updated"]:
231225
item.pop("origin_checksum", None)
232-
data = {"version": local_version, "changes": change}
226+
data = {"version": local_version, "changes": changes}
233227

234228
try:
235229
resp = mc.post(
@@ -246,15 +240,16 @@ def push_next_change(mc, directory) -> Optional[UploadJob]:
246240
upload_files = data["changes"]["added"] + data["changes"]["updated"]
247241

248242
transaction_id = server_resp["transaction"] if upload_files else None
249-
job = UploadJob(project_path, change, transaction_id, mp, mc, tmp_dir)
243+
exclusive = server_resp.get("exclusive", True)
244+
job = UploadJob(project_path, changes, transaction_id, mp, mc, tmp_dir, exclusive)
250245

251246
if not upload_files:
252247
mp.log.info("not uploading any files")
253248
job.server_resp = server_resp
254249
push_project_finalize(job)
255250
return None # all done - no pending job
256251

257-
mp.log.info(f"got transaction ID {transaction_id}")
252+
mp.log.info(f"got transaction ID {transaction_id}, {'exclusive' if exclusive else 'non-exclusive'} upload")
258253

259254
upload_queue_items = []
260255
total_size = 0
@@ -347,7 +342,7 @@ def push_project_finalize(job):
347342

348343
if with_upload_of_files:
349344
try:
350-
job.mp.log.info(f"Finishing transaction {job.transaction_id}")
345+
job.mp.log.info(f"Finishing {'exclusive' if job.exclusive else 'non-exclusive'} transaction {job.transaction_id}")
351346
resp = job.mc.post("/v1/project/push/finish/%s" % job.transaction_id)
352347
job.server_resp = json.load(resp)
353348
except ClientError as err:
@@ -417,3 +412,10 @@ def remove_diff_files(job) -> None:
417412
diff_file = job.mp.fpath_meta(change["diff"]["path"])
418413
if os.path.exists(diff_file):
419414
os.remove(diff_file)
415+
416+
def get_next_batch(project_dir) -> Tuple[Dict[str], bool]:
417+
"""
418+
Return the next dictionary with changes, similar to changes[0] in push_project_async.
419+
"""
420+
# TODO
421+
return {"added": [], "updated": [], "removed": []}, True

mergin/client_sync.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import time
2+
from typing import Dict, Any
3+
4+
from .merginproject import MerginProject
5+
from .client_pull import DownloadJob, pull_project_async, pull_project_wait, pull_project_finalize, pull_project_cancel
6+
from .client_push import UploadJob, push_project_async, push_project_wait, push_project_finalize, push_next_change, \
7+
push_project_cancel, push_project_is_running
8+
9+
10+
class Syncer:
11+
def __init__(self, client, directory, download_job: DownloadJob = None, upload_job: UploadJob = None, total_upload_size: int = 0, uploaded_size: int = 0):
12+
self.client = client
13+
self.directory = directory
14+
self.mp = MerginProject(directory)
15+
self.download_job = download_job
16+
self.upload_job = upload_job
17+
self.total_upload_size = total_upload_size
18+
self.uploaded_size = uploaded_size
19+
20+
def _one_cycle(self):
21+
"""Pull remote, then kick off an upload (if there are changes).
22+
Returns an UploadJob (with .transferred_size) or None if up‑to‑date."""
23+
download_job = pull_project_async(self.client, self.directory)
24+
if download_job:
25+
pull_project_wait(download_job)
26+
pull_project_finalize(download_job)
27+
28+
changes = self.mp.get_push_changes()
29+
if not changes:
30+
return None
31+
32+
upload_job = push_next_change(self.client, self.mp, changes)
33+
return upload_job
34+
35+
def sync_loop(self, progress_callback=None):
36+
"""
37+
Repeatedly do pull → push cycles until no more changes.
38+
If progress_callback is provided, it’s called as:
39+
progress_callback(upload_job, last_transferred, new_transferred)
40+
"""
41+
while True:
42+
upload_job = self._one_cycle()
43+
if upload_job is None:
44+
break
45+
46+
# no UI: just wait & finalize
47+
if progress_callback is None:
48+
push_project_wait(upload_job)
49+
push_project_finalize(upload_job)
50+
continue
51+
52+
# drive the callback
53+
last = 0
54+
while push_project_is_running(upload_job):
55+
time.sleep(0.1)
56+
now = upload_job.transferred_size
57+
progress_callback(upload_job, last, now)
58+
last = now
59+
60+
push_project_wait(upload_job)
61+
push_project_finalize(upload_job)
62+
63+
def cancel(self):
64+
if self.download_job is not None:
65+
pull_project_cancel(self.download_job)
66+
if self.upload_job is not None:
67+
push_project_cancel(self.upload_job)
68+
69+
def estimate_total_upload(self):
70+
"""
71+
One‑shot estimate of the _current_ total upload size.
72+
"""
73+
changes = self.mp.get_push_changes()
74+
return calculate_uploads_size(changes) if changes else 0
75+
76+
def calculate_uploads_size(changes: Dict[str, Any]) -> int:
77+
files = changes.get("added", []) + changes.get("updated", [])
78+
return sum(
79+
f.get("diff", {}).get("size", f.get("size", 0))
80+
for f in files
81+
)

0 commit comments

Comments
 (0)