Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions mergin/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,6 +468,37 @@ def pull(ctx):
except Exception as e:
_print_unhandled_exception()

@cli.command()
@click.pass_context
def sync(ctx):
"""Synchronize the project. Pull latest project version from the server and push split changes."""
mc = ctx.obj["client"]
if mc is None:
return
directory = os.getcwd()
upload_job = None
try:
def on_progress(increment, push_job):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As a user I would probably like to see individual steps, each with progress bar rather than one global adaptive bar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can handle this progress bar is tricky to do... let me take a look at this after finish of logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not possible to create addaptive bar in click

nonlocal upload_job
upload_job = push_job

# run pull & push cycles until there are no local changes
mc.sync_project(directory, progress_callback=on_progress)

click.secho("Sync complete.", fg="green")

except InvalidProject as e:
click.secho("Invalid project directory ({})".format(str(e)), fg="red")
except ClientError as e:
click.secho("Error: " + str(e), fg="red")
return
except KeyboardInterrupt:
click.secho("Cancelling...")
if upload_job:
push_project_cancel(upload_job)
except Exception as e:
_print_unhandled_exception()


@cli.command()
@click.argument("version")
Expand Down
81 changes: 74 additions & 7 deletions mergin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
import re
import typing
import warnings
from time import sleep

from typing import List

from .common import (
PUSH_ATTEMPT_WAIT,
PUSH_ATTEMPTS,
SYNC_CALLBACK_WAIT,
ClientError,
LoginError,
WorkspaceRole,
Expand All @@ -40,7 +44,14 @@
download_diffs_finalize,
)
from .client_pull import pull_project_async, pull_project_wait, pull_project_finalize
from .client_push import push_project_async, push_project_wait, push_project_finalize
from .client_push import (
get_push_changes_batch,
push_project_async,
push_project_is_running,
push_project_wait,
push_project_finalize,
UploadChunksCache,
)
from .utils import DateTimeEncoder, get_versions_with_file_changes, int_version, is_version_acceptable
from .version import __version__

Expand Down Expand Up @@ -106,6 +117,8 @@ def __init__(
self._user_info = None
self._server_type = None
self._server_version = None
self._server_features = {}
self.upload_chunks_cache = UploadChunksCache()
self.client_version = "Python-client/" + __version__
if plugin_version is not None: # this could be e.g. "Plugin/2020.1 QGIS/3.14"
self.client_version += " " + plugin_version
Expand Down Expand Up @@ -362,8 +375,7 @@ def server_type(self):
"""
if not self._server_type:
try:
resp = self.get("/config", validate_auth=False)
config = json.load(resp)
config = self.server_config()
if config["server_type"] == "ce":
self._server_type = ServerType.CE
elif config["server_type"] == "ee":
Expand All @@ -384,14 +396,26 @@ def server_version(self):
"""
if self._server_version is None:
try:
resp = self.get("/config", validate_auth=False)
config = json.load(resp)
config = self.server_config()
self._server_version = config["version"]
except (ClientError, KeyError):
self._server_version = ""

return self._server_version

def server_features(self):
"""
Returns feature flags of the server.
"""
if self._server_features:
return self._server_features
config = self.server_config()
self._server_features = {
"v2_push_enabled": config.get("v2_push_enabled", False),
"v2_pull_enabled": config.get("v2_pull_enabled", False),
}
return self._server_features

def workspaces_list(self):
"""
Find all available workspaces
Expand Down Expand Up @@ -782,7 +806,7 @@ def download_project(self, project_path, directory, version=None):
def user_info(self):
server_type = self.server_type()
if server_type == ServerType.OLD:
resp = self.get("/v1/user/" + self.username())
resp = self.get(f"/v1/user/{self.username()}")
else:
resp = self.get("/v1/user/profile")
return json.load(resp)
Expand Down Expand Up @@ -894,7 +918,7 @@ def push_project(self, directory):
:param directory: Project's directory
:type directory: String
"""
job = push_project_async(self, directory)
job = push_project_async(self, directory, check_version=True)
if job is None:
return # there is nothing to push (or we only deleted some files)
push_project_wait(job)
Expand Down Expand Up @@ -1471,3 +1495,46 @@ def create_invitation(self, workspace_id: int, email: str, workspace_role: Works
params = {"email": email, "role": workspace_role.value}
ws_inv = self.post(f"v2/workspaces/{workspace_id}/invitations", params, json_headers)
return json.load(ws_inv)

def sync_project(self, project_directory, progress_callback=None):
"""
Syncs project by loop with these steps:
1. Pull server version
2. Get local changes
3. Push first change batch
Repeat if there are more local changes.
"""
mp = MerginProject(project_directory)
has_changes = True
server_conflict_attempts = 0
while has_changes:
pull_job = pull_project_async(self, project_directory)
if pull_job:
pull_project_wait(pull_job)
pull_project_finalize(pull_job)

try:
job = push_project_async(self, project_directory)
if not job:
break
if not progress_callback:
push_project_wait(job)
else:
last_size = 0
while push_project_is_running(job):
sleep(SYNC_CALLBACK_WAIT)
current_size = job.transferred_size
progress_callback(current_size - last_size, job) # call callback with transferred size increment
last_size = current_size
push_project_finalize(job)
_, has_changes = get_push_changes_batch(self, mp, job.server_resp)
except ClientError as e:
if e.is_retryable_sync() and server_conflict_attempts < PUSH_ATTEMPTS - 1:
# retry on conflict, e.g. when server has changes that we do not have yet
mp.log.info(
f"Restarting sync process (conflict on server) - {server_conflict_attempts + 1}/{PUSH_ATTEMPTS}"
)
server_conflict_attempts += 1
sleep(PUSH_ATTEMPT_WAIT)
continue
raise e
2 changes: 1 addition & 1 deletion mergin/client_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def dump(self):
print("--- END ---")


def pull_project_async(mc, directory):
def pull_project_async(mc, directory) -> PullJob:
"""
Starts project pull in background and returns handle to the pending job.
Using that object it is possible to watch progress or cancel the ongoing work.
Expand Down
Loading
Loading