Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/autotests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:

- uses: actions/setup-python@v2
with:
python-version: '3.x'
python-version: '3.8'

- name: Install python package dependencies
run: |
Expand Down
44 changes: 28 additions & 16 deletions mergin/client_push.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,16 @@
import time
from typing import List, Tuple, Optional, ByteString

from .local_changes import LocalChange, LocalChanges

from .common import UPLOAD_CHUNK_ATTEMPT_WAIT, UPLOAD_CHUNK_ATTEMPTS, UPLOAD_CHUNK_SIZE, ClientError, ErrorCode
from .local_changes import ChangesValidationError, LocalChange, LocalChanges

from .common import (
MAX_UPLOAD_VERSIONED_SIZE,
UPLOAD_CHUNK_ATTEMPT_WAIT,
UPLOAD_CHUNK_ATTEMPTS,
UPLOAD_CHUNK_SIZE,
MAX_UPLOAD_MEDIA_SIZE,
ClientError,
)
from .merginproject import MerginProject
from .editor import filter_changes
from .utils import get_data_checksum
Expand Down Expand Up @@ -296,28 +303,23 @@ def push_project_async(mc, directory) -> Optional[UploadJob]:
mp.log.info(f"--- push {project_path} - nothing to do")
return

mp.log.debug("push changes:\n" + pprint.pformat(changes))
mp.log.debug("push changes:\n" + pprint.pformat(asdict(changes)))
tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-")

# If there are any versioned files (aka .gpkg) that are not updated through a diff,
# we need to make a temporary copy somewhere to be sure that we are uploading full content.
# That's because if there are pending transactions, checkpointing or switching from WAL mode
# won't work, and we would end up with some changes left in -wal file which do not get
# uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything.
for f in changes["updated"]:
if mp.is_versioned_file(f["path"]) and "diff" not in f:
for f in changes.updated:
if mp.is_versioned_file(f.path) and not f.diff:
mp.copy_versioned_file_for_upload(f, tmp_dir.name)

for f in changes["added"]:
if mp.is_versioned_file(f["path"]):
for f in changes.added:
if mp.is_versioned_file(f.path):
mp.copy_versioned_file_for_upload(f, tmp_dir.name)

local_changes = LocalChanges(
added=[LocalChange(**change) for change in changes["added"]],
updated=[LocalChange(**change) for change in changes["updated"]],
removed=[LocalChange(**change) for change in changes["removed"]],
)
job = create_upload_job(mc, mp, local_changes, tmp_dir)
job = create_upload_job(mc, mp, changes, tmp_dir)
return job


Expand Down Expand Up @@ -471,12 +473,22 @@ def remove_diff_files(job: UploadJob) -> None:
os.remove(diff_file)


def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[dict, int]:
def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[LocalChanges, int]:
"""
Get changes that need to be pushed to the server.
"""
changes = mp.get_push_changes()
project_role = mp.project_role()
changes = filter_changes(mc, project_role, changes)

return changes, sum(len(v) for v in changes.values())
try:
local_changes = LocalChanges(
added=[LocalChange(**change) for change in changes["added"]],
updated=[LocalChange(**change) for change in changes["updated"]],
removed=[LocalChange(**change) for change in changes["removed"]],
)
except ChangesValidationError as e:
raise ClientError(
f"Some files exceeded maximum upload size. Files: {', '.join([c.path for c in e.invalid_changes])}. Maximum size for media files is {e.max_media_upload_size / (1024**3)} GB and for geopackage files {e.max_versioned_upload_size / (1024**3)} GB."
)
return local_changes, sum(len(v) for v in changes.values())
10 changes: 9 additions & 1 deletion mergin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@
# seconds to wait between sync callback calls
SYNC_CALLBACK_WAIT = 0.01

# maximum size of media file able to upload in one push (in bytes)
MAX_UPLOAD_MEDIA_SIZE = 10 * (1024**3)

# maximum size of GPKG file able to upload in one push (in bytes)
MAX_UPLOAD_VERSIONED_SIZE = 5 * (1024**3)

# default URL for submitting logs
MERGIN_DEFAULT_LOGS_URL = "https://g4pfq226j0.execute-api.eu-west-1.amazonaws.com/mergin_client_log_submit"

Expand All @@ -39,7 +45,9 @@ class ErrorCode(Enum):


class ClientError(Exception):
def __init__(self, detail: str, url=None, server_code=None, server_response=None, http_error=None, http_method=None):
def __init__(
self, detail: str, url=None, server_code=None, server_response=None, http_error=None, http_method=None
):
self.detail = detail
self.url = url
self.http_error = http_error
Expand Down
2 changes: 1 addition & 1 deletion mergin/editor.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from itertools import filterfalse
from typing import Callable, Dict, List

from .utils import is_mergin_config, is_qgis_file, is_versioned_file
from .utils import is_qgis_file

EDITOR_ROLE_NAME = "editor"

Expand Down
41 changes: 40 additions & 1 deletion mergin/local_changes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
from dataclasses import dataclass, field
from datetime import datetime
from typing import Dict, Optional, List, Tuple
from typing import Optional, List, Tuple

from .utils import is_versioned_file
from .common import MAX_UPLOAD_MEDIA_SIZE, MAX_UPLOAD_VERSIONED_SIZE

MAX_UPLOAD_CHANGES = 100


# The custom exception
class ChangesValidationError(Exception):
def __init__(self, message, invalid_changes=[], max_media_upload_size=None, max_versioned_upload_size=None):
super().__init__(message)
self.invalid_changes = invalid_changes if invalid_changes is not None else []
self.max_media_upload_size = max_media_upload_size
self.max_versioned_upload_size = max_versioned_upload_size


@dataclass
Expand Down Expand Up @@ -55,6 +69,31 @@ class LocalChanges:
updated: List[LocalChange] = field(default_factory=list)
removed: List[LocalChange] = field(default_factory=list)

def __post_init__(self):
"""
Enforce a limit of changes combined from `added` and `updated`.
"""
upload_changes = self.get_upload_changes()
total_changes = len(upload_changes)
oversize_changes = []
for change in upload_changes:
if not is_versioned_file(change.path) and change.size > MAX_UPLOAD_MEDIA_SIZE:
oversize_changes.append(change)
elif not change.diff and change.size > MAX_UPLOAD_VERSIONED_SIZE:
oversize_changes.append(change)
if oversize_changes:
error = ChangesValidationError("Some files exceed the maximum upload size", oversize_changes)
error.max_media_upload_size = MAX_UPLOAD_MEDIA_SIZE
error.max_versioned_upload_size = MAX_UPLOAD_VERSIONED_SIZE
raise error

if total_changes > MAX_UPLOAD_CHANGES:
# Calculate how many changes to keep from `added` and `updated`
added_limit = min(len(self.added), MAX_UPLOAD_CHANGES)
updated_limit = MAX_UPLOAD_CHANGES - added_limit
self.added = self.added[:added_limit]
self.updated = self.updated[:updated_limit]

def to_server_payload(self) -> dict:
return {
"added": [change.to_server_data() for change in self.added],
Expand Down
14 changes: 7 additions & 7 deletions mergin/merginproject.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
conflicted_copy_file_name,
edit_conflict_file_name,
)

from .local_changes import LocalChange

this_dir = os.path.dirname(os.path.realpath(__file__))

Expand Down Expand Up @@ -470,20 +470,20 @@ def get_push_changes(self):
changes["updated"] = [f for f in changes["updated"] if f not in not_updated]
return changes

def copy_versioned_file_for_upload(self, f, tmp_dir):
def copy_versioned_file_for_upload(self, f: LocalChange, tmp_dir: str) -> str:
"""
Make a temporary copy of the versioned file using geodiff, to make sure that we have full
content in a single file (nothing left in WAL journal)
"""
path = f["path"]
path = f.path
self.log.info("Making a temporary copy (full upload): " + path)
tmp_file = os.path.join(tmp_dir, path)
os.makedirs(os.path.dirname(tmp_file), exist_ok=True)
self.geodiff.make_copy_sqlite(self.fpath(path), tmp_file)
f["size"] = os.path.getsize(tmp_file)
f["checksum"] = generate_checksum(tmp_file)
f["chunks"] = [str(uuid.uuid4()) for i in range(math.ceil(f["size"] / UPLOAD_CHUNK_SIZE))]
f["upload_file"] = tmp_file
f.size = os.path.getsize(tmp_file)
f.checksum = generate_checksum(tmp_file)
f.chunks = [str(uuid.uuid4()) for i in range(math.ceil(f.size / UPLOAD_CHUNK_SIZE))]
f.upload_file = tmp_file
return tmp_file

def get_list_of_push_changes(self, push_changes):
Expand Down
23 changes: 22 additions & 1 deletion mergin/test/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2328,8 +2328,10 @@ def test_clean_diff_files(mc):
shutil.copy(mp.fpath("inserted_1_A.gpkg"), mp.fpath(f_updated))
mc.push_project(project_dir)

diff_files = glob.glob("*-diff-*", root_dir=os.path.split(mp.fpath_meta("inserted_1_A.gpkg"))[0])
directory = os.path.split(mp.fpath_meta("inserted_1_A.gpkg"))[0]
diff_files = [f for f in os.listdir(directory) if "-diff-" in f]

# Assert that no matching files are found
assert diff_files == []


Expand Down Expand Up @@ -3214,3 +3216,22 @@ def test_client_project_sync_retry(mc):
with pytest.raises(ClientError):
mc.sync_project(project_dir)
assert mock_push_project_async.call_count == 2


def test_push_file_limits(mc):
test_project = "test_push_file_limits"
project = API_USER + "/" + test_project
project_dir = os.path.join(TMP_DIR, test_project)
cleanup(mc, project, [project_dir])
mc.create_project(test_project)
mc.download_project(project, project_dir)
shutil.copy(os.path.join(TEST_DATA_DIR, "base.gpkg"), project_dir)
# setting to some minimal value to mock limit hit
with patch("mergin.local_changes.MAX_UPLOAD_VERSIONED_SIZE", 1):
with pytest.raises(ClientError, match=f"Some files exceeded maximum upload size. Files: base.gpkg."):
mc.push_project(project_dir)

shutil.copy(os.path.join(TEST_DATA_DIR, "test.txt"), project_dir)
with patch("mergin.local_changes.MAX_UPLOAD_MEDIA_SIZE", 1):
with pytest.raises(ClientError, match=f"Some files exceeded maximum upload size. Files: test.txt."):
mc.push_project(project_dir)
5 changes: 4 additions & 1 deletion mergin/test/test_common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from ..common import ClientError, ErrorCode


def test_client_error_is_blocked_sync():
"""Test the is_blocked_sync method of ClientError."""
error = ClientError(detail="", server_code=None)
Expand All @@ -12,6 +13,7 @@ def test_client_error_is_blocked_sync():
error.server_code = ErrorCode.ProjectVersionExists.value
assert error.is_blocking_sync() is True


def test_client_error_is_rate_limit():
"""Test the is_rate_limit method of ClientError."""
error = ClientError(detail="", http_error=None)
Expand All @@ -21,6 +23,7 @@ def test_client_error_is_rate_limit():
error.http_error = 429
assert error.is_rate_limit() is True


def test_client_error_is_retryable_sync():
"""Test the is_retryable_sync method of ClientError."""
error = ClientError(detail="", server_code=None, http_error=None)
Expand All @@ -43,4 +46,4 @@ def test_client_error_is_retryable_sync():
error.http_error = 500
assert error.is_retryable_sync() is False
error.http_error = 429
assert error.is_retryable_sync() is True
assert error.is_retryable_sync() is True
94 changes: 93 additions & 1 deletion mergin/test/test_local_changes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from datetime import datetime
import pytest
from unittest.mock import patch

from ..local_changes import LocalChange, LocalChanges
from ..local_changes import ChangesValidationError, LocalChange, LocalChanges, MAX_UPLOAD_CHANGES


def test_local_changes_from_dict():
Expand Down Expand Up @@ -118,3 +120,93 @@ def test_local_changes_get_upload_changes():
assert len(upload_changes) == 2 # Only added and updated should be included
assert upload_changes[0].path == "file1.txt" # First change is from added
assert upload_changes[1].path == "file2.txt" # Second change is from updated


def test_local_changes_post_init_validation_media():
"""Test the get_media_upload_file method of LocalChanges."""
# Define constants
SIZE_LIMIT_MB = 5
SIZE_LIMIT_BYTES = SIZE_LIMIT_MB * 1024 * 1024
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 15 * 1024 * 1024

# Create sample LocalChange instances
added = [
LocalChange(path="file1.txt", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
LocalChange(path="file2.jpg", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now()), # Over limit
]
updated = [
LocalChange(path="file3.mp4", checksum="lmn456", size=5 * 1024 * 1024, mtime=datetime.now()),
LocalChange(path="file4.gpkg", checksum="opq123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
]

# Initialize LocalChanges
with patch("mergin.local_changes.MAX_UPLOAD_MEDIA_SIZE", SIZE_LIMIT_BYTES):
with pytest.raises(ChangesValidationError, match="Some files exceed") as err:
LocalChanges(added=added, updated=updated)
print(err.value.invalid_changes)
assert len(err.value.invalid_changes) == 1
assert "file2.jpg" == err.value.invalid_changes[0].path
assert err.value.invalid_changes[0].size == LARGE_FILE_SIZE


def test_local_changes_post_init_validation_media():
"""Test the get_gpgk_upload_file method of LocalChanges."""
# Define constants
SIZE_LIMIT_MB = 10
SIZE_LIMIT_BYTES = SIZE_LIMIT_MB * 1024 * 1024
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 15 * 1024 * 1024

# Create sample LocalChange instances
added = [
LocalChange(path="file1.gpkg", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
LocalChange(
path="file2.gpkg", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now(), diff=None
), # Over limit
]
updated = [
LocalChange(
path="file3.gpkg",
checksum="lmn456",
size=SIZE_LIMIT_BYTES + 1,
mtime=datetime.now(),
diff={"path": "file3-diff.gpkg", "checksum": "diff123", "size": 1024, "mtime": datetime.now()},
),
LocalChange(path="file4.txt", checksum="opq123", size=SMALL_FILE_SIZE, mtime=datetime.now()),
]

# Initialize LocalChanges
with patch("mergin.local_changes.MAX_UPLOAD_VERSIONED_SIZE", SIZE_LIMIT_BYTES):
with pytest.raises(ChangesValidationError) as err:
LocalChanges(added=added, updated=updated)
assert len(err.value.invalid_changes) == 1
assert "file2.gpkg" == err.value.invalid_changes[0].path
assert err.value.invalid_changes[0].size == LARGE_FILE_SIZE


def test_local_changes_post_init():
"""Test the __post_init__ method of LocalChanges."""
# Define constants
ADDED_COUNT = 80
UPDATED_COUNT = 21
SMALL_FILE_SIZE = 1024
LARGE_FILE_SIZE = 2048

# Create more than MAX_UPLOAD_CHANGES changes
added = [
LocalChange(path=f"file{i}.txt", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now())
for i in range(ADDED_COUNT)
]
updated = [
LocalChange(path=f"file{i}.txt", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now())
for i in range(UPDATED_COUNT)
]

# Initialize LocalChanges
local_changes = LocalChanges(added=added, updated=updated)

# Assertions
assert len(local_changes.added) == ADDED_COUNT # All added changes are included
assert len(local_changes.updated) == MAX_UPLOAD_CHANGES - ADDED_COUNT # Only enough updated changes are included
assert len(local_changes.added) + len(local_changes.updated) == MAX_UPLOAD_CHANGES # Total is limited