Skip to content

Conversation

bisgaard-itis
Copy link
Contributor

@bisgaard-itis bisgaard-itis commented Oct 14, 2025

What do these changes do?

Motivated by https://git.speag.com/oSparc/osparc-infra/-/issues/incident/81, this PR batches rpc requests api-server -> wb-api-server to minimize the number of internal requests required to handle the map endpoint in the api-server.

  • With these changes the number of requests api-server->wb-api-server performed when calling the map endpoint does not depend on the number of inputs passed.
  • Previously the map endpoint in the api-server essentially had a loop around the run endpoint which meant that the number of RPC requests api-server->wb-api-server performed when a user called the map endpoint was #inputs*#(requests in run endpoint). With these changes and 🐛 Ensure function execute permission check is performed only once in map endpoint #8499 the number of requests will be #(requests in run endpoint).
image

Related issue/s

How to test

  • This only modifies already existing endpoints in the webserver. Tests have been modified and enhanced accoordingly.

Dev-ops

Copy link

codecov bot commented Oct 14, 2025

Codecov Report

❌ Patch coverage is 94.87179% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 89.05%. Comparing base (2f48e5d) to head (91fccb8).

Additional details and impacted files
@@            Coverage Diff             @@
##           master    #8514      +/-   ##
==========================================
+ Coverage   87.57%   89.05%   +1.48%     
==========================================
  Files        2007     1384     -623     
  Lines       78402    58381   -20021     
  Branches     1343      499     -844     
==========================================
- Hits        68659    51991   -16668     
+ Misses       9341     6260    -3081     
+ Partials      402      130     -272     
Flag Coverage Δ
integrationtests 64.00% <16.12%> (-0.04%) ⬇️
unittests 87.31% <94.87%> (+1.03%) ⬆️
Components Coverage Δ
pkg_aws_library ∅ <ø> (∅)
pkg_celery_library ∅ <ø> (∅)
pkg_dask_task_models_library ∅ <ø> (∅)
pkg_models_library 92.92% <100.00%> (+0.01%) ⬆️
pkg_notifications_library ∅ <ø> (∅)
pkg_postgres_database ∅ <ø> (∅)
pkg_service_integration 70.17% <ø> (ø)
pkg_service_library ∅ <ø> (∅)
pkg_settings_library ∅ <ø> (∅)
pkg_simcore_sdk 85.01% <ø> (+0.05%) ⬆️
agent 93.10% <ø> (ø)
api_server ∅ <ø> (∅)
autoscaling 95.00% <ø> (ø)
catalog 92.06% <ø> (ø)
clusters_keeper 99.14% <ø> (ø)
dask_sidecar 92.38% <ø> (ø)
datcore_adapter 97.95% <ø> (ø)
director 75.72% <ø> (ø)
director_v2 90.94% <ø> (-0.06%) ⬇️
dynamic_scheduler ∅ <ø> (∅)
dynamic_sidecar 90.44% <ø> (ø)
efs_guardian 89.83% <ø> (ø)
invitations 90.90% <ø> (ø)
payments 92.80% <ø> (ø)
resource_usage_tracker 92.22% <ø> (+0.05%) ⬆️
storage 86.50% <ø> (-0.29%) ⬇️
webclient ∅ <ø> (∅)
webserver 87.17% <93.54%> (+0.03%) ⬆️

Continue to review full report in Codecov by Sentry.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 2f48e5d...91fccb8. Read the comment docs.

🚀 New features to boost your workflow:
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Copy link
Contributor

mergify bot commented Oct 14, 2025

🧪 CI Insights

Here's what we observed from your CI run for 91fccb8.

❌ Job Failures

Pipeline Job Health on master Retries 🔍 CI Insights 📄 Logs
CI unit-tests Healthy 0 View View

✅ Passed Jobs With Interesting Signals

Pipeline Job Signal Health on master Retries 🔍 CI Insights 📄 Logs
CI system-tests You had a 39% chance of failing… lucky you! 🎲 Flaky Configure an automatic retry View View

@odeimaiz odeimaiz added this to the Imparable milestone Oct 15, 2025
@bisgaard-itis bisgaard-itis changed the title 8506 batch db requests in map endpoint 🎨 Batch api-server->wb-apiserver requests in map endpoint in the api-server Oct 16, 2025
@bisgaard-itis bisgaard-itis changed the title 🎨 Batch api-server->wb-apiserver requests in map endpoint in the api-server 🎨 Batch api-server->wb-api-server requests in map endpoint in the api-server Oct 16, 2025
@bisgaard-itis bisgaard-itis marked this pull request as ready for review October 16, 2025 06:29


class FunctionUnrecoverableError(FunctionBaseError):
msg_template = "Unrecoverable error."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe this should also contain the name of the function that is not recoverable

) -> RegisteredFunctionJob:
return await wb_api_rpc.register_function_job(
function_job=function_job, user_id=user_id, product_name=product_name
registered_jobs = await wb_api_rpc.register_function_job(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a very big fan of this. Why don't we keep the existing 'register_function_job' in the rpc, and add 'register_function_jobs'? Now you need asserts etc below to catch possible errors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The assert is for documentation purposes. Perhaps one can use some meta programming way to specify that the length of a list output by a function is the same as the length of the list you input. But I wouldn't know exactly how to indicate that using type hints. Hence, the assert. But this is not meant to catch any errors. That's why there is the # nosec.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'Documentation' is a big word here, i think the assert definitely makes sense, since you never know from within this code that you get a list of length 1 back. That's why i'd like to keep the single-function-job register, since that guarantees by type checking to get a single function job back.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will keep a single function job register if you want, but I don't see why this is so controversial. Essentially all the RPC endpoints I added here are like "map" endpoints: They act on each element of the list passed in the input and returns a list with the result in the same order. So what I mean by "documenting" here is that I document that I rely on this property of that function. A property which I cannot capture in the type hints.

This is exactly the same as adding an assert in the osparc python client which checks that the length of the list of job_ids in the job collection is the same as the length of inputs passed to the map endpoint. I consider such asserts a good habit which document how you expect the function to work. My understanding is that that's how it is supposed to be done in python when you rely on features of functions which cannot be conveyed in the signature/type-hints/openapi-specs. They are not meant to fail at run time, they are meant to "enhance" the type hints so pylance can take that information into consideration when showing red squiggles.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there just a few things mixed.
First it's about the naming. As you said, it's kind of a map, but it doesn't reflect in the naming. I still see a use case of registering a single function job, so i would keep that. And create a new method, with or an 's' at the end for clarity, or 'map' or so. Wrt the assert, in this context the assert makes perfect sense, but my point is that ideally we wouldn't need that assert.

function_job_id: FunctionJobID,
registered_function_job_patch: RegisteredFunctionJobPatch,
) -> RegisteredFunctionJob:
registered_function_job_patch_inputs: (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This I also find quite artificial. As as user of the method I expect to specify the function job id, it's confusing that's it's part of the inputs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, the point here is that I need to pass one job uuid per patch. I could change the signature so that it takes two lists as inputs: One containing the uuids and one containing the patches. But I find it more natural to pass a list of tuples (uuid, patch) which is what I try to do here. I can make this more explicit by not using a pydantic model, but using a Tuple or NamedTuple instead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i understood, but maybe also here naming could help. like function_job_ids_input_tuples or so.

inputs: FunctionInputs,
) -> list[RegisteredFunctionJob] | None:
inputs: FunctionInputsList,
status_filter: list[FunctionJobStatus] | None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would make this name more self-documenting. I 'assume' this is the statuses that are considered cached? Maybe cached_job_statuses or so?

function: RegisteredFunction,
job_inputs: JobInputs,
) -> PreRegisteredFunctionJobData:
job_inputs: list[JobInputs],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would use job_inputs_list here and below

)
],
)
assert len(registered_jobs) == 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I would keep patch_registered_function_job and add patch_registered_function_jobs

}
]
},
"minItems": 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this also a result of this using of [0]? I would prefer not having a minimum here, i.e. i don't see why a map can't be called on an empty list, it should just return an empty list.

Copy link
Member

@sanderegg sanderegg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks

)
async def validate_function_inputs(
function_id: FunctionID,
function_id: FunctionID, # pylint: disable=unused-argument
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you keep the argument if it is unused?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my guess is that it is used in some of the dependencies injected function_id. but yes, this is not really neeeded here.

parent_node_id=parent_node_id,
)
assert len(jobs) == 1 # nosec
return jobs[0]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a thought but:

  • this assert is a sure thing?
  • could this happen in prod code? then you would end up with a KeyError instead..

@pcrespov pcrespov requested a review from Copilot October 16, 2025 18:43
Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Batching of function job operations to reduce RPC calls and support multi-input map execution with a fixed number of internal requests. Key changes introduce list-based registration/patching of function jobs and cached lookup returning positional lists aligned to requested inputs.

  • Added list types (FunctionJobList, RegisteredFunctionJobList, FunctionInputsList, patch input lists) and updated RPC/service layers to operate on batches.
  • Refactored job creation/patch flows (map & run endpoints) to batch register jobs and patch metadata (task IDs) in bulk; caching logic now returns a list of optional jobs aligned with input order.
  • Updated tests and routes to adapt to new batched interfaces and introduced a new FunctionJobPatch domain model.

Reviewed Changes

Copilot reviewed 21 out of 21 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
test_function_jobs_controller_rpc.py Adjusts tests to new batched register/patch and cached lookup behaviors.
test_function_job_collections_controller_rpc.py Refactors to batch-register multiple jobs for collections.
_functions_service.py Converts single job operations to batch (register/patch, cached find) and removes prior single-job patch logic.
_function_jobs_repository.py Implements batch insert, multi-job patching, and positional cached job retrieval with status filtering.
_functions_rpc.py Aligns RPC interface to list-based register/patch/find signatures.
dynamic_scheduler/main.py Minor import reordering (cosmetic).
test_api_routers_functions.py Updates function run tests to mock batched job registration and patching.
test_api_routers_function_jobs.py Adjusts registration test to batched register semantics.
test_functions_celery.py Adapts celery tests to new batched RPC and cached lookup interface.
wb_api_server.py Wraps new list-based RPC calls for function jobs.
functions domain models (api-server) Introduces FunctionJobPatch for internal patch orchestration.
api-server main.py Minor import reordering (cosmetic).
functions_routes.py Refactors run/map endpoints to use batched creation; input validation updated.
function_jobs_routes.py Adjusts single registration endpoint to unwrap batched return.
_service_function_jobs_task_client.py Adds batched job creation task logic with cached substitution; removes per-job cache helper.
_service_function_jobs.py Refactors pre-registration, validation, patching, run logic to batch operations.
openapi.json Adds minItems constraint to FunctionInputsList definition.
service-library RPC functions.py Client-side RPC adaptations for batched register/patch/find.
functions_rpc_interface.py Server-side RPC interface updated for batched operations and new cached lookup signature.
functions_errors.py Adds FunctionUnrecoverableError (unused here).
functions.py (models-library) Introduces list type aliases and patch input list models with length constraints.

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

product_name=osparc_product_name,
registered_function_job_patch=patch,
registered_function_job_patch_inputs=TypeAdapter(
RegisteredSolverFunctionJobPatchInputList
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Union in TypeAdapter uses RegisteredSolverFunctionJobPatchInputList twice; this prevents validation of project patch inputs during the incompatible model test. Replace the first occurrence with RegisteredProjectFunctionJobPatchInputList so the union correctly represents both allowed list types.

Suggested change
RegisteredSolverFunctionJobPatchInputList
RegisteredProjectFunctionJobPatchInputList

Copilot uses AI. Check for mistakes.

Comment on lines +185 to +186
"status": "created",
}
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Patching a job forces status='created', overwriting any existing status (e.g. RUNNING/SUCCESS) unintentionally. Preserve current status by omitting 'status' from update_values unless explicitly changed or fetch existing status before updating.

Suggested change
"status": "created",
}
}
# Only update status if explicitly set in the patch
if getattr(patch_input.patch, "status", None) is not None:
update_values["status"] = patch_input.patch.status

Copilot uses AI. Check for mistakes.

Comment on lines +190 to +191
.where(function_jobs_table.c.uuid == f"{patch_input.uid}")
.values(**{k: v for k, v in update_values.items() if v is not None})
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Patching a job forces status='created', overwriting any existing status (e.g. RUNNING/SUCCESS) unintentionally. Preserve current status by omitting 'status' from update_values unless explicitly changed or fetch existing status before updating.

Copilot uses AI. Check for mistakes.

) -> RegisteredFunctionJob: ...
patches: Annotated[
list[FunctionJobPatch],
Field(max_length=50, min_length=1),
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The patches parameter enforces min_length=1, making it impossible to call this method with an empty list (e.g., when all jobs are fully cached and no patch is needed). Allow empty lists (remove min_length or set min_length=0) so callers can skip patching without error.

Suggested change
Field(max_length=50, min_length=1),
Field(max_length=50),

Copilot uses AI. Check for mistakes.

Comment on lines +374 to +382
patched_jobs = await self._function_job_service.patch_registered_function_job(
user_id=user_identity.user_id,
product_name=user_identity.product_name,
function_job_id=pre_registered_function_job_data.function_job_id,
function_class=function.function_class,
job_creation_task_id=TaskID(task_uuid),
patches=[
FunctionJobPatch(
function_class=function.function_class,
function_job_id=pre_registered_function_job_data.function_job_id,
job_creation_task_id=TaskID(task_uuid),
project_job_id=None,
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This call may produce an empty patches list when all inputs are cached, causing a validation error (min_length=1) in patch_registered_function_job. Add a conditional to skip invoking patch_registered_function_job when the patches list is empty.

Copilot uses AI. Check for mistakes.

Comment on lines 36 to 38
from simcore_service_api_server.models.schemas.functions import (
FunctionJobCreationTaskStatus,
)
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FunctionJobCreationTaskStatus is imported twice from the same module (absolute and relative). Remove one of these imports to avoid duplication.

Copilot uses AI. Check for mistakes.

from .models.domain.celery_models import ApiServerOwnerMetadata
from .models.schemas.jobs import JobInputs, JobPricingSpecification
from .models.domain.functions import FunctionJobPatch
from .models.schemas.functions import FunctionJobCreationTaskStatus
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FunctionJobCreationTaskStatus is imported twice from the same module (absolute and relative). Remove one of these imports to avoid duplication.

Suggested change
from .models.schemas.functions import FunctionJobCreationTaskStatus

Copilot uses AI. Check for mistakes.

Comment on lines +391 to +392
_ = lambda job: job if job is not None else next(patched_jobs_iter)
return [_(job) for job in cached_jobs]
Copy link

Copilot AI Oct 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Using '' as a lambda variable name here reduces readability and may conflict with conventional usage as a throwaway or translation variable. Rename '' to a descriptive identifier (e.g. resolve_cached_job) and replace the lambda with a standard function for clarity.

Suggested change
_ = lambda job: job if job is not None else next(patched_jobs_iter)
return [_(job) for job in cached_jobs]
def resolve_cached_job(job):
return job if job is not None else next(patched_jobs_iter)
return [resolve_cached_job(job) for job in cached_jobs]

Copilot uses AI. Check for mistakes.

Copy link
Member

@pcrespov pcrespov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thx.. Following our offline discussion, check the batch operations and see if you can reuse and even extend some of these concepts
tjhx

function_job_uuid: FunctionJobID,
registered_function_job_patch: RegisteredFunctionJobPatch,
) -> RegisteredFunctionJob:
registered_function_job_patch_inputs: (
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface (i.e., the whole file) should be removed rather than updated, since it’s no longer used — only v1 is still in use, correct?
As for the rest of the sub-APIs, we can’t remove them because other services still depend on them. But the functions part is only used between the webserver and the API server, right?

create_logging_lifespan,
)
from servicelib.tracing import TracingConfig
from simcore_service_dynamic_scheduler._meta import APP_NAME
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had a PR with these changes :-( ... @GitHK note these changes please

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only rpc client that you should update :-)

)
async def validate_function_inputs(
function_id: FunctionID,
function_id: FunctionID, # pylint: disable=unused-argument
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my guess is that it is used in some of the dependencies injected function_id. but yes, this is not really neeeded here.

function_job: FunctionJob,
) -> RegisteredFunctionJob:
function_jobs: FunctionJobList,
) -> RegisteredFunctionJobList:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a batchCreate operation, consider using models_library.batch_operations

),
) -> list[RegisteredFunctionJob]:

return await _functions_service.patch_registered_function_job(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a batchUpdate operation, consider using models_library.batch_operations validators and template models for the returned value

inputs: FunctionInputs,
) -> list[RegisteredFunctionJob] | None:
return await _functions_service.find_cached_function_jobs(
inputs: FunctionInputsList,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a batchGet operation, consider using models_library.batch_operations models

Here for instance, we could be flexible regaring the returned value and have something similar to asyncio.gather

Copy link

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants