From 6f8739fc114fabb656695972120b1b0b8d0ae27d Mon Sep 17 00:00:00 2001 From: assada Date: Sun, 10 Aug 2025 18:31:54 +0200 Subject: [PATCH 1/2] feat: add thread management functionality with database integration and DI container --- alembic.ini | 147 ++++++++++++++++++ alembic/README | 1 + alembic/env.py | 82 ++++++++++ alembic/script.py.mako | 28 ++++ .../4c750f1a48e6_create_threads_table.py | 45 ++++++ app/agent/factory.py | 18 ++- app/agent/langgraph/checkpoint/__init__.py | 2 - app/agent/langgraph/checkpoint/factory.py | 31 ---- app/agent/langgraph/checkpoint/memory.py | 2 - app/agent/langgraph/checkpoint/postgres.py | 5 +- .../langgraph/demo/tools/weather_tool.py | 2 +- .../langgraph/langgraph_agent_instance.py | 35 +++-- app/agent/prompt.py | 14 +- app/agent/services/agent_service.py | 10 +- app/bootstrap/__init__.py | 3 +- app/bootstrap/app_factory.py | 2 + app/container.py | 109 +++++++++++++ app/http/controllers/thread_controller.py | 59 +++---- app/http/middleware/auth.py | 13 +- app/http/routes/runs_routes.py | 12 +- app/http/routes/thread_routes.py | 115 +++++++++----- app/infrastructure/database/__init__.py | 8 +- .../database/postgresql_connection.py | 6 +- app/infrastructure/database/session.py | 9 ++ .../database/sqlmodel_manager.py | 39 +++++ app/models/thread.py | 31 ++-- app/repositories/thread_repository.py | 76 ++++++--- app/repositories/user_repository.py | 7 +- app/services/__init__.py | 0 app/services/thread_service.py | 23 +++ app/services/user_service.py | 20 +++ frontend/src/components/ChatApp.jsx | 23 ++- frontend/src/components/Message.jsx | 3 +- frontend/src/components/MessageActions.jsx | 5 +- frontend/src/components/ThinkingMessage.jsx | 3 - frontend/src/components/ThreadList.jsx | 34 ++++ frontend/src/components/index.js | 3 +- frontend/src/hooks/useSSE.js | 94 +++++++++-- frontend/src/store/chatStore.js | 49 +++++- frontend/src/styles.css | 81 ++++++++-- pyproject.toml | 6 + tests/agent/services/test_agent_service.py | 4 +- uv.lock | 119 +++++++++++++- 43 files changed, 1145 insertions(+), 233 deletions(-) create mode 100644 alembic.ini create mode 100644 alembic/README create mode 100644 alembic/env.py create mode 100644 alembic/script.py.mako create mode 100644 alembic/versions/4c750f1a48e6_create_threads_table.py delete mode 100644 app/agent/langgraph/checkpoint/factory.py create mode 100644 app/container.py create mode 100644 app/infrastructure/database/session.py create mode 100644 app/infrastructure/database/sqlmodel_manager.py create mode 100644 app/services/__init__.py create mode 100644 app/services/thread_service.py create mode 100644 app/services/user_service.py create mode 100644 frontend/src/components/ThreadList.jsx diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 0000000..f1cd9bb --- /dev/null +++ b/alembic.ini @@ -0,0 +1,147 @@ +# A generic, single database configuration. + +[alembic] +# path to migration scripts. +# this is typically a path given in POSIX (e.g. forward slashes) +# format, relative to the token %(here)s which refers to the location of this +# ini file +script_location = %(here)s/alembic + +# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s +# Uncomment the line below if you want the files to be prepended with date and time +# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file +# for all available tokens +# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s + +# sys.path path, will be prepended to sys.path if present. +# defaults to the current working directory. for multiple paths, the path separator +# is defined by "path_separator" below. +prepend_sys_path = . + + +# timezone to use when rendering the date within the migration file +# as well as the filename. +# If specified, requires the python>=3.9 or backports.zoneinfo library and tzdata library. +# Any required deps can installed by adding `alembic[tz]` to the pip requirements +# string value is passed to ZoneInfo() +# leave blank for localtime +# timezone = + +# max length of characters to apply to the "slug" field +# truncate_slug_length = 40 + +# set to 'true' to run the environment during +# the 'revision' command, regardless of autogenerate +# revision_environment = false + +# set to 'true' to allow .pyc and .pyo files without +# a source .py file to be detected as revisions in the +# versions/ directory +# sourceless = false + +# version location specification; This defaults +# to /versions. When using multiple version +# directories, initial revisions must be specified with --version-path. +# The path separator used here should be the separator specified by "path_separator" +# below. +# version_locations = %(here)s/bar:%(here)s/bat:%(here)s/alembic/versions + +# path_separator; This indicates what character is used to split lists of file +# paths, including version_locations and prepend_sys_path within configparser +# files such as alembic.ini. +# The default rendered in new alembic.ini files is "os", which uses os.pathsep +# to provide os-dependent path splitting. +# +# Note that in order to support legacy alembic.ini files, this default does NOT +# take place if path_separator is not present in alembic.ini. If this +# option is omitted entirely, fallback logic is as follows: +# +# 1. Parsing of the version_locations option falls back to using the legacy +# "version_path_separator" key, which if absent then falls back to the legacy +# behavior of splitting on spaces and/or commas. +# 2. Parsing of the prepend_sys_path option falls back to the legacy +# behavior of splitting on spaces, commas, or colons. +# +# Valid values for path_separator are: +# +# path_separator = : +# path_separator = ; +# path_separator = space +# path_separator = newline +# +# Use os.pathsep. Default configuration used for new projects. +path_separator = os + +# set to 'true' to search source files recursively +# in each "version_locations" directory +# new in Alembic version 1.10 +# recursive_version_locations = false + +# the output encoding used when revision files +# are written from script.py.mako +# output_encoding = utf-8 + +# database URL. This is consumed by the user-maintained env.py script only. +# other means of configuring database URLs may be customized within the env.py +# file. +#sqlalchemy.url = driver://user:pass@localhost/dbname + + +[post_write_hooks] +# post_write_hooks defines scripts or Python functions that are run +# on newly generated revision scripts. See the documentation for further +# detail and examples + +# format using "black" - use the console_scripts runner, against the "black" entrypoint +# hooks = black +# black.type = console_scripts +# black.entrypoint = black +# black.options = -l 79 REVISION_SCRIPT_FILENAME + +# lint with attempts to fix using "ruff" - use the module runner, against the "ruff" module +# hooks = ruff +# ruff.type = module +# ruff.module = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Alternatively, use the exec runner to execute a binary found on your PATH +# hooks = ruff +# ruff.type = exec +# ruff.executable = ruff +# ruff.options = check --fix REVISION_SCRIPT_FILENAME + +# Logging configuration. This is also consumed by the user-maintained +# env.py script only. +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/alembic/README b/alembic/README new file mode 100644 index 0000000..98e4f9c --- /dev/null +++ b/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 0000000..d8ebcb1 --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,82 @@ +import os +from logging.config import fileConfig + +from dotenv import load_dotenv +from sqlalchemy import engine_from_config, pool + +from alembic import context + +load_dotenv() + +# this is the Alembic Config object, which provides +# access to the values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import mymodel +# target_metadata = mymodel.Base.metadata +target_metadata = None + +# other values from the config, defined by the needs of env.py, +# can be acquired: +# my_important_option = config.get_main_option("my_important_option") +# ... etc. + +database_url = os.getenv("DATABASE_URL") +config.set_main_option("sqlalchemy.url", database_url) + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a DBAPI to be available. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure(connection=connection, target_metadata=target_metadata) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/alembic/script.py.mako b/alembic/script.py.mako new file mode 100644 index 0000000..1101630 --- /dev/null +++ b/alembic/script.py.mako @@ -0,0 +1,28 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, Sequence[str], None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + """Upgrade schema.""" + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + """Downgrade schema.""" + ${downgrades if downgrades else "pass"} diff --git a/alembic/versions/4c750f1a48e6_create_threads_table.py b/alembic/versions/4c750f1a48e6_create_threads_table.py new file mode 100644 index 0000000..0f1b80c --- /dev/null +++ b/alembic/versions/4c750f1a48e6_create_threads_table.py @@ -0,0 +1,45 @@ +"""create threads table + +Revision ID: 4c750f1a48e6 +Revises: +Create Date: 2025-08-10 00:28:27.618485 + +""" + +from collections.abc import Sequence + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "4c750f1a48e6" +down_revision: str | Sequence[str] | None = None +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.create_table( + "thread", + sa.Column("id", sa.UUID(as_uuid=True), nullable=False, primary_key=True), + sa.Column( + "created_at", sa.DateTime(), server_default=sa.text("now()"), nullable=False + ), + sa.Column( + "updated_at", sa.DateTime(), onupdate=sa.text("now()"), nullable=True + ), + sa.Column("user_id", sa.String(), nullable=False), + sa.Column("agent_id", sa.String(), nullable=False), + sa.Column("meta", sa.JSON(), server_default=sa.text("'{}'"), nullable=False), + sa.Column( + "status", + sa.Enum("idle", "busy", "interrupted", "error", name="threadstatus"), + server_default="idle", + nullable=True, + ), + ) + + +def downgrade() -> None: + op.drop_table("threads") diff --git a/app/agent/factory.py b/app/agent/factory.py index d2dc3d9..a0d6f6d 100644 --- a/app/agent/factory.py +++ b/app/agent/factory.py @@ -4,13 +4,12 @@ import logging from typing import Any -from langfuse import Langfuse # type: ignore[attr-defined] +from dependency_injector.wiring import Provide, inject +from langfuse import Langfuse from app.agent.config import AgentConfig from app.agent.interfaces import AgentInstance -from app.agent.langgraph.checkpoint.factory import ( - CheckpointerFactory, -) +from app.agent.langgraph.checkpoint.base import BaseCheckpointer from app.agent.prompt import create_prompt_provider from app.bootstrap.config import AppConfig @@ -32,7 +31,7 @@ def __init__(self, global_config: AppConfig, langfuse_client: Langfuse): @classmethod def register_agent( - cls, agent_id: str, agent_class_path: str, config: AgentConfig + cls, agent_id: str, agent_class_path: str, config: AgentConfig ) -> None: cls._registered_agents[agent_id] = AgentRegistry(agent_class_path, config) logger.info( @@ -67,7 +66,12 @@ def _load_agent_class(self, agent_id: str) -> type[Any]: f"Failed to import agent class '{class_path}': {e}" ) from e - async def create_agent(self, agent_id: str) -> AgentInstance: + @inject + async def create_agent( + self, + agent_id: str, + checkpointer_provider: BaseCheckpointer = Provide["checkpointer_provider"], + ) -> AgentInstance: if agent_id not in self._registered_agents: raise ValueError( f"Agent '{agent_id}' not found. Available agents: {self.list_agents()}" @@ -76,7 +80,7 @@ async def create_agent(self, agent_id: str) -> AgentInstance: registry_entry = self._registered_agents[agent_id] agent_config = registry_entry.config - checkpointer_provider = await CheckpointerFactory.create(self.global_config) + await checkpointer_provider.initialize() checkpointer = await checkpointer_provider.get_checkpointer() prompt_provider = create_prompt_provider( diff --git a/app/agent/langgraph/checkpoint/__init__.py b/app/agent/langgraph/checkpoint/__init__.py index 74a642c..db6e0b6 100644 --- a/app/agent/langgraph/checkpoint/__init__.py +++ b/app/agent/langgraph/checkpoint/__init__.py @@ -1,11 +1,9 @@ from .base import BaseCheckpointer -from .factory import CheckpointerFactory from .memory import MemoryCheckpointer from .postgres import PostgresCheckpointer __all__ = [ "BaseCheckpointer", - "CheckpointerFactory", "MemoryCheckpointer", "PostgresCheckpointer", ] diff --git a/app/agent/langgraph/checkpoint/factory.py b/app/agent/langgraph/checkpoint/factory.py deleted file mode 100644 index 69bb441..0000000 --- a/app/agent/langgraph/checkpoint/factory.py +++ /dev/null @@ -1,31 +0,0 @@ -import logging - -from app.agent.langgraph.checkpoint.base import BaseCheckpointer -from app.agent.langgraph.checkpoint.memory import MemoryCheckpointer -from app.agent.langgraph.checkpoint.postgres import PostgresCheckpointer -from app.bootstrap.config import AppConfig -from app.infrastructure.database.connection import DatabaseConnectionFactory - -logger = logging.getLogger(__name__) - - -class CheckpointerFactory: - @classmethod - async def create(cls, config: AppConfig) -> BaseCheckpointer: - checkpoint_type = config.checkpoint_type.lower() - instance: BaseCheckpointer - - if checkpoint_type == "memory": - instance = MemoryCheckpointer() - elif checkpoint_type == "postgres": - database_connection = DatabaseConnectionFactory.create_connection(config) - instance = PostgresCheckpointer(database_connection) - else: - raise ValueError(f"Unsupported checkpointer type: {checkpoint_type}") - - await instance.initialize() - return instance - - @classmethod - def get_supported_types(cls) -> list[str]: - return ["memory", "postgres"] diff --git a/app/agent/langgraph/checkpoint/memory.py b/app/agent/langgraph/checkpoint/memory.py index 4f7cff6..6b696d4 100644 --- a/app/agent/langgraph/checkpoint/memory.py +++ b/app/agent/langgraph/checkpoint/memory.py @@ -1,5 +1,4 @@ import logging -from functools import lru_cache from langgraph.checkpoint.memory import InMemorySaver @@ -8,7 +7,6 @@ logger = logging.getLogger(__name__) -@lru_cache() class MemoryCheckpointer(BaseCheckpointer): """Memory implementation of the checkpointer.""" diff --git a/app/agent/langgraph/checkpoint/postgres.py b/app/agent/langgraph/checkpoint/postgres.py index 5663f9a..06a55cb 100644 --- a/app/agent/langgraph/checkpoint/postgres.py +++ b/app/agent/langgraph/checkpoint/postgres.py @@ -1,5 +1,4 @@ import logging -from functools import lru_cache from langgraph.checkpoint.postgres.aio import AsyncPostgresSaver @@ -9,7 +8,6 @@ logger = logging.getLogger(__name__) -@lru_cache() class PostgresCheckpointer(BaseCheckpointer): """PostgreSQL implementation of the checkpointer.""" @@ -29,8 +27,7 @@ async def initialize(self) -> None: async def cleanup(self) -> None: """Clean up PostgreSQL resources.""" - if self.database_connection: - self.database_connection.close() + self._checkpointer = None async def get_checkpointer(self) -> AsyncPostgresSaver: """Get the PostgreSQL checkpointer instance.""" diff --git a/app/agent/langgraph/demo/tools/weather_tool.py b/app/agent/langgraph/demo/tools/weather_tool.py index 9aacbeb..27e3427 100644 --- a/app/agent/langgraph/demo/tools/weather_tool.py +++ b/app/agent/langgraph/demo/tools/weather_tool.py @@ -47,4 +47,4 @@ def _run(self, city: str, **kwargs: Any) -> str: temperature = random.randint(-5, 35) condition = random.choice(weather_conditions) - return f"The weather in {city} is {condition} and {temperature}°C!" \ No newline at end of file + return f"The weather in {city} is {condition} and {temperature}°C!" diff --git a/app/agent/langgraph/langgraph_agent_instance.py b/app/agent/langgraph/langgraph_agent_instance.py index 55a13e2..910127f 100644 --- a/app/agent/langgraph/langgraph_agent_instance.py +++ b/app/agent/langgraph/langgraph_agent_instance.py @@ -9,7 +9,7 @@ from langchain_core.messages import HumanMessage from langchain_core.runnables import RunnableConfig -from langfuse import Langfuse # type: ignore[attr-defined] +from langfuse import Langfuse from langfuse.langchain import CallbackHandler from langgraph.graph.state import CompiledStateGraph @@ -27,11 +27,11 @@ class LangGraphAgentInstance(AgentInstance): def __init__( - self, - agent_id: str, - graph: CompiledStateGraph[Any, Any, Any], - langfuse: Langfuse, - config: AppConfig | None = None, + self, + agent_id: str, + graph: CompiledStateGraph[Any, Any, Any], + langfuse: Langfuse, + config: AppConfig | None = None, ): super().__init__(agent_id, config or AppConfig()) self.graph = graph @@ -39,10 +39,10 @@ def __init__( self.stream_processor = StreamProcessor() async def stream_response( # type: ignore[override] - self, message: str, thread: Thread, user: User + self, message: str, thread: Thread, user: User ) -> AsyncGenerator[dict[str, Any]]: with self.langfuse.start_as_current_span( - name=self.graph.name, input=message + name=self.graph.name, input=message ) as span: run_id = uuid4() @@ -69,13 +69,24 @@ async def stream_response( # type: ignore[override] ) try: + yield BaseEvent.from_payload( + event="thread", + payload={ + "id": str(thread.id), + "agent_id": thread.agent_id, + "user_id": str(user.id), + "status": thread.status.value if thread.status else None, + }, + source="stream", + ).model_dump() + stream = self.graph.astream( inputs, stream_mode=["updates", "messages", "custom"], config=config ) async for event in self.stream_processor.process_stream( - stream, # type: ignore[arg-type] - run_id, - span, + stream, # type: ignore[arg-type] + run_id, + span, ): thread.status = ThreadStatus.idle yield event.model_dump() @@ -86,7 +97,7 @@ async def stream_response( # type: ignore[override] ).model_dump() async def load_history( # type: ignore[override] - self, thread: Thread, user: User + self, thread: Thread, user: User ) -> AsyncGenerator[dict[str, Any]]: try: state_snapshot = await self.graph.aget_state( diff --git a/app/agent/prompt.py b/app/agent/prompt.py index 7ae1b27..123f498 100644 --- a/app/agent/prompt.py +++ b/app/agent/prompt.py @@ -10,7 +10,7 @@ from pydantic import BaseModel, Field try: - from langfuse import Langfuse as _LF # type: ignore[attr-defined] + from langfuse import Langfuse as _LF except ImportError: _LF = None # type: ignore[misc, assignment] @@ -111,15 +111,15 @@ def get_prompt(self, prompt_name: str, label: str, fallback: Prompt) -> Prompt: def create_prompt_provider( - prompt_source: str, - langfuse_client: object | None = None, - prompt_dir: str | Path | None = None, + prompt_source: str, + langfuse_client: object | None = None, + prompt_dir: str | Path | None = None, ) -> PromptProvider: if prompt_source.lower() == "langfuse": if ( - langfuse_client is None - or _LF is None - or not isinstance(langfuse_client, _LF) + langfuse_client is None + or _LF is None + or not isinstance(langfuse_client, _LF) ): raise ValueError("Langfuse client is required for langfuse prompt type") return LangfusePromptProvider(langfuse_client) diff --git a/app/agent/services/agent_service.py b/app/agent/services/agent_service.py index 37720d3..85e6168 100644 --- a/app/agent/services/agent_service.py +++ b/app/agent/services/agent_service.py @@ -1,7 +1,7 @@ import logging from datetime import UTC, datetime -from langfuse import Langfuse # type: ignore[attr-defined] +from langfuse import Langfuse from app.models import Thread, User @@ -9,14 +9,14 @@ class AgentService: - def __init__(self, langfuse: Langfuse): - self.langfuse = langfuse + def __init__(self, langfuse_client: Langfuse): + self._langfuse_client = langfuse_client async def add_feedback( - self, trace: str, feedback: float, thread: Thread, user: User + self, trace: str, feedback: float, thread: Thread, user: User ) -> dict[str, str]: try: - self.langfuse.create_score( + self._langfuse_client.create_score( trace_id=trace, name="user_feedback", value=feedback, diff --git a/app/bootstrap/__init__.py b/app/bootstrap/__init__.py index 121c5cb..7c725c5 100644 --- a/app/bootstrap/__init__.py +++ b/app/bootstrap/__init__.py @@ -1,8 +1,7 @@ from .app_factory import create_app -from .config import AppConfig, get_config +from .config import AppConfig __all__ = [ "create_app", - "get_config", "AppConfig", ] diff --git a/app/bootstrap/app_factory.py b/app/bootstrap/app_factory.py index c05b944..2c57d3d 100644 --- a/app/bootstrap/app_factory.py +++ b/app/bootstrap/app_factory.py @@ -7,6 +7,7 @@ # from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor from prometheus_fastapi_instrumentator import Instrumentator +from app.container import Container from app.http.middleware.cors_middleware import CORSConfig, setup_cors_middleware from app.utils.utils import is_valid_uuid4 @@ -16,6 +17,7 @@ def create_app(config: AppConfig) -> FastAPI: + Container() register_agents() app = FastAPI( diff --git a/app/container.py b/app/container.py new file mode 100644 index 0000000..6ac842e --- /dev/null +++ b/app/container.py @@ -0,0 +1,109 @@ +from typing import Any + +from dependency_injector import containers, providers +from langfuse import Langfuse +from sqlmodel import Session + +from app.bootstrap.config import get_config + +from .agent.langgraph.checkpoint.base import BaseCheckpointer +from .agent.langgraph.checkpoint.memory import MemoryCheckpointer +from .agent.langgraph.checkpoint.postgres import PostgresCheckpointer +from .bootstrap.config import AppConfig +from .infrastructure import DatabaseConnection +from .infrastructure.database import PostgreSQLConnection, SQLModelManager +from .infrastructure.database.session import SessionManager + + +def create_checkpointer( + config: AppConfig, + memory_checkpointer: MemoryCheckpointer, + postgres_checkpointer: PostgresCheckpointer, +) -> BaseCheckpointer: + checkpoint_type = config.checkpoint_type.lower() + + if checkpoint_type == "memory": + return memory_checkpointer + elif checkpoint_type == "postgres": + return postgres_checkpointer + else: + raise ValueError(f"Unsupported checkpointer type: {checkpoint_type}") + + +def create_session(sqlmodel_manager: SQLModelManager) -> Session: + return sqlmodel_manager.get_session() + + +class Container(containers.DeclarativeContainer): + wiring_config = containers.WiringConfiguration( + modules=[ + "app.http.routes.thread_routes", + "app.http.routes.runs_routes", + "app.http.middleware", + "app.http.controllers", + "app.agent.factory", + ] + ) + + config = providers.Singleton(get_config) + + db_connection: providers.Singleton[DatabaseConnection] = providers.Singleton(PostgreSQLConnection, config=config) + sqlmodel_manager: providers.Singleton[SessionManager] = providers.Singleton( + SQLModelManager, + db_connection=db_connection + ) + + user_repository: providers.Factory[Any] = providers.Factory( + "app.repositories.UserRepository", + session_manager=sqlmodel_manager, + ) + thread_repository: providers.Factory[Any] = providers.Factory( + "app.repositories.ThreadRepository", + session_manager=sqlmodel_manager, + ) + + thread_service: providers.Factory[Any] = providers.Factory( + "app.services.thread_service.ThreadService", thread_repository=thread_repository + ) + user_service: providers.Factory[Any] = providers.Factory( + "app.services.user_service.UserService", user_repository=user_repository + ) + + memory_checkpointer: providers.Singleton[Any] = providers.Singleton( + "app.agent.langgraph.checkpoint.memory.MemoryCheckpointer" + ) + postgres_checkpointer: providers.Singleton[Any] = providers.Singleton( + "app.agent.langgraph.checkpoint.postgres.PostgresCheckpointer", + database_connection=db_connection, + ) + + checkpointer_provider: providers.Singleton[Any] = providers.Singleton( ## TODO: Remove this + create_checkpointer, + config=config, + memory_checkpointer=memory_checkpointer, + postgres_checkpointer=postgres_checkpointer, + ) + + langfuse_client: providers.Singleton[Any] = providers.Singleton( + Langfuse, + debug=False, + ) + + agent_factory: providers.Singleton[Any] = providers.Singleton( + "app.agent.factory.AgentFactory", + global_config=config, + langfuse_client=langfuse_client, + ) + + agent_service: providers.Singleton[Any] = providers.Singleton( + "app.agent.services.AgentService", + langfuse_client=langfuse_client, + ) + + thread_controller: providers.Singleton[Any] = providers.Singleton( + "app.http.controllers.ThreadController", + config=config, + agent_factory=agent_factory, + agent_service=agent_service, + thread_service=thread_service, + ) diff --git a/app/http/controllers/thread_controller.py b/app/http/controllers/thread_controller.py index bc6b3b6..c5e6c7f 100644 --- a/app/http/controllers/thread_controller.py +++ b/app/http/controllers/thread_controller.py @@ -2,8 +2,7 @@ from typing import Any from uuid import UUID -from fastapi import Depends, HTTPException -from langfuse import Langfuse # type: ignore[attr-defined] +from fastapi import HTTPException from sse_starlette.sse import EventSourceResponse from app.agent.factory import AgentFactory @@ -14,24 +13,30 @@ from app.http.requests import FeedbackRequest from app.models import Thread, User from app.models.thread import ThreadStatus -from app.repositories import ThreadRepository, UserRepository +from app.services.thread_service import ThreadService logger = logging.getLogger(__name__) class ThreadController: - def __init__(self, config: AppConfig): + def __init__( + self, + config: AppConfig, + agent_factory: AgentFactory, + agent_service: AgentService, + thread_service: ThreadService, + ): self.config = config - self.langfuse_client = Langfuse(debug=False) - self.agent_factory = AgentFactory(config, self.langfuse_client) + self._agent_factory = agent_factory + self._agent_service = agent_service + self._thread_service = thread_service self._agent_instances: dict[str, AgentInstance] = {} - self._agent_service = AgentService(self.langfuse_client) async def _get_agent_instance(self, agent_id: str) -> AgentInstance: if agent_id in self._agent_instances: return self._agent_instances[agent_id] - agent_instance = await self.agent_factory.create_agent(agent_id) + agent_instance = await self._agent_factory.create_agent(agent_id) self._agent_instances[agent_id] = agent_instance logger.debug(f"Created and cached agent instance: {agent_id}") @@ -39,12 +44,12 @@ async def _get_agent_instance(self, agent_id: str) -> AgentInstance: return agent_instance async def stream( - self, - agent_id: str | None, - query: dict[str, Any] | list[Any] | str | float | bool | None, - thread_id: UUID | None, - metadata: dict[str, Any] | None, - user: User, + self, + agent_id: str | None, + query: dict[str, Any] | list[Any] | str | float | bool | None, + thread_id: UUID | None, + metadata: dict[str, Any] | None, + user: User, ) -> EventSourceResponse: try: effective_agent_id = validate_agent_id(agent_id) @@ -56,20 +61,20 @@ async def stream( from uuid import uuid4 thread = Thread( - id=str(uuid4()), - user_id="1437ade37359488e95c0727a1cdf1786d24edce3", + id=uuid4(), + user_id=str(user.id), status=ThreadStatus.idle, created_at=datetime.now(UTC), updated_at=datetime.now(UTC), agent_id=effective_agent_id, - metadata=metadata or {}, + meta=metadata or {}, ) - thread = await ThreadRepository.create_thread(thread) + thread = await self._thread_service.create_thread(thread) else: - thread = await ThreadRepository.get_thread_by_id(str(thread_id)) + thread = self._thread_service.get_thread(str(thread_id)) if agent_id and thread.agent_id != effective_agent_id: thread.agent_id = effective_agent_id - thread = await ThreadRepository.update_thread(thread) + thread = await self._thread_service.update_thread(thread) try: agent_instance = await self._get_agent_instance(thread.agent_id) @@ -90,9 +95,9 @@ async def stream( raise HTTPException(status_code=500, detail="Internal server error") from e async def get_thread_history( - self, - user: User = Depends(UserRepository.get_user_by_id), # noqa: B008 - thread: Thread = Depends(ThreadRepository.get_thread_by_id), # noqa: B008 + self, + thread: Thread, + user: User, ) -> EventSourceResponse: try: agent_instance = await self._get_agent_instance(thread.agent_id) @@ -111,10 +116,10 @@ async def get_thread_history( raise HTTPException(status_code=500, detail="Internal server error") from e async def feedback( - self, - request: FeedbackRequest, - user: User = Depends(UserRepository.get_user_by_id), # noqa: B008 - thread: Thread = Depends(ThreadRepository.get_thread_by_id), # noqa: B008 + self, + thread: Thread, + request: FeedbackRequest, + user: User, ) -> dict[str, str]: return await self._agent_service.add_feedback( trace=request.trace_id, feedback=request.feedback, thread=thread, user=user diff --git a/app/http/middleware/auth.py b/app/http/middleware/auth.py index 65088a9..07b7ee4 100644 --- a/app/http/middleware/auth.py +++ b/app/http/middleware/auth.py @@ -1,25 +1,34 @@ import base64 import json +import logging +from typing import Annotated +from dependency_injector.wiring import Provide, inject from fastapi import Depends, HTTPException from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from app.container import Container from app.models import User -from app.repositories import UserRepository +from app.services.user_service import UserService security = HTTPBearer() +logger = logging.getLogger(__name__) + +@inject async def get_current_user( + user_service: Annotated[UserService, Depends(Provide[Container.user_service])], # noqa: B008 creds: HTTPAuthorizationCredentials = Depends(security), # noqa: B008 ) -> User: try: token = creds.credentials payload = json.loads(base64.b64decode(token).decode("utf-8")) user_id = payload.get("user_id") - user = await UserRepository.get_user_by_id(user_id) + user = user_service.get_user_by_id(user_id) if not user: raise HTTPException(status_code=401, detail="Invalid user") return user except Exception as e: + logger.error(e) raise HTTPException(status_code=401, detail="Authentication failed") from e diff --git a/app/http/routes/runs_routes.py b/app/http/routes/runs_routes.py index 78082f4..ccf262a 100644 --- a/app/http/routes/runs_routes.py +++ b/app/http/routes/runs_routes.py @@ -1,19 +1,25 @@ +from typing import Annotated + +from dependency_injector.wiring import Provide, inject from fastapi import APIRouter, Depends from sse_starlette import EventSourceResponse -from app.bootstrap.config import get_config +from app.container import Container from app.http.controllers import ThreadController -from app.http.middleware import get_current_user +from app.http.middleware.auth import get_current_user from app.http.requests import Run from app.models import User runs_router = APIRouter(tags=["runs"]) -thread_controller = ThreadController(get_config()) @runs_router.post("/runs/stream") +@inject async def run_stream( request: Run, + thread_controller: Annotated[ + ThreadController, Depends(Provide[Container.thread_controller]) + ], user: User = Depends(get_current_user), # noqa: B008 ) -> EventSourceResponse: return await thread_controller.stream( diff --git a/app/http/routes/thread_routes.py b/app/http/routes/thread_routes.py index 47a8077..a702af4 100644 --- a/app/http/routes/thread_routes.py +++ b/app/http/routes/thread_routes.py @@ -1,39 +1,54 @@ -from datetime import UTC, datetime -from typing import Any +from typing import Annotated, Any -from fastapi import APIRouter, Depends, Request +from dependency_injector.wiring import Provide, inject +from fastapi import APIRouter, Depends +from langgraph_sdk.auth.exceptions import HTTPException from sse_starlette import EventSourceResponse -from app.bootstrap.config import get_config +from app.container import Container from app.http.controllers import ThreadController -from app.http.middleware import get_current_user +from app.http.middleware.auth import get_current_user from app.http.requests import FeedbackRequest from app.http.responses import ErrorResponse -from app.models import Thread, User -from app.repositories import ThreadRepository +from app.models import User +from app.services.thread_service import ThreadService thread_router = APIRouter(tags=["threads"]) -thread_controller = ThreadController(get_config()) + + +@thread_router.get( + "/threads", + responses={"422": {"model": ErrorResponse}}, +) +@inject +def list_threads( ## TODO: POST method to search threads + thread_service: Annotated[ + ThreadService, Depends(Provide[Container.thread_service]) + ], + user: User = Depends(get_current_user), # noqa: B008 +) -> list[dict[str, str | Any]]: + threads = thread_service.list_threads(user.id) + return [t.model_dump() for t in threads] @thread_router.get( "/threads/{thread_id}", responses={"404": {"model": ErrorResponse}, "422": {"model": ErrorResponse}}, ) -async def get_thread( - user: User = Depends(get_current_user), # noqa: B008 - thread: Thread = Depends(ThreadRepository.get_thread_by_id), # noqa: B008 +@inject +def get_thread( + thread_id: str, + thread_service: Annotated[ + ThreadService, Depends(Provide[Container.thread_service]) + ], + user: User = Depends(get_current_user), # noqa: B008 ) -> dict[str, str | Any]: - return { - "thread_id": thread.id, - "created_at": datetime.now(tz=UTC).isoformat(), - "updated_at": datetime.now(tz=UTC).isoformat(), - "metadata": { - "title": "Thread Title", - "user_id": user.id, - }, - "status": "idle", - } + thread = thread_service.get_thread(thread_id) + if thread.user_id != user.id: + raise HTTPException( + status_code=403, detail="Forbidden: You do not have access to this thread." + ) + return thread.model_dump() @thread_router.delete( @@ -42,12 +57,20 @@ async def get_thread( status_code=204, responses={"404": {"model": ErrorResponse}, "422": {"model": ErrorResponse}}, ) -async def delete_thread( - request: Request, - user: User = Depends(get_current_user), # noqa: B008 - thread: Thread = Depends(ThreadRepository.get_thread_by_id), # noqa: B008 +@inject +def delete_thread( + thread_id: str, + thread_service: Annotated[ + ThreadService, Depends(Provide[Container.thread_service]) + ], + user: User = Depends(get_current_user), # noqa: B008 ) -> ErrorResponse | None: - # await ThreadRepository.delete_thread(thread.id) + thread = thread_service.get_thread(thread_id) + if thread.user_id != user.id: + raise HTTPException( + status_code=403, detail="Forbidden: You do not have access to this thread." + ) + thread_service.delete_thread(thread) return None @@ -55,19 +78,41 @@ async def delete_thread( "/threads/{thread_id}/history", responses={"404": {"model": ErrorResponse}, "422": {"model": ErrorResponse}}, ) +@inject async def get_thread_history( - request: Request, - user: User = Depends(get_current_user), # noqa: B008 - thread: Thread = Depends(ThreadRepository.get_thread_by_id), # noqa: B008 + thread_id: str, + thread_service: Annotated[ + ThreadService, Depends(Provide[Container.thread_service]) + ], + thread_controller: Annotated[ + ThreadController, Depends(Provide[Container.thread_controller]) + ], + user: User = Depends(get_current_user), # noqa: B008 ) -> EventSourceResponse: - return await thread_controller.get_thread_history(user, thread) + thread = thread_service.get_thread(thread_id) + if thread.user_id != user.id: + raise HTTPException( + status_code=403, detail="Forbidden: You do not have access to this thread." + ) + return await thread_controller.get_thread_history(thread, user) @thread_router.post("/threads/{thread_id}/feedback") +@inject async def post_thread_feedback( - request: Request, - request_body: FeedbackRequest, - user: User = Depends(get_current_user), # noqa: B008 - thread: Thread = Depends(ThreadRepository.get_thread_by_id), # noqa: B008 + thread_id: str, + thread_service: Annotated[ + ThreadService, Depends(Provide[Container.thread_service]) + ], + thread_controller: Annotated[ + ThreadController, Depends(Provide[Container.thread_controller]) + ], + request_body: FeedbackRequest, + user: User = Depends(get_current_user), # noqa: B008 ) -> dict[str, str]: - return await thread_controller.feedback(request_body, user, thread) + thread = thread_service.get_thread(thread_id) + if thread.user_id != user.id: + raise HTTPException( + status_code=403, detail="Forbidden: You do not have access to this thread." + ) + return await thread_controller.feedback(thread, request_body, user) diff --git a/app/infrastructure/database/__init__.py b/app/infrastructure/database/__init__.py index 9f7a457..28c1126 100644 --- a/app/infrastructure/database/__init__.py +++ b/app/infrastructure/database/__init__.py @@ -1,4 +1,10 @@ from .connection import DatabaseConnection, DatabaseConnectionFactory from .postgresql_connection import PostgreSQLConnection +from .sqlmodel_manager import SQLModelManager -__all__ = ["DatabaseConnection", "DatabaseConnectionFactory", "PostgreSQLConnection"] +__all__ = [ + "DatabaseConnection", + "DatabaseConnectionFactory", + "PostgreSQLConnection", + "SQLModelManager", +] diff --git a/app/infrastructure/database/postgresql_connection.py b/app/infrastructure/database/postgresql_connection.py index 406b65a..1c313e5 100644 --- a/app/infrastructure/database/postgresql_connection.py +++ b/app/infrastructure/database/postgresql_connection.py @@ -34,7 +34,7 @@ def get_sync_connection(self) -> None: async def get_async_connection(self) -> AsyncConnection: pool = await self.get_pool() try: - return await pool.getconn() + return await pool.getconn() ## pool.connection() except Exception as e: logger.error(f"Failed to get connection from pool: {e}") raise @@ -44,8 +44,8 @@ async def get_pool(self) -> AsyncConnectionPool: try: self._pool = AsyncConnectionPool( conninfo=self.get_connection_string(), - min_size=1, - max_size=20, + min_size=20, + max_size=50, kwargs=self._connection_kwargs, ) await self._pool.wait() diff --git a/app/infrastructure/database/session.py b/app/infrastructure/database/session.py new file mode 100644 index 0000000..06edf02 --- /dev/null +++ b/app/infrastructure/database/session.py @@ -0,0 +1,9 @@ +from abc import ABC, abstractmethod + +from sqlmodel import Session + + +class SessionManager(ABC): + @abstractmethod + def get_session(self) -> Session: + pass diff --git a/app/infrastructure/database/sqlmodel_manager.py b/app/infrastructure/database/sqlmodel_manager.py new file mode 100644 index 0000000..c4c7c41 --- /dev/null +++ b/app/infrastructure/database/sqlmodel_manager.py @@ -0,0 +1,39 @@ +from sqlalchemy import Engine, create_engine +from sqlalchemy.orm import sessionmaker +from sqlmodel import Session + +from .connection import DatabaseConnection +from .session import SessionManager + + +class SQLModelManager(SessionManager): + def __init__(self, db_connection: DatabaseConnection): + self._db_connection = db_connection + self._engine: Engine | None = None + self._session_factory: sessionmaker[Session] | None = None + + def _get_engine(self) -> Engine: + if self._engine is None: + self._engine = create_engine( + self._db_connection.get_connection_string().replace( + "postgresql://", "postgresql+psycopg://" + ), + pool_pre_ping=True, + pool_recycle=3600, + pool_size=10, + max_overflow=20, + pool_timeout=30, + ) + return self._engine + + def _get_session_factory(self) -> sessionmaker[Session]: + if self._session_factory is None: + engine = self._get_engine() + self._session_factory = sessionmaker( + engine, class_=Session, expire_on_commit=False + ) + return self._session_factory + + def get_session(self) -> Session: + session_factory = self._get_session_factory() + return session_factory() diff --git a/app/models/thread.py b/app/models/thread.py index 102deb5..703b69b 100644 --- a/app/models/thread.py +++ b/app/models/thread.py @@ -1,8 +1,10 @@ -from datetime import UTC, datetime +import datetime +import uuid from enum import Enum from typing import Any -from pydantic import AwareDatetime, BaseModel, Field +from sqlalchemy import JSON, DateTime, func +from sqlmodel import Column, Field, SQLModel class ThreadStatus(Enum): @@ -12,32 +14,21 @@ class ThreadStatus(Enum): error = "error" -class Thread(BaseModel): - id: str = Field( - description="Thread ID.", - examples=["edd5a53c-da04-4db4-84e0-a9f3592eef45"], +class Thread(SQLModel, table=True): + id: uuid.UUID = Field(default_factory=uuid.uuid4, primary_key=True) + created_at: datetime.datetime = Field( + default_factory=datetime.datetime.utcnow, ) - created_at: AwareDatetime = Field( - default_factory=lambda: datetime.now(UTC), - description="The time the thread was created.", - examples=["2023-10-01T12:00:00Z"], - ) - updated_at: AwareDatetime = Field( - default_factory=lambda: datetime.now(UTC), - description="The last time the thread was updated.", - examples=["2023-10-01T12:00:00Z"], + updated_at: datetime.datetime = Field( + sa_column=Column(DateTime(), onupdate=func.now()) ) user_id: str = Field( description="The ID of the user that owns this thread.", - examples=["user-12345"], ) agent_id: str = Field( description="The ID of the agent that owns this thread.", - examples=["agent-12345"], - ) - metadata: dict[str, Any] = Field( - ..., description="The thread metadata.", title="Metadata" ) + meta: dict[str, Any] = Field(default_factory=dict, sa_column=Column(JSON)) status: ThreadStatus | None = Field( default=ThreadStatus.idle, description="Thread status to filter on.", diff --git a/app/repositories/thread_repository.py b/app/repositories/thread_repository.py index 0d69e75..5818071 100644 --- a/app/repositories/thread_repository.py +++ b/app/repositories/thread_repository.py @@ -2,34 +2,31 @@ from datetime import UTC, datetime from fastapi import HTTPException, Path +from sqlalchemy import desc, nullslast +from sqlmodel import select +from app.infrastructure.database.session import SessionManager from app.models import Thread -from app.models.thread import ThreadStatus logger = logging.getLogger(__name__) class ThreadRepository: - @staticmethod - async def get_thread_by_id( - thread_id: str = Path(..., description="Thread ID"), + def __init__(self, session_manager: SessionManager): + self.session_manager = session_manager + + def get_thread_by_id( + self, + thread_id: str = Path(..., description="Thread ID"), ) -> Thread: try: if thread_id is None or not isinstance(thread_id, str): raise HTTPException(status_code=400, detail="Invalid thread ID") - # TODO: get thread from database - thread = Thread( - id=thread_id, - user_id="1437ade37359488e95c0727a1cdf1786d24edce3", - status=ThreadStatus.idle, - created_at=datetime.now(UTC), - updated_at=datetime.now(UTC), - agent_id="demo_agent", - metadata={ - "title": "Sample Thread", - }, - ) + with self.session_manager.get_session() as session: + statement = select(Thread).where(Thread.id == thread_id) + results = session.exec(statement) + thread = results.first() if not thread: raise HTTPException(status_code=404, detail="Thread not found") @@ -47,23 +44,58 @@ async def get_thread_by_id( logger.error(f"Error resolving thread {thread_id}: {str(e)}") raise HTTPException(status_code=500, detail="Internal server error") from e - @staticmethod - async def create_thread(thread: Thread) -> Thread: + async def create_thread(self, thread: Thread) -> Thread: try: - # TODO: save thread to database + with self.session_manager.get_session() as session: + session.add(thread) + session.commit() + session.refresh(thread) logger.debug(f"Created thread: {thread.id}") return thread except Exception as e: logger.error(f"Error creating thread {thread.id}: {str(e)}") raise HTTPException(status_code=500, detail="Internal server error") from e - @staticmethod - async def update_thread(thread: Thread) -> Thread: + async def update_thread(self, thread: Thread) -> Thread: try: - # TODO: update thread in database thread.updated_at = datetime.now(UTC) + with self.session_manager.get_session() as session: + session.add(thread) + session.commit() + session.refresh(thread) logger.debug(f"Updated thread: {thread.id}") return thread except Exception as e: logger.error(f"Error updating thread {thread.id}: {str(e)}") raise HTTPException(status_code=500, detail="Internal server error") from e + + def list_threads_by_user(self, user_id: str) -> list[Thread]: + try: + with self.session_manager.get_session() as session: + statement = ( + select(Thread) + .where(Thread.user_id == user_id) + .order_by( + nullslast(desc("updated_at")), + desc("created_at"), + ) + ) + results = session.exec(statement) + threads = list(results.all()) + logger.debug(f"Found {len(threads)} threads for user {user_id}") + return threads + except Exception as e: + logger.error(f"Error listing threads for user {user_id}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") from e + + def delete_thread(self, thread: Thread) -> None: + try: + with self.session_manager.get_session() as session: + session.delete(thread) + session.commit() + logger.debug(f"Deleted thread: {thread.id}") + except HTTPException: + raise + except Exception as e: + logger.error(f"Error deleting thread {thread.id}: {str(e)}") + raise HTTPException(status_code=500, detail="Internal server error") from e diff --git a/app/repositories/user_repository.py b/app/repositories/user_repository.py index 94c3421..4b80bdb 100644 --- a/app/repositories/user_repository.py +++ b/app/repositories/user_repository.py @@ -2,14 +2,17 @@ from fastapi import HTTPException +from app.infrastructure.database.session import SessionManager from app.models import User logger = logging.getLogger(__name__) class UserRepository: - @staticmethod - async def get_user_by_id(user_id: str) -> User | None: + def __init__(self, session_manager: SessionManager): + self.session_manager = session_manager + + def get_user_by_id(self, user_id: str) -> User | None: try: if user_id is None: raise HTTPException(status_code=400, detail="Invalid user ID") diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/services/thread_service.py b/app/services/thread_service.py new file mode 100644 index 0000000..41d016d --- /dev/null +++ b/app/services/thread_service.py @@ -0,0 +1,23 @@ +from app.models import Thread +from app.repositories import ThreadRepository + + +class ThreadService: + def __init__(self, thread_repository: ThreadRepository) -> None: + self._repository: ThreadRepository = thread_repository + + def get_thread(self, thread_id: str) -> Thread: + thread = self._repository.get_thread_by_id(thread_id) + return thread + + async def create_thread(self, thread: Thread) -> Thread: + return await self._repository.create_thread(thread) + + async def update_thread(self, thread: Thread) -> Thread: + return await self._repository.update_thread(thread) + + def list_threads(self, user_id: str) -> list[Thread]: + return self._repository.list_threads_by_user(user_id) + + def delete_thread(self, thread: Thread) -> None: + return self._repository.delete_thread(thread) \ No newline at end of file diff --git a/app/services/user_service.py b/app/services/user_service.py new file mode 100644 index 0000000..6921a7e --- /dev/null +++ b/app/services/user_service.py @@ -0,0 +1,20 @@ +from app.models import User +from app.repositories import UserRepository + + +class UserService: + def __init__(self, user_repository: UserRepository) -> None: + self._repository: UserRepository = user_repository + + # def get_users(self) -> Iterator[User]: + # return self._repository.get_all() + + def get_user_by_id(self, user_id: str) -> User | None: + return self._repository.get_user_by_id(user_id) + + # def create_user(self) -> User: + # uid = uuid4() + # return self._repository.add(email=f"{uid}@email.com", password="pwd") + # + # def delete_user_by_id(self, user_id: int) -> None: + # return self._repository.delete_by_id(user_id) diff --git a/frontend/src/components/ChatApp.jsx b/frontend/src/components/ChatApp.jsx index 42ab50f..a0bd468 100644 --- a/frontend/src/components/ChatApp.jsx +++ b/frontend/src/components/ChatApp.jsx @@ -4,6 +4,7 @@ import {useSSE} from '../hooks/index.js'; import {Message} from './Message.jsx'; import {ThinkingMessage} from './ThinkingMessage.jsx'; import {ConnectionStatus} from './ConnectionStatus.jsx'; +import {ThreadList} from './ThreadList.jsx'; import {MESSAGE_SUBTYPES, SENDER_TYPES, THINKING_STATES} from '../constants/constants.js'; export const ChatApp = () => { @@ -23,7 +24,7 @@ export const ChatApp = () => { toggleThinkingMessageExpanded } = useChatStore(); - const {sendMessage, closeConnection, loadChatHistory} = useSSE(); + const {sendMessage, closeConnection, loadChatHistory, fetchThreads, setCurrentThreadId, newChat} = useSSE(); const messagesEndRef = useRef(null); const inputRef = useRef(null); @@ -65,14 +66,15 @@ export const ChatApp = () => { inputRef.current.focus(); } - loadChatHistory(); + fetchThreads().then(() => { + loadChatHistory(); + }); return () => { closeConnection(); }; - }, [closeConnection, loadChatHistory]); + }, [closeConnection, loadChatHistory, fetchThreads]); - // Reset textarea height when input is cleared useEffect(() => { if (inputRef.current && !input) { inputRef.current.style.height = 'auto'; @@ -99,7 +101,6 @@ export const ChatApp = () => { const handleInputChange = useCallback((e) => { setInput(e.target.value); - // Auto-resize textarea const textarea = e.target; textarea.style.height = 'auto'; textarea.style.height = Math.min(textarea.scrollHeight, 200) + 'px'; @@ -117,8 +118,13 @@ export const ChatApp = () => { -
-
+
+ +
+
{messages.length === 0 && !currentAssistantMessage && (
@@ -155,7 +161,8 @@ export const ChatApp = () => { /> )} -
+
+
diff --git a/frontend/src/components/Message.jsx b/frontend/src/components/Message.jsx index 2f7e172..2b6d0fb 100644 --- a/frontend/src/components/Message.jsx +++ b/frontend/src/components/Message.jsx @@ -14,8 +14,7 @@ export const Message = ({ message, showActions = true, onToggleThinkingExpanded const isCurrentAssistant = id === 'current-assistant'; const isRegularMessage = messageType === MESSAGE_SUBTYPES.MESSAGE; const isUIMessage = messageType === MESSAGE_SUBTYPES.UI; - - // If this is a thinking message, display it as ThinkingMessage + if (thinkingData) { const thinkingProcess = { isActive: false, diff --git a/frontend/src/components/MessageActions.jsx b/frontend/src/components/MessageActions.jsx index 88852fe..90ec4b5 100644 --- a/frontend/src/components/MessageActions.jsx +++ b/frontend/src/components/MessageActions.jsx @@ -55,7 +55,7 @@ export const MessageActions = ({ content, messageId, traceId, isUser = false }) }, [traceId, getUserId, getThreadId, isSubmittingFeedback]); const handleRating = useCallback(async (ratingValue) => { - if (rating === ratingValue) return; // Don't submit if already rated with same value + if (rating === ratingValue) return; setRating(ratingValue); @@ -63,7 +63,6 @@ export const MessageActions = ({ content, messageId, traceId, isUser = false }) await submitFeedback(feedbackValue); }, [rating, submitFeedback]); - // Don't render if no trace_id if (!traceId) { return null; } @@ -80,7 +79,6 @@ export const MessageActions = ({ content, messageId, traceId, isUser = false }) {isUser ? ( - // Actions for user messages ) : ( - // Actions for assistant messages <> + ))} +
+ ); +}; + +export default ThreadList; + + diff --git a/frontend/src/components/index.js b/frontend/src/components/index.js index 2dcbff0..5ba0d0c 100644 --- a/frontend/src/components/index.js +++ b/frontend/src/components/index.js @@ -3,4 +3,5 @@ export { Message } from './Message.jsx'; export { MessageActions } from './MessageActions.jsx'; export { ThinkingMessage } from './ThinkingMessage.jsx'; export { ConnectionStatus } from './ConnectionStatus.jsx'; -export { default as UIMessage } from './UIMessage.jsx'; \ No newline at end of file +export { default as UIMessage } from './UIMessage.jsx'; +export { ThreadList } from './ThreadList.jsx'; \ No newline at end of file diff --git a/frontend/src/hooks/useSSE.js b/frontend/src/hooks/useSSE.js index f0bd275..383b57a 100644 --- a/frontend/src/hooks/useSSE.js +++ b/frontend/src/hooks/useSSE.js @@ -10,13 +10,17 @@ import { THINKING_STATES } from '../constants/constants.js'; -// TODO: Replace with actual thread_id const USER_ID = '1437ade37359488e95c0727a1cdf1786d24edce3'; -const THREAD_ID = 'edd5a53c-da04-4db4-84e0-a9f3592eef45'; export const useSSE = () => { const sseRef = useRef(null); + const sseVersionRef = useRef(0); const { + threads, + currentThreadId, + setThreads, + setCurrentThreadId, + addThread, setConnectionStatus, setLoading, setSending, @@ -24,13 +28,15 @@ export const useSSE = () => { addMessage, finalizeAssistantMessage, clearCurrentAssistantMessage, + clearMessages, setCurrentAssistantMessage, loadHistory, startThinkingProcess, setThinkingState, addThinkingEvent, completeThinkingProcess, - clearThinkingProcess + clearThinkingProcess, + resetChatContext } = useChatStore(); const loadChatHistory = useCallback(async () => { @@ -42,9 +48,17 @@ export const useSSE = () => { } const authToken = localStorage.getItem('authToken') || 'eyJ1c2VyX2lkIjogIjE0MzdhZGUzNzM1OTQ4OGU5NWMwNzI3YTFjZGYxNzg2ZDI0ZWRjZTMiLCAiZW1haWwiOiAidGVzdEBnbWFpbC5jb20ifQ=='; + if (!currentThreadId) { + setLoading(false); + return; + } + + const version = ++sseVersionRef.current; + + clearMessages(); const historicalMessages = []; - sseRef.current = new SSE(`/api/v1/threads/${THREAD_ID}/history`, { + sseRef.current = new SSE(`/api/v1/threads/${currentThreadId}/history`, { headers: { 'Authorization': 'Bearer ' + authToken }, @@ -53,16 +67,19 @@ export const useSSE = () => { }); const handleHistoryMessage = (e) => { + if (version !== sseVersionRef.current) return; const data = parseEventData(e); if (!data) return; historicalMessages.push(data); }; const handleHistoryOpen = () => { + if (version !== sseVersionRef.current) return; console.log('History SSE connection opened'); }; const handleHistoryError = (e) => { + if (version !== sseVersionRef.current) return; console.error('History SSE Error:', e); const errorMessage = extractErrorMessage(e); addMessage(errorMessage, SENDER_TYPES.SYSTEM, MESSAGE_SUBTYPES.ERROR, CSS_CLASSES.ERROR); @@ -70,6 +87,7 @@ export const useSSE = () => { }; const handleHistoryEnd = () => { + if (version !== sseVersionRef.current) return; console.log('History loading completed'); if (historicalMessages.length > 0) { @@ -92,6 +110,7 @@ export const useSSE = () => { sseRef.current.addEventListener('error', handleHistoryError); sseRef.current.addEventListener('stream_end', handleHistoryEnd); sseRef.current.addEventListener('readystatechange', (e) => { + if (version !== sseVersionRef.current) return; if (e.readyState === 2) { setLoading(false); } @@ -104,7 +123,7 @@ export const useSSE = () => { addMessage('Error loading chat history', SENDER_TYPES.SYSTEM, MESSAGE_SUBTYPES.ERROR, CSS_CLASSES.ERROR); setLoading(false); } - }, [setLoading, addMessage, loadHistory, parseEventData, extractErrorMessage]); + }, [currentThreadId, setLoading, addMessage, loadHistory, parseEventData, extractErrorMessage, clearMessages]); const parseEventData = useCallback((e) => { try { @@ -231,6 +250,17 @@ export const useSSE = () => { const handlers = { 'open': handleSSEOpen, + 'thread': (e) => { + const data = parseEventData(e); + if (data?.id) { + const id = String(data.id); + setCurrentThreadId(id); + const exists = (useChatStore.getState().threads || []).some(t => String(t.id) === id); + if (!exists) { + addThread({ id, agent_id: data.agent_id, meta: data.meta || {}, status: data.status }); + } + } + }, 'ai_message': handleAIMessage, 'tool_call': handleToolCall, 'tool_result': handleToolResult, @@ -271,8 +301,8 @@ export const useSSE = () => { startThinkingProcess(); try { - // TODO: Mocked data, replace with actual API call const authToken = localStorage.getItem('authToken') || 'eyJ1c2VyX2lkIjogIjE0MzdhZGUzNzM1OTQ4OGU5NWMwNzI3YTFjZGYxNzg2ZDI0ZWRjZTMiLCAiZW1haWwiOiAidGVzdEBnbWFpbC5jb20ifQ=='; + const version = ++sseVersionRef.current; sseRef.current = new SSE(`/api/v1/runs/stream`, { headers: { 'Content-Type': 'application/json', @@ -280,7 +310,7 @@ export const useSSE = () => { }, payload: JSON.stringify({ input: message, - thread_id: THREAD_ID, + thread_id: currentThreadId, metadata: { user_id: USER_ID, }, @@ -290,7 +320,25 @@ export const useSSE = () => { start: false }); - setupSSEListeners(); + const guardedSetup = () => { + if (!sseRef.current) return; + const handlers = { + 'open': handleSSEOpen, + 'thread': (e) => { if (version !== sseVersionRef.current) return; const data = parseEventData(e); if (data?.id) { const id = String(data.id); setCurrentThreadId(id); const exists = (useChatStore.getState().threads || []).some(t => String(t.id) === id); if (!exists) { addThread({ id, agent_id: data.agent_id, meta: data.meta || {}, status: data.status }); } } }, + 'ai_message': (e) => { if (version !== sseVersionRef.current) return; handleAIMessage(e); }, + 'tool_call': (e) => { if (version !== sseVersionRef.current) return; handleToolCall(e); }, + 'tool_result': (e) => { if (version !== sseVersionRef.current) return; handleToolResult(e); }, + 'token': (e) => { if (version !== sseVersionRef.current) return; handleToken(e); }, + 'ui': (e) => { if (version !== sseVersionRef.current) return; handleUIMessage(e); }, + 'error': (e) => { if (version !== sseVersionRef.current) return; handleSSEError(e); }, + 'abort': (e) => { if (version !== sseVersionRef.current) return; handleSSEAbort(e); }, + 'readystatechange': (e) => { if (version !== sseVersionRef.current) return; handleReadyStateChange(e); }, + 'stream_end': (e) => { if (version !== sseVersionRef.current) return; handleStreamEnd(e); }, + }; + Object.entries(handlers).forEach(([event, handler]) => sseRef.current.addEventListener(event, handler)); + }; + + guardedSetup(); sseRef.current.stream(); } catch (error) { @@ -305,7 +353,8 @@ export const useSSE = () => { clearCurrentAssistantMessage, startThinkingProcess, setupSSEListeners, - addMessage + addMessage, + currentThreadId ]); const closeConnection = useCallback(() => { @@ -315,11 +364,36 @@ export const useSSE = () => { } }, []); + const fetchThreads = useCallback(async () => { + try { + const authToken = localStorage.getItem('authToken') || 'eyJ1c2VyX2lkIjogIjE0MzdhZGUzNzM1OTQ4OGU5NWMwNzI3YTFjZGYxNzg2ZDI0ZWRjZTMiLCAiZW1haWwiOiAidGVzdEBnbWFpbC5jb20ifQ=='; + const res = await fetch('/api/v1/threads', { + headers: { 'Authorization': 'Bearer ' + authToken } + }); + const data = await res.json(); + setThreads(data); + } catch (e) { + console.error('Failed to fetch threads', e); + } + }, [setThreads]); + + const newChat = useCallback(async () => { + if (sseRef.current) { + sseRef.current.close(); + sseRef.current = null; + } + setCurrentThreadId(null); + resetChatContext(); + }, [setCurrentThreadId, resetChatContext]); + return { sendMessage, closeConnection, loadChatHistory, + fetchThreads, + newChat, getUserId: () => USER_ID, - getThreadId: () => THREAD_ID + getThreadId: () => currentThreadId, + setCurrentThreadId, }; }; \ No newline at end of file diff --git a/frontend/src/store/chatStore.js b/frontend/src/store/chatStore.js index 5955253..784c8fc 100644 --- a/frontend/src/store/chatStore.js +++ b/frontend/src/store/chatStore.js @@ -2,6 +2,9 @@ import { create } from 'zustand'; import { STATUSES, MESSAGES, SENDER_TYPES, MESSAGE_SUBTYPES, CSS_CLASSES, THINKING_STATES } from '../constants/constants.js'; export const useChatStore = create((set, get) => ({ + threads: [], + currentThreadId: null, + isSidebarOpen: true, messages: [], input: '', isLoading: false, @@ -18,11 +21,25 @@ export const useChatStore = create((set, get) => ({ startTime: null, endTime: null, duration: 0, - history: [], // Array of { type, timestamp, content } + history: [], isExpanded: false }, setInput: (input) => set({ input }), + + setThreads: (threads) => set({ threads }), + + setCurrentThreadId: (threadId) => set({ currentThreadId: threadId }), + + addThread: (thread) => set((state) => ({ + threads: [thread, ...state.threads], + currentThreadId: thread.id, + })), + + removeThread: (threadId) => set((state) => ({ + threads: state.threads.filter(t => String(t.id) !== String(threadId)), + currentThreadId: state.currentThreadId === threadId ? null : state.currentThreadId, + })), setLoading: (isLoading) => set({ isLoading }), @@ -136,6 +153,12 @@ export const useChatStore = create((set, get) => ({ clearCurrentAssistantMessage: () => set({ currentAssistantMessage: '', currentAssistantTraceId: null }), + clearMessages: () => set({ + messages: [], + currentAssistantMessage: '', + currentAssistantTraceId: null, + }), + startThinkingProcess: () => set((state) => ({ thinkingProcess: { ...state.thinkingProcess, @@ -233,6 +256,30 @@ export const useChatStore = create((set, get) => ({ })), reset: () => set({ + threads: [], + currentThreadId: null, + messages: [], + input: '', + isLoading: false, + isSending: false, + connectionStatus: { + status: STATUSES.DISCONNECTED, + message: MESSAGES.READY + }, + currentAssistantMessage: '', + currentAssistantTraceId: null, + thinkingProcess: { + isActive: false, + state: THINKING_STATES.THINKING, + startTime: null, + endTime: null, + duration: 0, + history: [], + isExpanded: false + } + }), + + resetChatContext: () => set({ messages: [], input: '', isLoading: false, diff --git a/frontend/src/styles.css b/frontend/src/styles.css index 61f7cfd..7df35b0 100644 --- a/frontend/src/styles.css +++ b/frontend/src/styles.css @@ -302,19 +302,71 @@ p { margin: 0; } +.app-content { + display: grid; + grid-template-columns: 280px 1fr; + gap: 0; + align-items: stretch; + height: calc(100vh - 60px); + margin-top: 60px; +} + +.sidebar { + border-right: 1px solid var(--border-color); + padding: 8px; + display: flex; + flex-direction: column; + gap: 8px; + overflow-y: auto; +} .chat-container { - flex: 1; display: flex; flex-direction: column; overflow: hidden; - margin-top: 60px; - max-width: 900px; - width: 100%; - margin-left: auto; - margin-right: auto; + max-width: 100%; transition: background-color 0.3s ease, border-color 0.3s ease; } +.new-chat-btn { + padding: 8px 10px; + border-radius: 6px; + background: var(--surface-color); + color: var(--text-primary); + border: 1px solid var(--border-color); + cursor: pointer; +} + +.thread-list { + display: flex; + flex-direction: column; + gap: 6px; + overflow-y: auto; +} + +.thread-item { + text-align: left; + padding: 8px; + border: 1px solid var(--border-color); + border-radius: 6px; + background: transparent; + color: inherit; + cursor: pointer; +} + +.thread-item.active { + background: rgba(128, 128, 128, 0.12); +} + +.thread-title { + font-weight: 600; + font-size: 14px; +} + +.thread-subtitle { + opacity: 0.7; + font-size: 12px; +} + .theme-toggle { /*background: rgba(255, 255, 255, 0.1);*/ /*border: 1px solid rgba(255, 255, 255, 0.2);*/ @@ -912,7 +964,7 @@ p { .input-container { position: fixed; bottom: 15px; - left: 0; + left: 280px; /* Align with chat column (sidebar is 280px) */ right: 0; display: flex; flex-direction: column; @@ -1150,15 +1202,18 @@ p { } @media (max-width: 768px) { + .app-content { + grid-template-columns: 1fr; + } + .sidebar { + display: none; + } .app-header { height: 50px; padding: 0 16px; } - .chat-container { - margin-top: 50px; - max-width: none; - } + .chat-container { margin-top: 50px; max-width: none; } .empty-chat-title { font-size: 28px; @@ -1183,6 +1238,8 @@ p { .input-container { padding: 16px 24px; + left: 0; /* Sidebar hidden on mobile */ + right: 0; } .chat-messages { @@ -1263,6 +1320,8 @@ p { .input-container { padding: 12px 16px; + left: 0; /* Ensure full width on small screens */ + right: 0; } .chat-messages { diff --git a/pyproject.toml b/pyproject.toml index 90a9ea6..6a007af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,10 +9,13 @@ authors = [ { name = "Oleksii Ilienko", email = "assada.ua@gmail.com" } ] dependencies = [ + "alembic>=1.16.4", "asgi-correlation-id>=4.3.4", "asyncpg>=0.30.0", "black>=25.1.0", + "dependency-injector>=4.48.1", "fastapi>=0.115.14", + "greenlet>=3.2.3", "langchain>=0.3.26", "langchain-core==0.3.69", "langchain-openai>=0.3.27", @@ -22,7 +25,9 @@ dependencies = [ "mypy>=1.17.0", "opentelemetry-instrumentation-fastapi>=0.55b1", "prometheus-fastapi-instrumentator>=7.1.0", + "psycopg2-binary>=2.9.10", "psycopg[binary,pool]>=3.2.9", + "pydantic-core>=2.33.2", "pyright>=1.1.403", "pytest>=8.4.1", "pytest-asyncio>=1.1.0", @@ -30,6 +35,7 @@ dependencies = [ "python-dotenv>=1.1.1", "python-multipart>=0.0.20", "ruff>=0.12.5", + "sqlmodel>=0.0.24", "sse-starlette==2.4.1", "uvicorn>=0.35.0", ] diff --git a/tests/agent/services/test_agent_service.py b/tests/agent/services/test_agent_service.py index 569d545..f44a269 100644 --- a/tests/agent/services/test_agent_service.py +++ b/tests/agent/services/test_agent_service.py @@ -34,7 +34,7 @@ def mock_thread(): id="test_thread_id", user_id="test_user_id", agent_id="test_agent_id", - metadata={"key": "value"}, + meta={"key": "value"}, status=ThreadStatus.idle, created_at=datetime.now(UTC), updated_at=datetime.now(UTC), @@ -53,4 +53,4 @@ async def test_add_feedback_success(self, agent_service, mock_thread, mock_user) r = await agent_service.add_feedback("tr", 1.0, mock_thread, mock_user) assert r["status"] == "success" assert mock_thread.updated_at > t0 - agent_service.langfuse.create_score.assert_called_once() + agent_service._langfuse_client.create_score.assert_called_once() diff --git a/uv.lock b/uv.lock index 7647ab3..994b2a7 100644 --- a/uv.lock +++ b/uv.lock @@ -2,6 +2,20 @@ version = 1 revision = 1 requires-python = ">=3.13" +[[package]] +name = "alembic" +version = "1.16.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "mako" }, + { name = "sqlalchemy" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/83/52/72e791b75c6b1efa803e491f7cbab78e963695e76d4ada05385252927e76/alembic-1.16.4.tar.gz", hash = "sha256:efab6ada0dd0fae2c92060800e0bf5c1dc26af15a10e02fb4babff164b4725e2", size = 1968161 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/c2/62/96b5217b742805236614f05904541000f55422a6060a90d7fd4ce26c172d/alembic-1.16.4-py3-none-any.whl", hash = "sha256:b05e51e8e82efc1abd14ba2af6392897e145930c3e0a2faf2b0da2f7f7fd660d", size = 247026 }, +] + [[package]] name = "annotated-types" version = "0.7.0" @@ -218,6 +232,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0f/64/922899cff2c0fd3496be83fa8b81230f5a8d82a2ad30f98370b133c2c83b/coverage-7.10.1-py3-none-any.whl", hash = "sha256:fa2a258aa6bf188eb9a8948f7102a83da7c430a0dce918dbd8b60ef8fcb772d7", size = 206597 }, ] +[[package]] +name = "dependency-injector" +version = "4.48.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/26/7c/5062c4a7ffd32bf210ff55fab9d279a5beeae350fb09533d3536811e13b6/dependency_injector-4.48.1.tar.gz", hash = "sha256:1805185e4522effad6d5e348c255d27e80d3f8adc89701daf13d743367392978", size = 1100885 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/92/f9/c9b77652f724aece8856e281f7a71e5af544049b3c068df70c68868e43be/dependency_injector-4.48.1-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:a6f73011d532f3ea59689aad85c7999be6da3f30393041a745d5861cdcdc02e4", size = 1631637 }, + { url = "https://files.pythonhosted.org/packages/ea/f0/d91c9cdabb1f2354762aca588757d1aa341f3cbccbc8636dd2c06acac10b/dependency_injector-4.48.1-cp38-abi3-manylinux1_x86_64.manylinux_2_28_x86_64.manylinux_2_5_x86_64.whl", hash = "sha256:ac09f508fa9aee06a036ebf3e3d3b2a210276aba1993e9993cec7f1fdc5fd89e", size = 1855944 }, + { url = "https://files.pythonhosted.org/packages/57/ee/d69c4758a12653edbe6ee15c0bf4195981c9820650a1cfa762cbb838485b/dependency_injector-4.48.1-cp38-abi3-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1b05a4a980096b53ad90a87965c5450183bfbb8bbe36615d7cea97537086d622", size = 1811989 }, + { url = "https://files.pythonhosted.org/packages/cf/6d/d2a257402c8c3f7a9c61f1b8a0482ec4373f1ef7fdfe784a91e883506e3b/dependency_injector-4.48.1-cp38-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:0506e98440ee6c48fe660016d602961b1b3ecc0a8227838a2221048ed11e2fca", size = 1826408 }, + { url = "https://files.pythonhosted.org/packages/65/f9/2a408d460eedb264f7ea919754c526c8f3a18c026496cacb7dd6960766d2/dependency_injector-4.48.1-cp38-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:1994622eae8917138626303b176cba4c74e625ba1e588cb09d673ca175d299a2", size = 1863948 }, + { url = "https://files.pythonhosted.org/packages/6e/8a/2edaef77e725dd8b1a625c84cbccb0f445afe58277c7b243cbf58784826a/dependency_injector-4.48.1-cp38-abi3-win32.whl", hash = "sha256:58d4d81f92e3267c331f160cbbb517fd7644b95ee57a0d6e4b01f53a7e437a4a", size = 1516768 }, + { url = "https://files.pythonhosted.org/packages/8c/41/4bf523af7e1b7f367499f8b8709e0e807e9a14c7d1674b0442d7f84403c8/dependency_injector-4.48.1-cp38-abi3-win_amd64.whl", hash = "sha256:572b22b7db9b103718ea52634b5ca1ef763278338310254334f4633a57c9f0e7", size = 1639850 }, +] + [[package]] name = "distro" version = "1.9.0" @@ -483,7 +512,7 @@ wheels = [ [[package]] name = "langfuse" -version = "3.2.1" +version = "3.2.3" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "backoff" }, @@ -496,9 +525,9 @@ dependencies = [ { name = "requests" }, { name = "wrapt" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/61/0d/8fc51099cf337fb3b56cb7d305074bc0223c62e1ccabf80cc6285ccf5b31/langfuse-3.2.1.tar.gz", hash = "sha256:f79b0380dfcf52c7525bb5d7f8e9d8786a6fc8b37867def047bb388930a7beb3", size = 153369 } +sdist = { url = "https://files.pythonhosted.org/packages/b4/80/af4ff6ce10103484eba8f69f1846a62e51a4cbb9d9c84032a566381cda60/langfuse-3.2.3.tar.gz", hash = "sha256:38706d24bf6b53827edc0c54d84ad50d31d0e4dff6cf6a39b450896e7f1cc15e", size = 153521 } wheels = [ - { url = "https://files.pythonhosted.org/packages/92/b0/8f08df3f0fa584c4132937690c6dd33e0a116f963ecf2b35567f614e0ca7/langfuse-3.2.1-py3-none-any.whl", hash = "sha256:07a84e8c1eed6ac8e149bdda1431fd866e4aee741b66124316336fb2bc7e6a32", size = 299315 }, + { url = "https://files.pythonhosted.org/packages/df/5b/eb860b3ef9ddf2732aec8e6b8547fa8e8851ff43fc9d88144d6ff2f1a2d8/langfuse-3.2.3-py3-none-any.whl", hash = "sha256:93ce315ac056dd171fb4fe01c0b64db22549e890556d422afeb14d4d48a97d6d", size = 299755 }, ] [[package]] @@ -590,6 +619,46 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/19/4f/481324462c44ce21443b833ad73ee51117031d41c16fec06cddbb7495b26/langsmith-0.4.8-py3-none-any.whl", hash = "sha256:ca2f6024ab9d2cd4d091b2e5b58a5d2cb0c354a0c84fe214145a89ad450abae0", size = 367975 }, ] +[[package]] +name = "mako" +version = "1.3.10" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "markupsafe" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/9e/38/bd5b78a920a64d708fe6bc8e0a2c075e1389d53bef8413725c63ba041535/mako-1.3.10.tar.gz", hash = "sha256:99579a6f39583fa7e5630a28c3c1f440e4e97a414b80372649c0ce338da2ea28", size = 392474 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/87/fb/99f81ac72ae23375f22b7afdb7642aba97c00a713c217124420147681a2f/mako-1.3.10-py3-none-any.whl", hash = "sha256:baef24a52fc4fc514a0887ac600f9f1cff3d82c61d4d700a1fa84d597b88db59", size = 78509 }, +] + +[[package]] +name = "markupsafe" +version = "3.0.2" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/b2/97/5d42485e71dfc078108a86d6de8fa46db44a1a9295e89c5d6d4a06e23a62/markupsafe-3.0.2.tar.gz", hash = "sha256:ee55d3edf80167e48ea11a923c7386f4669df67d7994554387f84e7d8b0a2bf0", size = 20537 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/83/0e/67eb10a7ecc77a0c2bbe2b0235765b98d164d81600746914bebada795e97/MarkupSafe-3.0.2-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:ba9527cdd4c926ed0760bc301f6728ef34d841f405abf9d4f959c478421e4efd", size = 14274 }, + { url = "https://files.pythonhosted.org/packages/2b/6d/9409f3684d3335375d04e5f05744dfe7e9f120062c9857df4ab490a1031a/MarkupSafe-3.0.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:f8b3d067f2e40fe93e1ccdd6b2e1d16c43140e76f02fb1319a05cf2b79d99430", size = 12352 }, + { url = "https://files.pythonhosted.org/packages/d2/f5/6eadfcd3885ea85fe2a7c128315cc1bb7241e1987443d78c8fe712d03091/MarkupSafe-3.0.2-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:569511d3b58c8791ab4c2e1285575265991e6d8f8700c7be0e88f86cb0672094", size = 24122 }, + { url = "https://files.pythonhosted.org/packages/0c/91/96cf928db8236f1bfab6ce15ad070dfdd02ed88261c2afafd4b43575e9e9/MarkupSafe-3.0.2-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:15ab75ef81add55874e7ab7055e9c397312385bd9ced94920f2802310c930396", size = 23085 }, + { url = "https://files.pythonhosted.org/packages/c2/cf/c9d56af24d56ea04daae7ac0940232d31d5a8354f2b457c6d856b2057d69/MarkupSafe-3.0.2-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:f3818cb119498c0678015754eba762e0d61e5b52d34c8b13d770f0719f7b1d79", size = 22978 }, + { url = "https://files.pythonhosted.org/packages/2a/9f/8619835cd6a711d6272d62abb78c033bda638fdc54c4e7f4272cf1c0962b/MarkupSafe-3.0.2-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:cdb82a876c47801bb54a690c5ae105a46b392ac6099881cdfb9f6e95e4014c6a", size = 24208 }, + { url = "https://files.pythonhosted.org/packages/f9/bf/176950a1792b2cd2102b8ffeb5133e1ed984547b75db47c25a67d3359f77/MarkupSafe-3.0.2-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:cabc348d87e913db6ab4aa100f01b08f481097838bdddf7c7a84b7575b7309ca", size = 23357 }, + { url = "https://files.pythonhosted.org/packages/ce/4f/9a02c1d335caabe5c4efb90e1b6e8ee944aa245c1aaaab8e8a618987d816/MarkupSafe-3.0.2-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:444dcda765c8a838eaae23112db52f1efaf750daddb2d9ca300bcae1039adc5c", size = 23344 }, + { url = "https://files.pythonhosted.org/packages/ee/55/c271b57db36f748f0e04a759ace9f8f759ccf22b4960c270c78a394f58be/MarkupSafe-3.0.2-cp313-cp313-win32.whl", hash = "sha256:bcf3e58998965654fdaff38e58584d8937aa3096ab5354d493c77d1fdd66d7a1", size = 15101 }, + { url = "https://files.pythonhosted.org/packages/29/88/07df22d2dd4df40aba9f3e402e6dc1b8ee86297dddbad4872bd5e7b0094f/MarkupSafe-3.0.2-cp313-cp313-win_amd64.whl", hash = "sha256:e6a2a455bd412959b57a172ce6328d2dd1f01cb2135efda2e4576e8a23fa3b0f", size = 15603 }, + { url = "https://files.pythonhosted.org/packages/62/6a/8b89d24db2d32d433dffcd6a8779159da109842434f1dd2f6e71f32f738c/MarkupSafe-3.0.2-cp313-cp313t-macosx_10_13_universal2.whl", hash = "sha256:b5a6b3ada725cea8a5e634536b1b01c30bcdcd7f9c6fff4151548d5bf6b3a36c", size = 14510 }, + { url = "https://files.pythonhosted.org/packages/7a/06/a10f955f70a2e5a9bf78d11a161029d278eeacbd35ef806c3fd17b13060d/MarkupSafe-3.0.2-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:a904af0a6162c73e3edcb969eeeb53a63ceeb5d8cf642fade7d39e7963a22ddb", size = 12486 }, + { url = "https://files.pythonhosted.org/packages/34/cf/65d4a571869a1a9078198ca28f39fba5fbb910f952f9dbc5220afff9f5e6/MarkupSafe-3.0.2-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:4aa4e5faecf353ed117801a068ebab7b7e09ffb6e1d5e412dc852e0da018126c", size = 25480 }, + { url = "https://files.pythonhosted.org/packages/0c/e3/90e9651924c430b885468b56b3d597cabf6d72be4b24a0acd1fa0e12af67/MarkupSafe-3.0.2-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:c0ef13eaeee5b615fb07c9a7dadb38eac06a0608b41570d8ade51c56539e509d", size = 23914 }, + { url = "https://files.pythonhosted.org/packages/66/8c/6c7cf61f95d63bb866db39085150df1f2a5bd3335298f14a66b48e92659c/MarkupSafe-3.0.2-cp313-cp313t-manylinux_2_5_i686.manylinux1_i686.manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:d16a81a06776313e817c951135cf7340a3e91e8c1ff2fac444cfd75fffa04afe", size = 23796 }, + { url = "https://files.pythonhosted.org/packages/bb/35/cbe9238ec3f47ac9a7c8b3df7a808e7cb50fe149dc7039f5f454b3fba218/MarkupSafe-3.0.2-cp313-cp313t-musllinux_1_2_aarch64.whl", hash = "sha256:6381026f158fdb7c72a168278597a5e3a5222e83ea18f543112b2662a9b699c5", size = 25473 }, + { url = "https://files.pythonhosted.org/packages/e6/32/7621a4382488aa283cc05e8984a9c219abad3bca087be9ec77e89939ded9/MarkupSafe-3.0.2-cp313-cp313t-musllinux_1_2_i686.whl", hash = "sha256:3d79d162e7be8f996986c064d1c7c817f6df3a77fe3d6859f6f9e7be4b8c213a", size = 24114 }, + { url = "https://files.pythonhosted.org/packages/0d/80/0985960e4b89922cb5a0bac0ed39c5b96cbc1a536a99f30e8c220a996ed9/MarkupSafe-3.0.2-cp313-cp313t-musllinux_1_2_x86_64.whl", hash = "sha256:131a3c7689c85f5ad20f9f6fb1b866f402c445b220c19fe4308c0b147ccd2ad9", size = 24098 }, + { url = "https://files.pythonhosted.org/packages/82/78/fedb03c7d5380df2427038ec8d973587e90561b2d90cd472ce9254cf348b/MarkupSafe-3.0.2-cp313-cp313t-win32.whl", hash = "sha256:ba8062ed2cf21c07a9e295d5b8a2a5ce678b913b45fdf68c32d95d6c1291e0b6", size = 15208 }, + { url = "https://files.pythonhosted.org/packages/4f/65/6079a46068dfceaeabb5dcad6d674f5f5c61a6fa5673746f42a9f4c233b3/MarkupSafe-3.0.2-cp313-cp313t-win_amd64.whl", hash = "sha256:e444a31f8db13eb18ada366ab3cf45fd4b31e4db1236a4448f68778c1d1a5a2f", size = 15739 }, +] + [[package]] name = "mypy" version = "1.17.0" @@ -977,6 +1046,25 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/47/fd/4feb52a55c1a4bd748f2acaed1903ab54a723c47f6d0242780f4d97104d4/psycopg_pool-3.2.6-py3-none-any.whl", hash = "sha256:5887318a9f6af906d041a0b1dc1c60f8f0dda8340c2572b74e10907b51ed5da7", size = 38252 }, ] +[[package]] +name = "psycopg2-binary" +version = "2.9.10" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/cb/0e/bdc8274dc0585090b4e3432267d7be4dfbfd8971c0fa59167c711105a6bf/psycopg2-binary-2.9.10.tar.gz", hash = "sha256:4b3df0e6990aa98acda57d983942eff13d824135fe2250e6522edaa782a06de2", size = 385764 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/3e/30/d41d3ba765609c0763505d565c4d12d8f3c79793f0d0f044ff5a28bf395b/psycopg2_binary-2.9.10-cp313-cp313-macosx_12_0_x86_64.whl", hash = "sha256:26540d4a9a4e2b096f1ff9cce51253d0504dca5a85872c7f7be23be5a53eb18d", size = 3044699 }, + { url = "https://files.pythonhosted.org/packages/35/44/257ddadec7ef04536ba71af6bc6a75ec05c5343004a7ec93006bee66c0bc/psycopg2_binary-2.9.10-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:e217ce4d37667df0bc1c397fdcd8de5e81018ef305aed9415c3b093faaeb10fb", size = 3275245 }, + { url = "https://files.pythonhosted.org/packages/1b/11/48ea1cd11de67f9efd7262085588790a95d9dfcd9b8a687d46caf7305c1a/psycopg2_binary-2.9.10-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:245159e7ab20a71d989da00f280ca57da7641fa2cdcf71749c193cea540a74f7", size = 2851631 }, + { url = "https://files.pythonhosted.org/packages/62/e0/62ce5ee650e6c86719d621a761fe4bc846ab9eff8c1f12b1ed5741bf1c9b/psycopg2_binary-2.9.10-cp313-cp313-manylinux_2_17_i686.manylinux2014_i686.whl", hash = "sha256:3c4ded1a24b20021ebe677b7b08ad10bf09aac197d6943bfe6fec70ac4e4690d", size = 3082140 }, + { url = "https://files.pythonhosted.org/packages/27/ce/63f946c098611f7be234c0dd7cb1ad68b0b5744d34f68062bb3c5aa510c8/psycopg2_binary-2.9.10-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:3abb691ff9e57d4a93355f60d4f4c1dd2d68326c968e7db17ea96df3c023ef73", size = 3264762 }, + { url = "https://files.pythonhosted.org/packages/43/25/c603cd81402e69edf7daa59b1602bd41eb9859e2824b8c0855d748366ac9/psycopg2_binary-2.9.10-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8608c078134f0b3cbd9f89b34bd60a943b23fd33cc5f065e8d5f840061bd0673", size = 3020967 }, + { url = "https://files.pythonhosted.org/packages/5f/d6/8708d8c6fca531057fa170cdde8df870e8b6a9b136e82b361c65e42b841e/psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:230eeae2d71594103cd5b93fd29d1ace6420d0b86f4778739cb1a5a32f607d1f", size = 2872326 }, + { url = "https://files.pythonhosted.org/packages/ce/ac/5b1ea50fc08a9df82de7e1771537557f07c2632231bbab652c7e22597908/psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_i686.whl", hash = "sha256:bb89f0a835bcfc1d42ccd5f41f04870c1b936d8507c6df12b7737febc40f0909", size = 2822712 }, + { url = "https://files.pythonhosted.org/packages/c4/fc/504d4503b2abc4570fac3ca56eb8fed5e437bf9c9ef13f36b6621db8ef00/psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_ppc64le.whl", hash = "sha256:f0c2d907a1e102526dd2986df638343388b94c33860ff3bbe1384130828714b1", size = 2920155 }, + { url = "https://files.pythonhosted.org/packages/b2/d1/323581e9273ad2c0dbd1902f3fb50c441da86e894b6e25a73c3fda32c57e/psycopg2_binary-2.9.10-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:f8157bed2f51db683f31306aa497311b560f2265998122abe1dce6428bd86567", size = 2959356 }, + { url = "https://files.pythonhosted.org/packages/08/50/d13ea0a054189ae1bc21af1d85b6f8bb9bbc5572991055d70ad9006fe2d6/psycopg2_binary-2.9.10-cp313-cp313-win_amd64.whl", hash = "sha256:27422aa5f11fbcd9b18da48373eb67081243662f9b46e6fd07c3eb46e4535142", size = 2569224 }, +] + [[package]] name = "pycparser" version = "2.22" @@ -1133,10 +1221,13 @@ name = "raw-langchain" version = "0.0.1" source = { virtual = "." } dependencies = [ + { name = "alembic" }, { name = "asgi-correlation-id" }, { name = "asyncpg" }, { name = "black" }, + { name = "dependency-injector" }, { name = "fastapi" }, + { name = "greenlet" }, { name = "langchain" }, { name = "langchain-core" }, { name = "langchain-openai" }, @@ -1147,6 +1238,8 @@ dependencies = [ { name = "opentelemetry-instrumentation-fastapi" }, { name = "prometheus-fastapi-instrumentator" }, { name = "psycopg", extra = ["binary", "pool"] }, + { name = "psycopg2-binary" }, + { name = "pydantic-core" }, { name = "pyright" }, { name = "pytest" }, { name = "pytest-asyncio" }, @@ -1154,16 +1247,20 @@ dependencies = [ { name = "python-dotenv" }, { name = "python-multipart" }, { name = "ruff" }, + { name = "sqlmodel" }, { name = "sse-starlette" }, { name = "uvicorn" }, ] [package.metadata] requires-dist = [ + { name = "alembic", specifier = ">=1.16.4" }, { name = "asgi-correlation-id", specifier = ">=4.3.4" }, { name = "asyncpg", specifier = ">=0.30.0" }, { name = "black", specifier = ">=25.1.0" }, + { name = "dependency-injector", specifier = ">=4.48.1" }, { name = "fastapi", specifier = ">=0.115.14" }, + { name = "greenlet", specifier = ">=3.2.3" }, { name = "langchain", specifier = ">=0.3.26" }, { name = "langchain-core", specifier = "==0.3.69" }, { name = "langchain-openai", specifier = ">=0.3.27" }, @@ -1174,6 +1271,8 @@ requires-dist = [ { name = "opentelemetry-instrumentation-fastapi", specifier = ">=0.55b1" }, { name = "prometheus-fastapi-instrumentator", specifier = ">=7.1.0" }, { name = "psycopg", extras = ["binary", "pool"], specifier = ">=3.2.9" }, + { name = "psycopg2-binary", specifier = ">=2.9.10" }, + { name = "pydantic-core", specifier = ">=2.33.2" }, { name = "pyright", specifier = ">=1.1.403" }, { name = "pytest", specifier = ">=8.4.1" }, { name = "pytest-asyncio", specifier = ">=1.1.0" }, @@ -1181,6 +1280,7 @@ requires-dist = [ { name = "python-dotenv", specifier = ">=1.1.1" }, { name = "python-multipart", specifier = ">=0.0.20" }, { name = "ruff", specifier = ">=0.12.5" }, + { name = "sqlmodel", specifier = ">=0.0.24" }, { name = "sse-starlette", specifier = "==2.4.1" }, { name = "uvicorn", specifier = ">=0.35.0" }, ] @@ -1290,6 +1390,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/1c/fc/9ba22f01b5cdacc8f5ed0d22304718d2c758fce3fd49a5372b886a86f37c/sqlalchemy-2.0.41-py3-none-any.whl", hash = "sha256:57df5dc6fdb5ed1a88a1ed2195fd31927e705cad62dedd86b46972752a80f576", size = 1911224 }, ] +[[package]] +name = "sqlmodel" +version = "0.0.24" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "pydantic" }, + { name = "sqlalchemy" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/86/4b/c2ad0496f5bdc6073d9b4cef52be9c04f2b37a5773441cc6600b1857648b/sqlmodel-0.0.24.tar.gz", hash = "sha256:cc5c7613c1a5533c9c7867e1aab2fd489a76c9e8a061984da11b4e613c182423", size = 116780 } +wheels = [ + { url = "https://files.pythonhosted.org/packages/16/91/484cd2d05569892b7fef7f5ceab3bc89fb0f8a8c0cde1030d383dbc5449c/sqlmodel-0.0.24-py3-none-any.whl", hash = "sha256:6778852f09370908985b667d6a3ab92910d0d5ec88adcaf23dbc242715ff7193", size = 28622 }, +] + [[package]] name = "sse-starlette" version = "2.4.1" From 0e57492b51831380b0707b5f30283ea43d1e0b9a Mon Sep 17 00:00:00 2001 From: assada Date: Tue, 12 Aug 2025 13:55:49 +0200 Subject: [PATCH 2/2] chore: fix migration --- alembic/versions/4c750f1a48e6_create_threads_table.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alembic/versions/4c750f1a48e6_create_threads_table.py b/alembic/versions/4c750f1a48e6_create_threads_table.py index 0f1b80c..b01c41f 100644 --- a/alembic/versions/4c750f1a48e6_create_threads_table.py +++ b/alembic/versions/4c750f1a48e6_create_threads_table.py @@ -42,4 +42,4 @@ def upgrade() -> None: def downgrade() -> None: - op.drop_table("threads") + op.drop_table("thread")