From 6bd90ee9b008da12e6acc2a5b1f29229b28a5906 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Mon, 21 Jul 2025 13:10:17 +0300 Subject: [PATCH 01/20] ADd missing env var in the template --- packages/examples/cvat/recording-oracle/src/.env.template | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/examples/cvat/recording-oracle/src/.env.template b/packages/examples/cvat/recording-oracle/src/.env.template index a5851b0fb6..d63b8d41e3 100644 --- a/packages/examples/cvat/recording-oracle/src/.env.template +++ b/packages/examples/cvat/recording-oracle/src/.env.template @@ -77,6 +77,7 @@ ENABLE_CUSTOM_CLOUD_HOST= MIN_AVAILABLE_GT_THRESHOLD= MAX_USABLE_GT_SHARE= +ENABLE_GT_BANS= UNVERIFIABLE_ASSIGNMENTS_THRESHOLD= MAX_ESCROW_ITERATIONS= WARMUP_ITERATIONS= From 07b0ac23085bd1de31c3c69ad6fd9f9b6a620635 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Mon, 21 Jul 2025 13:15:38 +0300 Subject: [PATCH 02/20] Use default chunk size --- .../examples/cvat/exchange-oracle/src/cvat/api_calls.py | 8 +++++--- .../cvat/exchange-oracle/src/handlers/job_creation.py | 7 ------- 2 files changed, 5 insertions(+), 10 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/src/cvat/api_calls.py b/packages/examples/cvat/exchange-oracle/src/cvat/api_calls.py index 06934225b4..02f442f4c2 100644 --- a/packages/examples/cvat/exchange-oracle/src/cvat/api_calls.py +++ b/packages/examples/cvat/exchange-oracle/src/cvat/api_calls.py @@ -359,7 +359,7 @@ def put_task_data( task_id: int, cloudstorage_id: int, *, - chunk_size: int, + chunk_size: int | None = None, filenames: list[str] | None = None, sort_images: bool | None = None, validation_params: dict[str, str | float | list[str]] | None = None, @@ -404,8 +404,10 @@ def put_task_data( else models.SortingMethod("predefined") ) + if chunk_size is not None: + kwargs["chunk_size"] = chunk_size + data_request = models.DataRequest( - chunk_size=chunk_size, cloud_storage_id=cloudstorage_id, image_quality=Config.cvat_config.image_quality, use_cache=True, @@ -414,7 +416,7 @@ def put_task_data( **kwargs, ) try: - (_, response) = api_client.tasks_api.create_data(task_id, data_request=data_request) + api_client.tasks_api.create_data(task_id, data_request=data_request) return except exceptions.ApiException as e: diff --git a/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py b/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py index 64645992f4..66fea34f40 100644 --- a/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py +++ b/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py @@ -166,10 +166,6 @@ def _task_segment_size(self) -> int: def _job_val_frames_count(self) -> int: return self.manifest.validation.val_size - @property - def _task_chunk_size(self) -> int: - return self._task_segment_size + self._job_val_frames_count - def __enter__(self): return self @@ -434,7 +430,6 @@ def build(self): cvat_task.id, cloud_storage.id, filenames=data_subset, - chunk_size=self._task_chunk_size, validation_params={ "gt_filenames": gt_filenames, # include whole GT dataset into each task "gt_frames_per_job_count": self._job_val_frames_count, @@ -1614,7 +1609,6 @@ def _create_on_cvat(self): cvat_task.id, cvat_cloud_storage.id, filenames=filenames, - chunk_size=self._task_chunk_size, validation_params={ "gt_filenames": gt_filenames, "gt_frames_per_job_count": self._job_val_frames_count, @@ -2942,7 +2936,6 @@ def _task_params_label_key(ts): cvat_task.id, cvat_cloud_storage.id, filenames=point_label_filenames + gt_point_label_filenames, - chunk_size=self._task_chunk_size, validation_params={ "gt_filenames": gt_point_label_filenames, "gt_frames_per_job_count": self._job_val_frames_count, From f00e7f80d8424b0f969a62f8f01ec9b402c47bcb Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Mon, 21 Jul 2025 14:14:15 +0300 Subject: [PATCH 03/20] Remove dead code --- .../cvat/exchange-oracle/src/services/cvat.py | 40 ------------------- 1 file changed, 40 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/src/services/cvat.py b/packages/examples/cvat/exchange-oracle/src/services/cvat.py index 5b2bbc96e0..06f3ef5aa7 100644 --- a/packages/examples/cvat/exchange-oracle/src/services/cvat.py +++ b/packages/examples/cvat/exchange-oracle/src/services/cvat.py @@ -329,46 +329,6 @@ def create_escrow_validations(session: Session, *, limit: int = 100) -> list[tup return session.execute(insert_stmt).all() -def get_available_projects(session: Session, *, limit: int = 10) -> list[Project]: - return ( - session.query(Project) - .where( - (Project.status == ProjectStatuses.annotation.value) - & Project.jobs.any( - (Job.status == JobStatuses.new) - & ~Job.assignments.any(Assignment.status == AssignmentStatuses.created.value) - ) - ) - .distinct() - .limit(limit) - .all() - ) - - -def get_projects_by_assignee( - session: Session, - wallet_address: str | None = None, - *, - limit: int = 10, - for_update: bool | ForUpdateParams = False, -) -> list[Project]: - return ( - _maybe_for_update(session.query(Project), enable=for_update) - .where( - Project.jobs.any( - Job.assignments.any( - (Assignment.user_wallet_address == wallet_address) - & (Assignment.status == AssignmentStatuses.created) - & (utcnow() < Assignment.expires_at) - ) - ) - ) - .distinct() - .limit(limit) - .all() - ) - - def update_project_status(session: Session, project_id: str, status: ProjectStatuses) -> None: upd = update(Project).where(Project.id == project_id).values(status=status.value) session.execute(upd) From 95ed47f19fbd06b3b52183deccd0959f2edb08f8 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Mon, 21 Jul 2025 16:02:12 +0300 Subject: [PATCH 04/20] Fix build deps --- packages/examples/cvat/exchange-oracle/poetry.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/poetry.lock b/packages/examples/cvat/exchange-oracle/poetry.lock index 8af6c9bc2d..3566ac5494 100644 --- a/packages/examples/cvat/exchange-oracle/poetry.lock +++ b/packages/examples/cvat/exchange-oracle/poetry.lock @@ -1155,13 +1155,13 @@ files = [ [[package]] name = "distlib" -version = "0.3.8" +version = "0.4.0" description = "Distribution utilities" optional = false python-versions = "*" files = [ - {file = "distlib-0.3.8-py2.py3-none-any.whl", hash = "sha256:034db59a0b96f8ca18035f36290806a9a6e6bd9d1ff91e45a7f172eb17e51784"}, - {file = "distlib-0.3.8.tar.gz", hash = "sha256:1530ea13e350031b6312d8580ddb6b27a104275a31106523b8f123787f494f64"}, + {file = "distlib-0.4.0-py2.py3-none-any.whl", hash = "sha256:9659f7d87e46584a30b5780e43ac7a2143098441670ff0a49d5f9034c54a6c16"}, + {file = "distlib-0.4.0.tar.gz", hash = "sha256:feec40075be03a04501a973d81f633735b4b69f98b05450592310c0f401a4e0d"}, ] [[package]] From 7abeea67a40345e7e8a24179a0f1a3b36461cd7b Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 14:57:13 +0300 Subject: [PATCH 05/20] Allow extra GT labels in skeletons_from_boxes --- .../src/handlers/job_creation.py | 22 +++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py b/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py index 66fea34f40..566d69a959 100644 --- a/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py +++ b/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py @@ -1786,19 +1786,33 @@ def _validate_gt_labels(self): for node_label in skeleton_label.nodes: manifest_labels.add((node_label, skeleton_label.name)) - if gt_labels - manifest_labels: + if manifest_labels - gt_labels: raise DatasetValidationError( - "GT labels do not match job labels. Unknown labels: {}".format( + "Could not find GT for labels {}".format( format_sequence( [ label_name if not parent_name else f"{parent_name}.{label_name}" - for label_name, parent_name in gt_labels - manifest_labels + for label_name, parent_name in manifest_labels - gt_labels ] ), ) ) - # Reorder labels to match the manifest + # It should not be an issue that there are some extra GT labels - they should + # just be skipped. + if gt_labels - manifest_labels: + self.logger.info( + "Skipping unknown GT labels: {}".format( + format_sequence( + [ + label_name if not parent_name else f"{parent_name}.{label_name}" + for label_name, parent_name in gt_labels - manifest_labels + ] + ) + ) + ) + + # Reorder and filter labels to match the manifest self._input_gt_dataset.transform( ProjectLabels, dst_labels=[label.name for label in self.manifest.annotation.labels] ) From fea9cff292c1eada46b99ad8ccf6d922dd7a2b88 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 14:58:18 +0300 Subject: [PATCH 06/20] Simplify webhook sending in debug mode --- .../examples/cvat/exchange-oracle/debug.py | 79 ++++++++++++++++--- .../examples/cvat/recording-oracle/debug.py | 31 ++++++++ 2 files changed, 100 insertions(+), 10 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/debug.py b/packages/examples/cvat/exchange-oracle/debug.py index 1b0deab363..4efb07e978 100644 --- a/packages/examples/cvat/exchange-oracle/debug.py +++ b/packages/examples/cvat/exchange-oracle/debug.py @@ -7,6 +7,8 @@ from typing import Any from unittest import mock +import inspect +import uuid import uvicorn from httpx import URL @@ -14,9 +16,10 @@ from src.core.config import Config from src.db import SessionLocal from src.services import cloud -from src.services import cvat as cvat_service +from src.services import cvat as cvat_db_service from src.services.cloud import BucketAccessInfo from src.utils.logging import format_sequence, get_function_logger +from src.utils.time import utcnow @contextmanager @@ -28,7 +31,9 @@ def _mock_cvat_cloud_storage_params(logger: Logger) -> Generator[None, None, Non def patched_make_cvat_cloud_storage_params(bucket_info: BucketAccessInfo) -> dict: original_host_url = bucket_info.host_url - if Config.development_config.cvat_in_docker: + if Config.development_config.cvat_in_docker and ( + "localhost" in original_host_url or "127.0.0.1" in original_host_url + ): bucket_info.host_url = str( URL(original_host_url).copy_with( host=Config.development_config.exchange_oracle_host @@ -110,6 +115,8 @@ def _mock_webhook_signature_checking(_: Logger) -> Generator[None, None, None]: - from reputation oracle - encoded with Config.localhost.reputation_oracle_address wallet address or signature "reputation_oracle" + + is optional in all cases. """ from src.chain.escrow import ( @@ -133,6 +140,33 @@ def patched_get_available_webhook_types(chain_id, escrow_address): d[Config.localhost.reputation_oracle_address.lower()] = OracleWebhookTypes.reputation_oracle return d + from src.services.webhook import inbox as original_inbox + + class PatchedInbox: + def __init__(self): + pass + + def __getattr__(self, name: str): + return getattr(original_inbox, name) + + def create_webhook( + self, + session, + escrow_address, + chain_id, + type: OracleWebhookTypes, + signature = None, + event_type = None, + event_data = None, + event = None, + ): + if signature in OracleWebhookTypes: + signature = f"{type.value}-{utcnow().isoformat(sep='T')}-{uuid.uuid4()}" + + _orig_params = inspect.signature(original_inbox.create_webhook).parameters + _args = {k: v for k, v in locals().items() if k in _orig_params} + return original_inbox.create_webhook(**_args) + with ( mock.patch("src.schemas.webhook.validate_address", lambda x: x), mock.patch( @@ -143,6 +177,7 @@ def patched_get_available_webhook_types(chain_id, escrow_address): "src.endpoints.webhook.validate_oracle_webhook_signature", patched_validate_oracle_webhook_signature, ), + mock.patch("src.services.webhook.inbox", PatchedInbox()) ): yield @@ -165,7 +200,7 @@ def decode_plain_json_token(self, token) -> dict[str, Any]: if (user_wallet := token_data.get("wallet_address")) and not token_data.get("email"): with SessionLocal.begin() as session: - user = cvat_service.get_user_by_id(session, user_wallet) + user = cvat_db_service.get_user_by_id(session, user_wallet) if not user: raise Exception(f"Could not find user with wallet address '{user_wallet}'") @@ -236,7 +271,30 @@ def apply_local_development_patches() -> Generator[None, None, None]: yield -if __name__ == "__main__": +def run_server(): + uvicorn.run( + app="src:app", + host="0.0.0.0", # noqa: S104 + port=int(Config.port), + workers=Config.workers_amount, + ) + + +# def fix_escrow(): +# from src.handlers.job_fixing import entrypoint + +# return entrypoint() + + +import sys +from argparse import ArgumentParser + + +def main(args: list[str] | None = None) -> int: + parser = ArgumentParser() + parser.add_argument("-e", "--entrypoint", default=run_server) + parsed_args = parser.parse_args(args) + with ExitStack() as es: is_dev = Config.environment == "development" if is_dev: @@ -245,9 +303,10 @@ def apply_local_development_patches() -> Generator[None, None, None]: Config.validate() register_in_kvstore() - uvicorn.run( - app="src:app", - host="0.0.0.0", # noqa: S104 - port=int(Config.port), - workers=Config.workers_amount, - ) + parsed_args.entrypoint() + + return 0 + + +if __name__ == "__main__": + sys.exit(main(sys.argv[1:])) diff --git a/packages/examples/cvat/recording-oracle/debug.py b/packages/examples/cvat/recording-oracle/debug.py index 5d21b1029f..1d6fe702cf 100644 --- a/packages/examples/cvat/recording-oracle/debug.py +++ b/packages/examples/cvat/recording-oracle/debug.py @@ -4,6 +4,8 @@ from logging import Logger from pathlib import PurePosixPath from unittest import mock +import inspect +import uuid import uvicorn @@ -12,6 +14,7 @@ from src.services import cloud from src.services.cloud import BucketAccessInfo from src.utils.logging import format_sequence, get_function_logger +from src.utils.time import utcnow @contextmanager @@ -109,6 +112,33 @@ def patched_get_available_webhook_types(chain_id, escrow_address): d[Config.localhost.exchange_oracle_address.lower()] = OracleWebhookTypes.exchange_oracle return d + from src.services.webhook import inbox as original_inbox + + class PatchedInbox: + def __init__(self): + pass + + def __getattr__(self, name: str): + return getattr(original_inbox, name) + + def create_webhook( + self, + session, + escrow_address, + chain_id, + type: OracleWebhookTypes, + signature = None, + event_type = None, + event_data = None, + event = None, + ): + if signature in OracleWebhookTypes: + signature = f"{type.value}-{utcnow().isoformat(sep='T')}-{uuid.uuid4()}" + + _orig_params = inspect.signature(original_inbox.create_webhook).parameters + _args = {k: v for k, v in locals().items() if k in _orig_params} + return original_inbox.create_webhook(**_args) + with ( mock.patch("src.schemas.webhook.validate_address", lambda x: x), mock.patch( @@ -119,6 +149,7 @@ def patched_get_available_webhook_types(chain_id, escrow_address): "src.endpoints.webhook.validate_oracle_webhook_signature", patched_validate_oracle_webhook_signature, ), + mock.patch("src.services.webhook.inbox", PatchedInbox()) ): yield From ca3c67d7822d2f3fc72bb48048fd91d1a0f614e0 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 15:05:13 +0300 Subject: [PATCH 07/20] Update build deps in ro --- packages/examples/cvat/recording-oracle/poetry.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/examples/cvat/recording-oracle/poetry.lock b/packages/examples/cvat/recording-oracle/poetry.lock index 52dd95468a..22d120c3b0 100644 --- a/packages/examples/cvat/recording-oracle/poetry.lock +++ b/packages/examples/cvat/recording-oracle/poetry.lock @@ -1124,13 +1124,13 @@ files = [ [[package]] name = "distlib" -version = "0.3.8" +version = "0.4.0" description = "Distribution utilities" optional = false python-versions = "*" files = [ - {file = "distlib-0.3.8-py2.py3-none-any.whl", hash = "sha256:034db59a0b96f8ca18035f36290806a9a6e6bd9d1ff91e45a7f172eb17e51784"}, - {file = "distlib-0.3.8.tar.gz", hash = "sha256:1530ea13e350031b6312d8580ddb6b27a104275a31106523b8f123787f494f64"}, + {file = "distlib-0.4.0-py2.py3-none-any.whl", hash = "sha256:9659f7d87e46584a30b5780e43ac7a2143098441670ff0a49d5f9034c54a6c16"}, + {file = "distlib-0.4.0.tar.gz", hash = "sha256:feec40075be03a04501a973d81f633735b4b69f98b05450592310c0f401a4e0d"}, ] [[package]] From c98d220f49d24af4906b97749fcf6083a4b7cfbd Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 15:11:22 +0300 Subject: [PATCH 08/20] revert extra change --- .../examples/cvat/exchange-oracle/debug.py | 39 ++++--------------- 1 file changed, 7 insertions(+), 32 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/debug.py b/packages/examples/cvat/exchange-oracle/debug.py index 4efb07e978..c04a0ac439 100644 --- a/packages/examples/cvat/exchange-oracle/debug.py +++ b/packages/examples/cvat/exchange-oracle/debug.py @@ -270,31 +270,7 @@ def apply_local_development_patches() -> Generator[None, None, None]: yield - -def run_server(): - uvicorn.run( - app="src:app", - host="0.0.0.0", # noqa: S104 - port=int(Config.port), - workers=Config.workers_amount, - ) - - -# def fix_escrow(): -# from src.handlers.job_fixing import entrypoint - -# return entrypoint() - - -import sys -from argparse import ArgumentParser - - -def main(args: list[str] | None = None) -> int: - parser = ArgumentParser() - parser.add_argument("-e", "--entrypoint", default=run_server) - parsed_args = parser.parse_args(args) - +if __name__ == "__main__": with ExitStack() as es: is_dev = Config.environment == "development" if is_dev: @@ -303,10 +279,9 @@ def main(args: list[str] | None = None) -> int: Config.validate() register_in_kvstore() - parsed_args.entrypoint() - - return 0 - - -if __name__ == "__main__": - sys.exit(main(sys.argv[1:])) + uvicorn.run( + app="src:app", + host="0.0.0.0", # noqa: S104 + port=int(Config.port), + workers=Config.workers_amount, + ) From 8c2f5ae3548486d2d3a4210d1153c7b7af1b8a14 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 15:53:54 +0300 Subject: [PATCH 09/20] Update tests --- .../tests/integration/services/test_cvat.py | 85 ------------------- 1 file changed, 85 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/tests/integration/services/test_cvat.py b/packages/examples/cvat/exchange-oracle/tests/integration/services/test_cvat.py index af25b2fb43..0863ae43d6 100644 --- a/packages/examples/cvat/exchange-oracle/tests/integration/services/test_cvat.py +++ b/packages/examples/cvat/exchange-oracle/tests/integration/services/test_cvat.py @@ -17,7 +17,6 @@ ) from src.db import SessionLocal from src.models.cvat import Assignment, DataUpload, Image, Job, Project, Task, User -from src.utils.time import utcnow from tests.utils.db_helper import ( create_project, @@ -347,90 +346,6 @@ def test_get_projects_by_status(self): assert len(projects) == 1 - def test_get_available_projects(self): - cvat_id_1 = 456 - (cvat_project, cvat_task, cvat_job) = create_project_task_and_job( - self.session, "0x86e83d346041E8806e352681f3F14549C0d2BC67", cvat_id_1 - ) - - projects = cvat_service.get_available_projects(self.session) - - assert len(projects) == 1 - - cvat_id_2 = 457 - (cvat_project, cvat_task) = create_project_and_task( - self.session, "0x86e83d346041E8806e352681f3F14549C0d2BC68", cvat_id_2 - ) - - cvat_task_id = cvat_task.cvat_id - cvat_project_id = cvat_project.cvat_id - - cvat_service.create_job( - session=self.session, - cvat_id=cvat_id_2, - cvat_task_id=cvat_task_id, - cvat_project_id=cvat_project_id, - status=JobStatuses.in_progress, - start_frame=0, - stop_frame=1, - ) - - cvat_id_3 = 458 - (cvat_project, cvat_task, _) = create_project_task_and_job( - self.session, "0x86e83d346041E8806e352681f3F14549C0d2BC69", cvat_id_3 - ) - - projects = cvat_service.get_available_projects(self.session) - assert len(projects) == 2 - assert any(project.cvat_id == cvat_id_1 for project in projects) - assert any(project.cvat_id == cvat_id_3 for project in projects) - - def test_get_projects_by_assignee(self): - wallet_address_1 = "0x86e83d346041E8806e352681f3F14549C0d2BC60" - cvat_id_1 = 456 - - create_project_task_and_job( - self.session, "0x86e83d346041E8806e352681f3F14549C0d2BC67", cvat_id_1 - ) - - user = User(wallet_address=wallet_address_1, cvat_id=cvat_id_1, cvat_email="test@hmt.ai") - self.session.add(user) - - cvat_service.create_assignment( - session=self.session, - wallet_address=wallet_address_1, - cvat_job_id=cvat_id_1, - expires_at=datetime.now() + timedelta(days=1), - ) - - wallet_address_2 = "0x86e83d346041E8806e352681f3F14549C0d2BC61" - cvat_id_2 = 457 - - create_project_task_and_job( - self.session, "0x86e83d346041E8806e352681f3F14549C0d2BC68", cvat_id_2 - ) - - user = User(wallet_address=wallet_address_2, cvat_id=cvat_id_2, cvat_email="test2@hmt.ai") - self.session.add(user) - - cvat_service.create_assignment( - session=self.session, - wallet_address=wallet_address_2, - cvat_job_id=cvat_id_2, - expires_at=utcnow(), - ) - - projects = cvat_service.get_projects_by_assignee(self.session, wallet_address_1) - - assert len(projects) == 1 - assert projects[0].cvat_id == cvat_id_1 - - projects = cvat_service.get_projects_by_assignee(self.session, wallet_address_2) - - assert ( - len(projects) == 0 - ) # expired should not be shown, https://github.com/humanprotocol/human-protocol/pull/1879 - def test_update_project_status(self): cvat_id = 1 cvat_cloudstorage_id = 1 From a7991b0f1e9aa784c21118a0a473e1d56b46a55e Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 16:00:43 +0300 Subject: [PATCH 10/20] fix imports --- .../examples/cvat/exchange-oracle/debug.py | 19 +++++++++---------- .../examples/cvat/recording-oracle/debug.py | 14 +++++++------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/debug.py b/packages/examples/cvat/exchange-oracle/debug.py index c04a0ac439..566dc74416 100644 --- a/packages/examples/cvat/exchange-oracle/debug.py +++ b/packages/examples/cvat/exchange-oracle/debug.py @@ -1,5 +1,7 @@ import datetime +import inspect import json +import uuid from collections.abc import Generator from contextlib import ExitStack, contextmanager from logging import Logger @@ -7,8 +9,6 @@ from typing import Any from unittest import mock -import inspect -import uuid import uvicorn from httpx import URL @@ -31,9 +31,7 @@ def _mock_cvat_cloud_storage_params(logger: Logger) -> Generator[None, None, Non def patched_make_cvat_cloud_storage_params(bucket_info: BucketAccessInfo) -> dict: original_host_url = bucket_info.host_url - if Config.development_config.cvat_in_docker and ( - "localhost" in original_host_url or "127.0.0.1" in original_host_url - ): + if Config.development_config.cvat_in_docker: bucket_info.host_url = str( URL(original_host_url).copy_with( host=Config.development_config.exchange_oracle_host @@ -155,10 +153,10 @@ def create_webhook( escrow_address, chain_id, type: OracleWebhookTypes, - signature = None, - event_type = None, - event_data = None, - event = None, + signature=None, + event_type=None, + event_data=None, + event=None, ): if signature in OracleWebhookTypes: signature = f"{type.value}-{utcnow().isoformat(sep='T')}-{uuid.uuid4()}" @@ -177,7 +175,7 @@ def create_webhook( "src.endpoints.webhook.validate_oracle_webhook_signature", patched_validate_oracle_webhook_signature, ), - mock.patch("src.services.webhook.inbox", PatchedInbox()) + mock.patch("src.services.webhook.inbox", PatchedInbox()), ): yield @@ -270,6 +268,7 @@ def apply_local_development_patches() -> Generator[None, None, None]: yield + if __name__ == "__main__": with ExitStack() as es: is_dev = Config.environment == "development" diff --git a/packages/examples/cvat/recording-oracle/debug.py b/packages/examples/cvat/recording-oracle/debug.py index 1d6fe702cf..8bf711ecd6 100644 --- a/packages/examples/cvat/recording-oracle/debug.py +++ b/packages/examples/cvat/recording-oracle/debug.py @@ -1,11 +1,11 @@ import datetime +import inspect +import uuid from collections.abc import Generator from contextlib import ExitStack, contextmanager from logging import Logger from pathlib import PurePosixPath from unittest import mock -import inspect -import uuid import uvicorn @@ -127,10 +127,10 @@ def create_webhook( escrow_address, chain_id, type: OracleWebhookTypes, - signature = None, - event_type = None, - event_data = None, - event = None, + signature=None, + event_type=None, + event_data=None, + event=None, ): if signature in OracleWebhookTypes: signature = f"{type.value}-{utcnow().isoformat(sep='T')}-{uuid.uuid4()}" @@ -149,7 +149,7 @@ def create_webhook( "src.endpoints.webhook.validate_oracle_webhook_signature", patched_validate_oracle_webhook_signature, ), - mock.patch("src.services.webhook.inbox", PatchedInbox()) + mock.patch("src.services.webhook.inbox", PatchedInbox()), ): yield From 3e2e7bb01a39e82d2c36a5b5321500ee37c2f032 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 16:03:30 +0300 Subject: [PATCH 11/20] Update description --- packages/examples/cvat/recording-oracle/debug.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/examples/cvat/recording-oracle/debug.py b/packages/examples/cvat/recording-oracle/debug.py index 8bf711ecd6..e250c6974e 100644 --- a/packages/examples/cvat/recording-oracle/debug.py +++ b/packages/examples/cvat/recording-oracle/debug.py @@ -90,6 +90,8 @@ def _mock_webhook_signature_checking(_: Logger) -> Generator[None, None, None]: - from exchange oracle - signed with Config.localhost.exchange_oracle_address or with signature "exchange_oracle" + + is optional. """ from src.chain.escrow import ( From e9d5869f9643be861b38594603a2dd0095cc5141 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 16:29:39 +0300 Subject: [PATCH 12/20] Split JobStatuses into CvatJobStatuses and EO JobStatuses --- packages/examples/cvat/exchange-oracle/src/core/types.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/examples/cvat/exchange-oracle/src/core/types.py b/packages/examples/cvat/exchange-oracle/src/core/types.py index 2b67fe9fab..b22c92986c 100644 --- a/packages/examples/cvat/exchange-oracle/src/core/types.py +++ b/packages/examples/cvat/exchange-oracle/src/core/types.py @@ -32,6 +32,12 @@ class TaskStatuses(str, Enum, metaclass=BetterEnumMeta): class JobStatuses(str, Enum, metaclass=BetterEnumMeta): + new = "new" + in_progress = "in progress" + completed = "completed" + + +class CvatJobStatuses(str, Enum, metaclass=BetterEnumMeta): new = "new" in_progress = "in progress" rejected = "rejected" From d57eeceef10eb3a862332b05a900c3bb2b9f6d00 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 16:41:40 +0300 Subject: [PATCH 13/20] Refactor cvat enums --- .../cvat/exchange-oracle/src/core/types.py | 20 ------------------- .../exchange-oracle/src/cvat/api_calls.py | 20 +++++++++++++++++++ .../src/handlers/cvat_events.py | 11 +++++----- .../src/handlers/job_creation.py | 14 ++++++------- 4 files changed, 33 insertions(+), 32 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/src/core/types.py b/packages/examples/cvat/exchange-oracle/src/core/types.py index b22c92986c..20a04c0b59 100644 --- a/packages/examples/cvat/exchange-oracle/src/core/types.py +++ b/packages/examples/cvat/exchange-oracle/src/core/types.py @@ -10,12 +10,6 @@ class Networks(int, Enum, metaclass=BetterEnumMeta): localhost = Config.localhost.chain_id -class CvatEventTypes(str, Enum, metaclass=BetterEnumMeta): - update_job = "update:job" - create_job = "create:job" - ping = "ping" - - class ProjectStatuses(str, Enum, metaclass=BetterEnumMeta): creation = "creation" annotation = "annotation" @@ -37,13 +31,6 @@ class JobStatuses(str, Enum, metaclass=BetterEnumMeta): completed = "completed" -class CvatJobStatuses(str, Enum, metaclass=BetterEnumMeta): - new = "new" - in_progress = "in progress" - rejected = "rejected" - completed = "completed" - - class TaskTypes(str, Enum, metaclass=BetterEnumMeta): image_label_binary = "image_label_binary" image_points = "image_points" @@ -53,13 +40,6 @@ class TaskTypes(str, Enum, metaclass=BetterEnumMeta): image_polygons = "image_polygons" -class CvatLabelTypes(str, Enum, metaclass=BetterEnumMeta): - tag = "tag" - points = "points" - rectangle = "rectangle" - polygon = "polygon" - - class OracleWebhookTypes(str, Enum, metaclass=BetterEnumMeta): exchange_oracle = "exchange_oracle" job_launcher = "job_launcher" diff --git a/packages/examples/cvat/exchange-oracle/src/cvat/api_calls.py b/packages/examples/cvat/exchange-oracle/src/cvat/api_calls.py index 02f442f4c2..5f9f780043 100644 --- a/packages/examples/cvat/exchange-oracle/src/cvat/api_calls.py +++ b/packages/examples/cvat/exchange-oracle/src/cvat/api_calls.py @@ -38,6 +38,26 @@ class RequestStatus(str, Enum, metaclass=BetterEnumMeta): FAILED = "Failed" +class JobStatus(str, Enum, metaclass=BetterEnumMeta): + new = "new" + in_progress = "in progress" + rejected = "rejected" + completed = "completed" + + +class LabelType(str, Enum, metaclass=BetterEnumMeta): + tag = "tag" + points = "points" + rectangle = "rectangle" + polygon = "polygon" + + +class WebhookEventType(str, Enum, metaclass=BetterEnumMeta): + update_job = "update:job" + create_job = "create:job" + ping = "ping" + + def _request_annotations(endpoint: Endpoint, cvat_id: int, format_name: str) -> str: """ Requests annotations export. diff --git a/packages/examples/cvat/exchange-oracle/src/handlers/cvat_events.py b/packages/examples/cvat/exchange-oracle/src/handlers/cvat_events.py index 0378f6c9b4..6ec2699dc1 100644 --- a/packages/examples/cvat/exchange-oracle/src/handlers/cvat_events.py +++ b/packages/examples/cvat/exchange-oracle/src/handlers/cvat_events.py @@ -4,10 +4,11 @@ import src.models.cvat as models import src.services.cvat as cvat_service from src import db -from src.core.types import AssignmentStatuses, CvatEventTypes, JobStatuses, ProjectStatuses +from src.core.types import AssignmentStatuses, JobStatuses, ProjectStatuses from src.db import SessionLocal from src.db import errors as db_errors from src.log import ROOT_LOGGER_NAME +from src.schemas.cvat import CvatWebhook from src.utils.logging import get_function_logger module_logger_name = f"{ROOT_LOGGER_NAME}.cron.handler" @@ -167,11 +168,11 @@ def handle_create_job_event(payload: dict) -> None: ) -def cvat_webhook_handler(cvat_webhook: dict) -> None: +def cvat_webhook_handler(cvat_webhook: CvatWebhook) -> None: match cvat_webhook.event: - case CvatEventTypes.update_job.value: + case cvat_api.WebhookEventType.update_job.value: handle_update_job_event(cvat_webhook) - case CvatEventTypes.create_job.value: + case cvat_api.WebhookEventType.create_job.value: handle_create_job_event(cvat_webhook) - case CvatEventTypes.ping.value: + case cvat_api.WebhookEventType.ping.value: pass diff --git a/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py b/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py index 566d69a959..9093257d86 100644 --- a/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py +++ b/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py @@ -33,7 +33,7 @@ from src.chain.escrow import get_escrow_manifest from src.core.config import Config from src.core.storage import compose_data_bucket_filename, compose_data_bucket_prefix -from src.core.types import CvatLabelTypes, TaskStatuses, TaskTypes +from src.core.types import TaskStatuses, TaskTypes from src.db import SessionLocal from src.log import ROOT_LOGGER_NAME from src.models.cvat import Project @@ -54,12 +54,12 @@ module_logger = f"{ROOT_LOGGER_NAME}.cron.cvat" LABEL_TYPE_MAPPING = { - TaskTypes.image_label_binary: CvatLabelTypes.tag, - TaskTypes.image_points: CvatLabelTypes.points, - TaskTypes.image_boxes: CvatLabelTypes.rectangle, - TaskTypes.image_polygons: CvatLabelTypes.polygon, - TaskTypes.image_boxes_from_points: CvatLabelTypes.rectangle, - TaskTypes.image_skeletons_from_boxes: CvatLabelTypes.points, + TaskTypes.image_label_binary: cvat_api.LabelType.tag, + TaskTypes.image_points: cvat_api.LabelType.points, + TaskTypes.image_boxes: cvat_api.LabelType.rectangle, + TaskTypes.image_polygons: cvat_api.LabelType.polygon, + TaskTypes.image_boxes_from_points: cvat_api.LabelType.rectangle, + TaskTypes.image_skeletons_from_boxes: cvat_api.LabelType.points, } DM_DATASET_FORMAT_MAPPING = { From 849da9d6440bc797f63720811ffa20a5181bbbe5 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 16:43:47 +0300 Subject: [PATCH 14/20] Refactor param --- .../cvat/exchange-oracle/src/services/exchange.py | 8 +++++--- .../examples/cvat/exchange-oracle/src/utils/requests.py | 2 +- .../examples/cvat/recording-oracle/src/utils/requests.py | 2 +- 3 files changed, 7 insertions(+), 5 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/src/services/exchange.py b/packages/examples/cvat/exchange-oracle/src/services/exchange.py index 23a7b8e1cc..b7d1bfca4c 100644 --- a/packages/examples/cvat/exchange-oracle/src/services/exchange.py +++ b/packages/examples/cvat/exchange-oracle/src/services/exchange.py @@ -23,7 +23,7 @@ def create_assignment(escrow_address: str, chain_id: Networks, wallet_address: s user = get_or_404( cvat_service.get_user_by_id(session, wallet_address, for_update=True), wallet_address, - "user", + object_type_name="user", ) if cvat_service.has_active_user_assignments( @@ -43,7 +43,7 @@ def create_assignment(escrow_address: str, chain_id: Networks, wallet_address: s session, escrow_address, status_in=[ProjectStatuses.annotation] ), escrow_address, - "job", + object_type_name="job", ) unassigned_job = cvat_service.get_free_job( @@ -91,7 +91,9 @@ class NoAccessError(Exception): async def resign_assignment(assignment_id: str, wallet_address: str) -> None: with SessionLocal.begin() as session: assignments = cvat_service.get_assignments_by_id(session, [assignment_id], for_update=True) - assignment = get_or_404(next(iter(assignments), None), assignment_id, "assignment") + assignment = get_or_404( + next(iter(assignments), None), assignment_id, object_type_name="assignment" + ) # Can only resign from an active assignment in a job # TODO: maybe optimize to a single DB request diff --git a/packages/examples/cvat/exchange-oracle/src/utils/requests.py b/packages/examples/cvat/exchange-oracle/src/utils/requests.py index ef2174f9b9..73519f3947 100644 --- a/packages/examples/cvat/exchange-oracle/src/utils/requests.py +++ b/packages/examples/cvat/exchange-oracle/src/utils/requests.py @@ -9,8 +9,8 @@ def get_or_404( obj: T | None, object_id: V, - object_type_name: str, *, + object_type_name: str, reason: str | None = None, ) -> T: if obj is None: diff --git a/packages/examples/cvat/recording-oracle/src/utils/requests.py b/packages/examples/cvat/recording-oracle/src/utils/requests.py index ef2174f9b9..73519f3947 100644 --- a/packages/examples/cvat/recording-oracle/src/utils/requests.py +++ b/packages/examples/cvat/recording-oracle/src/utils/requests.py @@ -9,8 +9,8 @@ def get_or_404( obj: T | None, object_id: V, - object_type_name: str, *, + object_type_name: str, reason: str | None = None, ) -> T: if obj is None: From dbe8ddd78ab9982001eb2c076c0be9ece735fbbc Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 17:12:17 +0300 Subject: [PATCH 15/20] - fix potential deadlock in /resign; - use job status instead of assignment to sync (fix possible double assignment); - fix CVAT job (un-)assignment races between /assign, /resign, cvat webhook handler and expired assignment status tracker - cvat webhook handler will only check the last assignment for better performance - optimized some db requests for jobs and assignments --- .../src/crons/cvat/state_trackers.py | 56 ++++--- .../src/handlers/cvat_events.py | 138 +++++++++--------- .../cvat/exchange-oracle/src/services/cvat.py | 44 ++++-- .../exchange-oracle/src/services/exchange.py | 23 ++- 4 files changed, 158 insertions(+), 103 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/src/crons/cvat/state_trackers.py b/packages/examples/cvat/exchange-oracle/src/crons/cvat/state_trackers.py index 04252e82bc..213d129a7a 100644 --- a/packages/examples/cvat/exchange-oracle/src/crons/cvat/state_trackers.py +++ b/packages/examples/cvat/exchange-oracle/src/crons/cvat/state_trackers.py @@ -49,8 +49,23 @@ def track_assignments(logger: logging.Logger) -> None: Tracks assignments: 1. Checks time for each active assignment 2. If an assignment is timed out, expires it - 3. If a project or task state is not "annotation", cancels assignments + 3. If an assignment is canceled, resets it + 4. If a project or task state is not "annotation", cancels assignments """ + + def _reset_job_after_assignment(session: Session, assignment: cvat_models.Assignment): + latest_assignment = cvat_service.get_latest_assignment_by_cvat_job_id( + session, assignment.cvat_job_id + ) + if latest_assignment.id == assignment.id: + # Avoid un-assigning if it's not the latest assignment + + cvat_api.update_job_assignee( + assignment.cvat_job_id, assignee_id=None + ) # note that calling it in a loop can take too much time + + cvat_service.update_job_status(session, assignment.job.id, status=JobStatuses.new) + with SessionLocal.begin() as session: assignments = cvat_service.get_unprocessed_expired_assignments( session, @@ -67,17 +82,27 @@ def track_assignments(logger: logging.Logger) -> None: ) ) - latest_assignment = cvat_service.get_latest_assignment_by_cvat_job_id( - session, assignment.cvat_job_id - ) - if latest_assignment.id == assignment.id: - # Avoid un-assigning if it's not the latest assignment + cvat_service.expire_assignment(session, assignment.id) + _reset_job_after_assignment(session, assignment) - cvat_api.update_job_assignee( - assignment.cvat_job_id, assignee_id=None - ) # note that calling it in a loop can take too much time + cvat_service.touch(session, cvat_models.Job, [a.job.id for a in assignments]) - cvat_service.expire_assignment(session, assignment.id) + with SessionLocal.begin() as session: + assignments = cvat_service.get_unprocessed_cancelled_assignments( + session, + limit=CronConfig.track_assignments_chunk_size, + for_update=ForUpdateParams(skip_locked=True), + ) + + for assignment in assignments: + logger.info( + "Finalizing the canceled assignment {} (user {}, job id {})".format( + assignment.id, + assignment.user_wallet_address, + assignment.cvat_job_id, + ) + ) + _reset_job_after_assignment(session, assignment) cvat_service.touch(session, cvat_models.Job, [a.job.id for a in assignments]) @@ -99,17 +124,8 @@ def track_assignments(logger: logging.Logger) -> None: ) ) - latest_assignment = cvat_service.get_latest_assignment_by_cvat_job_id( - session, assignment.cvat_job_id - ) - if latest_assignment.id == assignment.id: - # Avoid un-assigning if it's not the latest assignment - - cvat_api.update_job_assignee( - assignment.cvat_job_id, assignee_id=None - ) # note that calling it in a loop can take too much time - cvat_service.cancel_assignment(session, assignment.id) + _reset_job_after_assignment(session, assignment) cvat_service.touch(session, cvat_models.Job, [a.job.id for a in assignments]) diff --git a/packages/examples/cvat/exchange-oracle/src/handlers/cvat_events.py b/packages/examples/cvat/exchange-oracle/src/handlers/cvat_events.py index 6ec2699dc1..ecec61ff7e 100644 --- a/packages/examples/cvat/exchange-oracle/src/handlers/cvat_events.py +++ b/packages/examples/cvat/exchange-oracle/src/handlers/cvat_events.py @@ -17,6 +17,11 @@ def handle_update_job_event(payload: dict) -> None: logger = get_function_logger(module_logger_name) + if "state" not in payload.before_update: + return + + new_cvat_status = cvat_api.JobStatus(payload.job["state"]) + with SessionLocal.begin() as session: job_id = payload.job["id"] jobs = cvat_service.get_jobs_by_cvat_id(session, [job_id], for_update=True) @@ -28,77 +33,80 @@ def handle_update_job_event(payload: dict) -> None: job = jobs[0] - if "state" in payload.before_update: - job_assignments = job.assignments - new_status = JobStatuses(payload.job["state"]) + if job.status != JobStatuses.in_progress: + logger.warning( + f"Received a job update webhook for a job id {job_id} " + f"in the status {job.status}, ignoring " + ) + return - if not job_assignments: - logger.warning( - f"Received job #{job.cvat_id} status update: {new_status.value}. " - "No assignments for this job, ignoring the update" - ) - else: - webhook_time = parse_aware_datetime(payload.job["updated_date"]) - webhook_assignee_id = (payload.job["assignee"] or {}).get("id") + # ignore updates for any assignments except the last one + latest_assignment = cvat_service.get_latest_assignment_by_cvat_job_id( + session, job_id, for_update=True + ) + if not latest_assignment: + logger.warning( + f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. " + "No assignments for this job, ignoring the update" + ) + return - job_assignments: list[models.Assignment] = sorted( - job_assignments, key=lambda a: a.created_at, reverse=True - ) - latest_assignment = job.assignments[0] - matching_assignment = next( - ( - a - for a in job_assignments - if a.user.cvat_id == webhook_assignee_id - if a.created_at < webhook_time - ), - None, - ) + webhook_time = parse_aware_datetime(payload.job["updated_date"]) + webhook_assignee_id = (payload.job["assignee"] or {}).get("id") + + matching_assignment = next( + ( + a + for a in [latest_assignment] + if a.user.cvat_id == webhook_assignee_id + if a.created_at < webhook_time + ), + None, + ) - if not matching_assignment: - logger.warning( - f"Received job #{job.cvat_id} status update: {new_status.value}. " - "Can't find a matching assignment, ignoring the update" - ) - elif matching_assignment.is_finished: - if matching_assignment.status == AssignmentStatuses.created: - logger.warning( - f"Received job #{job.cvat_id} status update: {new_status.value}. " - "Assignment is expired, rejecting the update" - ) - cvat_service.expire_assignment(session, matching_assignment.id) - cvat_service.touch(session, models.Job, [matching_assignment.job.id]) - - if matching_assignment.id == latest_assignment.id: - cvat_api.update_job_assignee(job.cvat_id, assignee_id=None) - - else: - logger.info( - f"Received job #{job.cvat_id} status update: {new_status.value}. " - "Assignment is already finished, ignoring the update" - ) - elif ( - new_status == JobStatuses.completed - and matching_assignment.id == latest_assignment.id - and matching_assignment.status == AssignmentStatuses.created - ): - logger.info( - f"Received job #{job.cvat_id} status update: {new_status.value}. " - "Completing the assignment" - ) - cvat_service.complete_assignment( - session, matching_assignment.id, completed_at=webhook_time - ) - cvat_service.update_job_status(session, job.id, new_status) - cvat_service.touch(session, models.Job, [job.id]) + if not matching_assignment: + logger.warning( + f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. " + "No matching assignment or the assignment is too old, ignoring the update" + ) + elif matching_assignment.is_finished: + if matching_assignment.status == AssignmentStatuses.created: + logger.warning( + f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. " + "Assignment is expired, rejecting the update" + ) + cvat_service.expire_assignment(session, matching_assignment.id) + if matching_assignment.id == latest_assignment.id: cvat_api.update_job_assignee(job.cvat_id, assignee_id=None) + cvat_service.update_job_status(session, job.id, status=JobStatuses.new) - else: - logger.info( - f"Received job #{job.cvat_id} status update: {new_status.value}. " - "Ignoring the update" - ) + cvat_service.touch(session, models.Job, [job.id]) + else: + logger.info( + f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. " + "Assignment is already finished, ignoring the update" + ) + elif ( + new_cvat_status == cvat_api.JobStatus.completed + and matching_assignment.id == latest_assignment.id + and matching_assignment.is_finished == False + ): + logger.info( + f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. " + "Completing the assignment" + ) + cvat_service.complete_assignment( + session, matching_assignment.id, completed_at=webhook_time + ) + cvat_api.update_job_assignee(job.cvat_id, assignee_id=None) + cvat_service.update_job_status(session, job.id, status=JobStatuses.completed) + cvat_service.touch(session, models.Job, [job.id]) + else: + logger.info( + f"Received job #{job.cvat_id} status update: {new_cvat_status.value}. " + "Ignoring the update" + ) def handle_create_job_event(payload: dict) -> None: diff --git a/packages/examples/cvat/exchange-oracle/src/services/cvat.py b/packages/examples/cvat/exchange-oracle/src/services/cvat.py index 06f3ef5aa7..09768227dd 100644 --- a/packages/examples/cvat/exchange-oracle/src/services/cvat.py +++ b/packages/examples/cvat/exchange-oracle/src/services/cvat.py @@ -12,6 +12,7 @@ from sqlalchemy import delete, func, literal, select, update from sqlalchemy.dialects.postgresql import insert from sqlalchemy.orm import Session +from sqlalchemy.sql.functions import coalesce from src.core.types import ( AssignmentStatuses, @@ -761,7 +762,7 @@ def get_free_job( for_update: bool | ForUpdateParams = False, ) -> Job | None: """ - Returns the first available job that wasn't previously assigned to that user_walled_address. + Returns the first available job that wasn't previously assigned to that user_wallet_address. """ return ( _maybe_for_update(session.query(Job), enable=for_update) @@ -772,14 +773,7 @@ def get_free_job( & (Project.status == ProjectStatuses.annotation) ), Job.status == JobStatuses.new, - ~Job.assignments.any( - ( - (Assignment.status == AssignmentStatuses.created.value) - & (Assignment.completed_at == None) - & (utcnow() < Assignment.expires_at) - ) - | (Assignment.user_wallet_address == user_wallet_address), - ), + ~Job.assignments.any(Assignment.user_wallet_address == user_wallet_address), ) .first() ) @@ -881,13 +875,28 @@ def get_unprocessed_expired_assignments( ) +def get_unprocessed_cancelled_assignments( + session: Session, *, limit: int = 10, for_update: bool | ForUpdateParams = False +) -> list[Assignment]: + return ( + _maybe_for_update(session.query(Assignment), enable=for_update) + .where( + (Assignment.job.has(Job.status == JobStatuses.in_progress.value)) + & (Assignment.status == AssignmentStatuses.canceled.value) + ) + .limit(limit) + .all() + ) + + def get_active_assignments( session: Session, *, limit: int = 10, for_update: bool | ForUpdateParams = False ) -> list[Assignment]: return ( _maybe_for_update(session.query(Assignment), enable=for_update) .where( - (Assignment.status == AssignmentStatuses.created.value) + (Assignment.job.has(Job.status == JobStatuses.in_progress.value)) + & (Assignment.status == AssignmentStatuses.created.value) & (Assignment.completed_at == None) & (Assignment.expires_at <= utcnow()) ) @@ -1020,7 +1029,14 @@ def touch( if time is None: time = utcnow() - session.execute(update(cls).where(cls.id.in_(ids)).values({cls.updated_at: time})) + session.execute( + update(cls) + .where( + cls.id.in_(ids), + coalesce(cls.updated_at, datetime.min) < time, + ) + .values({cls.updated_at: time}) + ) if touch_parents: touch_parent_objects(session, cls, ids, time=time) @@ -1033,6 +1049,9 @@ def touch_parent_objects( *, time: datetime | None = None, ): + if time is None: + time = utcnow() + while issubclass(cls, ChildOf): parent_cls = cls.parent_cls foreign_key_column = next(iter(cls.parent.property.local_columns)) @@ -1044,7 +1063,8 @@ def touch_parent_objects( select(foreign_key_column) .where(cls.id.in_(ids)) .where(foreign_key_column.is_not(None)) - ) + ), + coalesce(parent_cls.updated_at, datetime.min) < time, ) .values({parent_cls.updated_at: time}) .returning(parent_cls.id) diff --git a/packages/examples/cvat/exchange-oracle/src/services/exchange.py b/packages/examples/cvat/exchange-oracle/src/services/exchange.py index b7d1bfca4c..7d514597a0 100644 --- a/packages/examples/cvat/exchange-oracle/src/services/exchange.py +++ b/packages/examples/cvat/exchange-oracle/src/services/exchange.py @@ -1,9 +1,11 @@ +from contextlib import suppress from datetime import timedelta import src.cvat.api_calls as cvat_api import src.services.cvat as cvat_service -from src.core.types import Networks, ProjectStatuses, TaskTypes +from src.core.types import JobStatuses, Networks, ProjectStatuses, TaskTypes from src.db import SessionLocal +from src.db.utils import ForUpdateParams from src.models.cvat import Job from src.utils.assignments import get_default_assignment_timeout from src.utils.requests import get_or_404 @@ -51,7 +53,7 @@ def create_assignment(escrow_address: str, chain_id: Networks, wallet_address: s escrow_address=escrow_address, chain_id=chain_id.value, user_wallet_address=wallet_address, - for_update=True, + for_update=ForUpdateParams(skip_locked=True), # lock the job to be able to make a rollback if CVAT requests fail # can potentially be optimized to make less DB requests # and rely only on assignment expiration @@ -73,6 +75,7 @@ def create_assignment(escrow_address: str, chain_id: Networks, wallet_address: s ), ) + cvat_service.update_job_status(session, unassigned_job.id, status=JobStatuses.in_progress) cvat_service.touch(session, Job, [unassigned_job.id]) with cvat_api.api_client_context(cvat_api.get_api_client()): @@ -105,12 +108,20 @@ async def resign_assignment(assignment_id: str, wallet_address: str) -> None: raise NoAccessError last_job_assignment = cvat_service.get_latest_assignment_by_cvat_job_id( - session, assignment.cvat_job_id, for_update=True + session, + assignment.cvat_job_id, + for_update=ForUpdateParams(skip_locked=True), ) - if assignment.id != last_job_assignment.id: + if not last_job_assignment or assignment.id != last_job_assignment.id: raise NoAccessError cvat_service.cancel_assignment(session, assignment_id) - job = assignment.job - cvat_service.touch(session, Job, [job.id]) # project|task rows are locked for update + # Try to update the status, but don't insist + with suppress(cvat_api.exceptions.ApiException): + cvat_api.update_job_assignee(assignment.cvat_job_id, assignee_id=None) + + # Update the job only if assignee was unset + cvat_service.update_job_status(session, assignment.job.id, status=JobStatuses.new) + + cvat_service.touch(session, Job, [assignment.job.id]) From ad8b9e02364f9f998449ced4c67a1a15b385a17b Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 19:53:56 +0300 Subject: [PATCH 16/20] Update container builds and test launch --- .../examples/cvat/exchange-oracle/Dockerfile | 12 ++++++++---- packages/examples/cvat/exchange-oracle/README.md | 16 ++++++++-------- .../exchange-oracle/dockerfiles/test.Dockerfile | 14 +++++++++----- .../examples/cvat/exchange-oracle/pyproject.toml | 1 + .../examples/cvat/exchange-oracle/pytest.ini | 10 ++++++++++ .../examples/cvat/recording-oracle/Dockerfile | 12 ++++++++---- .../recording-oracle/{README.MD => README.md} | 15 ++++++++------- .../recording-oracle/dockerfiles/test.Dockerfile | 14 +++++++++----- .../cvat/recording-oracle/pyproject.toml | 1 + .../examples/cvat/recording-oracle/pytest.ini | 10 ++++++++++ .../services/test_validation_service.py | 6 +++--- 11 files changed, 75 insertions(+), 36 deletions(-) create mode 100644 packages/examples/cvat/exchange-oracle/pytest.ini rename packages/examples/cvat/recording-oracle/{README.MD => README.md} (87%) create mode 100644 packages/examples/cvat/recording-oracle/pytest.ini diff --git a/packages/examples/cvat/exchange-oracle/Dockerfile b/packages/examples/cvat/exchange-oracle/Dockerfile index 86c54f1441..69e547480d 100644 --- a/packages/examples/cvat/exchange-oracle/Dockerfile +++ b/packages/examples/cvat/exchange-oracle/Dockerfile @@ -4,13 +4,17 @@ WORKDIR /app RUN apt-get update -y && \ apt-get install -y jq ffmpeg libsm6 libxext6 && \ - pip install --no-cache poetry + rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache poetry COPY pyproject.toml poetry.lock ./ -RUN poetry config virtualenvs.create false \ - && poetry install --no-interaction --no-ansi --no-root \ - && poetry cache clear pypi --all +RUN --mount=type=cache,target=/root/.cache \ + poetry config virtualenvs.create false && \ + poetry install --no-interaction --no-ansi --no-root + +RUN python -m pip uninstall -y poetry pip COPY . . diff --git a/packages/examples/cvat/exchange-oracle/README.md b/packages/examples/cvat/exchange-oracle/README.md index 7d70eeac82..d6d91ce62b 100644 --- a/packages/examples/cvat/exchange-oracle/README.md +++ b/packages/examples/cvat/exchange-oracle/README.md @@ -18,14 +18,14 @@ For deployment it is required to have PostgreSQL(v14.4) ### Run the oracle locally: -``` +```sh docker compose -f docker-compose.dev.yml up -d ./bin/start_dev.sh ``` or -``` +```sh docker compose -f docker-compose.dev.yml up -d ./bin/start_debug.sh ``` @@ -48,17 +48,17 @@ Example: [Alembic env file](https://github.com/humanprotocol/human-protocol/blob Adding new migration: -``` +```sh alembic revision --autogenerate -m "your-migration-name" ``` Upgrade: -``` +```sh alembic upgrade head ``` Downgrade: -``` +```sh alembic downgrade -{number of migrations} ``` @@ -72,7 +72,7 @@ Available at `/docs` route ### Tests To run tests -``` -docker compose -f docker-compose.test.yml up --build test --attach test --exit-code-from test && \ - docker compose -f docker-compose.test.yml down +```sh +docker compose -p "test" -f docker-compose.test.yml up --build test --attach test --exit-code-from test; \ + docker compose -p "test" -f docker-compose.test.yml down ``` \ No newline at end of file diff --git a/packages/examples/cvat/exchange-oracle/dockerfiles/test.Dockerfile b/packages/examples/cvat/exchange-oracle/dockerfiles/test.Dockerfile index f2341c8d22..eaa436f5d0 100644 --- a/packages/examples/cvat/exchange-oracle/dockerfiles/test.Dockerfile +++ b/packages/examples/cvat/exchange-oracle/dockerfiles/test.Dockerfile @@ -4,16 +4,20 @@ WORKDIR /app RUN apt-get update -y && \ apt-get install -y jq ffmpeg libsm6 libxext6 && \ - pip install --no-cache poetry + rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache poetry COPY pyproject.toml poetry.lock ./ -RUN poetry config virtualenvs.create false \ - && poetry install --no-interaction --no-ansi --no-root \ - && poetry cache clear pypi --all +RUN --mount=type=cache,target=/root/.cache \ + poetry config virtualenvs.create false && \ + poetry install --no-interaction --no-ansi --no-root + +RUN python -m pip uninstall -y poetry pip COPY . . RUN rm -f ./src/.env -CMD ["pytest", "-W", "ignore::DeprecationWarning", "-v"] \ No newline at end of file +CMD ["pytest"] \ No newline at end of file diff --git a/packages/examples/cvat/exchange-oracle/pyproject.toml b/packages/examples/cvat/exchange-oracle/pyproject.toml index 5dd04c1813..2170018394 100644 --- a/packages/examples/cvat/exchange-oracle/pyproject.toml +++ b/packages/examples/cvat/exchange-oracle/pyproject.toml @@ -134,6 +134,7 @@ ignore = [ "ANN001", # | "ANN003", # | "ARG001", # | + "FBT001", # Allow bool-annotated positional args in functions "SLF001", # Allow private attrs access "PLR2004", # Allow magic values "S", # security diff --git a/packages/examples/cvat/exchange-oracle/pytest.ini b/packages/examples/cvat/exchange-oracle/pytest.ini new file mode 100644 index 0000000000..bdf7142e2b --- /dev/null +++ b/packages/examples/cvat/exchange-oracle/pytest.ini @@ -0,0 +1,10 @@ +[pytest] +addopts = --verbose +filterwarnings = + ignore::DeprecationWarning:cvat_sdk.core + ignore::DeprecationWarning:human_protocol_sdk.storage + ignore:Field name \"sort\" shadows:UserWarning:pydantic._internal._fields + +python_files = test_*.py +python_classes = *Test +python_functions = test_* \ No newline at end of file diff --git a/packages/examples/cvat/recording-oracle/Dockerfile b/packages/examples/cvat/recording-oracle/Dockerfile index 86c54f1441..69e547480d 100644 --- a/packages/examples/cvat/recording-oracle/Dockerfile +++ b/packages/examples/cvat/recording-oracle/Dockerfile @@ -4,13 +4,17 @@ WORKDIR /app RUN apt-get update -y && \ apt-get install -y jq ffmpeg libsm6 libxext6 && \ - pip install --no-cache poetry + rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache poetry COPY pyproject.toml poetry.lock ./ -RUN poetry config virtualenvs.create false \ - && poetry install --no-interaction --no-ansi --no-root \ - && poetry cache clear pypi --all +RUN --mount=type=cache,target=/root/.cache \ + poetry config virtualenvs.create false && \ + poetry install --no-interaction --no-ansi --no-root + +RUN python -m pip uninstall -y poetry pip COPY . . diff --git a/packages/examples/cvat/recording-oracle/README.MD b/packages/examples/cvat/recording-oracle/README.md similarity index 87% rename from packages/examples/cvat/recording-oracle/README.MD rename to packages/examples/cvat/recording-oracle/README.md index 55faf0c916..6b412f5081 100644 --- a/packages/examples/cvat/recording-oracle/README.MD +++ b/packages/examples/cvat/recording-oracle/README.md @@ -18,14 +18,14 @@ For deployment it is required to have PostgreSQL(v14.4) ### Run the oracle locally: -``` +```sh docker compose -f docker-compose.dev.yml up -d ./bin/start_dev.sh ``` or -``` +```sh docker compose -f docker-compose.dev.yml up -d ./bin/start_debug.sh ``` @@ -46,17 +46,17 @@ Config file: `/src/config.py` To simplify the process and use `--autogenerate` flag, you need to import a new model to `/alembic/env.py` Adding new migration: -``` +```sh alembic revision --autogenerate -m "your-migration-name" ``` Upgrade: -``` +```sh alembic upgrade head ``` Downgrade: -``` +```sh alembic downgrade -{number of migrations} ``` @@ -69,6 +69,7 @@ Available at `/docs` route ### Tests To run tests -``` -docker compose -f docker-compose.test.yml up --build test --attach test --exit-code-from test +```sh +docker compose -p "test" -f docker-compose.test.yml up --build test --attach test --exit-code-from test; \ + docker compose -p "test" -f docker-compose.test.yml down ``` diff --git a/packages/examples/cvat/recording-oracle/dockerfiles/test.Dockerfile b/packages/examples/cvat/recording-oracle/dockerfiles/test.Dockerfile index 591d0cb769..eaa436f5d0 100644 --- a/packages/examples/cvat/recording-oracle/dockerfiles/test.Dockerfile +++ b/packages/examples/cvat/recording-oracle/dockerfiles/test.Dockerfile @@ -4,16 +4,20 @@ WORKDIR /app RUN apt-get update -y && \ apt-get install -y jq ffmpeg libsm6 libxext6 && \ - pip install --no-cache poetry + rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache poetry COPY pyproject.toml poetry.lock ./ -RUN poetry config virtualenvs.create false \ - && poetry install --no-interaction --no-ansi --no-root \ - && poetry cache clear pypi --all +RUN --mount=type=cache,target=/root/.cache \ + poetry config virtualenvs.create false && \ + poetry install --no-interaction --no-ansi --no-root + +RUN python -m pip uninstall -y poetry pip COPY . . RUN rm -f ./src/.env -CMD ["pytest", "-W", "ignore::DeprecationWarning", "-W", "ignore::RuntimeWarning", "-W", "ignore::UserWarning", "-v"] \ No newline at end of file +CMD ["pytest"] \ No newline at end of file diff --git a/packages/examples/cvat/recording-oracle/pyproject.toml b/packages/examples/cvat/recording-oracle/pyproject.toml index 194543a267..e12f60a4ce 100644 --- a/packages/examples/cvat/recording-oracle/pyproject.toml +++ b/packages/examples/cvat/recording-oracle/pyproject.toml @@ -123,6 +123,7 @@ ignore = [ "ANN001", # | "ANN003", # | "ARG001", # | + "FBT001", # Allow bool-annotated positional args in functions "SLF001", # Allow private attrs access "PLR2004", # Allow magic values "S", # security diff --git a/packages/examples/cvat/recording-oracle/pytest.ini b/packages/examples/cvat/recording-oracle/pytest.ini new file mode 100644 index 0000000000..bdf7142e2b --- /dev/null +++ b/packages/examples/cvat/recording-oracle/pytest.ini @@ -0,0 +1,10 @@ +[pytest] +addopts = --verbose +filterwarnings = + ignore::DeprecationWarning:cvat_sdk.core + ignore::DeprecationWarning:human_protocol_sdk.storage + ignore:Field name \"sort\" shadows:UserWarning:pydantic._internal._fields + +python_files = test_*.py +python_classes = *Test +python_functions = test_* \ No newline at end of file diff --git a/packages/examples/cvat/recording-oracle/tests/integration/services/test_validation_service.py b/packages/examples/cvat/recording-oracle/tests/integration/services/test_validation_service.py index 1ba1e17052..e901f89c14 100644 --- a/packages/examples/cvat/recording-oracle/tests/integration/services/test_validation_service.py +++ b/packages/examples/cvat/recording-oracle/tests/integration/services/test_validation_service.py @@ -103,7 +103,7 @@ def test_create_and_get_validation_result(self): assert vrs[0] == vr -class TestManifestChange: +class ManifestChangeTest: def test_can_handle_lowered_quality_requirements_in_manifest(self, session: Session): escrow_address = ESCROW_ADDRESS chain_id = Networks.localhost @@ -282,7 +282,7 @@ def test_can_handle_lowered_quality_requirements_in_manifest(self, session: Sess ) -class TestValidationLogic: +class ValidationLogicTest: @pytest.mark.parametrize("seed", range(25)) def test_can_change_bad_honeypots_in_jobs(self, session: Session, seed: int): escrow_address = ESCROW_ADDRESS @@ -1134,7 +1134,7 @@ def patched_get_jobs_quality_reports(task_id: int): mock_update_task_validation_layout.assert_not_called() -class TestAnnotationMerging: +class AnnotationMergingTest: def test_can_prepare_final_results_in_validated_escrow(self, session: Session): escrow_address = ESCROW_ADDRESS chain_id = Networks.localhost.value From d21d290c5660335ab7fd07b71a78aa4de4d5f293 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 19:55:27 +0300 Subject: [PATCH 17/20] Fix deprecation message --- .../examples/cvat/exchange-oracle/src/handlers/job_creation.py | 2 +- .../examples/cvat/exchange-oracle/src/handlers/job_export.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py b/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py index 9093257d86..1f94262a3f 100644 --- a/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py +++ b/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py @@ -230,7 +230,7 @@ def _setup_gt_job_for_cvat_task( with TemporaryDirectory() as tmp_dir: export_dir = Path(tmp_dir) / "export" - gt_dataset.export(save_dir=str(export_dir), save_images=False, format=dm_export_format) + gt_dataset.export(save_dir=str(export_dir), save_media=False, format=dm_export_format) annotations_archive_path = Path(tmp_dir) / "annotations.zip" with annotations_archive_path.open("wb") as annotations_archive: diff --git a/packages/examples/cvat/exchange-oracle/src/handlers/job_export.py b/packages/examples/cvat/exchange-oracle/src/handlers/job_export.py index 6b93567965..a3d67c7eb6 100644 --- a/packages/examples/cvat/exchange-oracle/src/handlers/job_export.py +++ b/packages/examples/cvat/exchange-oracle/src/handlers/job_export.py @@ -130,7 +130,7 @@ def _parse_dataset(self, ann_descriptor: FileDescriptor, dataset_dir: str) -> dm return dm.Dataset.import_from(dataset_dir, self.input_format) def _export_dataset(self, dataset: dm.Dataset, output_dir: str): - dataset.export(output_dir, self.output_format, save_images=False) + dataset.export(output_dir, self.output_format, save_media=False) def _process_dataset( self, dataset: dm.Dataset, *, ann_descriptor: FileDescriptor From 7f837a95c8f5f6ba4be0cd3b2d5e9461430caeda Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 19:57:29 +0300 Subject: [PATCH 18/20] Update tests --- .../tests/api/test_cvat_webhook_api.py | 137 ++++++++------ .../tests/api/test_exchange_api.py | 112 ++++++------ .../state_trackers/test_track_assignments.py | 146 +++++++-------- .../tests/integration/services/test_cvat.py | 170 +++++++++++++++--- .../integration/services/test_exchange.py | 35 +++- .../exchange-oracle/tests/utils/setup_cvat.py | 103 +++++++---- 6 files changed, 456 insertions(+), 247 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/tests/api/test_cvat_webhook_api.py b/packages/examples/cvat/exchange-oracle/tests/api/test_cvat_webhook_api.py index 2cfff389c8..3c54224f69 100644 --- a/packages/examples/cvat/exchange-oracle/tests/api/test_cvat_webhook_api.py +++ b/packages/examples/cvat/exchange-oracle/tests/api/test_cvat_webhook_api.py @@ -1,12 +1,15 @@ -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from unittest.mock import patch +import pytest from fastapi.testclient import TestClient from src.core.types import AssignmentStatuses, JobStatuses +from src.utils.time import utcnow +from tests.utils.constants import WALLET_ADDRESS1, WALLET_ADDRESS2 from tests.utils.setup_cvat import ( - add_asignment_to_db, + add_assignment_to_db, add_cvat_job_to_db, add_cvat_project_to_db, add_cvat_task_to_db, @@ -14,20 +17,19 @@ get_cvat_job_from_db, ) -api_url = "http://localhost:8080/api/" +API_URL = "http://localhost:8080/api/" +PING_EVENT_DATA = { + "event": "ping", +} -def test_ping_incoming_webhook(client: TestClient) -> None: - data = { - "event": "ping", - } - signature = generate_cvat_signature(data) +def test_ping_incoming_webhook(client: TestClient) -> None: # Should respond with 200 status to a "ping" event response = client.post( "/cvat-webhook", - headers={"X-Signature-256": signature}, - json=data, + headers={"X-Signature-256": generate_cvat_signature(PING_EVENT_DATA)}, + json=PING_EVENT_DATA, ) assert response.status_code == 200 @@ -36,13 +38,13 @@ def test_ping_incoming_webhook(client: TestClient) -> None: def test_incoming_webhook_200(client: TestClient) -> None: # Create some entities in test DB add_cvat_project_to_db(cvat_id=1) - add_cvat_task_to_db(cvat_id=1, cvat_project_id=1, status="annotation") + add_cvat_task_to_db(cvat_id=1, cvat_project_id=1) # Payload for "create:job" event data = { "event": "create:job", "job": { - "url": api_url + "jobs/1", + "url": API_URL + "jobs/1", "id": 1, "task_id": 1, "project_id": 1, @@ -71,19 +73,30 @@ def test_incoming_webhook_200(client: TestClient) -> None: assert job.cvat_project_id == 1 -def test_incoming_webhook_200_update_expired_assignmets(client: TestClient) -> None: +@pytest.mark.parametrize("is_last_assignment", [True, False]) +def test_incoming_webhook_can_update_expired_assignment( + client: TestClient, is_last_assignment: bool +): + # Check if an "update:job" event can update an expired assignment, + # if the assignment is the last one for the job. Updates to other assignments should be ignored. + add_cvat_project_to_db(cvat_id=1) - add_cvat_task_to_db(cvat_id=1, cvat_project_id=1, status="annotation") - add_cvat_job_to_db(cvat_id=1, cvat_task_id=1, cvat_project_id=1, status="new") - (job, _) = get_cvat_job_from_db(1) - # Check if "update:job" event works with expired assignments - wallet_address = "0x86e83d346041E8806e352681f3F14549C0d2BC68" - add_asignment_to_db(wallet_address, 1, job.cvat_id, datetime.now(tz=timezone.utc)) + add_cvat_task_to_db(cvat_id=1, cvat_project_id=1) + job = add_cvat_job_to_db( + cvat_id=1, cvat_task_id=1, cvat_project_id=1, status=JobStatuses.in_progress + ) + + user_cvat_id = 1 + add_assignment_to_db(WALLET_ADDRESS1, user_cvat_id, job.cvat_id, expires_at=utcnow()) + + if not is_last_assignment: + user_cvat_id += 1 + add_assignment_to_db(WALLET_ADDRESS2, user_cvat_id, job.cvat_id, expires_at=utcnow()) data = { "event": "update:job", "job": { - "url": api_url + "jobs/1", + "url": API_URL + "jobs/1", "id": 1, "task_id": 1, "project_id": 1, @@ -91,44 +104,57 @@ def test_incoming_webhook_200_update_expired_assignmets(client: TestClient) -> N "start_frame": 0, "stop_frame": 1, "assignee": { - "url": api_url + "users/1", - "id": 1, + "url": API_URL + f"users/{user_cvat_id}", + "id": user_cvat_id, }, - "updated_date": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z", + "updated_date": (utcnow() + timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z", }, "before_update": {"state": "new", "assignee": None}, "webhook_id": 1, } - signature = generate_cvat_signature(data) - - with patch("src.handlers.cvat_events.cvat_api"): + with patch("src.handlers.cvat_events.cvat_api.update_job_assignee") as mock_update_job_assignee: response = client.post( "/cvat-webhook", - headers={"X-Signature-256": signature}, + headers={"X-Signature-256": generate_cvat_signature(data)}, json=data, ) assert response.status_code == 200 - (job, asignees) = get_cvat_job_from_db(1) - assert job.status == JobStatuses.new.value - assert asignees[0].status == AssignmentStatuses.expired.value + (job, assignments) = get_cvat_job_from_db(1) + assert job.status == JobStatuses.new + assert assignments[-1].status == AssignmentStatuses.expired + mock_update_job_assignee.assert_called_once_with(job.cvat_id, assignee_id=None) + + if not is_last_assignment: + for assignment in assignments[:-1]: + assert assignment.status == AssignmentStatuses.created -def test_incoming_webhook_200_update(client: TestClient) -> None: +@pytest.mark.parametrize("assignment_status", AssignmentStatuses) +def test_incoming_webhook_can_update_active_assignment( + client: TestClient, assignment_status: AssignmentStatuses +): add_cvat_project_to_db(cvat_id=1) - add_cvat_task_to_db(cvat_id=1, cvat_project_id=1, status="annotation") - add_cvat_job_to_db(cvat_id=1, cvat_task_id=1, cvat_project_id=1, status="new") - (job, _) = get_cvat_job_from_db(1) - # Check if "update:job" event works correctly - wallet_address = "0x86e83d346041E8806e352681f3F14549C0d2BC69" - add_asignment_to_db(wallet_address, 2, job.cvat_id, datetime.now() + timedelta(hours=1)) + add_cvat_task_to_db(cvat_id=1, cvat_project_id=1) + job = add_cvat_job_to_db( + cvat_id=1, cvat_task_id=1, cvat_project_id=1, status=JobStatuses.in_progress + ) + add_assignment_to_db( + WALLET_ADDRESS1, + 1, + job.cvat_id, + status=assignment_status, + expires_at=datetime.now() + if assignment_status == AssignmentStatuses.expired + else datetime.now() + timedelta(hours=1), + ) data = { "event": "update:job", "job": { - "url": api_url + "jobs/1", + "url": API_URL + "jobs/1", "id": 1, "task_id": 1, "project_id": 1, @@ -136,34 +162,33 @@ def test_incoming_webhook_200_update(client: TestClient) -> None: "start_frame": 0, "stop_frame": 1, "assignee": { - "url": api_url + "users/1", - "id": 2, + "url": API_URL + "users/1", + "id": 1, }, - "updated_date": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z", + "updated_date": utcnow().strftime("%Y-%m-%dT%H:%M:%S.%f") + "Z", }, - "before_update": {"state": "new", "assignee": None}, + "before_update": {"state": "in_progress", "assignee": None}, "webhook_id": 1, } - signature = generate_cvat_signature(data) - - with patch("src.handlers.cvat_events.cvat_api"): + with patch("src.handlers.cvat_events.cvat_api.update_job_assignee") as mock_update_job_assignee: response = client.post( "/cvat-webhook", - headers={"X-Signature-256": signature}, + headers={"X-Signature-256": generate_cvat_signature(data)}, json=data, ) assert response.status_code == 200 - (job, asignees) = get_cvat_job_from_db(1) - assert job.status == JobStatuses.completed.value - assert asignees[0].status == AssignmentStatuses.completed.value - - -data = { - "event": "ping", -} + (job, assignments) = get_cvat_job_from_db(1) + if assignment_status == AssignmentStatuses.created: + assert job.status == JobStatuses.completed + assert assignments[0].status == AssignmentStatuses.completed + mock_update_job_assignee.assert_called_once_with(job.cvat_id, assignee_id=None) + else: + assert job.status == JobStatuses.in_progress + assert assignments[0].status == assignment_status + mock_update_job_assignee.assert_not_called() def test_incoming_webhook_401_bad_signature(client: TestClient) -> None: @@ -171,7 +196,7 @@ def test_incoming_webhook_401_bad_signature(client: TestClient) -> None: response = client.post( "/cvat-webhook", headers={"X-Signature-256": "dummy_signature"}, - json=data, + json=PING_EVENT_DATA, ) assert response.status_code == 401 assert response.json() == {"message": "Unauthorized"} @@ -180,7 +205,7 @@ def test_incoming_webhook_401_bad_signature(client: TestClient) -> None: def test_incoming_webhook_401_without_signature(client: TestClient) -> None: response = client.post( "/cvat-webhook", - json=data, + json=PING_EVENT_DATA, ) # Send a request without a signature diff --git a/packages/examples/cvat/exchange-oracle/tests/api/test_exchange_api.py b/packages/examples/cvat/exchange-oracle/tests/api/test_exchange_api.py index ef6e0b02c4..a310398829 100644 --- a/packages/examples/cvat/exchange-oracle/tests/api/test_exchange_api.py +++ b/packages/examples/cvat/exchange-oracle/tests/api/test_exchange_api.py @@ -15,13 +15,14 @@ from sqlalchemy.orm import Session from src.core.config import Config -from src.core.types import AssignmentStatuses, ProjectStatuses, TaskTypes +from src.core.types import AssignmentStatuses, JobStatuses, ProjectStatuses, TaskTypes from src.models.cvat import Assignment, Job, Project, Task, User from src.schemas.exchange import AssignmentStatuses as APIAssignmentStatuses from src.schemas.exchange import JobStatuses as APIJobStatuses -from src.services import cvat +from src.services import cvat as cvat_service from src.utils.time import utcnow +from tests.utils.constants import WALLET_ADDRESS1 from tests.utils.db_helper import ( create_job, create_project, @@ -30,8 +31,6 @@ create_task, ) -escrow_address = "0x12E66A452f95bff49eD5a30b0d06Ebc37C5A94B6" -user_address = "0x86e83d346041E8806e352681f3F14549C0d2BC60" cvat_email = "test@hmt.ai" @@ -68,7 +67,7 @@ def generate_jwt_token( return jwt.encode(data, private_key, algorithm="ES256") -def get_auth_header(token: str = generate_jwt_token(wallet_address=user_address)) -> dict: +def get_auth_header(token: str = generate_jwt_token(wallet_address=WALLET_ADDRESS1)) -> dict: return {"Authorization": f"Bearer {token}"} @@ -148,7 +147,7 @@ def validate_result( session.begin() user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -162,7 +161,7 @@ def validate_result( ) assignment = Assignment( id=str(uuid.uuid4()), - user_wallet_address=user_address, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=cvat_job.cvat_id, expires_at=utcnow() + timedelta(days=1), ) @@ -208,7 +207,7 @@ def test_can_list_jobs_200_without_escrows_in_hidden_states( ) -> None: session.begin() user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -271,7 +270,7 @@ def test_can_list_jobs_200_with_only_one_entry_per_escrow_address_if_several_pro session.begin() user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -303,7 +302,7 @@ def test_can_list_jobs_200_with_only_one_entry_per_escrow_address_if_several_pro def test_can_list_jobs_200_with_fields(client: TestClient, session: Session) -> None: session.begin() user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -314,7 +313,7 @@ def test_can_list_jobs_200_with_fields(client: TestClient, session: Session) -> ) assignment = Assignment( id=str(uuid.uuid4()), - user_wallet_address=user_address, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=cvat_job.cvat_id, expires_at=utcnow() + timedelta(days=1), ) @@ -361,7 +360,7 @@ def test_can_list_jobs_200_with_sorting(client: TestClient, session: Session) -> # sort: ASC, DESC; sort_field: chain_id|job_type|created_at|updated_at session.begin() user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -375,14 +374,14 @@ def test_can_list_jobs_200_with_sorting(client: TestClient, session: Session) -> cvat_project, cvat_task, cvat_job = create_project_task_and_job( session, f"0x86e83d346041E8806e352681f3F14549C0d2BC6{i}", i + 1 ) - cvat.touch(session, Job, [cvat_job.id]) + cvat_service.touch(session, Job, [cvat_job.id]) cvat_projects.append(cvat_project) cvat_tasks.append(cvat_task) cvat_jobs.append(cvat_job) assignment = Assignment( id=str(uuid.uuid4()), - user_wallet_address=user_address, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=cvat_job.cvat_id, expires_at=utcnow() + timedelta(hours=i + 1), status=AssignmentStatuses.created if i % 2 else AssignmentStatuses.completed, @@ -392,7 +391,7 @@ def test_can_list_jobs_200_with_sorting(client: TestClient, session: Session) -> session.commit() last_updated_job = cvat_jobs[1] - cvat.touch(session, Job, [last_updated_job.id]) + cvat_service.touch(session, Job, [last_updated_job.id]) session.commit() assert { @@ -444,7 +443,7 @@ def test_can_list_jobs_200_with_sorting(client: TestClient, session: Session) -> def test_can_list_jobs_200_with_filters(client: TestClient, session: Session): session.begin() user_1 = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -490,7 +489,7 @@ def test_can_list_jobs_200_with_filters(client: TestClient, session: Session): session.add(assignment) assignments.append(assignment) - cvat.touch(session, Job, [cvat_job.id]) + cvat_service.touch(session, Job, [cvat_job.id]) session.commit() # TODO: imitate different created_dates visible_projects_ids = set( @@ -510,7 +509,7 @@ def test_can_list_jobs_200_with_filters(client: TestClient, session: Session): updated_cvat_project_ids = set() for job in cvat_jobs[len(cvat_jobs) // 2 :]: - cvat.touch(session, Job, [job.id]) + cvat_service.touch(session, Job, [job.id]) updated_cvat_project_ids.add(job.task.cvat_project_id) session.commit() @@ -568,7 +567,7 @@ def test_can_list_jobs_200_with_filters(client: TestClient, session: Session): def test_can_list_jobs_200_check_values(client: TestClient, session: Session) -> None: session.begin() user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -584,12 +583,12 @@ def test_can_list_jobs_200_check_values(client: TestClient, session: Session) -> for job in (cvat_second_job, cvat_first_job): assignment = Assignment( id=str(uuid.uuid4()), - user_wallet_address=user_address, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=job.cvat_id, expires_at=utcnow() + timedelta(days=1), ) session.add(assignment) - cvat.touch(session, Job, [job.id]) + cvat_service.touch(session, Job, [job.id]) session.commit() with ( @@ -631,7 +630,7 @@ def test_can_list_jobs_200_without_address(client: TestClient, session: Session) create_project_task_and_job(session, "0x86e83d346041E8806e352681f3F14549C0d2BC69", 3) user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -639,7 +638,7 @@ def test_can_list_jobs_200_without_address(client: TestClient, session: Session) assignment = Assignment( id=str(uuid.uuid4()), - user_wallet_address=user_address, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=cvat_job_1.cvat_id, expires_at=utcnow() + timedelta(days=1), ) @@ -705,15 +704,15 @@ def test_can_register_200(client: TestClient, session: Session) -> None: assert response.status_code == 200 user = response.json() - db_user = session.query(User).where(User.wallet_address == user_address).first() - assert user["wallet_address"] == db_user.wallet_address == user_address + db_user = session.query(User).where(User.wallet_address == WALLET_ADDRESS1).first() + assert user["wallet_address"] == db_user.wallet_address == WALLET_ADDRESS1 assert user["email"] == db_user.cvat_email == cvat_email def test_cannot_register_400_with_duplicated_address(client: TestClient, session: Session) -> None: session.begin() user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -724,7 +723,7 @@ def test_cannot_register_400_with_duplicated_address(client: TestClient, session response = client.post( "/register", headers=get_auth_header( - generate_jwt_token(wallet_address=user_address, email=new_cvat_email) + generate_jwt_token(wallet_address=WALLET_ADDRESS1, email=new_cvat_email) ), ) assert response.status_code == 400 @@ -733,9 +732,9 @@ def test_cannot_register_400_with_duplicated_address(client: TestClient, session def test_cannot_register_400_with_duplicated_user(client: TestClient, session: Session) -> None: session.begin() - new_user_address = "0x86e83d346041E8806e352681f3F14549C0d2BC61" + new_WALLET_ADDRESS1 = "0x86e83d346041E8806e352681f3F14549C0d2BC61" user = User( - wallet_address=new_user_address, + wallet_address=new_WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -747,7 +746,7 @@ def test_cannot_register_400_with_duplicated_user(client: TestClient, session: S ) assert response.status_code == 400 assert response.json() == {"message": "User already exists"} - assert new_user_address != user_address + assert new_WALLET_ADDRESS1 != WALLET_ADDRESS1 def test_cannot_register_401(client: TestClient) -> None: @@ -771,12 +770,13 @@ def test_can_create_assignment_200(client: TestClient, session: Session) -> None session, "0x86e83d346041E8806e352681f3F14549C0d2BC67", 1 ) user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) session.add(user) session.commit() + with ( open("tests/utils/manifest.json") as data, patch("src.endpoints.serializers.get_escrow_manifest") as mock_get_manifest, @@ -815,7 +815,7 @@ def test_can_create_assignment_200(client: TestClient, session: Session) -> None } db_assignment = ( - session.query(Assignment).filter_by(user_wallet_address=user_address).first() + session.query(Assignment).filter_by(user_wallet_address=WALLET_ADDRESS1).first() ) assert assignment["escrow_address"] == cvat_project.escrow_address assert assignment["chain_id"] == cvat_project.chain_id @@ -841,7 +841,7 @@ def test_cannot_create_assignment_401(client: TestClient) -> None: response = client.post( "/assignment", headers=get_auth_header(token) if token else None, - json={"wallet_address": user_address, "cvat_email": cvat_email}, + json={"wallet_address": WALLET_ADDRESS1, "cvat_email": cvat_email}, ) assert response.status_code == 401 @@ -859,7 +859,7 @@ def test_cannot_create_assignment_400_when_has_unfinished_assignments( create_job(session, 2, cvat_task.cvat_id, cvat_project.cvat_id) user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -868,7 +868,7 @@ def test_cannot_create_assignment_400_when_has_unfinished_assignments( assignment = Assignment( created_at=utcnow(), expires_at=utcnow() + timedelta(hours=1), - user_wallet_address=user_address, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=cvat_job1.cvat_id, status=AssignmentStatuses.created.value, ) @@ -892,7 +892,7 @@ def test_cannot_create_assignment_400_when_has_unfinished_assignments( def test_can_list_assignments_200(client: TestClient, session: Session) -> None: session.begin() user_1 = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -1020,7 +1020,7 @@ def test_can_list_assignments_200_with_sorting(client: TestClient, session: Sess # sort: ASC, DESC session.begin() user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -1033,7 +1033,7 @@ def test_can_list_assignments_200_with_sorting(client: TestClient, session: Sess assignment = Assignment( id=str(uuid.uuid4()), - user_wallet_address=user_address, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=cvat_job.cvat_id, expires_at=utcnow() + timedelta(hours=i + 1), status=AssignmentStatuses.created if i % 2 else AssignmentStatuses.completed, @@ -1086,33 +1086,42 @@ def test_can_resign_assignment_200(client: TestClient, session: Session) -> None cvat_project, cvat_task, cvat_job = create_project_task_and_job( session, "0x86e83d346041E8806e352681f3F14549C0d2BC67", 1 ) + cvat_job.status = JobStatuses.in_progress + cvat_job.updated_at = None + user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) assignment = Assignment( id=str(uuid.uuid4()), - user_wallet_address=user_address, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=cvat_job.cvat_id, expires_at=utcnow() + timedelta(hours=1), + status=AssignmentStatuses.created, ) - session.add_all([user, assignment]) + session.add_all([cvat_job, user, assignment]) + session.commit() assert {cvat_job.updated_at, cvat_task.updated_at, cvat_job.updated_at} == {None} - response = client.post( - "/assignment/resign", - headers=get_auth_header(), - json={"assignment_id": assignment.id}, - ) + + with patch("src.services.exchange.cvat_api.update_job_assignee") as mock_update_job_assignee: + response = client.post( + "/assignment/resign", + headers=get_auth_header(), + json={"assignment_id": assignment.id}, + ) + + mock_update_job_assignee.assert_called_once_with(cvat_job.cvat_id, assignee_id=None) assert response.status_code == 200 session.refresh(assignment) assert assignment.status == AssignmentStatuses.canceled - for obj in cvat_project, cvat_task, cvat_job: + for obj in (cvat_project, cvat_task, cvat_job): session.refresh(obj) assert obj.updated_at is not None assert cvat_project.updated_at == cvat_task.updated_at == cvat_job.updated_at @@ -1144,14 +1153,14 @@ def test_cannot_resign_assignment_400_when_assignment_is_finished( session, "0x86e83d346041E8806e352681f3F14549C0d2BC67", 1 ) user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) session.add(user) assignment = Assignment( id=str(uuid.uuid4()), - user_wallet_address=user_address, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=cvat_job_1.cvat_id, expires_at=utcnow() + timedelta(hours=1), status=AssignmentStatuses.completed.value, @@ -1229,7 +1238,7 @@ def test_can_get_assignment_stats_by_worker_200(client: TestClient, session: Ses cvat_jobs.append(cvat_job) user = User( - wallet_address=user_address, + wallet_address=WALLET_ADDRESS1, cvat_email=cvat_email, cvat_id=1, ) @@ -1245,7 +1254,7 @@ def test_can_get_assignment_stats_by_worker_200(client: TestClient, session: Ses ): assignment = Assignment( id=str(uuid.uuid4()), - user_wallet_address=user_address, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=cvat_jobs[i].cvat_id, expires_at=utcnow() + timedelta(hours=1), status=status, @@ -1376,7 +1385,6 @@ def test_can_list_jobs_200_check_updated_at(client: TestClient, session: Session ] session.add_all(users) - utcnow() cvat_project = create_project(session, "0x86e83d346041E8806e352681f3F14549C0d2BC66", 1) cvat_tasks: list[Task] = [] cvat_jobs: list[Job] = [] diff --git a/packages/examples/cvat/exchange-oracle/tests/integration/cron/state_trackers/test_track_assignments.py b/packages/examples/cvat/exchange-oracle/tests/integration/cron/state_trackers/test_track_assignments.py index 2acae81df7..b4b2679dc1 100644 --- a/packages/examples/cvat/exchange-oracle/tests/integration/cron/state_trackers/test_track_assignments.py +++ b/packages/examples/cvat/exchange-oracle/tests/integration/cron/state_trackers/test_track_assignments.py @@ -3,17 +3,15 @@ from datetime import datetime, timedelta from unittest.mock import patch -import pytest -from sqlalchemy import update - from src.core.types import ( AssignmentStatuses, - ProjectStatuses, + JobStatuses, ) from src.crons.cvat.state_trackers import track_assignments from src.db import SessionLocal -from src.models.cvat import Assignment, Project, User +from src.models.cvat import Assignment, Job, User +from tests.utils.constants import ESCROW_ADDRESS, WALLET_ADDRESS1, WALLET_ADDRESS2 from tests.utils.db_helper import create_project_task_and_job @@ -24,128 +22,118 @@ def setUp(self): def tearDown(self): self.session.close() - def test_track_expired_assignments(self): - (_, _, cvat_job) = create_project_task_and_job( - self.session, "0x86e83d346041E8806e352681f3F14549C0d2BC67", 1 - ) - wallet_address_1 = "0x86e83d346041E8806e352681f3F14549C0d2BC67" + def test_can_track_expired_assignments(self): + (_, _, cvat_job) = create_project_task_and_job(self.session, ESCROW_ADDRESS, 1) + cvat_job.status = JobStatuses.in_progress + self.session.add(cvat_job) + user = User( - wallet_address=wallet_address_1, + wallet_address=WALLET_ADDRESS1, cvat_email="test@hmt.ai", cvat_id=1, ) self.session.add(user) - wallet_address_2 = "0x86e83d346041E8806e352681f3F14549C0d2BC68" user = User( - wallet_address=wallet_address_2, + wallet_address=WALLET_ADDRESS2, cvat_email="test2@hmt.ai", cvat_id=2, ) self.session.add(user) - assignment = Assignment( + + assignment1 = Assignment( id=str(uuid.uuid4()), - user_wallet_address=wallet_address_1, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now() + timedelta(days=1), + created_at=datetime.now() - timedelta(hours=2), + expires_at=datetime.now() - timedelta(hours=1), + status=AssignmentStatuses.created, ) - assignment_2 = Assignment( + self.session.add(assignment1) + + assignment2 = Assignment( id=str(uuid.uuid4()), - user_wallet_address=wallet_address_2, + user_wallet_address=WALLET_ADDRESS2, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now() - timedelta(days=1), - created_at=datetime.now() + timedelta(hours=1), + created_at=datetime.now() - timedelta(hours=1), + expires_at=datetime.now(), + status=AssignmentStatuses.created, ) - self.session.add(assignment) - self.session.add(assignment_2) - self.session.commit() + self.session.add(assignment2) - db_assignments = sorted( - self.session.query(Assignment).all(), key=lambda assignment: assignment.user.cvat_id - ) - assert db_assignments[0].status == AssignmentStatuses.created.value - assert db_assignments[1].status == AssignmentStatuses.created.value + self.session.commit() - with patch("src.crons.cvat.state_trackers.cvat_api.update_job_assignee") as mock_cvat_api: + with patch( + "src.crons.cvat.state_trackers.cvat_api.update_job_assignee" + ) as update_job_assignee: track_assignments() - mock_cvat_api.assert_called_once_with(assignment_2.cvat_job_id, assignee_id=None) - self.session.commit() + update_job_assignee.assert_called_once_with(assignment2.cvat_job_id, assignee_id=None) db_assignments = sorted( self.session.query(Assignment).all(), key=lambda assignment: assignment.user.cvat_id ) - assert db_assignments[0].status == AssignmentStatuses.created.value - assert db_assignments[1].status == AssignmentStatuses.expired.value - - @pytest.mark.xfail( - strict=True, - reason=""" -Fix src.crons.cvat.state_trackers.py -Where in `cvat_service.get_active_assignments()` return value will be empty -because it actually looking for the expired assignments -""", - ) - def test_track_canceled_assignments(self): - (_, _, cvat_job) = create_project_task_and_job( - self.session, "0x86e83d346041E8806e352681f3F14549C0d2BC67", 1 - ) - (cvat_project_2, _, cvat_job_2) = create_project_task_and_job( - self.session, "0x86e83d346041E8806e352681f3F14549C0d2BC68", 2 + assert db_assignments[0].status == AssignmentStatuses.expired + assert db_assignments[1].status == AssignmentStatuses.expired + + assert ( + self.session.query(Job).filter(Job.id == cvat_job.id).first().status == JobStatuses.new ) - wallet_address_1 = "0x86e83d346041E8806e352681f3F14549C0d2BC67" + + def test_can_track_canceled_assignments(self): + (_, _, cvat_job) = create_project_task_and_job(self.session, ESCROW_ADDRESS, 1) + cvat_job.status = JobStatuses.in_progress + self.session.add(cvat_job) + user = User( - wallet_address=wallet_address_1, + wallet_address=WALLET_ADDRESS1, cvat_email="test@hmt.ai", cvat_id=1, ) self.session.add(user) - wallet_address_2 = "0x86e83d346041E8806e352681f3F14549C0d2BC68" user = User( - wallet_address=wallet_address_2, + wallet_address=WALLET_ADDRESS2, cvat_email="test2@hmt.ai", cvat_id=2, ) self.session.add(user) - assignment = Assignment( + + assignment1 = Assignment( id=str(uuid.uuid4()), - user_wallet_address=wallet_address_1, + user_wallet_address=WALLET_ADDRESS1, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now() + timedelta(days=1), - ) - assignment_2 = Assignment( - id=str(uuid.uuid4()), - user_wallet_address=wallet_address_2, - cvat_job_id=cvat_job_2.cvat_id, - expires_at=datetime.now() + timedelta(days=1), - created_at=datetime.now() + timedelta(hours=1), + created_at=datetime.now() - timedelta(hours=2), + expires_at=datetime.now() - timedelta(hours=1), + status=AssignmentStatuses.canceled, ) - self.session.add(assignment) - self.session.add(assignment_2) + self.session.add(assignment1) - self.session.execute( - update(Project) - .where(Project.id == cvat_project_2.id) - .values(status=ProjectStatuses.completed.value) + assignment2 = Assignment( + id=str(uuid.uuid4()), + user_wallet_address=WALLET_ADDRESS2, + cvat_job_id=cvat_job.cvat_id, + created_at=datetime.now() - timedelta(hours=1), + expires_at=datetime.now() + timedelta(hours=1), + status=AssignmentStatuses.canceled, ) + self.session.add(assignment2) self.session.commit() - db_assignments = sorted( - self.session.query(Assignment).all(), key=lambda assignment: assignment.user.cvat_id - ) - assert db_assignments[0].status == AssignmentStatuses.created.value - assert db_assignments[1].status == AssignmentStatuses.created.value - - with patch("src.crons.cvat.state_trackers.cvat_api.update_job_assignee") as mock_cvat_api: + with patch( + "src.crons.cvat.state_trackers.cvat_api.update_job_assignee" + ) as update_job_assignee: track_assignments() - mock_cvat_api.assert_called_once_with(assignment_2.cvat_job_id, assignee_id=None) - self.session.commit() + update_job_assignee.assert_called_once_with(assignment2.cvat_job_id, assignee_id=None) db_assignments = sorted( self.session.query(Assignment).all(), key=lambda assignment: assignment.user.cvat_id ) - assert db_assignments[0].status == AssignmentStatuses.created.value - assert db_assignments[1].status == AssignmentStatuses.canceled.value + assert db_assignments[0].status == AssignmentStatuses.canceled + assert db_assignments[1].status == AssignmentStatuses.canceled + + assert ( + self.session.query(Job).filter(Job.id == cvat_job.id).first().status == JobStatuses.new + ) diff --git a/packages/examples/cvat/exchange-oracle/tests/integration/services/test_cvat.py b/packages/examples/cvat/exchange-oracle/tests/integration/services/test_cvat.py index 0863ae43d6..f5dd8a6ca1 100644 --- a/packages/examples/cvat/exchange-oracle/tests/integration/services/test_cvat.py +++ b/packages/examples/cvat/exchange-oracle/tests/integration/services/test_cvat.py @@ -1,4 +1,3 @@ -import unittest import uuid from datetime import datetime, timedelta @@ -17,7 +16,9 @@ ) from src.db import SessionLocal from src.models.cvat import Assignment, DataUpload, Image, Job, Project, Task, User +from src.utils.time import utcnow +from tests.utils.constants import WALLET_ADDRESS1, WALLET_ADDRESS2 from tests.utils.db_helper import ( create_project, create_project_and_task, @@ -25,12 +26,18 @@ ) -class ServiceIntegrationTest(unittest.TestCase): +class ServiceIntegrationTest: + @pytest.fixture(autouse=True) def setUp(self): self.session = SessionLocal() - def tearDown(self): - self.session.close() + try: + self.session.begin() + + yield + finally: + self.session.rollback() + self.session.close() def test_create_project(self): cvat_id = 1 @@ -346,6 +353,127 @@ def test_get_projects_by_status(self): assert len(projects) == 1 + def test_can_get_free_job_if_exists(self): + escrow_address = "0x86e83d346041E8806e352681f3F14549C0d2BC67" + + (cvat_project, cvat_task, cvat_job) = create_project_task_and_job( + self.session, escrow_address, cvat_id=1 + ) + chain_id = cvat_project.chain_id + + user = User(wallet_address=WALLET_ADDRESS1, cvat_email="test1@hmt.ai", cvat_id=1) + self.session.add(user) + + self.session.commit() + + free_job = cvat_service.get_free_job( + self.session, escrow_address, chain_id, user_wallet_address=WALLET_ADDRESS1 + ) + assert free_job.id == cvat_job.id + + def test_cannot_get_free_job_if_all_completed_and_not_project_checked_yet(self): + escrow_address = "0x86e83d346041E8806e352681f3F14549C0d2BC67" + + (cvat_project, cvat_task, cvat_job) = create_project_task_and_job( + self.session, escrow_address, cvat_id=1 + ) + chain_id = cvat_project.chain_id + + cvat_job.status = JobStatuses.completed.value + cvat_job.updated_at = utcnow() + self.session.add(cvat_job) + + user1 = User(wallet_address=WALLET_ADDRESS1, cvat_email="test1@hmt.ai", cvat_id=1) + self.session.add(user1) + + user2 = User(wallet_address=WALLET_ADDRESS2, cvat_email="test2@hmt.ai", cvat_id=2) + self.session.add(user2) + + assignment = Assignment( + id=str(uuid.uuid4()), + user_wallet_address=WALLET_ADDRESS2, + cvat_job_id=cvat_job.cvat_id, + expires_at=utcnow() + timedelta(days=1), + completed_at=utcnow(), + status=AssignmentStatuses.completed.value, + ) + self.session.add(assignment) + + self.session.commit() + + free_job = cvat_service.get_free_job( + self.session, escrow_address, chain_id, user_wallet_address=WALLET_ADDRESS1 + ) + assert free_job is None + + @pytest.mark.parametrize("previous_assignment_status", AssignmentStatuses) + def test_cannot_get_free_job_if_was_assigned_to_this_user( + self, previous_assignment_status: AssignmentStatuses + ): + escrow_address = "0x86e83d346041E8806e352681f3F14549C0d2BC67" + + (cvat_project, _, cvat_job) = create_project_task_and_job( + self.session, escrow_address, cvat_id=1 + ) + chain_id = cvat_project.chain_id + + user1 = User(wallet_address=WALLET_ADDRESS1, cvat_email="test1@hmt.ai", cvat_id=1) + self.session.add(user1) + + user2 = User(wallet_address=WALLET_ADDRESS2, cvat_email="test2@hmt.ai", cvat_id=2) + self.session.add(user2) + + assignment = Assignment( + id=str(uuid.uuid4()), + user_wallet_address=WALLET_ADDRESS1, + cvat_job_id=cvat_job.cvat_id, + expires_at=utcnow() + timedelta(days=1), + status=previous_assignment_status.value, + ) + if previous_assignment_status == AssignmentStatuses.completed: + assignment.completed_at = utcnow() + self.session.add(assignment) + + self.session.commit() + + free_job = cvat_service.get_free_job( + self.session, escrow_address, chain_id, user_wallet_address=WALLET_ADDRESS1 + ) + assert free_job is None + + def test_cannot_get_free_job_if_assigned_to_other_user(self): + escrow_address = "0x86e83d346041E8806e352681f3F14549C0d2BC67" + + (cvat_project, _, cvat_job) = create_project_task_and_job( + self.session, escrow_address, cvat_id=1 + ) + chain_id = cvat_project.chain_id + + cvat_job.status = JobStatuses.in_progress + self.session.add(cvat_job) + + user1 = User(wallet_address=WALLET_ADDRESS1, cvat_email="test1@hmt.ai", cvat_id=1) + self.session.add(user1) + + user2 = User(wallet_address=WALLET_ADDRESS2, cvat_email="test2@hmt.ai", cvat_id=2) + self.session.add(user2) + + assignment = Assignment( + id=str(uuid.uuid4()), + user_wallet_address=WALLET_ADDRESS2, + cvat_job_id=cvat_job.cvat_id, + expires_at=utcnow() + timedelta(days=1), + status=AssignmentStatuses.created.value, + ) + self.session.add(assignment) + + self.session.commit() + + free_job = cvat_service.get_free_job( + self.session, escrow_address, chain_id, user_wallet_address=WALLET_ADDRESS1 + ) + assert free_job is None + def test_update_project_status(self): cvat_id = 1 cvat_cloudstorage_id = 1 @@ -1215,7 +1343,7 @@ def test_create_assignment(self): session=self.session, wallet_address=wallet_address, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now(), + expires_at=utcnow(), ) assignment_count = self.session.query(Assignment).count() @@ -1237,7 +1365,7 @@ def test_create_assignment_invalid_address(self): session=self.session, wallet_address="invalid_address", cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now(), + expires_at=utcnow(), ) with pytest.raises(IntegrityError): self.session.commit() @@ -1255,7 +1383,7 @@ def test_create_assignment_invalid_address(self): session=self.session, wallet_address=wallet_address, cvat_job_id=0, - expires_at=datetime.now(), + expires_at=utcnow(), ) with pytest.raises(IntegrityError): self.session.commit() @@ -1283,13 +1411,13 @@ def test_get_assignments_by_id(self): session=self.session, wallet_address=wallet_address_1, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now(), + expires_at=utcnow(), ) assignment_2 = cvat_service.create_assignment( session=self.session, wallet_address=wallet_address_2, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now(), + expires_at=utcnow(), ) self.session.commit() @@ -1329,14 +1457,14 @@ def test_get_latest_assignment_by_cvat_job_id(self): id=str(uuid.uuid4()), user_wallet_address=wallet_address_1, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now(), - created_at=datetime.now() - timedelta(days=1), + expires_at=utcnow(), + created_at=utcnow() - timedelta(days=1), ) assignment_2 = Assignment( id=str(uuid.uuid4()), user_wallet_address=wallet_address_2, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now(), + expires_at=utcnow(), ) self.session.add(assignment) self.session.add(assignment_2) @@ -1372,13 +1500,13 @@ def test_get_unprocessed_expired_assignments(self): id=str(uuid.uuid4()), user_wallet_address=wallet_address_1, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now() + timedelta(days=1), + expires_at=utcnow() + timedelta(days=1), ) assignment_2 = Assignment( id=str(uuid.uuid4()), user_wallet_address=wallet_address_2, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now() - timedelta(days=1), + expires_at=utcnow() - timedelta(days=1), ) self.session.add(assignment) self.session.add(assignment_2) @@ -1406,7 +1534,7 @@ def test_update_assignment(self): id=str(uuid.uuid4()), user_wallet_address=wallet_address_1, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now() + timedelta(days=1), + expires_at=utcnow() + timedelta(days=1), ) self.session.add(assignment) self.session.commit() @@ -1436,7 +1564,7 @@ def test_cancel_assignment(self): id=str(uuid.uuid4()), user_wallet_address=wallet_address_1, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now() + timedelta(days=1), + expires_at=utcnow() + timedelta(days=1), ) self.session.add(assignment) self.session.commit() @@ -1464,7 +1592,7 @@ def test_expire_assignment(self): id=str(uuid.uuid4()), user_wallet_address=wallet_address_1, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now() + timedelta(days=1), + expires_at=utcnow() + timedelta(days=1), ) self.session.add(assignment) self.session.commit() @@ -1492,11 +1620,11 @@ def test_complete_assignment(self): id=str(uuid.uuid4()), user_wallet_address=wallet_address_1, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now() + timedelta(days=1), + expires_at=utcnow() + timedelta(days=1), ) self.session.add(assignment) self.session.commit() - completed_date = datetime.now() + timedelta(days=1) + completed_date = utcnow() + timedelta(days=1) cvat_service.complete_assignment(self.session, assignment.id, completed_date) db_assignment = self.session.query(Assignment).filter_by(id=assignment.id).first() @@ -1528,13 +1656,13 @@ def test_test_add_project_images(self): id=str(uuid.uuid4()), user_wallet_address=wallet_address_1, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now(), + expires_at=utcnow(), ) assignment_2 = Assignment( id=str(uuid.uuid4()), user_wallet_address=wallet_address_2, cvat_job_id=cvat_job.cvat_id, - expires_at=datetime.now(), + expires_at=utcnow(), ) self.session.add(assignment) self.session.add(assignment_2) diff --git a/packages/examples/cvat/exchange-oracle/tests/integration/services/test_exchange.py b/packages/examples/cvat/exchange-oracle/tests/integration/services/test_exchange.py index 7d074d12e9..311cd8b9b2 100644 --- a/packages/examples/cvat/exchange-oracle/tests/integration/services/test_exchange.py +++ b/packages/examples/cvat/exchange-oracle/tests/integration/services/test_exchange.py @@ -76,7 +76,13 @@ def test_serialize_task_invalid_manifest(self): serialize_job(cvat_project) def test_create_assignment(self): - cvat_project_1, _, cvat_job_1 = create_project_task_and_job(self.session, ESCROW_ADDRESS, 1) + cvat_project, cvat_task, cvat_job = create_project_task_and_job( + self.session, ESCROW_ADDRESS, 1 + ) + initial_job_updated_at = cvat_job.updated_at + initial_task_updated_at = cvat_task.updated_at + initial_project_updated_at = cvat_project.updated_at + user_address = WALLET_ADDRESS1 user = User( wallet_address=user_address, @@ -84,18 +90,28 @@ def test_create_assignment(self): cvat_id=1, ) self.session.add(user) + self.session.commit() with patch("src.services.exchange.cvat_api"): assignment_id = create_assignment( - cvat_project_1.escrow_address, Networks(cvat_project_1.chain_id), user_address + cvat_project.escrow_address, Networks(cvat_project.chain_id), user_address ) - assignment = self.session.query(Assignment).filter_by(id=assignment_id).first() + assignment = self.session.query(Assignment).filter_by(id=assignment_id).first() + + assert assignment.cvat_job_id == cvat_job.cvat_id + assert assignment.user_wallet_address == user_address + assert assignment.status == AssignmentStatuses.created + + self.session.refresh(cvat_job) + assert cvat_job.updated_at != initial_job_updated_at + + self.session.refresh(cvat_task) + assert cvat_task.updated_at != initial_task_updated_at - assert assignment.cvat_job_id == cvat_job_1.cvat_id - assert assignment.user_wallet_address == user_address - assert assignment.status == AssignmentStatuses.created + self.session.refresh(cvat_project) + assert cvat_project.updated_at != initial_project_updated_at def test_create_assignment_many_jobs_1_completed(self): cvat_project, _, cvat_job_1 = create_project_task_and_job(self.session, ESCROW_ADDRESS, 1) @@ -163,6 +179,10 @@ def test_create_assignment_invalid_project(self): def test_create_assignment_unfinished_assignment(self): _, _, cvat_job = create_project_task_and_job(self.session, ESCROW_ADDRESS, 1) + + cvat_job.status = JobStatuses.in_progress + self.session.add(cvat_job) + user_address = WALLET_ADDRESS1 user = User( wallet_address=user_address, @@ -266,6 +286,9 @@ def test_create_assignment_no_available_jobs_completed_assignment(self): def test_create_assignment_no_available_jobs_active_foreign_assignment(self): cvat_project, _, cvat_job_1 = create_project_task_and_job(self.session, ESCROW_ADDRESS, 1) + cvat_job_1.status = JobStatuses.in_progress + self.session.add(cvat_job_1) + user_address1 = WALLET_ADDRESS1 user1 = User( wallet_address=user_address1, diff --git a/packages/examples/cvat/exchange-oracle/tests/utils/setup_cvat.py b/packages/examples/cvat/exchange-oracle/tests/utils/setup_cvat.py index 8bc1a01515..77f657f98b 100644 --- a/packages/examples/cvat/exchange-oracle/tests/utils/setup_cvat.py +++ b/packages/examples/cvat/exchange-oracle/tests/utils/setup_cvat.py @@ -1,16 +1,21 @@ import hmac import json import uuid +from collections.abc import Generator, Sequence +from contextlib import ExitStack, contextmanager from datetime import datetime from hashlib import sha256 +from sqlalchemy.orm import Session from sqlalchemy.sql import select from src.core.config import CvatConfig -from src.core.types import ProjectStatuses, TaskTypes +from src.core.types import AssignmentStatuses, JobStatuses, ProjectStatuses, TaskStatuses, TaskTypes from src.db import SessionLocal from src.models.cvat import Assignment, Job, Project, Task, User +from tests.utils.constants import ESCROW_ADDRESS + def generate_cvat_signature(data: dict): b_data = json.dumps(data).encode("utf-8") @@ -25,89 +30,121 @@ def generate_cvat_signature(data: dict): ) -def add_cvat_project_to_db(cvat_id: int) -> str: - with SessionLocal.begin() as session: +def add_cvat_project_to_db(cvat_id: int, *, session: Session | None = None) -> Project: + with get_session(session) as session_: project_id = str(uuid.uuid4()) project = Project( id=project_id, cvat_id=cvat_id, cvat_cloudstorage_id=1, - status=ProjectStatuses.annotation.value, - job_type=TaskTypes.image_label_binary.value, - escrow_address="0x86e83d346041E8806e352681f3F14549C0d2BC67", + status=ProjectStatuses.annotation, + job_type=TaskTypes.image_label_binary, + escrow_address=ESCROW_ADDRESS, chain_id=80002, bucket_url="https://test.storage.googleapis.com/", ) - session.add(project) + session_.add(project) - return project_id + return project -def add_cvat_task_to_db(cvat_id: int, cvat_project_id: int, status: str) -> str: - with SessionLocal.begin() as session: +def add_cvat_task_to_db( + cvat_id: int, + cvat_project_id: int, + *, + status: TaskStatuses | str = TaskStatuses.annotation, + session: Session | None = None, +) -> Task: + with get_session(session) as session_: task_id = str(uuid.uuid4()) task = Task( id=task_id, cvat_id=cvat_id, cvat_project_id=cvat_project_id, - status=status, + status=TaskStatuses(status) if not isinstance(status, TaskStatuses) else status, ) - session.add(task) + session_.add(task) - return task_id + return task -# FUTURE-FIXME: a lot of ways to create a test job -def add_cvat_job_to_db(cvat_id: int, cvat_task_id: int, cvat_project_id: int, status: str) -> str: - with SessionLocal.begin() as session: +def add_cvat_job_to_db( + cvat_id: int, + cvat_task_id: int, + cvat_project_id: int, + *, + status: JobStatuses | str = JobStatuses.new, + session: Session | None = None, +) -> Job: + with get_session(session) as session_: job_id = str(uuid.uuid4()) job = Job( id=job_id, cvat_id=cvat_id, cvat_task_id=cvat_task_id, cvat_project_id=cvat_project_id, - status=status, + status=JobStatuses(status) if not isinstance(status, JobStatuses) else status, start_frame=0, stop_frame=1, ) - session.add(job) + session_.add(job) - return job_id + return job -def add_asignment_to_db( - wallet_address: str, cvat_id: int, cvat_job_id: int, expires_at: datetime -) -> str: - with SessionLocal.begin() as session: +def add_assignment_to_db( + wallet_address: str, + cvat_id: int, + cvat_job_id: int, + expires_at: datetime, + *, + status: AssignmentStatuses | str = AssignmentStatuses.created, + session: Session | None = None, +) -> Assignment: + with get_session(session) as session_: user = User( wallet_address=wallet_address, cvat_email="test" + str(cvat_id) + "@hmt.ai", cvat_id=cvat_id, ) - session.add(user) + session_.add(user) assignment_id = str(uuid.uuid4()) assignment = Assignment( id=assignment_id, user_wallet_address=wallet_address, cvat_job_id=cvat_job_id, expires_at=expires_at, + status=AssignmentStatuses(status) + if not isinstance(status, AssignmentStatuses) + else status, ) - session.add(assignment) + session_.add(assignment) - return assignment_id + return assignment -def get_cvat_job_from_db(cvat_id: int) -> tuple: - with SessionLocal.begin() as session: - session.expire_on_commit = False +def get_cvat_job_from_db( + cvat_id: int, *, session: Session | None = None +) -> tuple[Job, Sequence[Assignment]]: + with get_session(session) as session_: job_query = select(Job).where(Job.cvat_id == cvat_id) - job = session.execute(job_query).scalars().first() + job = session_.execute(job_query).scalars().first() + + assignments_query = select(Assignment).where(Assignment.cvat_job_id == cvat_id) + assignments = session_.execute(assignments_query).scalars().all() + + return job, assignments + - asignments_query = select(Assignment).where(Assignment.cvat_job_id == cvat_id) - asignments = session.execute(asignments_query).scalars().all() +@contextmanager +def get_session(session: Session | None = None) -> Generator[Session, None, None]: + with ExitStack() as es: + if not session: + session = es.enter_context(SessionLocal.begin()) + session.expire_on_commit = False - return job, asignments + yield session From 1c30f3e33690d998480a516eacda899a8f5a3bf1 Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 19:53:56 +0300 Subject: [PATCH 19/20] Update container builds and test launch --- .../examples/cvat/exchange-oracle/Dockerfile | 12 ++++++++---- packages/examples/cvat/exchange-oracle/README.md | 16 ++++++++-------- .../exchange-oracle/dockerfiles/test.Dockerfile | 14 +++++++++----- .../examples/cvat/exchange-oracle/pyproject.toml | 1 + .../examples/cvat/exchange-oracle/pytest.ini | 10 ++++++++++ .../examples/cvat/recording-oracle/Dockerfile | 12 ++++++++---- .../recording-oracle/{README.MD => README.md} | 15 ++++++++------- .../recording-oracle/dockerfiles/test.Dockerfile | 14 +++++++++----- .../cvat/recording-oracle/pyproject.toml | 1 + .../examples/cvat/recording-oracle/pytest.ini | 10 ++++++++++ .../services/test_validation_service.py | 6 +++--- 11 files changed, 75 insertions(+), 36 deletions(-) create mode 100644 packages/examples/cvat/exchange-oracle/pytest.ini rename packages/examples/cvat/recording-oracle/{README.MD => README.md} (87%) create mode 100644 packages/examples/cvat/recording-oracle/pytest.ini diff --git a/packages/examples/cvat/exchange-oracle/Dockerfile b/packages/examples/cvat/exchange-oracle/Dockerfile index 86c54f1441..69e547480d 100644 --- a/packages/examples/cvat/exchange-oracle/Dockerfile +++ b/packages/examples/cvat/exchange-oracle/Dockerfile @@ -4,13 +4,17 @@ WORKDIR /app RUN apt-get update -y && \ apt-get install -y jq ffmpeg libsm6 libxext6 && \ - pip install --no-cache poetry + rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache poetry COPY pyproject.toml poetry.lock ./ -RUN poetry config virtualenvs.create false \ - && poetry install --no-interaction --no-ansi --no-root \ - && poetry cache clear pypi --all +RUN --mount=type=cache,target=/root/.cache \ + poetry config virtualenvs.create false && \ + poetry install --no-interaction --no-ansi --no-root + +RUN python -m pip uninstall -y poetry pip COPY . . diff --git a/packages/examples/cvat/exchange-oracle/README.md b/packages/examples/cvat/exchange-oracle/README.md index 7d70eeac82..d6d91ce62b 100644 --- a/packages/examples/cvat/exchange-oracle/README.md +++ b/packages/examples/cvat/exchange-oracle/README.md @@ -18,14 +18,14 @@ For deployment it is required to have PostgreSQL(v14.4) ### Run the oracle locally: -``` +```sh docker compose -f docker-compose.dev.yml up -d ./bin/start_dev.sh ``` or -``` +```sh docker compose -f docker-compose.dev.yml up -d ./bin/start_debug.sh ``` @@ -48,17 +48,17 @@ Example: [Alembic env file](https://github.com/humanprotocol/human-protocol/blob Adding new migration: -``` +```sh alembic revision --autogenerate -m "your-migration-name" ``` Upgrade: -``` +```sh alembic upgrade head ``` Downgrade: -``` +```sh alembic downgrade -{number of migrations} ``` @@ -72,7 +72,7 @@ Available at `/docs` route ### Tests To run tests -``` -docker compose -f docker-compose.test.yml up --build test --attach test --exit-code-from test && \ - docker compose -f docker-compose.test.yml down +```sh +docker compose -p "test" -f docker-compose.test.yml up --build test --attach test --exit-code-from test; \ + docker compose -p "test" -f docker-compose.test.yml down ``` \ No newline at end of file diff --git a/packages/examples/cvat/exchange-oracle/dockerfiles/test.Dockerfile b/packages/examples/cvat/exchange-oracle/dockerfiles/test.Dockerfile index f2341c8d22..eaa436f5d0 100644 --- a/packages/examples/cvat/exchange-oracle/dockerfiles/test.Dockerfile +++ b/packages/examples/cvat/exchange-oracle/dockerfiles/test.Dockerfile @@ -4,16 +4,20 @@ WORKDIR /app RUN apt-get update -y && \ apt-get install -y jq ffmpeg libsm6 libxext6 && \ - pip install --no-cache poetry + rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache poetry COPY pyproject.toml poetry.lock ./ -RUN poetry config virtualenvs.create false \ - && poetry install --no-interaction --no-ansi --no-root \ - && poetry cache clear pypi --all +RUN --mount=type=cache,target=/root/.cache \ + poetry config virtualenvs.create false && \ + poetry install --no-interaction --no-ansi --no-root + +RUN python -m pip uninstall -y poetry pip COPY . . RUN rm -f ./src/.env -CMD ["pytest", "-W", "ignore::DeprecationWarning", "-v"] \ No newline at end of file +CMD ["pytest"] \ No newline at end of file diff --git a/packages/examples/cvat/exchange-oracle/pyproject.toml b/packages/examples/cvat/exchange-oracle/pyproject.toml index 5dd04c1813..2170018394 100644 --- a/packages/examples/cvat/exchange-oracle/pyproject.toml +++ b/packages/examples/cvat/exchange-oracle/pyproject.toml @@ -134,6 +134,7 @@ ignore = [ "ANN001", # | "ANN003", # | "ARG001", # | + "FBT001", # Allow bool-annotated positional args in functions "SLF001", # Allow private attrs access "PLR2004", # Allow magic values "S", # security diff --git a/packages/examples/cvat/exchange-oracle/pytest.ini b/packages/examples/cvat/exchange-oracle/pytest.ini new file mode 100644 index 0000000000..bdf7142e2b --- /dev/null +++ b/packages/examples/cvat/exchange-oracle/pytest.ini @@ -0,0 +1,10 @@ +[pytest] +addopts = --verbose +filterwarnings = + ignore::DeprecationWarning:cvat_sdk.core + ignore::DeprecationWarning:human_protocol_sdk.storage + ignore:Field name \"sort\" shadows:UserWarning:pydantic._internal._fields + +python_files = test_*.py +python_classes = *Test +python_functions = test_* \ No newline at end of file diff --git a/packages/examples/cvat/recording-oracle/Dockerfile b/packages/examples/cvat/recording-oracle/Dockerfile index 86c54f1441..69e547480d 100644 --- a/packages/examples/cvat/recording-oracle/Dockerfile +++ b/packages/examples/cvat/recording-oracle/Dockerfile @@ -4,13 +4,17 @@ WORKDIR /app RUN apt-get update -y && \ apt-get install -y jq ffmpeg libsm6 libxext6 && \ - pip install --no-cache poetry + rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache poetry COPY pyproject.toml poetry.lock ./ -RUN poetry config virtualenvs.create false \ - && poetry install --no-interaction --no-ansi --no-root \ - && poetry cache clear pypi --all +RUN --mount=type=cache,target=/root/.cache \ + poetry config virtualenvs.create false && \ + poetry install --no-interaction --no-ansi --no-root + +RUN python -m pip uninstall -y poetry pip COPY . . diff --git a/packages/examples/cvat/recording-oracle/README.MD b/packages/examples/cvat/recording-oracle/README.md similarity index 87% rename from packages/examples/cvat/recording-oracle/README.MD rename to packages/examples/cvat/recording-oracle/README.md index 55faf0c916..6b412f5081 100644 --- a/packages/examples/cvat/recording-oracle/README.MD +++ b/packages/examples/cvat/recording-oracle/README.md @@ -18,14 +18,14 @@ For deployment it is required to have PostgreSQL(v14.4) ### Run the oracle locally: -``` +```sh docker compose -f docker-compose.dev.yml up -d ./bin/start_dev.sh ``` or -``` +```sh docker compose -f docker-compose.dev.yml up -d ./bin/start_debug.sh ``` @@ -46,17 +46,17 @@ Config file: `/src/config.py` To simplify the process and use `--autogenerate` flag, you need to import a new model to `/alembic/env.py` Adding new migration: -``` +```sh alembic revision --autogenerate -m "your-migration-name" ``` Upgrade: -``` +```sh alembic upgrade head ``` Downgrade: -``` +```sh alembic downgrade -{number of migrations} ``` @@ -69,6 +69,7 @@ Available at `/docs` route ### Tests To run tests -``` -docker compose -f docker-compose.test.yml up --build test --attach test --exit-code-from test +```sh +docker compose -p "test" -f docker-compose.test.yml up --build test --attach test --exit-code-from test; \ + docker compose -p "test" -f docker-compose.test.yml down ``` diff --git a/packages/examples/cvat/recording-oracle/dockerfiles/test.Dockerfile b/packages/examples/cvat/recording-oracle/dockerfiles/test.Dockerfile index 591d0cb769..eaa436f5d0 100644 --- a/packages/examples/cvat/recording-oracle/dockerfiles/test.Dockerfile +++ b/packages/examples/cvat/recording-oracle/dockerfiles/test.Dockerfile @@ -4,16 +4,20 @@ WORKDIR /app RUN apt-get update -y && \ apt-get install -y jq ffmpeg libsm6 libxext6 && \ - pip install --no-cache poetry + rm -rf /var/lib/apt/lists/* + +RUN pip install --no-cache poetry COPY pyproject.toml poetry.lock ./ -RUN poetry config virtualenvs.create false \ - && poetry install --no-interaction --no-ansi --no-root \ - && poetry cache clear pypi --all +RUN --mount=type=cache,target=/root/.cache \ + poetry config virtualenvs.create false && \ + poetry install --no-interaction --no-ansi --no-root + +RUN python -m pip uninstall -y poetry pip COPY . . RUN rm -f ./src/.env -CMD ["pytest", "-W", "ignore::DeprecationWarning", "-W", "ignore::RuntimeWarning", "-W", "ignore::UserWarning", "-v"] \ No newline at end of file +CMD ["pytest"] \ No newline at end of file diff --git a/packages/examples/cvat/recording-oracle/pyproject.toml b/packages/examples/cvat/recording-oracle/pyproject.toml index 194543a267..e12f60a4ce 100644 --- a/packages/examples/cvat/recording-oracle/pyproject.toml +++ b/packages/examples/cvat/recording-oracle/pyproject.toml @@ -123,6 +123,7 @@ ignore = [ "ANN001", # | "ANN003", # | "ARG001", # | + "FBT001", # Allow bool-annotated positional args in functions "SLF001", # Allow private attrs access "PLR2004", # Allow magic values "S", # security diff --git a/packages/examples/cvat/recording-oracle/pytest.ini b/packages/examples/cvat/recording-oracle/pytest.ini new file mode 100644 index 0000000000..bdf7142e2b --- /dev/null +++ b/packages/examples/cvat/recording-oracle/pytest.ini @@ -0,0 +1,10 @@ +[pytest] +addopts = --verbose +filterwarnings = + ignore::DeprecationWarning:cvat_sdk.core + ignore::DeprecationWarning:human_protocol_sdk.storage + ignore:Field name \"sort\" shadows:UserWarning:pydantic._internal._fields + +python_files = test_*.py +python_classes = *Test +python_functions = test_* \ No newline at end of file diff --git a/packages/examples/cvat/recording-oracle/tests/integration/services/test_validation_service.py b/packages/examples/cvat/recording-oracle/tests/integration/services/test_validation_service.py index 1ba1e17052..e901f89c14 100644 --- a/packages/examples/cvat/recording-oracle/tests/integration/services/test_validation_service.py +++ b/packages/examples/cvat/recording-oracle/tests/integration/services/test_validation_service.py @@ -103,7 +103,7 @@ def test_create_and_get_validation_result(self): assert vrs[0] == vr -class TestManifestChange: +class ManifestChangeTest: def test_can_handle_lowered_quality_requirements_in_manifest(self, session: Session): escrow_address = ESCROW_ADDRESS chain_id = Networks.localhost @@ -282,7 +282,7 @@ def test_can_handle_lowered_quality_requirements_in_manifest(self, session: Sess ) -class TestValidationLogic: +class ValidationLogicTest: @pytest.mark.parametrize("seed", range(25)) def test_can_change_bad_honeypots_in_jobs(self, session: Session, seed: int): escrow_address = ESCROW_ADDRESS @@ -1134,7 +1134,7 @@ def patched_get_jobs_quality_reports(task_id: int): mock_update_task_validation_layout.assert_not_called() -class TestAnnotationMerging: +class AnnotationMergingTest: def test_can_prepare_final_results_in_validated_escrow(self, session: Session): escrow_address = ESCROW_ADDRESS chain_id = Networks.localhost.value From 8cf73741cca572ce9dedbd50df08d0ef9a64cecd Mon Sep 17 00:00:00 2001 From: Maxim Zhiltsov Date: Wed, 23 Jul 2025 19:55:27 +0300 Subject: [PATCH 20/20] Fix deprecation message --- .../examples/cvat/exchange-oracle/src/handlers/job_creation.py | 2 +- .../examples/cvat/exchange-oracle/src/handlers/job_export.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py b/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py index 64645992f4..4faf954a86 100644 --- a/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py +++ b/packages/examples/cvat/exchange-oracle/src/handlers/job_creation.py @@ -234,7 +234,7 @@ def _setup_gt_job_for_cvat_task( with TemporaryDirectory() as tmp_dir: export_dir = Path(tmp_dir) / "export" - gt_dataset.export(save_dir=str(export_dir), save_images=False, format=dm_export_format) + gt_dataset.export(save_dir=str(export_dir), save_media=False, format=dm_export_format) annotations_archive_path = Path(tmp_dir) / "annotations.zip" with annotations_archive_path.open("wb") as annotations_archive: diff --git a/packages/examples/cvat/exchange-oracle/src/handlers/job_export.py b/packages/examples/cvat/exchange-oracle/src/handlers/job_export.py index 6b93567965..a3d67c7eb6 100644 --- a/packages/examples/cvat/exchange-oracle/src/handlers/job_export.py +++ b/packages/examples/cvat/exchange-oracle/src/handlers/job_export.py @@ -130,7 +130,7 @@ def _parse_dataset(self, ann_descriptor: FileDescriptor, dataset_dir: str) -> dm return dm.Dataset.import_from(dataset_dir, self.input_format) def _export_dataset(self, dataset: dm.Dataset, output_dir: str): - dataset.export(output_dir, self.output_format, save_images=False) + dataset.export(output_dir, self.output_format, save_media=False) def _process_dataset( self, dataset: dm.Dataset, *, ann_descriptor: FileDescriptor