Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 34 additions & 14 deletions mergin/client_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,14 @@

from .local_changes import LocalChange, LocalChanges

from .common import UPLOAD_CHUNK_ATTEMPT_WAIT, UPLOAD_CHUNK_ATTEMPTS, UPLOAD_CHUNK_SIZE, ClientError, ErrorCode
from .common import (
MAX_UPLOAD_VERSIONED_SIZE,
UPLOAD_CHUNK_ATTEMPT_WAIT,
UPLOAD_CHUNK_ATTEMPTS,
UPLOAD_CHUNK_SIZE,
MAX_UPLOAD_MEDIA_SIZE,
ClientError,
)
from .merginproject import MerginProject
from .editor import filter_changes
from .utils import get_data_checksum
Expand Down Expand Up @@ -296,28 +303,23 @@ def push_project_async(mc, directory) -> Optional[UploadJob]:
mp.log.info(f"--- push {project_path} - nothing to do")
return

mp.log.debug("push changes:\n" + pprint.pformat(changes))
mp.log.debug("push changes:\n" + pprint.pformat(asdict(changes)))
tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")

# If there are any versioned files (aka .gpkg) that are not updated through a diff,
# we need to make a temporary copy somewhere to be sure that we are uploading full content.
# That's because if there are pending transactions, checkpointing or switching from WAL mode
# won't work, and we would end up with some changes left in -wal file which do not get
# uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything.
for f in changes["updated"]:
if mp.is_versioned_file(f["path"]) and "diff" not in f:
for f in changes.updated:
if mp.is_versioned_file(f.path) and not f.diff:
mp.copy_versioned_file_for_upload(f, tmp_dir.name)

for f in changes["added"]:
if mp.is_versioned_file(f["path"]):
for f in changes.added:
if mp.is_versioned_file(f.path):
mp.copy_versioned_file_for_upload(f, tmp_dir.name)

local_changes = LocalChanges(
added=[LocalChange(**change) for change in changes["added"]],
updated=[LocalChange(**change) for change in changes["updated"]],
removed=[LocalChange(**change) for change in changes["removed"]],
)
job = create_upload_job(mc, mp, local_changes, tmp_dir)
job = create_upload_job(mc, mp, changes, tmp_dir)
return job


Expand Down Expand Up @@ -471,12 +473,30 @@ def remove_diff_files(job: UploadJob) -> None:
os.remove(diff_file)


def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[dict, int]:
def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[LocalChanges, int]:
"""
Get changes that need to be pushed to the server.
"""
changes = mp.get_push_changes()
project_role = mp.project_role()
changes = filter_changes(mc, project_role, changes)

return changes, sum(len(v) for v in changes.values())
local_changes = LocalChanges(
added=[LocalChange(**change) for change in changes["added"]],
updated=[LocalChange(**change) for change in changes["updated"]],
removed=[LocalChange(**change) for change in changes["removed"]],
)

over_limit_media = local_changes.get_media_upload_over_size(MAX_UPLOAD_MEDIA_SIZE)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I liked your post_init approach, how about moving these checks to class as the first validation? We can also do both checks in single loop and raise custom validation error which will here trigger client error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check it

if over_limit_media:
raise ClientError(
f"File {over_limit_media.path} to upload exceeds the maximum allowed size of {MAX_UPLOAD_MEDIA_SIZE / (1024**3)} GB."
)

over_limit_gpkg = local_changes.get_gpgk_upload_over_size(MAX_UPLOAD_VERSIONED_SIZE)
if over_limit_gpkg:
raise ClientError(
f"Geopackage {over_limit_gpkg.path} to upload exceeds the maximum allowed size of {MAX_UPLOAD_VERSIONED_SIZE / (1024**3)} GB."
)

return local_changes, sum(len(v) for v in changes.values())
10 changes: 9 additions & 1 deletion mergin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
# seconds to wait between sync callback calls
SYNC_CALLBACK_WAIT = 0.01

# maximum size of media file able to upload in one push (in bytes)
MAX_UPLOAD_MEDIA_SIZE = 10 * (1024**3)

# maximum size of GPKG file able to upload in one push (in bytes)
MAX_UPLOAD_VERSIONED_SIZE = 5 * (1024**3)

# default URL for submitting logs
MERGIN_DEFAULT_LOGS_URL = "https://g4pfq226j0.execute-api.eu-west-1.amazonaws.com/mergin_client_log_submit"

Expand All @@ -39,7 +45,9 @@ class ErrorCode(Enum):


class ClientError(Exception):
def __init__(self, detail: str, url=None, server_code=None, server_response=None, http_error=None, http_method=None):
def __init__(
self, detail: str, url=None, server_code=None, server_response=None, http_error=None, http_method=None
):
self.detail = detail
self.url = url
self.http_error = http_error
Expand Down
2 changes: 1 addition & 1 deletion mergin/editor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from itertools import filterfalse
from typing import Callable, Dict, List

from .utils import is_mergin_config, is_qgis_file, is_versioned_file
from .utils import is_qgis_file

EDITOR_ROLE_NAME = "editor"

Expand Down
38 changes: 37 additions & 1 deletion mergin/local_changes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Optional, List, Tuple
from typing import Optional, List, Tuple

from .utils import is_versioned_file

MAX_UPLOAD_CHANGES = 100


@dataclass
Expand Down Expand Up @@ -55,6 +59,18 @@ class LocalChanges:
updated: List[LocalChange] = field(default_factory=list)
removed: List[LocalChange] = field(default_factory=list)

def __post_init__(self):
"""
Enforce a limit of changes combined from `added` and `updated`.
"""
total_changes = len(self.get_upload_changes())
if total_changes > MAX_UPLOAD_CHANGES:
# Calculate how many changes to keep from `added` and `updated`
added_limit = min(len(self.added), MAX_UPLOAD_CHANGES)
updated_limit = MAX_UPLOAD_CHANGES - added_limit
self.added = self.added[:added_limit]
self.updated = self.updated[:updated_limit]

def to_server_payload(self) -> dict:
return {
"added": [change.to_server_data() for change in self.added],
Expand Down Expand Up @@ -96,3 +112,23 @@ def update_chunks(self, server_chunks: List[Tuple[str, str]]) -> None:

for change in self.updated:
change.chunks = self._map_unique_chunks(change.chunks, server_chunks)

def get_media_upload_over_size(self, size_limit: int) -> Optional[LocalChange]:
"""
Find the first media file in added and updated changes that exceeds the size limit.
:return: The first LocalChange that exceeds the size limit, or None if no such file exists.
"""
for change in self.get_upload_changes():
if not is_versioned_file(change.path) and change.size > size_limit:
return change

def get_gpgk_upload_over_size(self, size_limit: int) -> Optional[LocalChange]:
"""
Find the first GPKG file in added and updated changes that exceeds the size limit.
Do not include diffs (only new or overwritten files).
:param size_limit: The size limit in bytes.
:return: The first LocalChange that exceeds the size limit, or None if no such file exists.
"""
for change in self.get_upload_changes():
if is_versioned_file(change.path) and not change.diff and change.size > size_limit:
return change
14 changes: 7 additions & 7 deletions mergin/merginproject.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
conflicted_copy_file_name,
edit_conflict_file_name,
)

from .local_changes import LocalChange

this_dir = os.path.dirname(os.path.realpath(__file__))

Expand Down Expand Up @@ -470,20 +470,20 @@ def get_push_changes(self):
changes["updated"] = [f for f in changes["updated"] if f not in not_updated]
return changes

def copy_versioned_file_for_upload(self, f, tmp_dir):
def copy_versioned_file_for_upload(self, f: LocalChange, tmp_dir: str) -> str:
"""
Make a temporary copy of the versioned file using geodiff, to make sure that we have full
content in a single file (nothing left in WAL journal)
"""
path = f["path"]
path = f.path
self.log.info("Making a temporary copy (full upload): " + path)
tmp_file = os.path.join(tmp_dir, path)
os.makedirs(os.path.dirname(tmp_file), exist_ok=True)
self.geodiff.make_copy_sqlite(self.fpath(path), tmp_file)
f["size"] = os.path.getsize(tmp_file)
f["checksum"] = generate_checksum(tmp_file)
f["chunks"] = [str(uuid.uuid4()) for i in range(math.ceil(f["size"] / UPLOAD_CHUNK_SIZE))]
f["upload_file"] = tmp_file
f.size = os.path.getsize(tmp_file)
f.checksum = generate_checksum(tmp_file)
f.chunks = [str(uuid.uuid4()) for i in range(math.ceil(f.size / UPLOAD_CHUNK_SIZE))]
f.upload_file = tmp_file
return tmp_file

def get_list_of_push_changes(self, push_changes):
Expand Down
19 changes: 19 additions & 0 deletions mergin/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3211,3 +3211,22 @@ def test_client_project_sync_retry(mc):
with pytest.raises(ClientError):
mc.sync_project(project_dir)
assert mock_push_project_async.call_count == 2


def test_push_file_limits(mc):
test_project = "test_push_file_limits"
project = API_USER + "/" + test_project
project_dir = os.path.join(TMP_DIR, test_project)
cleanup(mc, project, [project_dir])
mc.create_project(test_project)
mc.download_project(project, project_dir)
shutil.copy(os.path.join(TEST_DATA_DIR, "base.gpkg"), project_dir)
# setting to some minimal value to mock limit hit
with patch("mergin.client_push.MAX_UPLOAD_VERSIONED_SIZE", 1):
with pytest.raises(ClientError, match=f"base.gpkg to upload exceeds the maximum allowed size of {1/1024**3}"):
mc.push_project(project_dir)

shutil.copy(os.path.join(TEST_DATA_DIR, "test.txt"), project_dir)
with patch("mergin.client_push.MAX_UPLOAD_MEDIA_SIZE", 1):
with pytest.raises(ClientError, match=f"test.txt to upload exceeds the maximum allowed size of {1/1024**3}"):
mc.push_project(project_dir)
5 changes: 4 additions & 1 deletion mergin/test/test_common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ..common import ClientError, ErrorCode


def test_client_error_is_blocked_sync():
"""Test the is_blocked_sync method of ClientError."""
error = ClientError(detail="", server_code=None)
Expand All @@ -12,6 +13,7 @@ def test_client_error_is_blocked_sync():
error.server_code = ErrorCode.ProjectVersionExists.value
assert error.is_blocking_sync() is True


def test_client_error_is_rate_limit():
"""Test the is_rate_limit method of ClientError."""
error = ClientError(detail="", http_error=None)
Expand All @@ -21,6 +23,7 @@ def test_client_error_is_rate_limit():
error.http_error = 429
assert error.is_rate_limit() is True


def test_client_error_is_retryable_sync():
"""Test the is_retryable_sync method of ClientError."""
error = ClientError(detail="", server_code=None, http_error=None)
Expand All @@ -43,4 +46,4 @@ def test_client_error_is_retryable_sync():
error.http_error = 500
assert error.is_retryable_sync() is False
error.http_error = 429
assert error.is_retryable_sync() is True
assert error.is_retryable_sync() is True
92 changes: 91 additions & 1 deletion mergin/test/test_local_changes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime

from ..local_changes import LocalChange, LocalChanges
from ..local_changes import LocalChange, LocalChanges, MAX_UPLOAD_CHANGES


def test_local_changes_from_dict():
Expand Down Expand Up @@ -118,3 +118,93 @@ def test_local_changes_get_upload_changes():
assert len(upload_changes) == 2 # Only added and updated should be included
assert upload_changes[0].path == "file1.txt" # First change is from added
assert upload_changes[1].path == "file2.txt" # Second change is from updated


def test_local_changes_get_media_upload_over_size():
"""Test the get_media_upload_file method of LocalChanges."""
# Define constants
SIZE_LIMIT_MB = 10
SIZE_LIMIT_BYTES = SIZE_LIMIT_MB * 1024 * 1024
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 15 * 1024 * 1024

# Create sample LocalChange instances
added = [
LocalChange(path="file1.txt", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
LocalChange(path="file2.jpg", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now()), # Over limit
]
updated = [
LocalChange(path="file3.mp4", checksum="lmn456", size=5 * 1024 * 1024, mtime=datetime.now()),
LocalChange(path="file4.gpkg", checksum="opq123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
]

# Initialize LocalChanges
local_changes = LocalChanges(added=added, updated=updated)

# Call get_media_upload_file with a size limit
media_file = local_changes.get_media_upload_over_size(SIZE_LIMIT_BYTES)

# Assertions
assert media_file is not None
assert media_file.path == "file2.jpg" # The first file over the limit
assert media_file.size == LARGE_FILE_SIZE


def test_local_changes_get_gpgk_upload_over_size():
"""Test the get_gpgk_upload_file method of LocalChanges."""
# Define constants
SIZE_LIMIT_MB = 10
SIZE_LIMIT_BYTES = SIZE_LIMIT_MB * 1024 * 1024
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 15 * 1024 * 1024

# Create sample LocalChange instances
added = [
LocalChange(path="file1.gpkg", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
LocalChange(
path="file2.gpkg", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now(), diff=None
), # Over limit
]
updated = [
LocalChange(path="file3.gpkg", checksum="lmn456", size=5 * 1024 * 1024, mtime=datetime.now()),
LocalChange(path="file4.txt", checksum="opq123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
]

# Initialize LocalChanges
local_changes = LocalChanges(added=added, updated=updated)

# Call get_gpgk_upload_file with a size limit
gpkg_file = local_changes.get_gpgk_upload_over_size(SIZE_LIMIT_BYTES)

# Assertions
assert gpkg_file is not None
assert gpkg_file.path == "file2.gpkg" # The first GPKG file over the limit
assert gpkg_file.size == LARGE_FILE_SIZE
assert gpkg_file.diff is None # Ensure it doesn't include diffs


def test_local_changes_post_init():
"""Test the __post_init__ method of LocalChanges."""
# Define constants
ADDED_COUNT = 80
UPDATED_COUNT = 21
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 2048

# Create more than MAX_UPLOAD_CHANGES changes
added = [
LocalChange(path=f"file{i}.txt", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now())
for i in range(ADDED_COUNT)
]
updated = [
LocalChange(path=f"file{i}.txt", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now())
for i in range(UPDATED_COUNT)
]

# Initialize LocalChanges
local_changes = LocalChanges(added=added, updated=updated)

# Assertions
assert len(local_changes.added) == ADDED_COUNT # All added changes are included
assert len(local_changes.updated) == MAX_UPLOAD_CHANGES - ADDED_COUNT # Only enough updated changes are included
assert len(local_changes.added) + len(local_changes.updated) == MAX_UPLOAD_CHANGES # Total is limited
Loading