diff --git a/veadk/agent.py b/veadk/agent.py index cc7701a3..a4d1f355 100644 --- a/veadk/agent.py +++ b/veadk/agent.py @@ -90,6 +90,9 @@ class Agent(LlmAgent): In VeADK, the `long_term_memory` refers to cross-session memory under the same user. """ + short_term_memory: Optional[ShortTermMemory] = None + """The short term memory provided to agent. This attribute is not used in agent directly, as it will be passed to Runner in VeADK.""" + tracers: list[BaseTracer] = [] """The tracers provided to agent.""" diff --git a/veadk/agents/loop_agent.py b/veadk/agents/loop_agent.py index 33d2494e..95b6538b 100644 --- a/veadk/agents/loop_agent.py +++ b/veadk/agents/loop_agent.py @@ -14,11 +14,14 @@ from __future__ import annotations +from typing import Optional + from google.adk.agents import LoopAgent as GoogleADKLoopAgent from google.adk.agents.base_agent import BaseAgent from pydantic import ConfigDict, Field from typing_extensions import Any +from veadk.memory.short_term_memory import ShortTermMemory from veadk.prompts.agent_default_prompt import DEFAULT_DESCRIPTION, DEFAULT_INSTRUCTION from veadk.tracing.base_tracer import BaseTracer from veadk.utils.logger import get_logger @@ -49,6 +52,9 @@ class LoopAgent(GoogleADKLoopAgent): tracers: list[BaseTracer] = [] """The tracers provided to agent.""" + short_term_memory: Optional[ShortTermMemory] = None + """The short term memory provided to agent. This attribute is not used in agent directly, as it will be passed to Runner in VeADK.""" + def model_post_init(self, __context: Any) -> None: super().model_post_init(None) # for sub_agents init diff --git a/veadk/agents/parallel_agent.py b/veadk/agents/parallel_agent.py index d13b3a95..ec3714b0 100644 --- a/veadk/agents/parallel_agent.py +++ b/veadk/agents/parallel_agent.py @@ -14,11 +14,14 @@ from __future__ import annotations +from typing import Optional + from google.adk.agents import ParallelAgent as GoogleADKParallelAgent from google.adk.agents.base_agent import BaseAgent from pydantic import ConfigDict, Field from typing_extensions import Any +from veadk.memory.short_term_memory import ShortTermMemory from veadk.prompts.agent_default_prompt import DEFAULT_DESCRIPTION, DEFAULT_INSTRUCTION from veadk.tracing.base_tracer import BaseTracer from veadk.utils.logger import get_logger @@ -49,6 +52,9 @@ class ParallelAgent(GoogleADKParallelAgent): tracers: list[BaseTracer] = [] """The tracers provided to agent.""" + short_term_memory: Optional[ShortTermMemory] = None + """The short term memory provided to agent. This attribute is not used in agent directly, as it will be passed to Runner in VeADK.""" + def model_post_init(self, __context: Any) -> None: super().model_post_init(None) # for sub_agents init diff --git a/veadk/agents/sequential_agent.py b/veadk/agents/sequential_agent.py index cf4f5146..7904cd83 100644 --- a/veadk/agents/sequential_agent.py +++ b/veadk/agents/sequential_agent.py @@ -14,11 +14,14 @@ from __future__ import annotations +from typing import Optional + from google.adk.agents import SequentialAgent as GoogleADKSequentialAgent from google.adk.agents.base_agent import BaseAgent from pydantic import ConfigDict, Field from typing_extensions import Any +from veadk.memory.short_term_memory import ShortTermMemory from veadk.prompts.agent_default_prompt import DEFAULT_DESCRIPTION, DEFAULT_INSTRUCTION from veadk.tracing.base_tracer import BaseTracer from veadk.utils.logger import get_logger @@ -49,6 +52,9 @@ class SequentialAgent(GoogleADKSequentialAgent): tracers: list[BaseTracer] = [] """The tracers provided to agent.""" + short_term_memory: Optional[ShortTermMemory] = None + """The short term memory provided to agent. This attribute is not used in agent directly, as it will be passed to Runner in VeADK.""" + def model_post_init(self, __context: Any) -> None: super().model_post_init(None) # for sub_agents init diff --git a/veadk/cli/cli.py b/veadk/cli/cli.py index 9f39187e..b97115a7 100644 --- a/veadk/cli/cli.py +++ b/veadk/cli/cli.py @@ -15,11 +15,12 @@ import click +from veadk.cli.cli_blog import blog from veadk.cli.cli_deploy import deploy from veadk.cli.cli_init import init +from veadk.cli.cli_pipeline import pipeline from veadk.cli.cli_prompt import prompt from veadk.cli.cli_web import web -from veadk.cli.cli_pipeline import pipeline from veadk.version import VERSION @@ -37,6 +38,7 @@ def veadk(): veadk.add_command(prompt) veadk.add_command(web) veadk.add_command(pipeline) +veadk.add_command(blog) if __name__ == "__main__": veadk() diff --git a/veadk/cli/cli_blog.py b/veadk/cli/cli_blog.py new file mode 100644 index 00000000..d8b88d51 --- /dev/null +++ b/veadk/cli/cli_blog.py @@ -0,0 +1,125 @@ +# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import warnings +from typing import Any + +import click + +from veadk.version import VERSION + +warnings.filterwarnings( + "ignore", category=UserWarning, module="pydantic._internal._fields" +) + + +def _render_prompts() -> dict[str, Any]: + vefaas_application_name = click.prompt( + "Volcengine FaaS application name", default="veadk-cloud-agent" + ) + + veapig_instance_name = click.prompt( + "Volcengine API Gateway instance name", default="", show_default=True + ) + + veapig_service_name = click.prompt( + "Volcengine API Gateway service name", default="", show_default=True + ) + + veapig_upstream_name = click.prompt( + "Volcengine API Gateway upstream name", default="", show_default=True + ) + + deploy_mode_options = { + "1": "A2A/MCP Server", + "2": "VeADK Web / Google ADK Web", + } + + click.echo("Choose a deploy mode:") + for key, value in deploy_mode_options.items(): + click.echo(f" {key}. {value}") + + deploy_mode = click.prompt( + "Enter your choice", type=click.Choice(deploy_mode_options.keys()) + ) + + return { + "vefaas_application_name": vefaas_application_name, + "veapig_instance_name": veapig_instance_name, + "veapig_service_name": veapig_service_name, + "veapig_upstream_name": veapig_upstream_name, + "use_adk_web": deploy_mode == "2", + "veadk_version": VERSION, + } + + +@click.command() +@click.option( + "--vefaas-template-type", default="template", help="Expected template type" +) +def blog( + vefaas_template_type: str, +) -> None: + """Init a veadk project that can be deployed to Volcengine VeFaaS. + + `template` is A2A/MCP/Web server template, `web_template` is for web applications (i.e., a simple blog). + """ + import shutil + from pathlib import Path + + from cookiecutter.main import cookiecutter + + import veadk.integrations.ve_faas as vefaas + + if vefaas_template_type == "web_template": + click.echo( + "Welcome use VeADK to create your project. We will generate a `simple-blog` web application for you." + ) + else: + click.echo( + "Welcome use VeADK to create your project. We will generate a `weather-reporter` application for you." + ) + + cwd = Path.cwd() + local_dir_name = click.prompt("Local directory name", default="veadk-cloud-proj") + target_dir_path = cwd / local_dir_name + + if target_dir_path.exists(): + click.confirm( + f"Directory '{target_dir_path}' already exists, do you want to overwrite it", + abort=True, + ) + shutil.rmtree(target_dir_path) + + settings = _render_prompts() + settings["local_dir_name"] = local_dir_name + + if not vefaas_template_type: + vefaas_template_type = "template" + + template_dir_path = Path(vefaas.__file__).parent / vefaas_template_type + + cookiecutter( + template=str(template_dir_path), + output_dir=str(cwd), + extra_context=settings, + no_input=True, + ) + + click.echo(f"Template project has been generated at {target_dir_path}") + click.echo(f"Edit {target_dir_path / 'src/'} to define your agents") + click.echo( + f"Edit {target_dir_path / 'deploy.py'} to define your deployment attributes" + ) + click.echo("Run python `deploy.py` for deployment on Volcengine FaaS platform.") diff --git a/veadk/cli/cli_deploy.py b/veadk/cli/cli_deploy.py index 567cd0fa..fd30c490 100644 --- a/veadk/cli/cli_deploy.py +++ b/veadk/cli/cli_deploy.py @@ -12,152 +12,130 @@ # See the License for the specific language governing permissions and # limitations under the License. +from pathlib import Path import click +from InquirerPy.resolver import prompt +from jinja2 import Template +from yaml import safe_load + +import veadk.integrations.ve_faas as vefaas +from veadk.integrations.ve_faas.ve_faas import VeFaaS +from veadk.configs.deploy_config import VeDeployConfig +from veadk.utils.logger import get_logger + +logger = get_logger(__name__) -from veadk.version import VERSION TEMP_PATH = "/tmp" +DEPLOY_CONFIGS = [ + { + "name": "vefaas_application_name", + "type": "input", + "message": "VeFaaS application name:", + "default": "veadk-cloud-application", + }, + { + "name": "veapig_instance_name", + "type": "input", + "message": "VeAPI Gateway instance name:", + }, + { + "name": "veapig_service_name", + "type": "input", + "message": "VeAPI Gateway service name:", + }, + { + "name": "veapig_upstream_name", + "type": "input", + "message": "VeAPI Gateway upstream name:", + }, + { + "name": "entrypoint_agent", + "type": "input", + "message": "The entrypoint agent (e.g. agent_dir.agent_module:agent)", + }, + { + "name": "deploy_mode", + "type": "list", + "message": "Deploy mode:", + "choices": ["A2A/MCP", "Web"], + }, +] + + +def _get_user_configs() -> dict: + user_configs = prompt(DEPLOY_CONFIGS) + return user_configs + + @click.command() @click.option( - "--access-key", + "--volcengine-access-key", + envvar="VOLCENGINE_ACCESS_KEY", default=None, help="Volcengine access key", ) @click.option( - "--secret-key", + "--volcengine-secret-key", + envvar="VOLCENGINE_SECRET_KEY", default=None, help="Volcengine secret key", ) -@click.option( - "--vefaas-app-name", required=True, help="Expected Volcengine FaaS application name" -) -@click.option( - "--veapig-instance-name", default="", help="Expected Volcengine APIG instance name" -) -@click.option( - "--veapig-service-name", default="", help="Expected Volcengine APIG service name" -) -@click.option( - "--veapig-upstream-name", default="", help="Expected Volcengine APIG upstream name" -) -@click.option( - "--short-term-memory-backend", - default="local", - type=click.Choice(["local", "mysql"]), - help="Backend for short-term memory", -) -@click.option("--use-adk-web", is_flag=True, help="Whether to use ADK Web") -@click.option("--path", default=".", help="Local project path") +@click.option("--project-path", default=".", help="Local project path") +@click.option("--deploy-config-file", default=None, help="Deploy config file path") def deploy( - access_key: str, - secret_key: str, - vefaas_app_name: str, - veapig_instance_name: str, - veapig_service_name: str, - veapig_upstream_name: str, - short_term_memory_backend: str, - use_adk_web: bool, - path: str, + volcengine_access_key: str, + volcengine_secret_key: str, + project_path: str, + deploy_config_file: str, ) -> None: """Deploy a user project to Volcengine FaaS application.""" - import asyncio - import shutil - from pathlib import Path - - from cookiecutter.main import cookiecutter - - import veadk.integrations.ve_faas as vefaas - from veadk.config import getenv - from veadk.utils.logger import get_logger - from veadk.utils.misc import formatted_timestamp, load_module_from_file - - logger = get_logger(__name__) - - if not access_key: - access_key = getenv("VOLCENGINE_ACCESS_KEY") - if not secret_key: - secret_key = getenv("VOLCENGINE_SECRET_KEY") - - user_proj_abs_path = Path(path).resolve() - template_dir_path = Path(vefaas.__file__).parent / "template" - - tmp_dir_name = f"{user_proj_abs_path.name}_{formatted_timestamp()}" - - settings = { - "local_dir_name": tmp_dir_name.replace("-", "_"), - "app_name": user_proj_abs_path.name.replace("-", "_"), - "agent_module_name": user_proj_abs_path.name, - "short_term_memory_backend": short_term_memory_backend, - "vefaas_application_name": vefaas_app_name, - "veapig_instance_name": veapig_instance_name, - "veapig_service_name": veapig_service_name, - "veapig_upstream_name": veapig_upstream_name, - "use_adk_web": use_adk_web, - "veadk_version": VERSION, - } - - cookiecutter( - template=str(template_dir_path), - output_dir=TEMP_PATH, - no_input=True, - extra_context=settings, - ) - logger.debug(f"Create a template project at {TEMP_PATH}/{tmp_dir_name}") - agent_dir = ( - Path(TEMP_PATH) - / tmp_dir_name - / "src" - / user_proj_abs_path.name.replace("-", "_") - ) + if not volcengine_access_key or not volcengine_secret_key: + raise Exception("Volcengine access key and secret key must be set.") - # remove /tmp/tmp_dir_name/src/user_proj_abs_path.name - shutil.rmtree(agent_dir) - agent_dir.mkdir(parents=True, exist_ok=True) + deploy_config_file = ( + deploy_config_file if deploy_config_file else Path(project_path) / "deploy.yaml" + ) + if not Path(deploy_config_file).exists(): + click.echo(f"Deployment configuration file not found in {deploy_config_file}.") - # copy - shutil.copytree(user_proj_abs_path, agent_dir, dirs_exist_ok=True) - logger.debug(f"Remove agent module from {user_proj_abs_path} to {agent_dir}") + user_configs = _get_user_configs() - # copy requirements.txt - if (user_proj_abs_path / "requirements.txt").exists(): - logger.debug( - f"Find a requirements.txt in {user_proj_abs_path}/requirements.txt, copy it to temp project." - ) - shutil.copy( - user_proj_abs_path / "requirements.txt", - Path(TEMP_PATH) / tmp_dir_name / "src" / "requirements.txt", - ) - else: - logger.warning( - "No requirements.txt found in the user project, we will use a default one." + deploy_config_template_file_path = ( + Path(vefaas.__file__).parent + / "template" + / "{{cookiecutter.local_dir_name}}" + / "deploy.yaml" ) - # avoid upload user's config.yaml - if (user_proj_abs_path / "config.yaml").exists(): - logger.warning( - f"Find a config.yaml in {user_proj_abs_path}/config.yaml, we will not upload it by default." - ) - shutil.move(agent_dir / "config.yaml", Path(TEMP_PATH) / tmp_dir_name) - else: - logger.info( - "No config.yaml found in the user project. Some environment variables may not be set." - ) + with open(deploy_config_template_file_path, "r", encoding="utf-8") as f: + template_content = f.read() - # load - logger.debug( - f"Load deploy module from {Path(TEMP_PATH) / tmp_dir_name / 'deploy.py'}" - ) - deploy_module = load_module_from_file( - module_name="deploy_module", - file_path=str(Path(TEMP_PATH) / tmp_dir_name / "deploy.py"), - ) - logger.info(f"Begin deploy from {Path(TEMP_PATH) / tmp_dir_name / 'src'}") - asyncio.run(deploy_module.main()) + template = Template(template_content) + + rendered_content = template.render({"cookiecutter": user_configs}) + + output_path = Path.cwd() / "deploy.yaml" + with open(output_path, "w", encoding="utf-8") as f: + f.write(rendered_content) + + click.echo("Deployment configuration file generated.") + + # read deploy.yaml + # with open(output_path, "r", encoding="utf-8") as yaml_file: + # deploy_config_dict = safe_load(yaml_file) + with open(deploy_config_file, "r", encoding="utf-8") as yaml_file: + deploy_config_dict = safe_load(yaml_file) + + deploy_config = VeDeployConfig(**deploy_config_dict) + import veadk.config + + deploy_config.vefaas.function_envs.update(veadk.config.veadk_environments) + + vefaas_client = VeFaaS(deploy_config=deploy_config) - # remove tmp file - logger.info("Deploy done. Delete temp dir.") - shutil.rmtree(Path(TEMP_PATH) / tmp_dir_name) + vefaas_client.deploy() diff --git a/veadk/cli/cli_init.py b/veadk/cli/cli_init.py index 7fb858a3..42638be1 100644 --- a/veadk/cli/cli_init.py +++ b/veadk/cli/cli_init.py @@ -11,90 +11,67 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import shutil import warnings -from typing import Any +from pathlib import Path import click +from cookiecutter.main import cookiecutter +from InquirerPy.resolver import prompt +from InquirerPy.utils import InquirerPySessionResult +import veadk.integrations.ve_faas as vefaas from veadk.version import VERSION warnings.filterwarnings( "ignore", category=UserWarning, module="pydantic._internal._fields" ) - -def _render_prompts() -> dict[str, Any]: - vefaas_application_name = click.prompt( - "Volcengine FaaS application name", default="veadk-cloud-agent" - ) - - veapig_instance_name = click.prompt( - "Volcengine API Gateway instance name", default="", show_default=True - ) - - veapig_service_name = click.prompt( - "Volcengine API Gateway service name", default="", show_default=True - ) - - veapig_upstream_name = click.prompt( - "Volcengine API Gateway upstream name", default="", show_default=True - ) - - deploy_mode_options = { - "1": "A2A/MCP Server", - "2": "VeADK Web / Google ADK Web", - } - - click.echo("Choose a deploy mode:") - for key, value in deploy_mode_options.items(): - click.echo(f" {key}. {value}") - - deploy_mode = click.prompt( - "Enter your choice", type=click.Choice(deploy_mode_options.keys()) - ) - - return { - "vefaas_application_name": vefaas_application_name, - "veapig_instance_name": veapig_instance_name, - "veapig_service_name": veapig_service_name, - "veapig_upstream_name": veapig_upstream_name, - "use_adk_web": deploy_mode == "2", - "veadk_version": VERSION, - } - - -@click.command() -@click.option( - "--vefaas-template-type", default="template", help="Expected template type" -) -def init( - vefaas_template_type: str, -) -> None: - """Init a veadk project that can be deployed to Volcengine VeFaaS. - - `template` is A2A/MCP/Web server template, `web_template` is for web applications (i.e., a simple blog). - """ - import shutil - from pathlib import Path - - from cookiecutter.main import cookiecutter - - import veadk.integrations.ve_faas as vefaas - - if vefaas_template_type == "web_template": - click.echo( - "Welcome use VeADK to create your project. We will generate a `simple-blog` web application for you." - ) - else: - click.echo( - "Welcome use VeADK to create your project. We will generate a `weather-reporter` application for you." - ) - - cwd = Path.cwd() - local_dir_name = click.prompt("Local directory name", default="veadk-cloud-proj") - target_dir_path = cwd / local_dir_name - +DEPLOY_CONFIGS = [ + { + "name": "local_dir_name", + "type": "input", + "message": "Local project name:", + "default": "veadk-cloud-proj", + }, + { + "name": "vefaas_application_name", + "type": "input", + "message": "VeFaaS application name:", + "default": "veadk-cloud-application", + }, + { + "name": "veapig_instance_name", + "type": "input", + "message": "VeAPI Gateway instance name:", + }, + { + "name": "veapig_service_name", + "type": "input", + "message": "VeAPI Gateway service name:", + }, + { + "name": "veapig_upstream_name", + "type": "input", + "message": "VeAPI Gateway upstream name:", + }, + { + "name": "deploy_mode", + "type": "list", + "message": "Deploy mode:", + "choices": ["A2A/MCP", "Web"], + }, +] + + +def _get_user_configs() -> dict: + user_configs = prompt(DEPLOY_CONFIGS) + user_configs["veadk_version"] = VERSION + return user_configs + + +def _check_local_dir_exists(configs: InquirerPySessionResult) -> None: + target_dir_path = Path.cwd() / str(configs["local_dir_name"]) if target_dir_path.exists(): click.confirm( f"Directory '{target_dir_path}' already exists, do you want to overwrite it", @@ -102,24 +79,34 @@ def init( ) shutil.rmtree(target_dir_path) - settings = _render_prompts() - settings["local_dir_name"] = local_dir_name - if not vefaas_template_type: - vefaas_template_type = "template" +@click.command() +def init() -> None: + """Init a veadk project that can be deployed to Volcengine VeFaaS.""" + + click.echo( + "Welcome use VeADK to create your project. We will generate a `weather-reporter` application for you." + ) + + # 1. get user configurations by rendering prompts + user_configs = _get_user_configs() + _check_local_dir_exists(user_configs) - template_dir_path = Path(vefaas.__file__).parent / vefaas_template_type + # 2. copy template files + template_path = Path(vefaas.__file__).parent / "template" cookiecutter( - template=str(template_dir_path), - output_dir=str(cwd), - extra_context=settings, + template=str(template_path), + output_dir=str(Path.cwd()), + extra_context=user_configs, no_input=True, ) - click.echo(f"Template project has been generated at {target_dir_path}") - click.echo(f"Edit {target_dir_path / 'src/'} to define your agents") click.echo( - f"Edit {target_dir_path / 'deploy.py'} to define your deployment attributes" + f"🎉 Template project has been generated at {Path.cwd() / str(user_configs['local_dir_name'])}" ) - click.echo("Run python `deploy.py` for deployment on Volcengine FaaS platform.") + + click.echo(f"""Run: + - cd {user_configs["local_dir_name"]} + - veadk deploy +for deployment on Volcengine FaaS platform.""") diff --git a/veadk/cloud/app.py b/veadk/cloud/app.py new file mode 100644 index 00000000..4198f107 --- /dev/null +++ b/veadk/cloud/app.py @@ -0,0 +1,282 @@ +import os +from contextlib import asynccontextmanager +from typing import Callable +from fastapi import FastAPI +from starlette.applications import Starlette +import threading +from fastapi.routing import APIRoute +from starlette.routing import Route +from google.adk.agents import BaseAgent +from google.adk.a2a.utils.agent_card_builder import AgentCardBuilder +from google.adk.a2a.executor.a2a_agent_executor import A2aAgentExecutor +from uvicorn.importer import import_from_string +from google.adk.a2a.utils.agent_to_a2a import to_a2a +from fastmcp import FastMCP +from a2a.types import AgentProvider +from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporter +from veadk.tracing.telemetry.exporters.cozeloop_exporter import CozeloopExporter +from veadk.tracing.telemetry.exporters.tls_exporter import TLSExporter +from veadk.tracing.telemetry.opentelemetry_tracer import OpentelemetryTracer +from veadk.utils.logger import get_logger +from veadk.runner import Runner +from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator +from opentelemetry import context +import asyncio +from veadk.memory.short_term_memory import ShortTermMemory + + +logger = get_logger(__name__) + +VEFAAS_REGION = os.getenv("APP_REGION", "cn-beijing") +VEFAAS_FUNC_ID = os.getenv("_FAAS_FUNC_ID", "") + +# Get entrypoint agent +entrypoint_agent_string = os.getenv("VEADK_ENTRYPOINT_AGENT", None) +entrypoint_agent = import_from_string(entrypoint_agent_string) +HOST = "0.0.0.0" +PORT = 8000 + +assert isinstance(entrypoint_agent, BaseAgent), ( + "The entrypoint agent must be an instance of BaseAgent." +) + +# Get app_name from environment variable +app_name = os.getenv("APP_NAME", "veadk-app") + + +def veadk_to_a2a( + agent: BaseAgent, *, host: str = "localhost", port: int = 8000 +) -> Starlette: + # Patch A2aAgentExecutor + original_init = A2aAgentExecutor.__init__ + + def patched_init(self, *, runner=None, config=None): + original_init( + self, + runner=Runner( + agent=agent, + app_name="veadk_agent", + user_id="user_123", + short_term_memory=ShortTermMemory(), + ), + config=config, + ) + + A2aAgentExecutor.__init__ = patched_init + app = to_a2a(agent=agent, host=host, port=port) + A2aAgentExecutor.__init__ = original_init + + return app + + +def load_tracer(agent: BaseAgent) -> None: + EXPORTER_REGISTRY = { + "VEADK_TRACER_APMPLUS": APMPlusExporter, + "VEADK_TRACER_COZELOOP": CozeloopExporter, + "VEADK_TRACER_TLS": TLSExporter, + } + + exporters = [] + for env_var, exporter_cls in EXPORTER_REGISTRY.items(): + if os.getenv(env_var, "").lower() == "true": + if ( + agent.tracers + and isinstance(agent.tracers[0], OpentelemetryTracer) + and any(isinstance(e, exporter_cls) for e in agent.tracers[0].exporters) + ): + logger.warning( + f"Exporter {exporter_cls.__name__} is already defined in agent.tracers[0].exporters. These two exporters will be used at the same time. As a result, your data may be uploaded twice." + ) + else: + exporters.append(exporter_cls()) + + tracer = OpentelemetryTracer(name="veadk_tracer", exporters=exporters) + agent.tracers.extend([tracer]) + + +def build_run_agent_func( + agent: BaseAgent, app_name: str, short_term_memory=None +) -> Callable: + runner = Runner( + agent=agent, + short_term_memory=short_term_memory, + app_name=app_name, + user_id="", + ) + + async def run_agent( + user_input: str, + user_id: str = "mcp_user", + session_id: str = "mcp_session", + ) -> str: + runner.user_id = user_id + final_output = await runner.run( + messages=user_input, + session_id=session_id, + ) + return final_output + + run_agent_doc = f"""{agent.description} + Args: + user_input: User's input message (required). + user_id: User identifier. Defaults to "mcp_user". + session_id: Session identifier. Defaults to "mcp_session". + Returns: + Final agent response as a string.""" + + run_agent.__doc__ = run_agent_doc + return run_agent + + +def _to_a2a(agent: BaseAgent) -> FastAPI: + # Use to_a2a to create Starlette app, then convert to FastAPI + starlette_app = veadk_to_a2a(agent=agent, host=HOST, port=PORT) + + # Create FastAPI app and import Starlette routes + fastapi_app = FastAPI() + + # Import A2A routes to FastAPI app + # Execute startup event to generate routes + + # async def trigger_startup(): + # for handler in starlette_app.router.on_startup: + # await handler() if asyncio.iscoroutinefunction(handler) else handler() + # asyncio.run(trigger_startup()) + + def run_async_startup(): + """Run startup handlers in a new event loop""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + async def trigger_startup(): + for handler in starlette_app.router.on_startup: + if asyncio.iscoroutinefunction(handler): + await handler() + else: + handler() + + loop.run_until_complete(trigger_startup()) + loop.close() + + try: + asyncio.get_running_loop() + # if in event loop, run in another thread + thread = threading.Thread(target=run_async_startup) + thread.start() + thread.join() + except RuntimeError: + # if not in event loop, run in current thread + run_async_startup() + + # Copy routes to FastAPI + fastapi_app.router.routes.extend(starlette_app.routes) + return fastapi_app + + +def _to_mcp(a2a_app: FastAPI, app_name: str): + # Build mcp server + mcp = FastMCP.from_fastapi(app=a2a_app, name=app_name, include_tags={"mcp"}) + + # Create MCP ASGI app + mcp_app = mcp.http_app(path="/", transport="streamable-http") + + return mcp_app + + +def _custom_routes(a2a_app: FastAPI, agent: BaseAgent, run_agent_func: Callable): + agent_card_builder = AgentCardBuilder( + agent=agent, + provider=AgentProvider( + organization="Volcengine Agent Development Kit (VeADK)", + url=f"https://console.volcengine.com/vefaas/region:vefaas+{VEFAAS_REGION}/function/detail/{VEFAAS_FUNC_ID}", + ), + ) + + async def agent_card() -> dict: + agent_card = await agent_card_builder.build() + return agent_card.model_dump() + + async def get_cozeloop_space_id() -> dict: + return { + "space_id": os.getenv( + "OBSERVABILITY_OPENTELEMETRY_COZELOOP_SERVICE_NAME", default="" + ) + } + + a2a_app.post("/run_agent", operation_id="run_agent", tags=["mcp"])(run_agent_func) + a2a_app.get("/agent_card", operation_id="agent_card", tags=["mcp"])(agent_card) + a2a_app.get( + "/get_cozeloop_space_id", operation_id="get_cozeloop_space_id", tags=["mcp"] + )(get_cozeloop_space_id) + + +def agent_to_server(agent: BaseAgent, app_name: str) -> FastAPI: + load_tracer(agent) + + # Build a run_agent function for building MCP server + run_agent_func = build_run_agent_func(agent, app_name, short_term_memory=None) + + # a2a_app + a2a_app = _to_a2a(agent) + + # Add custom routes to a2a_app + _custom_routes(a2a_app, agent, run_agent_func) + + # Build mcp server + mcp_app = _to_mcp(a2a_app, app_name) + + @asynccontextmanager + async def combined_lifespan(app: FastAPI): + async with mcp_app.lifespan(app): + yield + + # Create main FastAPI app with combined lifespan + app = FastAPI( + title=a2a_app.title, + version=a2a_app.version, + lifespan=combined_lifespan, + openapi_url=None, + docs_url=None, + redoc_url=None, + ) + + # Add otel context middleware + @app.middleware("http") + async def otel_context_middleware(request, call_next): + carrier = { + "traceparent": request.headers.get("Traceparent"), + "tracestate": request.headers.get("Tracestate"), + } + logger.debug(f"carrier: {carrier}") + if carrier["traceparent"] is None: + return await call_next(request) + else: + ctx = TraceContextTextMapPropagator().extract(carrier=carrier) + logger.debug(f"ctx: {ctx}") + token = context.attach(ctx) + try: + response = await call_next(request) + finally: + context.detach(token) + return response + + # Mount A2A routes to app + for route in a2a_app.routes: + app.routes.append(route) + + # Mount MCP server at /mcp endpoint of app + app.mount("/mcp", mcp_app) + + # Remove openapi routes + paths = ["/openapi.json", "/docs", "/redoc"] + new_routes = [] + for route in app.router.routes: + if isinstance(route, (APIRoute, Route)) and route.path in paths: + continue + new_routes.append(route) + app.router.routes = new_routes + + return app + + +app = agent_to_server(agent=entrypoint_agent, app_name=app_name) diff --git a/veadk/cloud/cloud_agent_engine.py b/veadk/cloud/cloud_agent_engine.py index 872fd1c1..e1a10090 100644 --- a/veadk/cloud/cloud_agent_engine.py +++ b/veadk/cloud/cloud_agent_engine.py @@ -22,25 +22,19 @@ from pydantic import BaseModel from veadk.cloud.cloud_app import CloudApp -from veadk.config import getenv from veadk.integrations.ve_faas.ve_faas import VeFaaS from veadk.utils.logger import get_logger from veadk.utils.misc import formatted_timestamp +from veadk.configs.deploy_config import VeDeployConfig logger = get_logger(__name__) class CloudAgentEngine(BaseModel): - volcengine_access_key: str = getenv("VOLCENGINE_ACCESS_KEY") - volcengine_secret_key: str = getenv("VOLCENGINE_SECRET_KEY") - region: str = "cn-beijing" + deploy_config: VeDeployConfig = VeDeployConfig() def model_post_init(self, context: Any, /) -> None: - self._vefaas_service = VeFaaS( - access_key=self.volcengine_access_key, - secret_key=self.volcengine_secret_key, - region=self.region, - ) + self._vefaas_service = VeFaaS(deploy_config=self.deploy_config) def _prepare(self, path: str, name: str): # basic check diff --git a/veadk/cloud/cloud_app.py b/veadk/cloud/cloud_app.py index e737c7f1..7e5df202 100644 --- a/veadk/cloud/cloud_app.py +++ b/veadk/cloud/cloud_app.py @@ -14,7 +14,7 @@ import json import time -from typing import Any +from typing import Any, Optional from uuid import uuid4 import httpx @@ -48,6 +48,12 @@ def __init__( self.vefaas_application_name = vefaas_application_name self.use_agent_card = use_agent_card + from veadk.configs.deploy_config import VeDeployConfig + import veadk.config + + self.deploy_config = VeDeployConfig() + self.deploy_config.vefaas.function_envs.update(veadk.config.veadk_environments) + # vefaas must be set one of three if ( not vefaas_endpoint @@ -79,12 +85,12 @@ def __init__( def _get_vefaas_endpoint( self, - volcengine_ak: str = getenv("VOLCENGINE_ACCESS_KEY"), - volcengine_sk: str = getenv("VOLCENGINE_SECRET_KEY"), + volcengine_ak: Optional[str] = getenv("VOLCENGINE_ACCESS_KEY"), + volcengine_sk: Optional[str] = getenv("VOLCENGINE_SECRET_KEY"), ) -> str: from veadk.integrations.ve_faas.ve_faas import VeFaaS - vefaas_client = VeFaaS(access_key=volcengine_ak, secret_key=volcengine_sk) + vefaas_client = VeFaaS(self.deploy_config) app = vefaas_client.get_application_details( app_id=self.vefaas_application_id, @@ -111,10 +117,7 @@ def _get_vefaas_application_id_by_name(self) -> str: ) from veadk.integrations.ve_faas.ve_faas import VeFaaS - vefaas_client = VeFaaS( - access_key=getenv("VOLCENGINE_ACCESS_KEY"), - secret_key=getenv("VOLCENGINE_SECRET_KEY"), - ) + vefaas_client = VeFaaS(self.deploy_config) vefaas_application_id = vefaas_client.find_app_id_by_name( self.vefaas_application_name ) @@ -151,8 +154,8 @@ def update_self( def delete_self( self, - volcengine_ak: str = getenv("VOLCENGINE_ACCESS_KEY"), - volcengine_sk: str = getenv("VOLCENGINE_SECRET_KEY"), + volcengine_ak: Optional[str] = getenv("VOLCENGINE_ACCESS_KEY"), + volcengine_sk: Optional[str] = getenv("VOLCENGINE_SECRET_KEY"), ): if not volcengine_ak or not volcengine_sk: raise ValueError("Volcengine access key and secret key must be set.") @@ -169,7 +172,7 @@ def delete_self( else: from veadk.integrations.ve_faas.ve_faas import VeFaaS - vefaas_client = VeFaaS(access_key=volcengine_ak, secret_key=volcengine_sk) + vefaas_client = VeFaaS(self.deploy_config) vefaas_client.delete(self.vefaas_application_id) print( f"Cloud app {self.vefaas_application_id} delete request has been sent to VeFaaS" diff --git a/veadk/configs/deploy_config.py b/veadk/configs/deploy_config.py new file mode 100644 index 00000000..c0257fcd --- /dev/null +++ b/veadk/configs/deploy_config.py @@ -0,0 +1,56 @@ +from typing import Any, Optional, Union + +from pydantic import BaseModel, Field +from typing_extensions import Literal + + +class CreateVeFaaSConfig(BaseModel): + region: Optional[str] = "cn-beijing" + + application_name: Optional[str] = "" + + function_name: Optional[str] = "" + + function_description: Optional[str] = ( + "Created by Volcengine Agent Development Kit (VeADK)" + ) + + function_startup_command: str = "bash run.sh" + + function_envs: dict[str, Any] = Field(default_factory=dict) + """Environment variables for the function instance.""" + + function_tags: dict[str, str] = {"provider": "veadk"} + + function_runtime: str = "native-python3.10/v1" + + function_memory_in_mb: int = 2048 + """Memory size in MB. Default is 2GB. CPU core is allocated based on memory size / 2.""" + + +class CreateVeApigConfig(BaseModel): + instance_name: Optional[str] = "" + + service_name: Optional[str] = "" + + upstream_name: Optional[str] = "" + + enable_key_auth: bool = True + + enable_mcp_session_keepalive: bool = True + + +class VeDeployConfig(BaseModel): + vefaas: CreateVeFaaSConfig = Field(default_factory=CreateVeFaaSConfig) + + veapig: CreateVeApigConfig = Field(default_factory=CreateVeApigConfig) + + user_project_path: str = "." + """Always use current dir as the working directory.""" + + entrypoint_agent: Optional[str] = "" + + ignore_files: list[str] = Field(default_factory=list) + + deploy_mode: Union[Literal["A2A/MCP", "WEB"], int] = "A2A/MCP" + """0 for `A2A/MCP` mode, 1 for `WEB` mode. Or, use literal to define this attribute.""" diff --git a/veadk/integrations/ve_faas/template/cookiecutter.json b/veadk/integrations/ve_faas/template/cookiecutter.json index 4ee809b5..51eb1246 100644 --- a/veadk/integrations/ve_faas/template/cookiecutter.json +++ b/veadk/integrations/ve_faas/template/cookiecutter.json @@ -1,12 +1,10 @@ { "local_dir_name": "veadk_vefaas_proj", - "app_name": "weather_report", - "agent_module_name": "weather_agent", - "short_term_memory_backend": "local", - "vefaas_application_name": "weather-reporter", + "vefaas_application_name": "", "veapig_instance_name": "", "veapig_service_name": "", "veapig_upstream_name": "", - "use_adk_web": false, + "entrypoint_agent": "weather_reporter.agent:root_agent", + "deploy_mode": "", "veadk_version": "" } \ No newline at end of file diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/__init__.py b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/__init__.py deleted file mode 100644 index 7f463206..00000000 --- a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/clean.py b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/clean.py deleted file mode 100644 index ceba3f39..00000000 --- a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/clean.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from veadk.cloud.cloud_app import CloudApp - -def main() -> None: - cloud_app = CloudApp(vefaas_application_name="{{cookiecutter.vefaas_application_name}}") - cloud_app.delete_self() - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/config.yaml.example b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/config.yaml.example index 7f2fdc08..9c51dc78 100644 --- a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/config.yaml.example +++ b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/config.yaml.example @@ -1,6 +1,3 @@ -model: - agent: - provider: openai - name: doubao-1-5-pro-256k-250115 - api_base: https://ark.cn-beijing.volces.com/api/v3/ - api_key: +volcengine: + access_key: + secret_key: \ No newline at end of file diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/deploy.py b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/deploy.py deleted file mode 100644 index 7ceb5bf4..00000000 --- a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/deploy.py +++ /dev/null @@ -1,103 +0,0 @@ -# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import asyncio -from pathlib import Path - -from a2a.types import TextPart -from fastmcp.client import Client - -from veadk.cloud.cloud_agent_engine import CloudAgentEngine -from veadk.cloud.cloud_app import CloudApp, get_message_id - -SESSION_ID = "cloud_app_test_session" -USER_ID = "cloud_app_test_user" - - -async def _send_msg_with_a2a(cloud_app: CloudApp, message: str) -> None: - print("===== A2A example =====") - - response_message = await cloud_app.message_send(message, SESSION_ID, USER_ID) - - if not response_message or not response_message.parts: - print( - "No response from VeFaaS application. Something wrong with cloud application." - ) - return - - print(f"Message ID: {get_message_id(response_message)}") - - if isinstance(response_message.parts[0].root, TextPart): - print( - f"Response from {cloud_app.vefaas_endpoint}: {response_message.parts[0].root.text}" - ) - else: - print( - f"Response from {cloud_app.vefaas_endpoint}: {response_message.parts[0].root}" - ) - - -async def _send_msg_with_mcp(cloud_app: CloudApp, message: str) -> None: - print("===== MCP example =====") - - endpoint = cloud_app._get_vefaas_endpoint() - print(f"MCP server endpoint: {endpoint}/mcp") - - # Connect to MCP server - client = Client(f"{endpoint}/mcp") - - async with client: - # List available tools - tools = await client.list_tools() - print(f"Available tools: {tools}") - - # Call run_agent tool, pass user input and session information - res = await client.call_tool( - "run_agent", - { - "user_input": message, - "session_id": SESSION_ID, - "user_id": USER_ID, - }, - ) - print(f"Response from {cloud_app.vefaas_endpoint}: {res}") - - -async def main(): - engine = CloudAgentEngine() - - cloud_app = engine.deploy( - path=str(Path(__file__).parent / "src"), - application_name="{{cookiecutter.vefaas_application_name}}", - gateway_name="{{cookiecutter.veapig_instance_name}}", - gateway_service_name="{{cookiecutter.veapig_service_name}}", - gateway_upstream_name="{{cookiecutter.veapig_upstream_name}}", - use_adk_web={{cookiecutter.use_adk_web}}, - local_test=False, # Set to True for local testing before deploy to VeFaaS - ) - print(f"VeFaaS application ID: {cloud_app.vefaas_application_id}") - - if {{cookiecutter.use_adk_web}}: - print(f"Web is running at: {cloud_app.vefaas_endpoint}") - else: - # Test with deployed cloud application - message = "How is the weather like in Beijing?" - print(f"Test message: {message}") - - # await _send_msg_with_a2a(cloud_app=cloud_app, message=message) - # await _send_msg_with_mcp(cloud_app=cloud_app, message=message) - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/deploy.yaml b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/deploy.yaml new file mode 100644 index 00000000..e3b103f4 --- /dev/null +++ b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/deploy.yaml @@ -0,0 +1,17 @@ +vefaas: + region: cn-beijing + application_name: {{ cookiecutter.vefaas_application_name }} + +veapig: + instance_name: {{ cookiecutter.veapig_instance_name }} + service_name: {{ cookiecutter.veapig_service_name }} + upstream_name: {{ cookiecutter.veapig_upstream_name }} + enable_key_auth: true + enable_mcp_session_keepalive: true + +ignore_files: + - "config.yaml" + +entrypoint_agent: {{ cookiecutter.entrypoint_agent }} + +deploy_mode: {{ cookiecutter.deploy_mode }} # 0 for `A2A/MCP`, 1 for `Web` \ No newline at end of file diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/requirements.txt b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/requirements.txt new file mode 100644 index 00000000..0a9f13d9 --- /dev/null +++ b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/requirements.txt @@ -0,0 +1 @@ +veadk-python=={{ cookiecutter.veadk_version }} \ No newline at end of file diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/__init__.py b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/__init__.py deleted file mode 100644 index 7f463206..00000000 --- a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/__init__.py +++ /dev/null @@ -1,13 +0,0 @@ -# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/agent.py b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/agent.py deleted file mode 100644 index 9d2f65df..00000000 --- a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/agent.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from {{ cookiecutter.app_name }}.agent import agent # type: ignore - -from veadk.memory.short_term_memory import ShortTermMemory -from veadk.types import AgentRunConfig - -# [required] instantiate the agent run configuration -agent_run_config = AgentRunConfig( - app_name="{{ cookiecutter.app_name }}", - agent=agent, # type: ignore - short_term_memory=ShortTermMemory(backend="{{ cookiecutter.short_term_memory_backend }}"), # type: ignore -) diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/app.py b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/app.py deleted file mode 100644 index 9807c944..00000000 --- a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/app.py +++ /dev/null @@ -1,202 +0,0 @@ -# Copyright (c) 2025 Beijing Volcano Engine Technology Co., Ltd. and/or its affiliates. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -from contextlib import asynccontextmanager -from typing import Callable - -from agent import agent_run_config - -from fastapi import FastAPI -from fastapi.routing import APIRoute - -from fastmcp import FastMCP - -from starlette.routing import Route - -from google.adk.a2a.utils.agent_card_builder import AgentCardBuilder -from a2a.types import AgentProvider - -from veadk.a2a.ve_a2a_server import init_app -from veadk.runner import Runner -from veadk.tracing.telemetry.exporters.apmplus_exporter import APMPlusExporter -from veadk.tracing.telemetry.exporters.cozeloop_exporter import CozeloopExporter -from veadk.tracing.telemetry.exporters.tls_exporter import TLSExporter -from veadk.tracing.telemetry.opentelemetry_tracer import OpentelemetryTracer -from veadk.types import AgentRunConfig -from veadk.utils.logger import get_logger -from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from opentelemetry import context - -logger = get_logger(__name__) - -assert isinstance(agent_run_config, AgentRunConfig), ( - f"Invalid agent_run_config type: {type(agent_run_config)}, expected `AgentRunConfig`" -) - -app_name = agent_run_config.app_name -agent = agent_run_config.agent -short_term_memory = agent_run_config.short_term_memory - -VEFAAS_REGION = os.getenv("APP_REGION", "cn-beijing") -VEFAAS_FUNC_ID = os.getenv("_FAAS_FUNC_ID", "") -agent_card_builder = AgentCardBuilder(agent=agent, provider=AgentProvider(organization="Volcengine Agent Development Kit (VeADK)", url=f"https://console.volcengine.com/vefaas/region:vefaas+{VEFAAS_REGION}/function/detail/{VEFAAS_FUNC_ID}")) - - -def load_tracer() -> None: - EXPORTER_REGISTRY = { - "VEADK_TRACER_APMPLUS": APMPlusExporter, - "VEADK_TRACER_COZELOOP": CozeloopExporter, - "VEADK_TRACER_TLS": TLSExporter, - } - - exporters = [] - for env_var, exporter_cls in EXPORTER_REGISTRY.items(): - if os.getenv(env_var, "").lower() == "true": - if ( - agent.tracers - and isinstance(agent.tracers[0], OpentelemetryTracer) - and any(isinstance(e, exporter_cls) for e in agent.tracers[0].exporters) - ): - logger.warning( - f"Exporter {exporter_cls.__name__} is already defined in agent.tracers[0].exporters. These two exporters will be used at the same time. As a result, your data may be uploaded twice." - ) - else: - exporters.append(exporter_cls()) - - tracer = OpentelemetryTracer(name="veadk_tracer", exporters=exporters) - agent_run_config.agent.tracers.extend([tracer]) - - -def build_mcp_run_agent_func() -> Callable: - runner = Runner( - agent=agent, - short_term_memory=short_term_memory, - app_name=app_name, - user_id="", - ) - - async def run_agent( - user_input: str, - user_id: str = "mcp_user", - session_id: str = "mcp_session", - ) -> str: - # Set user_id for runner - runner.user_id = user_id - - # Running agent and get final output - final_output = await runner.run( - messages=user_input, - session_id=session_id, - ) - return final_output - - run_agent_doc = f"""{agent.description} - Args: - user_input: User's input message (required). - user_id: User identifier. Defaults to "mcp_user". - session_id: Session identifier. Defaults to "mcp_session". - Returns: - Final agent response as a string.""" - - run_agent.__doc__ = run_agent_doc - - return run_agent - - -async def agent_card() -> dict: - agent_card = await agent_card_builder.build() - return agent_card.model_dump() - -async def get_cozeloop_space_id() -> dict: - return {"space_id": os.getenv("OBSERVABILITY_OPENTELEMETRY_COZELOOP_SERVICE_NAME", default="")} - -load_tracer() - -# Build a run_agent function for building MCP server -run_agent_func = build_mcp_run_agent_func() - -a2a_app = init_app( - server_url="0.0.0.0", - app_name=app_name, - agent=agent, - short_term_memory=short_term_memory, -) - -a2a_app.post("/run_agent", operation_id="run_agent", tags=["mcp"])(run_agent_func) -a2a_app.get("/agent_card", operation_id="agent_card", tags=["mcp"])(agent_card) -a2a_app.get("/get_cozeloop_space_id", operation_id="get_cozeloop_space_id", tags=["mcp"])(get_cozeloop_space_id) - -# === Build mcp server === - -mcp = FastMCP.from_fastapi(app=a2a_app, name=app_name, include_tags={"mcp"}) - -# Create MCP ASGI app -mcp_app = mcp.http_app(path="/", transport="streamable-http") - - -# Combined lifespan management -@asynccontextmanager -async def combined_lifespan(app: FastAPI): - async with mcp_app.lifespan(app): - yield - - -# Create main FastAPI app with combined lifespan -app = FastAPI( - title=a2a_app.title, - version=a2a_app.version, - lifespan=combined_lifespan, - openapi_url=None, - docs_url=None, - redoc_url=None -) - -@app.middleware("http") -async def otel_context_middleware(request, call_next): - carrier = { - "traceparent": request.headers.get("Traceparent"), - "tracestate": request.headers.get("Tracestate"), - } - logger.debug(f"carrier: {carrier}") - if carrier["traceparent"] is None: - return await call_next(request) - else: - ctx = TraceContextTextMapPropagator().extract(carrier=carrier) - logger.debug(f"ctx: {ctx}") - token = context.attach(ctx) - try: - response = await call_next(request) - finally: - context.detach(token) - return response - -# Mount A2A routes to main app -for route in a2a_app.routes: - app.routes.append(route) - -# Mount MCP server at /mcp endpoint -app.mount("/mcp", mcp_app) - - -# remove openapi routes -paths = ["/openapi.json", "/docs", "/redoc"] -new_routes = [] -for route in app.router.routes: - if isinstance(route, (APIRoute, Route)) and route.path in paths: - continue - new_routes.append(route) -app.router.routes = new_routes - -# === Build mcp server end === diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/requirements.txt b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/requirements.txt deleted file mode 100644 index 3b50313a..00000000 --- a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -veadk-python=={{ cookiecutter.veadk_version }} -fastapi -uvicorn[standard] diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/run.sh b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/run.sh deleted file mode 100755 index b0f80137..00000000 --- a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/run.sh +++ /dev/null @@ -1,49 +0,0 @@ -#!/bin/bash -set -ex -cd `dirname $0` - -# A special check for CLI users (run.sh should be located at the 'root' dir) -if [ -d "output" ]; then - cd ./output/ -fi - -# Default values for host and port -HOST="0.0.0.0" -PORT=${_FAAS_RUNTIME_PORT:-8000} -TIMEOUT=${_FAAS_FUNC_TIMEOUT} - -export SERVER_HOST=$HOST -export SERVER_PORT=$PORT - -export PYTHONPATH=$PYTHONPATH:./site-packages - -# Parse arguments -while [[ $# -gt 0 ]]; do - case $1 in - --port) - PORT="$2" - shift 2 - ;; - --host) - HOST="$2" - shift 2 - ;; - *) - shift - ;; - esac -done - - -USE_ADK_WEB=${USE_ADK_WEB:-False} - -export SHORT_TERM_MEMORY_BACKEND= # can be `mysql` -export LONG_TERM_MEMORY_BACKEND= # can be `opensearch` - -if [ "$USE_ADK_WEB" = "True" ]; then - echo "USE_ADK_WEB is True, running veadk web" - exec python3 -m veadk.cli.cli web --host $HOST -else - echo "USE_ADK_WEB is False, running A2A and MCP server" - exec python3 -m uvicorn app:app --host $HOST --port $PORT --timeout-graceful-shutdown $TIMEOUT --loop asyncio -fi diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/{{ cookiecutter.app_name }}/__init__.py b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/weather_reporter/__init__.py similarity index 100% rename from veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/{{ cookiecutter.app_name }}/__init__.py rename to veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/weather_reporter/__init__.py diff --git a/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/{{ cookiecutter.app_name }}/agent.py b/veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/weather_reporter/agent.py similarity index 100% rename from veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/src/{{ cookiecutter.app_name }}/agent.py rename to veadk/integrations/ve_faas/template/{{cookiecutter.local_dir_name}}/weather_reporter/agent.py diff --git a/veadk/integrations/ve_faas/ve_faas.py b/veadk/integrations/ve_faas/ve_faas.py index aee36f8c..c6541c45 100644 --- a/veadk/integrations/ve_faas/ve_faas.py +++ b/veadk/integrations/ve_faas/ve_faas.py @@ -13,9 +13,8 @@ # limitations under the License. import json -import os import time - +from typing import Optional import requests import volcenginesdkcore import volcenginesdkvefaas @@ -35,20 +34,30 @@ from veadk.utils.logger import get_logger from veadk.utils.misc import formatted_timestamp from veadk.utils.volcengine_sign import ve_request +from veadk.configs.deploy_config import VeDeployConfig logger = get_logger(__name__) class VeFaaS: - def __init__(self, access_key: str, secret_key: str, region: str = "cn-beijing"): - self.ak = access_key - self.sk = secret_key - self.region = region - + def __init__(self, deploy_config: VeDeployConfig): + self.deploy_config = deploy_config + self.ak = ( + veadk.config.getenv("VOLCENGINE_ACCESS_KEY", "", True) + or deploy_config.vefaas.function_envs["VOLCENGINE_ACCESS_KEY"] + ) + self.sk = ( + veadk.config.getenv("VOLCENGINE_SECRET_KEY", "", True) + or deploy_config.vefaas.function_envs["VOLCENGINE_SECRET_KEY"] + ) + self.region = ( + veadk.config.getenv("VOLCENGINE_REGION", "", True) + or deploy_config.vefaas.region + ) configuration = volcenginesdkcore.Configuration() configuration.ak = self.ak configuration.sk = self.sk - configuration.region = region + configuration.region = self.region configuration.client_side_validation = True volcenginesdkcore.Configuration.set_default(configuration) @@ -61,15 +70,18 @@ def __init__(self, access_key: str, secret_key: str, region: str = "cn-beijing") self.template_id = "6874f3360bdbc40008ecf8c7" - def _upload_and_mount_code(self, function_id: str, path: str): + def _upload_and_mount_code(self, function_id: str, path: Optional[str] = None): """Upload code to VeFaaS temp bucket and mount to function instance. Args: function_id (str): Target function ID. path (str): Local project path. """ + path = path or self.deploy_config.user_project_path # Get zipped code data - code_zip_data, code_zip_size, error = zip_and_encode_folder(path) + code_zip_data, code_zip_size, error = zip_and_encode_folder( + path, self.deploy_config.ignore_files + ) logger.info( f"Zipped project size: {code_zip_size / 1024 / 1024:.2f} MB", ) @@ -99,11 +111,29 @@ def _upload_and_mount_code(self, function_id: str, path: str): return res - def _create_function(self, function_name: str, path: str): + def _create_function( + self, function_name: Optional[str] = None, path: Optional[str] = None + ): + function_name = function_name or self.deploy_config.vefaas.function_name + path = path or self.deploy_config.user_project_path # Read envs - envs = [] - for key, value in veadk.config.veadk_environments.items(): - envs.append(EnvForCreateFunctionInput(key=key, value=value)) + # first veadk.config.yaml, then deploy.yaml update + envs = [ + EnvForCreateFunctionInput(key=k, value=v) + for k, v in { + **veadk.config.veadk_environments, + **self.deploy_config.vefaas.function_envs, + "VEADK_ENTRYPOINT_AGENT": self.deploy_config.entrypoint_agent or "", + }.items() + ] + # first default, then deploy.yaml update + tags = [ + TagForCreateFunctionInput(key=k, value=v) + for k, v in { + "provider": "veadk", + **self.deploy_config.vefaas.function_tags, + }.items() + ] logger.info( f"Fetch {len(envs)} environment variables.", ) @@ -111,14 +141,14 @@ def _create_function(self, function_name: str, path: str): # Create function res = self.client.create_function( volcenginesdkvefaas.CreateFunctionRequest( - command="./run.sh", + command=self.deploy_config.vefaas.function_startup_command, name=function_name, - description="Created by VeADK (Volcengine Agent Development Kit)", - tags=[TagForCreateFunctionInput(key="provider", value="veadk")], - runtime="native-python3.10/v1", + description=self.deploy_config.vefaas.function_description, + tags=tags, + runtime=self.deploy_config.vefaas.function_runtime, request_timeout=1800, envs=envs, - memory_mb=2048, + memory_mb=self.deploy_config.vefaas.function_memory_in_mb, ) ) @@ -136,13 +166,19 @@ def _create_function(self, function_name: str, path: str): def _create_application( self, - application_name: str, - function_name: str, - gateway_name: str, - upstream_name: str, - service_name: str, + application_name: Optional[str] = None, + function_name: Optional[str] = None, + gateway_name: Optional[str] = None, + upstream_name: Optional[str] = None, + service_name: Optional[str] = None, ): - enable_key_auth = os.getenv("VEFAAS_ENABLE_KEY_AUTH", "true").lower() == "true" + application_name = ( + application_name or self.deploy_config.vefaas.application_name + ) + function_name = function_name or self.deploy_config.vefaas.function_name + gateway_name = gateway_name or self.deploy_config.veapig.instance_name + upstream_name = upstream_name or self.deploy_config.veapig.upstream_name + service_name = service_name or self.deploy_config.veapig.service_name response = ve_request( request_body={ @@ -155,8 +191,8 @@ def _create_application( "GatewayName": gateway_name, "ServiceName": service_name, "UpstreamName": upstream_name, - "EnableKeyAuth": enable_key_auth, - "EnableMcpSession": True, + "EnableKeyAuth": self.deploy_config.veapig.enable_key_auth, + "EnableMcpSession": self.deploy_config.veapig.enable_mcp_session_keepalive, }, "TemplateId": self.template_id, }, @@ -165,7 +201,7 @@ def _create_application( sk=self.sk, service="vefaas", version="2021-03-03", - region="cn-beijing", + region=self.deploy_config.vefaas.region, host="open.volcengineapi.com", ) @@ -185,7 +221,7 @@ def _release_application(self, app_id: str): sk=self.sk, service="vefaas", version="2021-03-03", - region="cn-beijing", + region=self.deploy_config.vefaas.region, host="open.volcengineapi.com", ) @@ -224,7 +260,7 @@ def _get_application_status(self, app_id: str): sk=self.sk, service="vefaas", version="2021-03-03", - region="cn-beijing", + region=self.deploy_config.vefaas.region, host="open.volcengineapi.com", ) return response["Result"]["Status"], response @@ -255,7 +291,7 @@ def _list_application(self, app_id: str = None, app_name: str = None): sk=self.sk, service="vefaas", version="2021-03-03", - region="cn-beijing", + region=self.deploy_config.vefaas.region, host="open.volcengineapi.com", ) result = response.get("Result", {}) @@ -277,8 +313,8 @@ def _list_application(self, app_id: str = None, app_name: str = None): def _update_function_code( self, - application_name: str, # application name - path: str, + application_name: Optional[str] = None, + path: Optional[str] = None, ) -> tuple[str, str, str]: """Update existing application function code while preserving URL. @@ -289,6 +325,10 @@ def _update_function_code( Returns: tuple[str, str, str]: URL, app_id, function_id """ + application_name = ( + application_name or self.deploy_config.vefaas.application_name + ) + path = path or self.deploy_config.user_project_path # Naming check if "_" in application_name: raise ValueError("Function or Application name cannot contain '_'.") @@ -356,7 +396,8 @@ def get_application_details(self, app_id: str = None, app_name: str = None): if app["Name"] == app_name: return app - def find_app_id_by_name(self, name: str): + def find_app_id_by_name(self, name: Optional[str] = None): + name = name or self.deploy_config.vefaas.application_name apps = self._list_application(app_name=name) for app in apps: if app["Name"] == name: @@ -381,11 +422,11 @@ def delete(self, app_id: str): def deploy( self, - name: str, - path: str, - gateway_name: str = "", - gateway_service_name: str = "", - gateway_upstream_name: str = "", + name: Optional[str] = None, + path: Optional[str] = None, + gateway_name: Optional[str] = None, + gateway_service_name: Optional[str] = None, + gateway_upstream_name: Optional[str] = None, ) -> tuple[str, str, str]: """Deploy an agent project to VeFaaS service. @@ -399,6 +440,16 @@ def deploy( Returns: tuple[str, str, str]: (url, app_id, function_id) """ + name = name or self.deploy_config.vefaas.application_name + path = path or self.deploy_config.user_project_path + gateway_name = gateway_name or self.deploy_config.veapig.instance_name + gateway_service_name = ( + gateway_service_name or self.deploy_config.veapig.service_name + ) + gateway_upstream_name = ( + gateway_upstream_name or self.deploy_config.veapig.upstream_name + ) + # Naming check if "_" in name: raise ValueError("Function or Application name cannot contain '_'.") @@ -453,9 +504,21 @@ def deploy( def _create_image_function(self, function_name: str, image: str): """Create function using container image instead of code upload.""" # Read environment variables from veadk configuration - envs = [] - for key, value in veadk.config.veadk_environments.items(): - envs.append(EnvForCreateFunctionInput(key=key, value=value)) + envs = [ + EnvForCreateFunctionInput(key=k, value=v) + for k, v in { + **veadk.config.veadk_environments, + **self.deploy_config.vefaas.function_envs, + "VEADK_ENTRYPOINT_AGENT": self.deploy_config.entrypoint_agent or "", + }.items() + ] + tags = [ + TagForCreateFunctionInput(key=k, value=v) + for k, v in { + "provider": "veadk", + **self.deploy_config.vefaas.function_tags, + }.items() + ] logger.info( f"Fetch {len(envs)} environment variables for image function.", ) @@ -463,10 +526,10 @@ def _create_image_function(self, function_name: str, image: str): # Create function with container image source configuration res = self.client.create_function( volcenginesdkvefaas.CreateFunctionRequest( - command="bash ./run.sh", # Custom startup command + command=self.deploy_config.vefaas.function_startup_command, # Custom startup command name=function_name, description="Created by VeADK (Volcengine Agent Development Kit)", - tags=[TagForCreateFunctionInput(key="provider", value="veadk")], + tags=tags, runtime="native/v1", # Native runtime required for container images source_type="image", # Set source type to container image source=image, # Container image URL @@ -573,51 +636,14 @@ def query_user_cr_vpc_tunnel( return False - def _create_image_function(self, function_name: str, image: str): - """Create function using container image instead of code upload.""" - # Read environment variables from veadk configuration - envs = [] - for key, value in veadk.config.veadk_environments.items(): - envs.append(EnvForCreateFunctionInput(key=key, value=value)) - logger.info( - f"Fetch {len(envs)} environment variables for image function.", - ) - - # Create function with container image source configuration - res = self.client.create_function( - volcenginesdkvefaas.CreateFunctionRequest( - command="bash ./run.sh", # Custom startup command - name=function_name, - description="Created by VeADK (Volcengine Agent Development Kit)", - tags=[TagForCreateFunctionInput(key="provider", value="veadk")], - runtime="native/v1", # Native runtime required for container images - source_type="image", # Set source type to container image - source=image, # Container image URL - request_timeout=1800, # Request timeout in seconds - envs=envs, # Environment variables from configuration - ) - ) - - # Log function creation success without exposing sensitive information - logger.debug( - f"Function creation in {res.project_name} project with ID {res.id}" - ) - - function_id = res.id - logger.info( - f"Function {function_name} created with image {image} and ID {function_id}" - ) - - return function_name, function_id - def deploy_image( self, - name: str, - image: str, - registry_name: str, - gateway_name: str = "", - gateway_service_name: str = "", - gateway_upstream_name: str = "", + name: Optional[str] = None, + image: str = None, + registry_name: str = None, + gateway_name: Optional[str] = None, + gateway_service_name: Optional[str] = None, + gateway_upstream_name: Optional[str] = None, ) -> tuple[str, str, str]: """Deploy application using container image. @@ -631,6 +657,14 @@ def deploy_image( Returns: tuple[str, str, str]: (url, app_id, function_id) """ + name = name or self.deploy_config.vefaas.application_name + gateway_name = gateway_name or self.deploy_config.veapig.instance_name + gateway_service_name = ( + gateway_service_name or self.deploy_config.veapig.service_name + ) + gateway_upstream_name = ( + gateway_upstream_name or self.deploy_config.veapig.upstream_name + ) # Validate application name format is_ready = self.query_user_cr_vpc_tunnel(registry_name) if not is_ready: diff --git a/veadk/integrations/ve_faas/ve_faas_utils.py b/veadk/integrations/ve_faas/ve_faas_utils.py index f04af8a6..41edd1a8 100644 --- a/veadk/integrations/ve_faas/ve_faas_utils.py +++ b/veadk/integrations/ve_faas/ve_faas_utils.py @@ -46,10 +46,14 @@ def ensure_executable_permissions(folder_path: str): os.chmod(full_path, 0o755) -def python_zip_implementation(folder_path: str) -> bytes: +def python_zip_implementation( + folder_path: str, ignore_files: list[str] = None +) -> bytes: """Pure Python zip implementation with permissions support""" + if ignore_files is None: + ignore_files = [] buffer = BytesIO() - + all_ignore_files = [".git", ".venv", "__pycache__", ".pyc"] + ignore_files with zipfile.ZipFile(buffer, "w", compression=zipfile.ZIP_DEFLATED) as zipf: for root, dirs, files in os.walk(folder_path): for file in files: @@ -57,9 +61,7 @@ def python_zip_implementation(folder_path: str) -> bytes: arcname = os.path.relpath(file_path, folder_path) # Skip excluded paths and binary/cache files - if any( - excl in arcname for excl in [".git", ".venv", "__pycache__", ".pyc"] - ): + if any(excl in arcname for excl in all_ignore_files): continue try: @@ -87,16 +89,20 @@ def python_zip_implementation(folder_path: str) -> bytes: return buffer.getvalue() -def zip_and_encode_folder(folder_path: str) -> Tuple[bytes, int, Exception]: +def zip_and_encode_folder( + folder_path: str, ignore_files: list[str] = None +) -> Tuple[bytes, int, Exception]: """ Zips a folder with system zip command (if available) or falls back to Python implementation. Returns (zip_data, size_in_bytes, error) tuple. """ + if ignore_files is None: + ignore_files = [] # Check for system zip first if not shutil.which("zip"): print("System zip command not found, using Python implementation") try: - data = python_zip_implementation(folder_path) + data = python_zip_implementation(folder_path, ignore_files) return data, len(data), None except Exception as e: return None, 0, e @@ -104,6 +110,12 @@ def zip_and_encode_folder(folder_path: str) -> Tuple[bytes, int, Exception]: # print(f"Zipping folder: {folder_path}") try: ensure_executable_permissions(folder_path) + all_ignore_files = [".git", ".venv", "__pycache__", ".pyc"] + [ + f"*{pattern}*" for pattern in ignore_files + ] + exclude_args = [] + for pattern in all_ignore_files: + exclude_args.extend(["-x", pattern]) # Create zip process with explicit arguments proc = subprocess.Popen( [ @@ -112,15 +124,8 @@ def zip_and_encode_folder(folder_path: str) -> Tuple[bytes, int, Exception]: "-q", "-", ".", - "-x", - "*.git*", - "-x", - "*.venv*", - "-x", - "*__pycache__*", - "-x", - "*.pyc", - ], + ] + + exclude_args, cwd=folder_path, stdout=subprocess.PIPE, stderr=subprocess.PIPE, diff --git a/veadk/runner.py b/veadk/runner.py index b04909b6..251d3162 100644 --- a/veadk/runner.py +++ b/veadk/runner.py @@ -198,7 +198,11 @@ def __init__( logger.warning( "No short term memory or session service provided, use an in-memory one instead." ) - short_term_memory = ShortTermMemory() + if hasattr(agent, "short_term_memory") and agent.short_term_memory: # type: ignore + short_term_memory = agent.short_term_memory # type: ignore + else: + short_term_memory = ShortTermMemory() + self.short_term_memory = short_term_memory session_service = short_term_memory.session_service