Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .readthedocs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ sphinx:
# Fail on all warnings to avoid broken references
fail_on_warning: true

formats: all

python:
install:
- method: pip
Expand Down
2 changes: 1 addition & 1 deletion DEVELOPING.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ This repository contains the Python SDK for INTERSECT.

## Documentation

Documentation for the INTERSECT Python SDK can be viewed at http://10.64.193.144:30002. The documentation is generated with [Sphinx](https://www.sphinx-doc.org) from the `docs` directory. See the documentation for more information about installation, usage, and examples of the Python SDK for INTERSECT.
Documentation for the INTERSECT Python SDK can be viewed at https://intersect-python-sdk.readthedocs.io/ . The documentation is generated with [Sphinx](https://www.sphinx-doc.org) from the `docs` directory. See the documentation for more information about installation, usage, and examples of the Python SDK for INTERSECT.

## Quickstart (developers)

Expand Down
52 changes: 51 additions & 1 deletion src/intersect_sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
from __future__ import annotations

import time
from typing import TYPE_CHECKING
from collections import defaultdict
from threading import Event, Thread
from typing import TYPE_CHECKING, Any
from uuid import uuid4

from pydantic import ValidationError
Expand Down Expand Up @@ -46,6 +48,7 @@
from .client_callback_definitions import (
INTERSECT_CLIENT_EVENT_CALLBACK_TYPE,
INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE,
INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE,
)
from .shared_callback_definitions import IntersectDirectMessageParams

Expand All @@ -70,6 +73,7 @@ def __init__(
config: IntersectClientConfig,
user_callback: INTERSECT_CLIENT_RESPONSE_CALLBACK_TYPE | None = None,
event_callback: INTERSECT_CLIENT_EVENT_CALLBACK_TYPE | None = None,
timeout_callback: INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE | None = None,
) -> None:
"""The constructor performs almost all validation checks necessary to function in the INTERSECT ecosystem, with the exception of checking connections/credentials to any backing services.

Expand All @@ -78,6 +82,7 @@ def __init__(
user_callback: The callback function you can use to handle response messages from Services.
If this is left empty, you can only send a single message
event_callback: The callback function you can use to handle events from any Service.
timeout_callback: The callback function you can use to handle request timeouts.
"""
# this is called here in case a user created the object using "IntersectClientConfig.model_construct()" to skip validation
config = IntersectClientConfig.model_validate(config)
Expand All @@ -86,6 +91,8 @@ def __init__(
die('user_callback function should be a callable function if defined')
if event_callback is not None and not callable(event_callback):
die('event_callback function should be a callable function if defined')
if timeout_callback is not None and not callable(timeout_callback):
die('timeout_callback function should be a callable function if defined')
if not user_callback and not event_callback:
die('must define at least one of user_callback or event_callback')
if not user_callback:
Expand Down Expand Up @@ -145,6 +152,10 @@ def __init__(
)
self._user_callback = user_callback
self._event_callback = event_callback
self._timeout_callback = timeout_callback
self._pending_requests: defaultdict[str, list[dict[str, Any]]] = defaultdict(list)
self._stop_timeout_thread = Event()
self._timeout_thread = Thread(target=self._check_timeouts, daemon=True)

@final
def startup(self) -> Self:
Expand All @@ -171,6 +182,8 @@ def startup(self) -> Self:
# and has nothing to do with the Service at all.
time.sleep(1.0)

self._timeout_thread.start()

if self._resend_initial_messages or not self._sent_initial_messages:
for message in self._initial_messages:
self._send_userspace_message(message)
Expand Down Expand Up @@ -199,11 +212,31 @@ def shutdown(self, reason: str | None = None) -> Self:
"""
logger.info(f'Client is shutting down (reason: {reason})')

self._stop_timeout_thread.set()
self._timeout_thread.join()
self._control_plane_manager.disconnect()

logger.info('Client shutdown complete')
return self

def _check_timeouts(self) -> None:
"""Periodically check for timed out requests."""
while not self._stop_timeout_thread.is_set():
now = time.time()
for operation_id, requests in list(self._pending_requests.items()):
for request in requests:
if now > request['timeout']:
try:
request['on_timeout'](operation_id)
except Exception as e: # noqa: BLE001
logger.warning(
f'Exception from timeout callback for operation {operation_id}:\n{e}'
)
requests.remove(request)
if not requests:
del self._pending_requests[operation_id]
time.sleep(0.1) # Sleep for a short duration

@final
def is_connected(self) -> bool:
"""Check if we're currently connected to the INTERSECT brokers.
Expand Down Expand Up @@ -257,6 +290,15 @@ def _handle_userspace_message(
send_os_signal()
return

# If not in pending requests, it already timed out, so ignore this response
if headers.operation_id in self._pending_requests:
del self._pending_requests[headers.operation_id]
else:
logger.debug(
f'Received response for operation {headers.operation_id} that already timed out, ignoring'
)
return
Comment on lines +293 to +300
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The E2E tests are failing and I believe it's partially related to this block of code, because the existing E2E tests do not explicitly specify a timeout. self._pending_requests is not necessarily the source of truth for requests which haven't timed out.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, you're right. In addition to it being an issue for when a timeout wasn't set, it was an issue for when a client gets restarted. I've changed the logic to account for this. Will push everything after we discuss the above.


# TWO: GET DATA FROM APPROPRIATE DATA STORE AND DESERIALIZE IT
try:
request_params = self._data_plane_manager.incoming_message_data_handler(
Expand Down Expand Up @@ -442,3 +484,11 @@ def _send_userspace_message(self, params: IntersectDirectMessageParams) -> None:
self._control_plane_manager.publish_message(
channel, payload, params.content_type, headers, persist=False
)

if params.timeout is not None and params.on_timeout is not None:
self._pending_requests[params.operation].append(
{
'timeout': time.time() + params.timeout,
'on_timeout': params.on_timeout,
}
)
Comment on lines +488 to +494
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another part of the reason I think the E2E tests are failing. If the user doesn't specify a timeout on a pending request, then by default the request should be handled.

I also don't think that params.on_timeout should explicitly be checked for validity purposes, it should just be an optional function called in the specific usecase a user's function times out. A function should be allowed to timeout even if this callback isn't specified.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. Fixed and will push once we have a chance to discuss the first issue.

9 changes: 9 additions & 0 deletions src/intersect_sdk/client_callback_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,15 @@
IntersectEventMessageParams,
)

INTERSECT_CLIENT_TIMEOUT_CALLBACK_TYPE = Callable[[str], None]
"""
This is a callable function type which should be defined by the user.

Params
The SDK will send the function one argument:
1) The operation ID of the request that timed out.
"""


@final
class IntersectClientCallback(BaseModel):
Expand Down
13 changes: 13 additions & 0 deletions src/intersect_sdk/shared_callback_definitions.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
"""Callback definitions shared between Services, Capabilities, and Clients."""

from __future__ import annotations

from collections.abc import Callable
from typing import Annotated, Any, TypeAlias

from pydantic import BaseModel, ConfigDict, Field
Expand Down Expand Up @@ -73,6 +76,16 @@ class IntersectDirectMessageParams(BaseModel):
# pydantic config
model_config = ConfigDict(revalidate_instances='always')

timeout: float | None = None
"""
The timeout in seconds for the request. If the request is not fulfilled within this time, the on_timeout callback will be called.
"""

on_timeout: Callable[[], None] | None = None
"""
The callback to call if the request times out.
"""


class IntersectEventMessageParams(BaseModel):
"""Public facing properties of events the Client/Service wants to listen to."""
Expand Down
Loading