diff --git a/README.md b/README.md index d0e98447..4ccf251c 100644 --- a/README.md +++ b/README.md @@ -81,10 +81,11 @@ Commands: share Fetch permissions to project share-add Add permissions to [users] to project share-remove Remove [users] permissions from project - show-file-changeset Displays information about project changes. - show-file-history Displays information about a single version of a... - show-version Displays information about a single version of a... + show-file-changeset Display information about project changes. + show-file-history Display information about a single version of a... + show-version Display information about a single version of a... status Show all changes in project files - upstream and... + sync Synchronize the project. ``` ### Examples @@ -99,7 +100,7 @@ To download a specific version of a project: $ mergin --username john download --version v42 john/project1 ~/mergin/project1 ``` -To download a sepecific version of a single file: +To download a specific version of a single file: 1. First you need to download the project: ``` diff --git a/examples/README.md b/examples/README.md index b41a1979..e4e953ae 100644 --- a/examples/README.md +++ b/examples/README.md @@ -5,10 +5,10 @@ Here you can find some tailored Jupyter Notebooks examples on how to interact wi These examples are split into scenarios in order to highlight some of the core capabilities of the Mergin Maps API. ## [Scenario 1](01_users.ipynb) - Users Management -On this scenario you'll create some random users from an existing CSV into Mergin Maps, change user's `ROLE` on a `WORKSPACE` and `PROJECT` perspective as well remove user's from a specific project. +In this scenario you'll create some random users from an existing CSV into Mergin Maps, change user's `ROLE` on a `WORKSPACE` and `PROJECT` perspective as well remove a user from a specific project. -## [Scenario 2](02_sync.ipynb) - Synchronization -On this scenario you'll learn on how to do basic synchronisation (`PUSH`, `PULL`) of projects with the Mergin Maps Python API client. +## [Scenario 2](02_sync.ipynb) - Synchronisation +In this scenario you'll learn how to do basic synchronisation (`PUSH`, `PULL`) of projects with the Mergin Maps Python API client. ## [Scenario 3](03_projects.ipynb) - Projects Management -On this scenario you'll learn how to manage project with the Mergin Maps Python API client, namely how to clone projects and how to separate team members in isolated projects from the cloned template. \ No newline at end of file +In this scenario you'll learn how to manage projects with Mergin Maps Python API client, namely how to clone projects and how to separate team members in isolated projects from the cloned template. diff --git a/mergin/cli.py b/mergin/cli.py index e479b886..3eb80d9d 100755 --- a/mergin/cli.py +++ b/mergin/cli.py @@ -33,8 +33,13 @@ download_project_is_running, ) from mergin.client_pull import pull_project_async, pull_project_is_running, pull_project_finalize, pull_project_cancel -from mergin.client_push import push_project_async, push_project_is_running, push_project_finalize, push_project_cancel - +from mergin.client_push import ( + push_project_is_running, + push_project_finalize, + push_project_cancel, + push_project_async, + total_upload_size, +) from pygeodiff import GeoDiff @@ -403,6 +408,42 @@ def status(ctx): pretty_summary(push_changes_summary) +@cli.command() +@click.pass_context +def sync(ctx): + """Synchronize the project. Pull latest project version from the server and push split changes.""" + mc = ctx.obj["client"] + if mc is None: + return + directory = os.getcwd() + + try: + size = total_upload_size(directory) + with click.progressbar(length=size, label="Syncing") as bar: + + def on_progress(increment): + bar.update(increment) + + # run pull & push cycles until there are no local changes + mc.sync_project_with_callback(directory, progress_callback=on_progress) + + click.secho("Sync complete.", fg="green") + + except InvalidProject as e: + click.secho("Invalid project directory ({})".format(str(e)), fg="red") + except ClientError as e: + click.secho("Error: " + str(e), fg="red") + return + except KeyboardInterrupt: + click.secho("Cancelling...") + if mc.pull_job: + pull_project_cancel(mc.pull_job) + if mc.push_job: + push_project_cancel(mc.push_job) + except Exception as e: + _print_unhandled_exception() + + @cli.command() @click.pass_context def push(ctx): @@ -422,7 +463,7 @@ def push(ctx): bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only last_transferred_size = new_transferred_size push_project_finalize(job) - click.echo("Done") + click.secho("Done", fg="green") except InvalidProject as e: click.secho("Invalid project directory ({})".format(str(e)), fg="red") except ClientError as e: @@ -456,7 +497,7 @@ def pull(ctx): bar.update(new_transferred_size - last_transferred_size) # the update() needs increment only last_transferred_size = new_transferred_size pull_project_finalize(job) - click.echo("Done") + click.secho("Done", fg="green") except InvalidProject as e: click.secho("Invalid project directory ({})".format(str(e)), fg="red") except ClientError as e: @@ -473,7 +514,7 @@ def pull(ctx): @click.argument("version") @click.pass_context def show_version(ctx, version): - """Displays information about a single version of a project. `version` is 'v1', 'v2', etc.""" + """Display information about a single version of a project. `version` is 'v1', 'v2', etc.""" mc = ctx.obj["client"] if mc is None: return @@ -492,7 +533,7 @@ def show_version(ctx, version): @click.argument("path") @click.pass_context def show_file_history(ctx, path): - """Displays information about a single version of a project.""" + """Display information about a single version of a project.""" mc = ctx.obj["client"] if mc is None: return @@ -516,7 +557,7 @@ def show_file_history(ctx, path): @click.argument("version") @click.pass_context def show_file_changeset(ctx, path, version): - """Displays information about project changes.""" + """Display information about project changes.""" mc = ctx.obj["client"] if mc is None: return diff --git a/mergin/client.py b/mergin/client.py index f7ff4de9..4d8b75dd 100644 --- a/mergin/client.py +++ b/mergin/client.py @@ -1,4 +1,6 @@ import logging +from time import sleep + import math import os import json @@ -31,9 +33,17 @@ download_project_finalize, download_project_wait, download_diffs_finalize, + pull_project_async, + pull_project_wait, + pull_project_finalize, +) +from .client_push import ( + push_project_wait, + push_project_finalize, + push_project_async, + push_project_is_running, + get_change_batch, ) -from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize -from .client_push import push_project_async, push_project_wait, push_project_finalize from .utils import DateTimeEncoder, get_versions_with_file_changes, int_version, is_version_acceptable from .version import __version__ @@ -45,6 +55,10 @@ class TokenError(Exception): pass +class AuthTokenExpiredError(Exception): + pass + + class ServerType(Enum): OLD = auto() # Server is old and does not support workspaces CE = auto() # Server is Community Edition @@ -80,8 +94,16 @@ class MerginClient: Currently, only HTTP proxies are supported. """ - def __init__(self, url=None, auth_token=None, login=None, password=None, plugin_version=None, proxy_config=None): - self.url = url if url is not None else MerginClient.default_url() + def __init__( + self, + url=None, + auth_token=None, + login=None, + password=None, + plugin_version=None, + proxy_config=None, + ): + self.url = (url if url is not None else MerginClient.default_url()).rstrip("/") + "/" self._auth_params = None self._auth_session = None self._user_info = None @@ -91,6 +113,8 @@ def __init__(self, url=None, auth_token=None, login=None, password=None, plugin_ if plugin_version is not None: # this could be e.g. "Plugin/2020.1 QGIS/3.14" self.client_version += " " + plugin_version self.setup_logging() + self.pull_job = None + self.push_job = None if auth_token: try: token_data = decode_token_data(auth_token) @@ -190,11 +214,19 @@ def wrapper(self, *args): delta = self._auth_session["expire"] - datetime.now(timezone.utc) if delta.total_seconds() < 5: self.log.info("Token has expired - refreshing...") - self.login(self._auth_params["login"], self._auth_params["password"]) + if self._auth_params.get("login", None) and self._auth_params.get("password", None): + self.log.info("Token has expired - refreshing...") + self.login(self._auth_params["login"], self._auth_params["password"]) + else: + raise AuthTokenExpiredError("Token has expired - please re-login") else: # Create a new authorization token self.log.info(f"No token - login user: {self._auth_params['login']}") - self.login(self._auth_params["login"], self._auth_params["password"]) + if self._auth_params.get("login", None) and self._auth_params.get("password", None): + self.login(self._auth_params["login"], self._auth_params["password"]) + else: + raise ClientError("Missing login or password") + return f(self, *args) return wrapper @@ -453,7 +485,8 @@ def create_project(self, project_name, is_public=False, namespace=None): def create_project_and_push(self, project_name, directory, is_public=False, namespace=None): """ - Convenience method to create project and push the initial version right after that. + Convenience method to create project and push the the files right after that. + Creates two versions when directory contains blocking and non-blocking changes. :param project_name: Project's full name (/) :type project_name: String @@ -877,7 +910,7 @@ def push_project(self, directory): :type directory: String """ job = push_project_async(self, directory) - if job is None: + if not job: return # there is nothing to push (or we only deleted some files) push_project_wait(job) push_project_finalize(job) @@ -895,6 +928,62 @@ def pull_project(self, directory): pull_project_wait(job) return pull_project_finalize(job) + def sync_project(self, project_dir): + """ + Syncs project by loop with these steps: + 1. Pull server version + 2. Get local changes + 3. Push first change batch + Repeat if there are more local changes. + The batch pushing makes use of the server ability to handle simultaneously exclusive upload (that blocks + other uploads) and non-exclusive upload (for adding assets) + """ + has_more_changes = True + while has_more_changes: + self.pull_job = pull_project_async(self, project_dir) + if self.pull_job is not None: + pull_project_wait(self.pull_job) + pull_project_finalize(self.pull_job) + + changes_batch, has_more_changes = get_change_batch(self, project_dir) + if not changes_batch: # no changes to upload, quit sync + return + + self.push_job = push_project_async(self, project_dir, changes_batch) + if self.push_job: + push_project_wait(self.push_job) + push_project_finalize(self.push_job) + + def sync_project_with_callback(self, project_dir, progress_callback=None, sleep_time=0.1): + """ + Syncs project while sending push progress info as callback. + Sync is done in this loop: + Pending changes? -> Pull -> Get changes batch -> Push the changes -> repeat + + :param progress_callback: updates the progress bar in CLI, on_progress(increment) + :param sleep_time: sleep time between calling the callback function + """ + has_more_changes = True + while has_more_changes: + self.pull_job = pull_project_async(self, project_dir) + if self.pull_job is not None: + pull_project_wait(self.pull_job) + pull_project_finalize(self.pull_job) + + changes_batch, has_more_changes = get_change_batch(self, project_dir) + if not changes_batch: # no changes to upload, quit sync + return + + self.push_job = push_project_async(self, project_dir, changes_batch) + if self.push_job: + last = 0 + while push_project_is_running(self.push_job): + sleep(sleep_time) + now = self.push_job.transferred_size + progress_callback(now - last) # update progressbar with transferred size increment + last = now + push_project_finalize(self.push_job) + def clone_project(self, source_project_path, cloned_project_name, cloned_project_namespace=None): """ Clone project on server. diff --git a/mergin/client_pull.py b/mergin/client_pull.py index a46e6f96..45fd696e 100644 --- a/mergin/client_pull.py +++ b/mergin/client_pull.py @@ -129,7 +129,7 @@ def download_project_async(mc, project_path, directory, project_version=None): """ if "/" not in project_path: - raise ClientError("Project name needs to be fully qualified, e.g. /") + raise ClientError("Project name needs to be fully qualified, e.g. /") if os.path.exists(directory): raise ClientError("Project directory already exists") os.makedirs(directory) diff --git a/mergin/client_push.py b/mergin/client_push.py index 885db9ac..2c9594db 100644 --- a/mergin/client_push.py +++ b/mergin/client_push.py @@ -15,16 +15,18 @@ import tempfile import concurrent.futures import os +from typing import Dict, List, Optional, Tuple from .common import UPLOAD_CHUNK_SIZE, ClientError from .merginproject import MerginProject -from .editor import filter_changes +from .editor import is_editor_enabled, _apply_editor_filters +from .utils import is_qgis_file, is_versioned_file class UploadJob: """Keeps all the important data about a pending upload job""" - def __init__(self, project_path, changes, transaction_id, mp, mc, tmp_dir): + def __init__(self, project_path, changes, transaction_id, mp, mc, tmp_dir, exclusive: bool): self.project_path = project_path # full project name ("username/projectname") self.changes = changes # dictionary of local changes to the project self.transaction_id = transaction_id # ID of the transaction assigned by the server @@ -34,6 +36,7 @@ def __init__(self, project_path, changes, transaction_id, mp, mc, tmp_dir): self.mp = mp # MerginProject instance self.mc = mc # MerginClient instance self.tmp_dir = tmp_dir # TemporaryDirectory instance for any temp file we need + self.exclusive = exclusive # flag whether this upload blocks other uploads self.is_cancelled = False # whether upload has been cancelled self.executor = None # ThreadPoolExecutor that manages background upload tasks self.futures = [] # list of futures submitted to the executor @@ -82,8 +85,109 @@ def upload_blocking(self, mc, mp): raise ClientError("Mismatch between uploaded file chunk {} and local one".format(self.chunk_id)) -def push_project_async(mc, directory): - """Starts push of a project and returns pending upload job""" +class ChangesHandler: + """ + Handles preparation of file changes to be uploaded to the server. + + This class is responsible for: + - Filtering project file changes. + - Splitting changes into blocking and non-blocking groups. + - TODO: Applying limits such as max file count or size to break large uploads into smaller batches. + - Generating upload-ready change groups for asynchronous job creation. + """ + + def __init__(self, client, project_dir): + self.client = client + self.mp = MerginProject(project_dir) + self._raw_changes = self.mp.get_push_changes() + + @staticmethod + def is_blocking_file(file): + return is_qgis_file(file["path"]) or is_versioned_file(file["path"]) + + def _split_by_type(self, changes: Dict[str, List[dict]]) -> List[Dict[str, List[dict]]]: + """ + Split raw filtered changes into two batches: + 1. Blocking: updated/removed and added files that are blocking + 2. Non-blocking: added files that are not blocking + """ + blocking_changes = {"added": [], "updated": [], "removed": []} + non_blocking_changes = {"added": [], "updated": [], "removed": []} + + for f in changes.get("added", []): + if self.is_blocking_file(f): + blocking_changes["added"].append(f) + else: + non_blocking_changes["added"].append(f) + + for f in changes.get("updated", []): + blocking_changes["updated"].append(f) + + for f in changes.get("removed", []): + blocking_changes["removed"].append(f) + + result = [] + if any(len(v) for v in blocking_changes.values()): + result.append(blocking_changes) + if any(len(v) for v in non_blocking_changes.values()): + result.append(non_blocking_changes) + + return result + + def split(self) -> List[Dict[str, List[dict]]]: + """ + Applies all configured internal filters and returns a list of change ready to be uploaded. + """ + project_name = self.mp.project_full_name() + try: + project_info = self.client.project_info(project_name) + except ClientError as e: + self.mp.log.error(f"Failed to get project info for project {project_name}: {e}") + raise + changes = filter_changes(self.client, project_info, self._raw_changes) + changes_list = self._split_by_type(changes) + # TODO: apply limits; changes = self._limit_by_file_count(changes) + return changes_list + + +def filter_changes(mc, project_info, changes: Dict[str, List[dict]]) -> Dict[str, List[dict]]: + """ + Filters the given changes dictionary based on the editor's enabled state. + + If the editor is not enabled, the changes dictionary is returned as-is. Otherwise, the changes are passed through the `_apply_editor_filters` method to apply any configured filters. + + Args: + changes (dict[str, list[dict]]): A dictionary mapping file paths to lists of change dictionaries. + + Returns: + dict[str, list[dict]]: The filtered changes dictionary. + """ + if is_editor_enabled(mc, project_info): + changes = _apply_editor_filters(changes) + return changes + + +def get_change_batch(mc, project_dir) -> Tuple[Optional[Dict[str, List[dict]]], bool]: + """ + Return the next changes dictionary and flag if there are more changes (to be uploaded in the next upload job) + """ + changes_list = ChangesHandler(mc, project_dir).split() + if not changes_list: + return None, False + non_empty_length = sum(any(v for v in d.values()) for d in changes_list) + return changes_list[0], non_empty_length > 1 + + +def push_project_async(mc, directory, changes=None) -> Optional[UploadJob]: + """ + Starts push in background and returns pending upload job. + Pushes all project changes unless change_batch is provided. + When specific change is provided, initial version check is skipped (the pull has just been done). + + :param changes: The changes to upload are either (1) provided (and already split to blocking and bob-blocking batches) + or (2) all local changes are retrieved to upload + Pushing only non-blocking changes results in non-exclusive upload which server allows to be concurrent. + """ mp = MerginProject(directory) if mp.has_unfinished_pull(): @@ -95,34 +199,35 @@ def push_project_async(mc, directory): mp.log.info("--- version: " + mc.user_agent_info()) mp.log.info(f"--- start push {project_path}") - try: - project_info = mc.project_info(project_path) - except ClientError as err: - mp.log.error("Error getting project info: " + str(err)) - mp.log.info("--- push aborted") - raise - server_version = project_info["version"] if project_info["version"] else "v0" - - mp.log.info(f"got project info: local version {local_version} / server version {server_version}") - - username = mc.username() - # permissions field contains information about update, delete and upload privileges of the user - # on a specific project. This is more accurate information then "writernames" field, as it takes - # into account namespace privileges. So we have to check only "permissions", namely "upload" one - if not mc.has_writing_permissions(project_path): - mp.log.error(f"--- push {project_path} - username {username} does not have write access") - raise ClientError(f"You do not seem to have write access to the project (username '{username}')") - - if local_version != server_version: - mp.log.error(f"--- push {project_path} - not up to date (local {local_version} vs server {server_version})") - raise ClientError( - "There is a new version of the project on the server. Please update your local copy." - + f"\n\nLocal version: {local_version}\nServer version: {server_version}" - ) + # if we have specific change to push we don't need version check + if not changes: + try: + project_info = mc.project_info(project_path) + except ClientError as err: + mp.log.error("Error getting project info: " + str(err)) + mp.log.info("--- push aborted") + raise + server_version = project_info["version"] if project_info["version"] else "v0" + + mp.log.info(f"got project info: local version {local_version} / server version {server_version}") + + username = mc.username() + # permissions field contains information about update, delete and upload privileges of the user + # on a specific project. This is more accurate information than "writernames" field, as it takes + # into account namespace privileges. So we have to check only "permissions", namely "upload" once + if not mc.has_writing_permissions(project_path): + mp.log.error(f"--- push {project_path} - username {username} does not have write access") + raise ClientError(f"You do not seem to have write access to the project (username '{username}')") + + if local_version != server_version: + mp.log.error(f"--- push {project_path} - not up to date (local {local_version} vs server {server_version})") + raise ClientError( + "There is a new version of the project on the server. Please update your local copy." + + f"\n\nLocal version: {local_version}\nServer version: {server_version}" + ) + changes = filter_changes(mc, project_info, mp.get_push_changes()) - changes = mp.get_push_changes() - changes = filter_changes(mc, project_info, changes) - mp.log.debug("push changes:\n" + pprint.pformat(changes)) + mp.log.debug("push change:\n" + pprint.pformat(changes)) tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-") @@ -139,7 +244,7 @@ def push_project_async(mc, directory): if mp.is_versioned_file(f["path"]): mp.copy_versioned_file_for_upload(f, tmp_dir.name) - if not sum(len(v) for v in changes.values()): + if not any(len(v) for v in changes.values()): mp.log.info(f"--- push {project_path} - nothing to do") return @@ -163,7 +268,8 @@ def push_project_async(mc, directory): upload_files = data["changes"]["added"] + data["changes"]["updated"] transaction_id = server_resp["transaction"] if upload_files else None - job = UploadJob(project_path, changes, transaction_id, mp, mc, tmp_dir) + exclusive = server_resp.get("blocking", True) + job = UploadJob(project_path, changes, transaction_id, mp, mc, tmp_dir, exclusive) if not upload_files: mp.log.info("not uploading any files") @@ -171,7 +277,7 @@ def push_project_async(mc, directory): push_project_finalize(job) return None # all done - no pending job - mp.log.info(f"got transaction ID {transaction_id}") + mp.log.info(f"got transaction ID {transaction_id}, {'exclusive' if exclusive else 'non-exclusive'} upload") upload_queue_items = [] total_size = 0 @@ -264,7 +370,9 @@ def push_project_finalize(job): if with_upload_of_files: try: - job.mp.log.info(f"Finishing transaction {job.transaction_id}") + job.mp.log.info( + f"Finishing {'exclusive' if job.exclusive else 'non-exclusive'} transaction {job.transaction_id}" + ) resp = job.mc.post("/v1/project/push/finish/%s" % job.transaction_id) job.server_resp = json.load(resp) except ClientError as err: @@ -334,3 +442,15 @@ def remove_diff_files(job) -> None: diff_file = job.mp.fpath_meta(change["diff"]["path"]) if os.path.exists(diff_file): os.remove(diff_file) + + +def total_upload_size(directory) -> int: + """ + Total up the number of bytes that need to be uploaded. + """ + mp = MerginProject(directory) + changes = mp.get_push_changes() + files = changes.get("added", []) + changes.get("updated", []) + size = sum(f.get("diff", {}).get("size", f.get("size", 0)) for f in files) + mp.log.info(f"Upload size of all files is {size}") + return size diff --git a/mergin/common.py b/mergin/common.py index 9e65a6ce..c6818417 100644 --- a/mergin/common.py +++ b/mergin/common.py @@ -15,7 +15,7 @@ this_dir = os.path.dirname(os.path.realpath(__file__)) -# Error code from the public API, add to the end of enum as we handle more eror +# Error code from the public API, add to the end of enum as we handle more error class ErrorCode(Enum): ProjectsLimitHit = "ProjectsLimitHit" StorageLimitHit = "StorageLimitHit" diff --git a/mergin/editor.py b/mergin/editor.py index 237b0ea1..aa6420c5 100644 --- a/mergin/editor.py +++ b/mergin/editor.py @@ -1,7 +1,7 @@ from itertools import filterfalse from typing import Callable, Dict, List -from .utils import is_mergin_config, is_qgis_file, is_versioned_file +from .utils import is_qgis_file EDITOR_ROLE_NAME = "editor" @@ -40,23 +40,6 @@ def _apply_editor_filters(changes: Dict[str, List[dict]]) -> Dict[str, List[dict return changes -def filter_changes(mc, project_info: dict, changes: Dict[str, List[dict]]) -> Dict[str, List[dict]]: - """ - Filters the given changes dictionary based on the editor's enabled state. - - If the editor is not enabled, the changes dictionary is returned as-is. Otherwise, the changes are passed through the `_apply_editor_filters` method to apply any configured filters. - - Args: - changes (dict[str, list[dict]]): A dictionary mapping file paths to lists of change dictionaries. - - Returns: - dict[str, list[dict]]: The filtered changes dictionary. - """ - if not is_editor_enabled(mc, project_info): - return changes - return _apply_editor_filters(changes) - - def prevent_conflicted_copy(path: str, mc, project_info: dict) -> bool: """ Decides whether a file path should be blocked from creating a conflicted copy. diff --git a/mergin/merginproject.py b/mergin/merginproject.py index 83d8510a..933aa408 100644 --- a/mergin/merginproject.py +++ b/mergin/merginproject.py @@ -1,5 +1,6 @@ import json import logging + import math import os import re @@ -7,6 +8,7 @@ import uuid import tempfile from datetime import datetime +from typing import List, Dict from dateutil.tz import tzlocal from .editor import prevent_conflicted_copy @@ -314,17 +316,14 @@ def inspect_files(self): ) return files_meta - def compare_file_sets(self, origin, current): + def compare_file_sets(self, origin, current) -> Dict[str, List[dict]]: """ - Helper function to calculate difference between two sets of files metadata using file names and checksums. - + Calculate difference between two sets of files metadata using file names and checksums. :Example: - >>> origin = [{'checksum': '08b0e8caddafe74bf5c11a45f65cedf974210fed', 'path': 'base.gpkg', 'size': 2793, 'mtime': '2019-08-26T11:08:34.051221+02:00'}] >>> current = [{'checksum': 'c9a4fd2afd513a97aba19d450396a4c9df8b2ba4', 'path': 'test.qgs', 'size': 31980, 'mtime': '2019-08-26T11:09:30.051221+02:00'}] >>> self.compare_file_sets(origin, current) - {"added": [{'checksum': 'c9a4fd2afd513a97aba19d450396a4c9df8b2ba4', 'path': 'test.qgs', 'size': 31980, 'mtime': '2019-08-26T11:09:30.051221+02:00'}], "removed": [[{'checksum': '08b0e8caddafe74bf5c11a45f65cedf974210fed', 'path': 'base.gpkg', 'size': 2793, 'mtime': '2019-08-26T11:08:34.051221+02:00'}]], "renamed": [], "updated": []} - + {"added": [{'checksum': 'c9a4fd2afd513a97aba19d450396a4c9df8b2ba4', 'path': 'test.qgs', 'size': 31980, 'mtime': '2019-08-26T11:09:30.051221+02:00'}], "removed": [[{'checksum': '08b0e8caddafe74bf5c11a45f65cedf974210fed', 'path': 'base.gpkg', 'size': 2793, 'mtime': '2019-08-26T11:08:34.051221+02:00'}]], "updated": []} :param origin: origin set of files metadata :type origin: list[dict] :param current: current set of files metadata to be compared against origin @@ -347,7 +346,7 @@ def compare_file_sets(self, origin, current): f["origin_checksum"] = origin_map[path]["checksum"] updated.append(f) - return {"renamed": [], "added": added, "removed": removed, "updated": updated} + return {"added": added, "removed": removed, "updated": updated} def get_pull_changes(self, server_files): """ @@ -405,7 +404,7 @@ def get_pull_changes(self, server_files): changes["updated"] = [f for f in changes["updated"] if f not in not_updated] return changes - def get_push_changes(self): + def get_push_changes(self) -> Dict[str, List[dict]]: """ Calculate changes needed to be pushed to server. @@ -427,7 +426,7 @@ def get_push_changes(self): file["checksum"] = checksum file["chunks"] = [str(uuid.uuid4()) for i in range(math.ceil(file["size"] / UPLOAD_CHUNK_SIZE))] - # need to check for for real changes in geodiff files using geodiff tool (comparing checksum is not enough) + # need to check for real changes in geodiff files using geodiff tool (comparing checksum is not enough) not_updated = [] for file in changes["updated"]: path = file["path"] @@ -688,7 +687,7 @@ def apply_push_changes(self, changes): """ For geodiff files update basefiles according to changes pushed to server. - :param changes: metadata for pulled files + :param changes: metadata for pushed files :type changes: dict[str, list[dict]] """ for k, v in changes.items(): diff --git a/mergin/test/test_client.py b/mergin/test/test_client.py index 31de795f..5d095e97 100644 --- a/mergin/test/test_client.py +++ b/mergin/test/test_client.py @@ -5,11 +5,13 @@ import tempfile import subprocess import shutil +from collections import defaultdict from datetime import datetime, timedelta, date import pytest import pytz import sqlite3 import glob +import concurrent.futures from .. import InvalidProject from ..client import ( @@ -21,7 +23,7 @@ TokenError, ServerType, ) -from ..client_push import push_project_async, push_project_cancel +from ..client_push import push_project_cancel, ChangesHandler, push_project_async, filter_changes from ..client_pull import ( download_project_async, download_project_wait, @@ -38,7 +40,7 @@ ) from ..merginproject import pygeodiff from ..report import create_report -from ..editor import EDITOR_ROLE_NAME, filter_changes, is_editor_enabled +from ..editor import EDITOR_ROLE_NAME, is_editor_enabled from ..common import ErrorCode, WorkspaceRole, ProjectRole SERVER_URL = os.environ.get("TEST_MERGIN_URL") @@ -311,7 +313,8 @@ def test_push_pull_changes(mc): # renamed file will result in removed + added file assert next((f for f in push_changes["removed"] if f["path"] == f_renamed), None) assert next((f for f in push_changes["added"] if f["path"] == "renamed.txt"), None) - assert not pull_changes["renamed"] # not supported + assert not pull_changes.get("renamed") # not supported + assert not push_changes.get("renamed") # not supported mc.push_project(project_dir) @@ -2586,15 +2589,6 @@ def test_editor(mc: MerginClient): project_info["role"] = EDITOR_ROLE_NAME assert is_editor_enabled(mc, project_info) is True - # unit test for editor methods - qgs_changeset = { - "added": [{"path": "/folder/project.new.Qgz"}], - "updated": [{"path": "/folder/project.updated.Qgs"}], - "removed": [{"path": "/folder/project.removed.qgs"}], - } - qgs_changeset = filter_changes(mc, project_info, qgs_changeset) - assert sum(len(v) for v in qgs_changeset.values()) == 2 - def test_editor_push(mc: MerginClient, mc2: MerginClient): """Test push with editor""" @@ -2871,3 +2865,140 @@ def server_config(self): with pytest.raises(ClientError, match="The requested URL was not found on the server"): mc.send_logs(logs_path) + + +def test_mc_without_login(): + + # client without login should be able to access server config + mc = MerginClient(SERVER_URL) + config = mc.server_config() + assert config + assert isinstance(config, dict) + assert "server_configured" in config + assert config["server_configured"] + + # without login should not be able to access workspaces + with pytest.raises(ClientError) as e: + mc.workspaces_list() + + assert e.value.http_error == 401 + assert e.value.detail == "Authentication information is missing or invalid." + + +def _sort_dict_of_files_by_path(d): + return {k: sorted(v, key=lambda f: f["path"]) for k, v in d.items()} + + +def test_changes_handler(mc): + """ + Test methods of the ChangesHandler class + """ + # test _split_by_type + test_project = "test_changes_handler" + project = API_USER + "/" + test_project + project_dir = os.path.join(TMP_DIR, test_project) + cleanup(mc, project, [project_dir]) + shutil.copytree(TEST_DATA_DIR, project_dir) + mc.create_project(project) + project_info = mc.project_info(project) + mp = MerginProject(project_dir) + mp.write_metadata(project_dir, project_info) + + mixed_changes = mp.get_push_changes() + changes_handler = ChangesHandler(mc, project_dir) + split_changes = changes_handler._split_by_type(mixed_changes) + assert len(split_changes) == 2 + # all blocking files in the first dict and no blocking file in the second dict + assert all(ChangesHandler.is_blocking_file(f) for files in split_changes[0].values() for f in files) + assert all(not ChangesHandler.is_blocking_file(f) for files in split_changes[1].values() for f in files) + # merge the split changes dicts back into a single dict and check files are the same + merged = defaultdict(list) + for d in split_changes: + for k, v in d.items(): + merged[k].extend(v) + assert _sort_dict_of_files_by_path(merged) == _sort_dict_of_files_by_path(mixed_changes) + + # test filter_changes + project_info = {"role": EDITOR_ROLE_NAME} + qgs_changeset = { + "added": [{"path": "/folder/project.new.Qgz"}], + "updated": [{"path": "/folder/project.updated.Qgs"}], + "removed": [{"path": "/folder/project.removed.qgs"}], + } + qgs_changeset = filter_changes(mc, project_info, qgs_changeset) + assert sum(len(v) for v in qgs_changeset.values()) == 2 + + +def create_dummy_photos(dir_path, count=20, size_kb=5000): + """Create `count` dummy JPG files in `dir_path` with ~`size_kb` each.""" + os.makedirs(dir_path, exist_ok=True) + for i in range(count): + filename = os.path.join(dir_path, f"photo_{i:03}.jpg") + with open(filename, "wb") as f: + f.write(os.urandom(size_kb * 1024)) # Random bytes to simulate real file + + +files_to_push = [ + ( + "base.gpkg", + "inserted_1_A.gpkg", + False, + "another_process", + ), # both pushes are exclusive, the latter one is refused + # ( + # "inserted_1_A.gpkg", + # "test.txt", + # False, + # "version_conflict", + # ), # small files pushed at the same time might result in version conflict due to race condition + ("inserted_1_A.gpkg", "many_photos", True, None), # the upload of many photos does not block the other upload +] + + +@pytest.mark.parametrize("file1,file2,success,fail_reason", files_to_push) +def test_sync_project(mc, mc2, file1, file2, success, fail_reason): + """ + Test two clients pushing at the same time + """ + test_project_name = "test_sync_project" + project_dir1 = os.path.join(TMP_DIR, test_project_name + "_1") + project_dir2 = os.path.join(TMP_DIR, test_project_name + "_2") + + project_full_name = API_USER + "/" + test_project_name + cleanup(mc, project_full_name, [project_dir1, project_dir2]) + mc.create_project(test_project_name) + project_info = mc.project_info(project_full_name) + mc.download_project(project_full_name, project_dir1) + mc.add_project_collaborator(project_info["id"], API_USER2, ProjectRole.WRITER) + mc2.download_project(project_full_name, project_dir2) + + def sync1(): + mc.sync_project(project_dir1) + + def sync2(): + mc2.sync_project(project_dir2) + + shutil.copy(os.path.join(TEST_DATA_DIR, file1), project_dir1) + if file2 == "many_photos": + create_dummy_photos(project_dir2) + else: + shutil.copy(os.path.join(TEST_DATA_DIR, file2), project_dir2) + + with concurrent.futures.ThreadPoolExecutor() as executor: + future1 = executor.submit(sync1) + future2 = executor.submit(sync2) + exc2 = future2.exception() + exc1 = future1.exception() + + if not success: + error = exc1 if exc1 else exc2 # one is uploads is lucky to pass the other was slow + assert exc1 is None or exc2 is None + assert isinstance(error, ClientError) + if fail_reason == "another_process": + assert error.http_error == 400 + assert error.detail == "Another process is running. Please try later." + elif fail_reason == "version_conflict": + assert error.http_error == 409 + assert error.detail == "There is already version with this name v1" + else: + assert not (exc1 or exc2) diff --git a/scripts/update_version.py b/scripts/update_version.py index 184d6a8a..20206593 100644 --- a/scripts/update_version.py +++ b/scripts/update_version.py @@ -4,7 +4,7 @@ def replace_in_file(filepath, regex, sub): - with open(filepath, 'r') as f: + with open(filepath, "r") as f: content = f.read() content_new = re.sub(regex, sub, content, flags=re.M) @@ -15,14 +15,14 @@ def replace_in_file(filepath, regex, sub): dir_path = os.path.dirname(os.path.realpath(__file__)) parser = argparse.ArgumentParser() -parser.add_argument('--version', help='version to replace') +parser.add_argument("--version", help="version to replace") args = parser.parse_args() ver = args.version print("using version " + ver) about_file = os.path.join(dir_path, os.pardir, "mergin", "version.py") print("patching " + about_file) -replace_in_file(about_file, "__version__\s=\s\".*", "__version__ = \"" + ver + "\"") +replace_in_file(about_file, '__version__\s=\s".*', '__version__ = "' + ver + '"') setup_file = os.path.join(dir_path, os.pardir, "setup.py") print("patching " + setup_file) diff --git a/setup.py b/setup.py index 5bc34aca..2e3a4fb4 100644 --- a/setup.py +++ b/setup.py @@ -4,35 +4,31 @@ from setuptools import setup, find_packages setup( - name='mergin-client', - version='0.10.0', - url='https://github.com/MerginMaps/python-api-client', - license='MIT', - author='Lutra Consulting Ltd.', - author_email='info@merginmaps.com', - description='Mergin Maps utils and client', - long_description='Mergin Maps utils and client', - + name="mergin-client", + version="0.10.0", + url="https://github.com/MerginMaps/python-api-client", + license="MIT", + author="Lutra Consulting Ltd.", + author_email="info@merginmaps.com", + description="Mergin Maps utils and client", + long_description="Mergin Maps utils and client", packages=find_packages(), - - platforms='any', + platforms="any", install_requires=[ - 'python-dateutil==2.8.2', - 'pygeodiff==2.0.4', - 'pytz==2022.1', - 'click==8.1.3', + "python-dateutil==2.8.2", + "pygeodiff==2.0.4", + "pytz==2022.1", + "click==8.1.3", ], - entry_points={ - 'console_scripts': ['mergin=mergin.cli:cli'], + "console_scripts": ["mergin=mergin.cli:cli"], }, - classifiers=[ - 'Development Status :: 5 - Production/Stable', - 'Intended Audience :: Developers', - 'License :: OSI Approved :: MIT License', - 'Operating System :: OS Independent', - 'Programming Language :: Python :: 3' + "Development Status :: 5 - Production/Stable", + "Intended Audience :: Developers", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", + "Programming Language :: Python :: 3", ], - package_data={'mergin': ['cert.pem']} + package_data={"mergin": ["cert.pem"]}, )