Skip to content

Split push for blocking and non-blocking #244

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 31 commits into
base: master
Choose a base branch
from

Conversation

harminius
Copy link
Contributor

Resolves #240

@harminius
Copy link
Contributor Author

Follows up MerginMaps/server#464

Separates push changes to blocking and non-blocking changes that the server can handle concurrently.

@harminius harminius changed the title Draft: Split push for blocking and non-blocking Split push for blocking and non-blocking Jun 26, 2025
@harminius harminius requested a review from MarcelGeo June 26, 2025 13:21
Copy link
Contributor

@MarcelGeo MarcelGeo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good staff here. I am not sure if we want to run each job and wait for him and then pull changes and run it again. If yes, we could make some tweak to logic.

k: sorted(v, key=lambda f: f["path"]) for k, v in d.items()
}

def test_changes_handler(mc):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks fine. Just add some integration test to make 2 pushes in 2 threads (?). I think we could use mc and mc2 for that shit :)

@@ -2440,7 +2446,7 @@ def test_project_rename(mc: MerginClient):

# validate project info
project_info = mc.project_info(project_renamed)
assert project_info["version"] == "v1"
assert project_info["version"] == "v2" # teo version created in initial push
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can get rid of testing direct versin number from some places in tests. I think we do not need to test it in project_rename. We could stay with that in tests which are related to push testing directly. Just idea :)

assert "Wrong version parameters" in str(e.value)
assert "Available versions: [1, 2, 3, 4]" in str(e.value)
assert "Available versions: [1, 3, 4, 5]" in str(e.value)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I probably do not understand, but why 2 is missing here? :)

1. Blocking: updated/removed and added files that are blocking
2. Non-blocking: added files that are not blocking
"""
blocking_changes = {"added": [], "updated": [], "removed": [], "renamed": []}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we get rid of legacy "renamed", it is not used for a while

mp.log.info(f"will upload {len(upload_queue_items)} items with total size {total_size}")

# start uploads in background
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this would already start sending some chunks from second upload before first one is finalized

I would rather keep push_project_async for single job only (including start/upload/finalize) and process split changes sequentially as number of serial push_project_async calls with or without pull before each (I do not know, in principle for non-blocking push you should not need to pull)

@harminius
Copy link
Contributor Author

When two clients push at the same time, one client may remove changes uploaded by the other one. See merged logs from clieant A and client B.
We should probably introduce sync method that pulls before:

merged_logs.txt

@MarcelGeo
Copy link
Contributor

MarcelGeo commented Jul 8, 2025

We want to make some changes to the current PR. I believe these changes will not significantly break the existing logic.

  • Implement a function that will be used in both the plugin and client for splitting changes, which can call the UploadChanges class:
def get_next_batch(project_dir) -> Tuple[Dict[...], bool]:
    """
    Return the next dictionary with changes, similar to changes[0] in push_project_async.
    """
    # TODO
    return {"added": [], "updated": [], "removed": []}, True
  • Do not rename the push_project_async method.

  • Add a new argument changes: Dict{updated, removed, added} to push_project_async. If not provided, use the method to get current changes as before. We can then use this argument in the plugin and Python API client methods.

  • Introduce the MerginClient.sync_project() method with logic for incrementally pulling and pushing changes (to resolve Split push for blocking and non-blocking #244 (comment)):

def sync_project(self, project_dir):   # in MerginClient
    has_more_changes = True
    while has_more_changes:
        # changes_batch = dict ("added"/"updated"/"deleted" with list of file names)
        changes_batch, has_more_changes = get_next_push_batch(project_dir)

        pull_job = pull_project_async(project_dir)
        pull_project_wait(pull_job)
        pull_project_finalize(pull_job)

        push_job = push_project_async(project_dir, changes_batch)
        push_project_wait(push_job)
        push_project_finalize(push_job)
  • Implement a similar method as above, with the possibility of passing a callback argument to get the current job state for a progress bar:
def sync_project_with_callback(self, project_dir, callback=None, sleep_time=100):
    has_more_changes = True
    while has_more_changes:

        pull_job = pull_project_async(project_dir)
        while pull_project_is_running(pull_job):
            sleep(sleep_time)
            callback()
        pull_project_finalize(pull_job)

        changes_batch, has_more_changes = get_next_push_batch(project_dir)

        push_job = push_project_async(project_dir, changes_batch)
        while push_project_is_running(push_job):
            sleep(sleep_time)
            callback()
        push_project_finalize(push_job)
  • Introduce a simple CLI command mergin sync which will perform the above sync logic.

  • @wonder-sk suggests that we do not need to implement a mergin push command with this new sync logic and should leave it as it was before (@tomasMizera). Users can use mergin sync instead.

Plugin-related notes

  • Use similar logic, which will be implemented in MerginClient.sync_project, for pushing changes in the plugin (reference).
  • Use get_next_batch and loop until the change is None.

@JanCaha

We do not want to implement any magic function for sync that can be used in the plugin, because there is different logic with dialogs, etc.

@tomasMizera
Copy link
Contributor

Why can't we use the magic sync_project_with_callback method from the plugin? We know when things fail, succeed and with the callback, we should know about the progress too. @MarcelGeo @wonder-sk

@MarcelGeo
Copy link
Contributor

Why can't we use the magic sync_project_with_callback method from the plugin? We know when things fail, succeed and with the callback, we should know about the progress too. @MarcelGeo @wonder-sk

Yeah we discussed that together with @wonder-sk . Since the plugin has completely custom logic for handling pull and push jobs, rewriting all of it won't be simple.

@harminius
Copy link
Contributor Author

I'd prefer creating a new push function which would be called only from the sync method. project_pusuh_async is designed for push only intention (e.g. throws error when server_version != local_version), however, we need some function aware of the sync context (e.g. call pull or self for version mismatch)

@MarcelGeo
Copy link
Contributor

MarcelGeo commented Jul 10, 2025

If we will create test cases for new sync_project... I am approving this 👍 good job guys (@harminius )

Copy link
Contributor

@MarcelGeo MarcelGeo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add tests and probably revert old ones. Make small tweaks for progress bar here and varialbes which we discussed together.

mergin/client.py Outdated
if self.pull_job is None:
return
pull_project_wait(self.pull_job)
pull_project_finalize(self.pull_job)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use callback also for pull - it can take a while

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Callback to update a bar progress? Which bar? Do you want a common bar for pull and push, two separate bars for each, or multiple bars for every job? We can estimate the total push size quite precisely but the total pull size might change significantly in time.

This class is responsible for:
- Filtering project file changes.
- Splitting changes into blocking and non-blocking groups.
- TODO: Applying limits such as max file count or size to break large uploads into smaller batches.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we going to do that in this PR? it may affect the class API as well...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not in this PR


self.push_job = push_project_async(self, project_dir, changes_batch)
if not self.push_job:
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it correct to have the early return there? I guess if there are multiple batches, in theory there could be a single batch with just file removals (thus not uploading any data), and later some batches with data - and this early return would skip the other batches

mergin/client.py Outdated
"""
Syncs project while sending push progress info as callback.
Sync is done in this loop:
Pending changes? -> Pull -> Push change batch -> repeat
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you please add a docstring about how the progress_callback function should look like? (and what is the sleep_time)

mergin/client.py Outdated
1. Pull server version
2. Get local changes
3. Push first change batch
Repeat if there are more changes pending.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be useful to explain a bit what is a "change batch" and what's the philosophy of behind doing multiple batches

@@ -103,6 +109,8 @@ def __init__(
if plugin_version is not None: # this could be e.g. "Plugin/2020.1 QGIS/3.14"
self.client_version += " " + plugin_version
self.setup_logging()
self.pull_job = None
self.push_job = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these need to be member variables of MerginClient

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because I want to have an option to cancel the job on keyboard interruption in CLI. I need an instance that is aware of the jobs.
Could be a different class (Syncer) but in MerginClient it is the easiest.
It doesn't need to be defined here but I think it is a good practice.

@@ -93,7 +99,7 @@ def __init__(
plugin_version=None,
proxy_config=None,
):
self.url = url if url is not None else MerginClient.default_url()
self.url = (url if url is not None else MerginClient.default_url()).rstrip("/") + "/"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about this change - it does not seem to be relevant?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not relevant with the rest - shall be moved to a separate PR?
I just hit this when didn't include the trailing slash to localhost url

README.md Outdated
status Show all changes in project files - upstream and...
sync Syncronize the project.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo - Syncronize

job.total_size = total_size
job.upload_queue_items = upload_queue_items
job.total_size = total_size
job.upload_queue_items = upload_queue_items
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤔 why are these lines indented? they look like they become a part of the for-cycle, but they shouldn't be...

Copy link
Contributor Author

@harminius harminius Jul 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, residue from previous PR version

@harminius harminius force-pushed the non_blocking_push branch from 85fe64d to e886dfc Compare July 11, 2025 10:14
@coveralls
Copy link

Pull Request Test Coverage Report for Build 16218706270

Details

  • 175 of 253 (69.17%) changed or added relevant lines in 7 files are covered.
  • 93 unchanged lines in 2 files lost coverage.
  • Overall coverage decreased (-0.7%) to 79.478%

Changes Missing Coverage Covered Lines Changed/Added Lines %
mergin/client_pull.py 0 1 0.0%
mergin/test/test_client.py 83 87 95.4%
mergin/client_push.py 63 81 77.78%
mergin/client.py 24 49 48.98%
mergin/cli.py 0 30 0.0%
Files with Coverage Reduction New Missed Lines %
mergin/test/test_client.py 19 97.76%
mergin/client.py 74 83.81%
Totals Coverage Status
Change from base Build 16073839998: -0.7%
Covered Lines: 3563
Relevant Lines: 4483

💛 - Coveralls

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for blocking/non-blocking syncs
8 participants