-
Notifications
You must be signed in to change notification settings - Fork 7
Split push for blocking and non-blocking #244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Follows up MerginMaps/server#464 Separates push changes to blocking and non-blocking changes that the server can handle concurrently. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good staff here. I am not sure if we want to run each job and wait for him and then pull changes and run it again. If yes, we could make some tweak to logic.
k: sorted(v, key=lambda f: f["path"]) for k, v in d.items() | ||
} | ||
|
||
def test_changes_handler(mc): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this looks fine. Just add some integration test to make 2 pushes in 2 threads (?). I think we could use mc and mc2 for that shit :)
mergin/test/test_client.py
Outdated
@@ -2440,7 +2446,7 @@ def test_project_rename(mc: MerginClient): | |||
|
|||
# validate project info | |||
project_info = mc.project_info(project_renamed) | |||
assert project_info["version"] == "v1" | |||
assert project_info["version"] == "v2" # teo version created in initial push |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can get rid of testing direct versin number from some places in tests. I think we do not need to test it in project_rename. We could stay with that in tests which are related to push testing directly. Just idea :)
mergin/test/test_client.py
Outdated
assert "Wrong version parameters" in str(e.value) | ||
assert "Available versions: [1, 2, 3, 4]" in str(e.value) | ||
assert "Available versions: [1, 3, 4, 5]" in str(e.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I probably do not understand, but why 2 is missing here? :)
mergin/client_push.py
Outdated
1. Blocking: updated/removed and added files that are blocking | ||
2. Non-blocking: added files that are not blocking | ||
""" | ||
blocking_changes = {"added": [], "updated": [], "removed": [], "renamed": []} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we get rid of legacy "renamed", it is not used for a while
mergin/client_push.py
Outdated
mp.log.info(f"will upload {len(upload_queue_items)} items with total size {total_size}") | ||
|
||
# start uploads in background | ||
job.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would already start sending some chunks from second upload before first one is finalized
I would rather keep push_project_async
for single job only (including start/upload/finalize) and process split changes sequentially as number of serial push_project_async calls with or without pull before each (I do not know, in principle for non-blocking push you should not need to pull)
When two clients push at the same time, one client may remove changes uploaded by the other one. See merged logs from clieant A and client B. |
We want to make some changes to the current PR. I believe these changes will not significantly break the existing logic.
def get_next_batch(project_dir) -> Tuple[Dict[...], bool]:
"""
Return the next dictionary with changes, similar to changes[0] in push_project_async.
"""
# TODO
return {"added": [], "updated": [], "removed": []}, True
def sync_project(self, project_dir): # in MerginClient
has_more_changes = True
while has_more_changes:
# changes_batch = dict ("added"/"updated"/"deleted" with list of file names)
changes_batch, has_more_changes = get_next_push_batch(project_dir)
pull_job = pull_project_async(project_dir)
pull_project_wait(pull_job)
pull_project_finalize(pull_job)
push_job = push_project_async(project_dir, changes_batch)
push_project_wait(push_job)
push_project_finalize(push_job)
def sync_project_with_callback(self, project_dir, callback=None, sleep_time=100):
has_more_changes = True
while has_more_changes:
pull_job = pull_project_async(project_dir)
while pull_project_is_running(pull_job):
sleep(sleep_time)
callback()
pull_project_finalize(pull_job)
changes_batch, has_more_changes = get_next_push_batch(project_dir)
push_job = push_project_async(project_dir, changes_batch)
while push_project_is_running(push_job):
sleep(sleep_time)
callback()
push_project_finalize(push_job)
Plugin-related notes
We do not want to implement any magic function for sync that can be used in the plugin, because there is different logic with dialogs, etc. |
Why can't we use the magic |
Yeah we discussed that together with @wonder-sk . Since the plugin has completely custom logic for handling pull and push jobs, rewriting all of it won't be simple. |
I'd prefer creating a new push function which would be called only from the sync method. |
If we will create test cases for new sync_project... I am approving this 👍 good job guys (@harminius ) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add tests and probably revert old ones. Make small tweaks for progress bar here and varialbes which we discussed together.
mergin/client.py
Outdated
if self.pull_job is None: | ||
return | ||
pull_project_wait(self.pull_job) | ||
pull_project_finalize(self.pull_job) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should use callback also for pull - it can take a while
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Callback to update a bar progress? Which bar? Do you want a common bar for pull and push, two separate bars for each, or multiple bars for every job? We can estimate the total push size quite precisely but the total pull size might change significantly in time.
This class is responsible for: | ||
- Filtering project file changes. | ||
- Splitting changes into blocking and non-blocking groups. | ||
- TODO: Applying limits such as max file count or size to break large uploads into smaller batches. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are we going to do that in this PR? it may affect the class API as well...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not in this PR
|
||
self.push_job = push_project_async(self, project_dir, changes_batch) | ||
if not self.push_job: | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it correct to have the early return there? I guess if there are multiple batches, in theory there could be a single batch with just file removals (thus not uploading any data), and later some batches with data - and this early return would skip the other batches
mergin/client.py
Outdated
""" | ||
Syncs project while sending push progress info as callback. | ||
Sync is done in this loop: | ||
Pending changes? -> Pull -> Push change batch -> repeat |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you please add a docstring about how the progress_callback
function should look like? (and what is the sleep_time
)
mergin/client.py
Outdated
1. Pull server version | ||
2. Get local changes | ||
3. Push first change batch | ||
Repeat if there are more changes pending. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it would be useful to explain a bit what is a "change batch" and what's the philosophy of behind doing multiple batches
@@ -103,6 +109,8 @@ def __init__( | |||
if plugin_version is not None: # this could be e.g. "Plugin/2020.1 QGIS/3.14" | |||
self.client_version += " " + plugin_version | |||
self.setup_logging() | |||
self.pull_job = None | |||
self.push_job = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think these need to be member variables of MerginClient
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's because I want to have an option to cancel the job on keyboard interruption in CLI. I need an instance that is aware of the jobs.
Could be a different class (Syncer) but in MerginClient it is the easiest.
It doesn't need to be defined here but I think it is a good practice.
@@ -93,7 +99,7 @@ def __init__( | |||
plugin_version=None, | |||
proxy_config=None, | |||
): | |||
self.url = url if url is not None else MerginClient.default_url() | |||
self.url = (url if url is not None else MerginClient.default_url()).rstrip("/") + "/" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about this change - it does not seem to be relevant?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not relevant with the rest - shall be moved to a separate PR?
I just hit this when didn't include the trailing slash to localhost url
README.md
Outdated
status Show all changes in project files - upstream and... | ||
sync Syncronize the project. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo - Syncronize
mergin/client_push.py
Outdated
job.total_size = total_size | ||
job.upload_queue_items = upload_queue_items | ||
job.total_size = total_size | ||
job.upload_queue_items = upload_queue_items |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🤔 why are these lines indented? they look like they become a part of the for-cycle, but they shouldn't be...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, residue from previous PR version
85fe64d
to
e886dfc
Compare
Pull Request Test Coverage Report for Build 16218706270Details
💛 - Coveralls |
Resolves #240