diff --git a/.github/workflows/autotests.yml b/.github/workflows/autotests.yml index fc658a58..8848e103 100644 --- a/.github/workflows/autotests.yml +++ b/.github/workflows/autotests.yml @@ -19,7 +19,7 @@ jobs: - uses: actions/setup-python@v2 with: - python-version: '3.x' + python-version: '3.8' - name: Install python package dependencies run: | diff --git a/mergin/client.py b/mergin/client.py index 72edf8ba..14606c6b 100644 --- a/mergin/client.py +++ b/mergin/client.py @@ -1554,6 +1554,8 @@ def sync_project_generator(self, project_directory): def sync_project(self, project_directory): """ + Syncs project by pulling server changes and pushing local changes. There is intorduced retry mechanism + for handling server conflicts (when server has changes that we do not have yet or somebody else is syncing). See description of _sync_project_generator(). :param project_directory: Project's directory diff --git a/mergin/client_push.py b/mergin/client_push.py index c8fb4205..814b5ec0 100644 --- a/mergin/client_push.py +++ b/mergin/client_push.py @@ -22,9 +22,16 @@ import time from typing import List, Tuple, Optional, ByteString -from .local_changes import FileChange, LocalProjectChanges - -from .common import UPLOAD_CHUNK_ATTEMPT_WAIT, UPLOAD_CHUNK_ATTEMPTS, UPLOAD_CHUNK_SIZE, ClientError, ErrorCode +from .local_changes import ChangesValidationError, FileChange, LocalProjectChanges + +from .common import ( + MAX_UPLOAD_VERSIONED_SIZE, + UPLOAD_CHUNK_ATTEMPT_WAIT, + UPLOAD_CHUNK_ATTEMPTS, + UPLOAD_CHUNK_SIZE, + MAX_UPLOAD_MEDIA_SIZE, + ClientError, +) from .merginproject import MerginProject from .editor import filter_changes from .utils import get_data_checksum @@ -176,7 +183,7 @@ def start(self, items: List[UploadQueueItem]): def update_chunks_from_items(self): """ - Update chunks in LocalChanges from the upload queue items. + Update chunks in LocalProjectChanges from the upload queue items. Used just before finalizing the transaction to set the server_chunk_id in v2 API. """ self.changes.update_chunk_ids([(item.chunk_id, item.server_chunk_id) for item in self.upload_queue_items]) @@ -301,7 +308,7 @@ def push_project_async(mc, directory) -> Optional[UploadJob]: mp.log.info(f"--- push {project_path} - nothing to do") return - mp.log.debug("push changes:\n" + pprint.pformat(changes)) + mp.log.debug("push changes:\n" + pprint.pformat(asdict(changes))) tmp_dir = tempfile.TemporaryDirectory(prefix="python-api-client-") # If there are any versioned files (aka .gpkg) that are not updated through a diff, @@ -309,20 +316,15 @@ def push_project_async(mc, directory) -> Optional[UploadJob]: # That's because if there are pending transactions, checkpointing or switching from WAL mode # won't work, and we would end up with some changes left in -wal file which do not get # uploaded. The temporary copy using geodiff uses sqlite backup API and should copy everything. - for f in changes["updated"]: - if mp.is_versioned_file(f["path"]) and "diff" not in f: + for f in changes.updated: + if mp.is_versioned_file(f.path) and not f.diff: mp.copy_versioned_file_for_upload(f, tmp_dir.name) - for f in changes["added"]: - if mp.is_versioned_file(f["path"]): + for f in changes.added: + if mp.is_versioned_file(f.path): mp.copy_versioned_file_for_upload(f, tmp_dir.name) - local_changes = LocalProjectChanges( - added=[FileChange(**change) for change in changes["added"]], - updated=[FileChange(**change) for change in changes["updated"]], - removed=[FileChange(**change) for change in changes["removed"]], - ) - job = create_upload_job(mc, mp, local_changes, tmp_dir) + job = create_upload_job(mc, mp, changes, tmp_dir) return job @@ -477,7 +479,7 @@ def remove_diff_files(job: UploadJob) -> None: os.remove(diff_file) -def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[dict, int]: +def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[LocalProjectChanges, int]: """ Get changes that need to be pushed to the server. """ @@ -485,4 +487,14 @@ def get_push_changes_batch(mc, mp: MerginProject) -> Tuple[dict, int]: project_role = mp.project_role() changes = filter_changes(mc, project_role, changes) - return changes, sum(len(v) for v in changes.values()) + try: + local_changes = LocalProjectChanges( + added=[FileChange(**change) for change in changes["added"]], + updated=[FileChange(**change) for change in changes["updated"]], + removed=[FileChange(**change) for change in changes["removed"]], + ) + except ChangesValidationError as e: + raise ClientError( + f"Some files exceeded maximum upload size. Files: {', '.join([c.path for c in e.invalid_changes])}. Maximum size for media files is {MAX_UPLOAD_MEDIA_SIZE / (1024**3)} GB and for geopackage files {MAX_UPLOAD_VERSIONED_SIZE / (1024**3)} GB." + ) + return local_changes, sum(len(v) for v in changes.values()) diff --git a/mergin/common.py b/mergin/common.py index 296f7de0..25df4f4d 100644 --- a/mergin/common.py +++ b/mergin/common.py @@ -24,6 +24,12 @@ # seconds to wait between sync callback calls SYNC_CALLBACK_WAIT = 0.01 +# maximum size of media file able to upload in one push (in bytes) +MAX_UPLOAD_MEDIA_SIZE = 10 * (1024**3) + +# maximum size of GPKG file able to upload in one push (in bytes) +MAX_UPLOAD_VERSIONED_SIZE = 5 * (1024**3) + # default URL for submitting logs MERGIN_DEFAULT_LOGS_URL = "https://g4pfq226j0.execute-api.eu-west-1.amazonaws.com/mergin_client_log_submit" diff --git a/mergin/editor.py b/mergin/editor.py index 759991e3..65d42f8b 100644 --- a/mergin/editor.py +++ b/mergin/editor.py @@ -1,7 +1,7 @@ from itertools import filterfalse from typing import Callable, Dict, List -from .utils import is_mergin_config, is_qgis_file, is_versioned_file +from .utils import is_qgis_file EDITOR_ROLE_NAME = "editor" diff --git a/mergin/local_changes.py b/mergin/local_changes.py index c0fb980f..d67e91f5 100644 --- a/mergin/local_changes.py +++ b/mergin/local_changes.py @@ -2,6 +2,18 @@ from datetime import datetime from typing import Optional, List, Tuple +from .utils import is_versioned_file +from .common import MAX_UPLOAD_MEDIA_SIZE, MAX_UPLOAD_VERSIONED_SIZE + +MAX_UPLOAD_CHANGES = 100 + + +# The custom exception +class ChangesValidationError(Exception): + def __init__(self, message, invalid_changes=[]): + super().__init__(message) + self.invalid_changes = invalid_changes if invalid_changes is not None else [] + @dataclass class FileDiffChange: @@ -68,6 +80,29 @@ class LocalProjectChanges: updated: List[FileChange] = field(default_factory=list) removed: List[FileChange] = field(default_factory=list) + def __post_init__(self): + """ + Enforce a limit of changes combined from `added` and `updated`. + """ + upload_changes = self.get_upload_changes() + total_changes = len(upload_changes) + oversize_changes = [] + for change in upload_changes: + if not is_versioned_file(change.path) and change.size > MAX_UPLOAD_MEDIA_SIZE: + oversize_changes.append(change) + elif not change.diff and change.size > MAX_UPLOAD_VERSIONED_SIZE: + oversize_changes.append(change) + if oversize_changes: + error = ChangesValidationError("Some files exceed the maximum upload size", oversize_changes) + raise error + + if total_changes > MAX_UPLOAD_CHANGES: + # Calculate how many changes to keep from `added` and `updated` + added_limit = min(len(self.added), MAX_UPLOAD_CHANGES) + updated_limit = MAX_UPLOAD_CHANGES - added_limit + self.added = self.added[:added_limit] + self.updated = self.updated[:updated_limit] + def to_server_payload(self) -> dict: return { "added": [change.to_server_data() for change in self.added], diff --git a/mergin/merginproject.py b/mergin/merginproject.py index 72b1449c..cd519131 100644 --- a/mergin/merginproject.py +++ b/mergin/merginproject.py @@ -21,7 +21,7 @@ conflicted_copy_file_name, edit_conflict_file_name, ) - +from .local_changes import FileChange this_dir = os.path.dirname(os.path.realpath(__file__)) @@ -470,20 +470,20 @@ def get_push_changes(self): changes["updated"] = [f for f in changes["updated"] if f not in not_updated] return changes - def copy_versioned_file_for_upload(self, f, tmp_dir): + def copy_versioned_file_for_upload(self, f: FileChange, tmp_dir: str) -> str: """ Make a temporary copy of the versioned file using geodiff, to make sure that we have full content in a single file (nothing left in WAL journal) """ - path = f["path"] + path = f.path self.log.info("Making a temporary copy (full upload): " + path) tmp_file = os.path.join(tmp_dir, path) os.makedirs(os.path.dirname(tmp_file), exist_ok=True) self.geodiff.make_copy_sqlite(self.fpath(path), tmp_file) - f["size"] = os.path.getsize(tmp_file) - f["checksum"] = generate_checksum(tmp_file) - f["chunks"] = [str(uuid.uuid4()) for i in range(math.ceil(f["size"] / UPLOAD_CHUNK_SIZE))] - f["upload_file"] = tmp_file + f.size = os.path.getsize(tmp_file) + f.checksum = generate_checksum(tmp_file) + f.chunks = [str(uuid.uuid4()) for i in range(math.ceil(f.size / UPLOAD_CHUNK_SIZE))] + f.upload_file = tmp_file return tmp_file def get_list_of_push_changes(self, push_changes): diff --git a/mergin/test/test_client.py b/mergin/test/test_client.py index d366b25a..1c55933e 100644 --- a/mergin/test/test_client.py +++ b/mergin/test/test_client.py @@ -2328,8 +2328,10 @@ def test_clean_diff_files(mc): shutil.copy(mp.fpath("inserted_1_A.gpkg"), mp.fpath(f_updated)) mc.push_project(project_dir) - diff_files = glob.glob("*-diff-*", root_dir=os.path.split(mp.fpath_meta("inserted_1_A.gpkg"))[0]) + directory = os.path.split(mp.fpath_meta("inserted_1_A.gpkg"))[0] + diff_files = [f for f in os.listdir(directory) if "-diff-" in f] + # Assert that no matching files are found assert diff_files == [] @@ -3214,3 +3216,22 @@ def test_client_project_sync_retry(mc): with pytest.raises(ClientError): mc.sync_project(project_dir) assert mock_push_project_async.call_count == 2 + + +def test_push_file_limits(mc): + test_project = "test_push_file_limits" + project = API_USER + "/" + test_project + project_dir = os.path.join(TMP_DIR, test_project) + cleanup(mc, project, [project_dir]) + mc.create_project(test_project) + mc.download_project(project, project_dir) + shutil.copy(os.path.join(TEST_DATA_DIR, "base.gpkg"), project_dir) + # setting to some minimal value to mock limit hit + with patch("mergin.local_changes.MAX_UPLOAD_VERSIONED_SIZE", 1): + with pytest.raises(ClientError, match=f"Some files exceeded maximum upload size. Files: base.gpkg."): + mc.push_project(project_dir) + + shutil.copy(os.path.join(TEST_DATA_DIR, "test.txt"), project_dir) + with patch("mergin.local_changes.MAX_UPLOAD_MEDIA_SIZE", 1): + with pytest.raises(ClientError, match=f"Some files exceeded maximum upload size. Files: test.txt."): + mc.push_project(project_dir) diff --git a/mergin/test/test_local_changes.py b/mergin/test/test_local_changes.py index 32b195d8..2c3d878b 100644 --- a/mergin/test/test_local_changes.py +++ b/mergin/test/test_local_changes.py @@ -1,6 +1,8 @@ from datetime import datetime +import pytest +from unittest.mock import patch -from ..local_changes import FileChange, LocalProjectChanges +from ..local_changes import ChangesValidationError, FileChange, LocalProjectChanges, MAX_UPLOAD_CHANGES def test_local_changes_from_dict(): @@ -118,3 +120,93 @@ def test_local_changes_get_upload_changes(): assert len(upload_changes) == 2 # Only added and updated should be included assert upload_changes[0].path == "file1.txt" # First change is from added assert upload_changes[1].path == "file2.txt" # Second change is from updated + + +def test_local_changes_post_init_validation_media(): + """Test the get_media_upload_file method of LocalProjectChanges.""" + # Define constants + SIZE_LIMIT_MB = 5 + SIZE_LIMIT_BYTES = SIZE_LIMIT_MB * 1024 * 1024 + SMALL_FILE_SIZE = 1024 + LARGE_FILE_SIZE = 15 * 1024 * 1024 + + # Create sample LocalChange instances + added = [ + FileChange(path="file1.txt", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now()), + FileChange(path="file2.jpg", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now()), # Over limit + ] + updated = [ + FileChange(path="file3.mp4", checksum="lmn456", size=5 * 1024 * 1024, mtime=datetime.now()), + FileChange(path="file4.gpkg", checksum="opq123", size=SMALL_FILE_SIZE, mtime=datetime.now()), + ] + + # Initialize LocalProjectChanges + with patch("mergin.local_changes.MAX_UPLOAD_MEDIA_SIZE", SIZE_LIMIT_BYTES): + with pytest.raises(ChangesValidationError, match="Some files exceed") as err: + LocalProjectChanges(added=added, updated=updated) + print(err.value.invalid_changes) + assert len(err.value.invalid_changes) == 1 + assert "file2.jpg" == err.value.invalid_changes[0].path + assert err.value.invalid_changes[0].size == LARGE_FILE_SIZE + + +def test_local_changes_post_init_validation_gpgkg(): + """Test the get_gpgk_upload_file method of LocalProjectChanges.""" + # Define constants + SIZE_LIMIT_MB = 10 + SIZE_LIMIT_BYTES = SIZE_LIMIT_MB * 1024 * 1024 + SMALL_FILE_SIZE = 1024 + LARGE_FILE_SIZE = 15 * 1024 * 1024 + + # Create sample LocalChange instances + added = [ + FileChange(path="file1.gpkg", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now()), + FileChange( + path="file2.gpkg", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now(), diff=None + ), # Over limit + ] + updated = [ + FileChange( + path="file3.gpkg", + checksum="lmn456", + size=SIZE_LIMIT_BYTES + 1, + mtime=datetime.now(), + diff={"path": "file3-diff.gpkg", "checksum": "diff123", "size": 1024, "mtime": datetime.now()}, + ), + FileChange(path="file4.txt", checksum="opq123", size=SMALL_FILE_SIZE, mtime=datetime.now()), + ] + + # Initialize LocalProjectChanges + with patch("mergin.local_changes.MAX_UPLOAD_VERSIONED_SIZE", SIZE_LIMIT_BYTES): + with pytest.raises(ChangesValidationError) as err: + LocalProjectChanges(added=added, updated=updated) + assert len(err.value.invalid_changes) == 1 + assert "file2.gpkg" == err.value.invalid_changes[0].path + assert err.value.invalid_changes[0].size == LARGE_FILE_SIZE + + +def test_local_changes_post_init(): + """Test the __post_init__ method of LocalProjectChanges.""" + # Define constants + ADDED_COUNT = 80 + UPDATED_COUNT = 21 + SMALL_FILE_SIZE = 1024 + LARGE_FILE_SIZE = 2048 + + # Create more than MAX_UPLOAD_CHANGES changes + added = [ + FileChange(path=f"file{i}.txt", checksum="abc123", size=SMALL_FILE_SIZE, mtime=datetime.now()) + for i in range(ADDED_COUNT) + ] + updated = [ + FileChange(path=f"file{i}.txt", checksum="xyz789", size=LARGE_FILE_SIZE, mtime=datetime.now()) + for i in range(UPDATED_COUNT) + ] + + # Initialize LocalProjectChanges + local_changes = LocalProjectChanges(added=added, updated=updated) + + # Assertions + assert len(local_changes.added) == ADDED_COUNT # All added changes are included + assert len(local_changes.updated) == MAX_UPLOAD_CHANGES - ADDED_COUNT # Only enough updated changes are included + assert len(local_changes.added) + len(local_changes.updated) == MAX_UPLOAD_CHANGES # Total is limited