Skip to content

Split push for blocking and non-blocking #244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
9fa6c04
Split push for blocking and non-blocking
harminius Jun 18, 2025
dd03a38
Introduce UploadChanges class and split method
harminius Jun 20, 2025
ff3afac
Introduce UploadChangesHandler
harminius Jun 23, 2025
7081687
Put changes dicts back
harminius Jun 23, 2025
87f072e
Fix legacy changes
harminius Jun 23, 2025
ed47d58
Make ChnagesHandler cleaner
harminius Jun 24, 2025
aab4c7d
Remove blocking flag
harminius Jun 25, 2025
a577092
Replace sum() with faster any() in boolean expressions
harminius Jun 25, 2025
4500d0b
Fix tests and codestyle
harminius Jun 26, 2025
b326fba
login type and checks
JanCaha May 16, 2025
5484eee
raise auth token error
JanCaha May 16, 2025
4a57c86
remove enum
JanCaha Jun 25, 2025
0b1049f
remove
JanCaha Jun 25, 2025
be9c23b
update
JanCaha Jun 25, 2025
75059b9
raise error
JanCaha Jun 25, 2025
c42bcc7
Jupyter notebooks - Update README.md
alex-cit Jun 22, 2025
cd3b55a
allow client creation without authorization
JanCaha Jun 25, 2025
2c81a14
add test
JanCaha Jun 25, 2025
51127ab
simplify
JanCaha Jun 25, 2025
296de65
change error
JanCaha Jun 25, 2025
dfa8868
fix catch error
JanCaha Jun 25, 2025
e175f0b
Add test for ChangesHandler
harminius Jun 26, 2025
513a06a
Wait till change upload is done before uploading next change
harminius Jul 1, 2025
4bfa65c
Syncer
harminius Jul 9, 2025
d74c7b3
sync methods
harminius Jul 10, 2025
da39507
Cleanup and docstrings
harminius Jul 11, 2025
5ba7535
formatting
harminius Jul 11, 2025
df1a1c4
cleanup
harminius Jul 11, 2025
bafd30f
Get rid of renamed key in changes
harminius Jul 11, 2025
e886dfc
Fix test and filter_changes()
harminius Jul 11, 2025
81e7909
Fix tests
harminius Jul 11, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ Commands:
share Fetch permissions to project
share-add Add permissions to [users] to project
share-remove Remove [users] permissions from project
show-file-changeset Displays information about project changes.
show-file-history Displays information about a single version of a...
show-version Displays information about a single version of a...
show-file-changeset Display information about project changes.
show-file-history Display information about a single version of a...
show-version Display information about a single version of a...
status Show all changes in project files - upstream and...
sync Synchronize the project.
```

### Examples
Expand All @@ -99,7 +100,7 @@ To download a specific version of a project:
$ mergin --username john download --version v42 john/project1 ~/mergin/project1
```

To download a sepecific version of a single file:
To download a specific version of a single file:

1. First you need to download the project:
```
Expand Down
8 changes: 4 additions & 4 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ Here you can find some tailored Jupyter Notebooks examples on how to interact wi
These examples are split into scenarios in order to highlight some of the core capabilities of the Mergin Maps API.

## [Scenario 1](01_users.ipynb) - Users Management
On this scenario you'll create some random users from an existing CSV into Mergin Maps, change user's `ROLE` on a `WORKSPACE` and `PROJECT` perspective as well remove user's from a specific project.
In this scenario you'll create some random users from an existing CSV into Mergin Maps, change user's `ROLE` on a `WORKSPACE` and `PROJECT` perspective as well remove a user from a specific project.

## [Scenario 2](02_sync.ipynb) - Synchronization
On this scenario you'll learn on how to do basic synchronisation (`PUSH`, `PULL`) of projects with the Mergin Maps Python API client.
## [Scenario 2](02_sync.ipynb) - Synchronisation
In this scenario you'll learn how to do basic synchronisation (`PUSH`, `PULL`) of projects with the Mergin Maps Python API client.

## [Scenario 3](03_projects.ipynb) - Projects Management
On this scenario you'll learn how to manage project with the Mergin Maps Python API client, namely how to clone projects and how to separate team members in isolated projects from the cloned template.
In this scenario you'll learn how to manage projects with Mergin Maps Python API client, namely how to clone projects and how to separate team members in isolated projects from the cloned template.
55 changes: 48 additions & 7 deletions mergin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,13 @@
download_project_is_running,
)
from mergin.client_pull import pull_project_async, pull_project_is_running, pull_project_finalize, pull_project_cancel
from mergin.client_push import push_project_async, push_project_is_running, push_project_finalize, push_project_cancel

from mergin.client_push import (
push_project_is_running,
push_project_finalize,
push_project_cancel,
push_project_async,
total_upload_size,
)

from pygeodiff import GeoDiff

Expand Down Expand Up @@ -403,6 +408,42 @@ def status(ctx):
pretty_summary(push_changes_summary)


@cli.command()
@click.pass_context
def sync(ctx):
"""Synchronize the project. Pull latest project version from the server and push split changes."""
mc = ctx.obj["client"]
if mc is None:
return
directory = os.getcwd()

try:
size = total_upload_size(directory)
with click.progressbar(length=size, label="Syncing") as bar:

def on_progress(increment):
bar.update(increment)

# run pull & push cycles until there are no local changes
mc.sync_project_with_callback(directory, progress_callback=on_progress)

click.secho("Sync complete.", fg="green")

except InvalidProject as e:
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
except ClientError as e:
click.secho("Error: " + str(e), fg="red")
return
except KeyboardInterrupt:
click.secho("Cancelling...")
if mc.pull_job:
pull_project_cancel(mc.pull_job)
if mc.push_job:
push_project_cancel(mc.push_job)
except Exception as e:
_print_unhandled_exception()


@cli.command()
@click.pass_context
def push(ctx):
Expand All @@ -422,7 +463,7 @@ def push(ctx):
bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only
last_transferred_size = new_transferred_size
push_project_finalize(job)
click.echo("Done")
click.secho("Done", fg="green")
except InvalidProject as e:
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
except ClientError as e:
Expand Down Expand Up @@ -456,7 +497,7 @@ def pull(ctx):
bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only
last_transferred_size = new_transferred_size
pull_project_finalize(job)
click.echo("Done")
click.secho("Done", fg="green")
except InvalidProject as e:
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
except ClientError as e:
Expand All @@ -473,7 +514,7 @@ def pull(ctx):
@click.argument("version")
@click.pass_context
def show_version(ctx, version):
"""Displays information about a single version of a project. `version` is 'v1', 'v2', etc."""
"""Display information about a single version of a project. `version` is 'v1', 'v2', etc."""
mc = ctx.obj["client"]
if mc is None:
return
Expand All @@ -492,7 +533,7 @@ def show_version(ctx, version):
@click.argument("path")
@click.pass_context
def show_file_history(ctx, path):
"""Displays information about a single version of a project."""
"""Display information about a single version of a project."""
mc = ctx.obj["client"]
if mc is None:
return
Expand All @@ -516,7 +557,7 @@ def show_file_history(ctx, path):
@click.argument("version")
@click.pass_context
def show_file_changeset(ctx, path, version):
"""Displays information about project changes."""
"""Display information about project changes."""
mc = ctx.obj["client"]
if mc is None:
return
Expand Down
105 changes: 97 additions & 8 deletions mergin/client.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
from time import sleep

import math
import os
import json
Expand Down Expand Up @@ -31,9 +33,17 @@
download_project_finalize,
download_project_wait,
download_diffs_finalize,
pull_project_async,
pull_project_wait,
pull_project_finalize,
)
from .client_push import (
push_project_wait,
push_project_finalize,
push_project_async,
push_project_is_running,
get_change_batch,
)
from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize
from .client_push import push_project_async, push_project_wait, push_project_finalize
from .utils import DateTimeEncoder, get_versions_with_file_changes, int_version, is_version_acceptable
from .version import __version__

Expand All @@ -45,6 +55,10 @@ class TokenError(Exception):
pass


class AuthTokenExpiredError(Exception):
pass


class ServerType(Enum):
OLD = auto() # Server is old and does not support workspaces
CE = auto() # Server is Community Edition
Expand Down Expand Up @@ -80,8 +94,16 @@ class MerginClient:
Currently, only HTTP proxies are supported.
"""

def __init__(self, url=None, auth_token=None, login=None, password=None, plugin_version=None, proxy_config=None):
self.url = url if url is not None else MerginClient.default_url()
def __init__(
self,
url=None,
auth_token=None,
login=None,
password=None,
plugin_version=None,
proxy_config=None,
):
self.url = (url if url is not None else MerginClient.default_url()).rstrip("/") + "/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this change - it does not seem to be relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not relevant with the rest - shall be moved to a separate PR?
I just hit this when didn't include the trailing slash to localhost url

self._auth_params = None
self._auth_session = None
self._user_info = None
Expand All @@ -91,6 +113,8 @@ def __init__(self, url=None, auth_token=None, login=None, password=None, plugin_
if plugin_version is not None: # this could be e.g. "Plugin/2020.1 QGIS/3.14"
self.client_version += " " + plugin_version
self.setup_logging()
self.pull_job = None
self.push_job = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these need to be member variables of MerginClient

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because I want to have an option to cancel the job on keyboard interruption in CLI. I need an instance that is aware of the jobs.
Could be a different class (Syncer) but in MerginClient it is the easiest.
It doesn't need to be defined here but I think it is a good practice.

if auth_token:
try:
token_data = decode_token_data(auth_token)
Expand Down Expand Up @@ -190,11 +214,19 @@ def wrapper(self, *args):
delta = self._auth_session["expire"] - datetime.now(timezone.utc)
if delta.total_seconds() < 5:
self.log.info("Token has expired - refreshing...")
self.login(self._auth_params["login"], self._auth_params["password"])
if self._auth_params.get("login", None) and self._auth_params.get("password", None):
self.log.info("Token has expired - refreshing...")
self.login(self._auth_params["login"], self._auth_params["password"])
else:
raise AuthTokenExpiredError("Token has expired - please re-login")
else:
# Create a new authorization token
self.log.info(f"No token - login user: {self._auth_params['login']}")
self.login(self._auth_params["login"], self._auth_params["password"])
if self._auth_params.get("login", None) and self._auth_params.get("password", None):
self.login(self._auth_params["login"], self._auth_params["password"])
else:
raise ClientError("Missing login or password")

return f(self, *args)

return wrapper
Expand Down Expand Up @@ -453,7 +485,8 @@ def create_project(self, project_name, is_public=False, namespace=None):

def create_project_and_push(self, project_name, directory, is_public=False, namespace=None):
"""
Convenience method to create project and push the initial version right after that.
Convenience method to create project and push the the files right after that.
Creates two versions when directory contains blocking and non-blocking changes.

:param project_name: Project's full name (<namespace>/<name>)
:type project_name: String
Expand Down Expand Up @@ -877,7 +910,7 @@ def push_project(self, directory):
:type directory: String
"""
job = push_project_async(self, directory)
if job is None:
if not job:
return # there is nothing to push (or we only deleted some files)
push_project_wait(job)
push_project_finalize(job)
Expand All @@ -895,6 +928,62 @@ def pull_project(self, directory):
pull_project_wait(job)
return pull_project_finalize(job)

def sync_project(self, project_dir):
"""
Syncs project by loop with these steps:
1. Pull server version
2. Get local changes
3. Push first change batch
Repeat if there are more local changes.
The batch pushing makes use of the server ability to handle simultaneously exclusive upload (that blocks
other uploads) and non-exclusive upload (for adding assets)
"""
has_more_changes = True
while has_more_changes:
self.pull_job = pull_project_async(self, project_dir)
if self.pull_job is not None:
pull_project_wait(self.pull_job)
pull_project_finalize(self.pull_job)

changes_batch, has_more_changes = get_change_batch(self, project_dir)
if not changes_batch: # no changes to upload, quit sync
return

self.push_job = push_project_async(self, project_dir, changes_batch)
if self.push_job:
push_project_wait(self.push_job)
push_project_finalize(self.push_job)

def sync_project_with_callback(self, project_dir, progress_callback=None, sleep_time=0.1):
"""
Syncs project while sending push progress info as callback.
Sync is done in this loop:
Pending changes? -> Pull -> Get changes batch -> Push the changes -> repeat

:param progress_callback: updates the progress bar in CLI, on_progress(increment)
:param sleep_time: sleep time between calling the callback function
"""
has_more_changes = True
while has_more_changes:
self.pull_job = pull_project_async(self, project_dir)
if self.pull_job is not None:
pull_project_wait(self.pull_job)
pull_project_finalize(self.pull_job)

changes_batch, has_more_changes = get_change_batch(self, project_dir)
if not changes_batch: # no changes to upload, quit sync
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct to have the early return there? I guess if there are multiple batches, in theory there could be a single batch with just file removals (thus not uploading any data), and later some batches with data - and this early return would skip the other batches


self.push_job = push_project_async(self, project_dir, changes_batch)
if self.push_job:
last = 0
while push_project_is_running(self.push_job):
sleep(sleep_time)
now = self.push_job.transferred_size
progress_callback(now - last) # update progressbar with transferred size increment
last = now
push_project_finalize(self.push_job)

def clone_project(self, source_project_path, cloned_project_name, cloned_project_namespace=None):
"""
Clone project on server.
Expand Down
2 changes: 1 addition & 1 deletion mergin/client_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ def download_project_async(mc, project_path, directory, project_version=None):
"""

if "/" not in project_path:
raise ClientError("Project name needs to be fully qualified, e.g. <username>/<projectname>")
raise ClientError("Project name needs to be fully qualified, e.g. <workspacename>/<projectname>")
if os.path.exists(directory):
raise ClientError("Project directory already exists")
os.makedirs(directory)
Expand Down
Loading