From 71d00f083fb59bda34c82b82eea85602c1710265 Mon Sep 17 00:00:00 2001 From: tgasser-nv <200644301+tgasser-nv@users.noreply.github.com> Date: Tue, 2 Sep 2025 11:17:40 -0500 Subject: [PATCH 1/2] Dummy commit to set up the chore/type-clean-guardrails PR and branch --- nemoguardrails/actions/llm/generation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemoguardrails/actions/llm/generation.py b/nemoguardrails/actions/llm/generation.py index 2a57e1c26..cd11e70a7 100644 --- a/nemoguardrails/actions/llm/generation.py +++ b/nemoguardrails/actions/llm/generation.py @@ -137,7 +137,7 @@ async def init(self): self._init_flows_index(), ) - def _extract_user_message_example(self, flow: Flow): + def _extract_user_message_example(self, flow: Flow) -> None: """Heuristic to extract user message examples from a flow.""" elements = [ item From eb83d1bacb5489fd8d6e7c45781ff819cd41147b Mon Sep 17 00:00:00 2001 From: tgasser-nv <200644301+tgasser-nv@users.noreply.github.com> Date: Sun, 14 Sep 2025 21:04:58 -0500 Subject: [PATCH 2/2] Cleaned llm/ type errors --- nemoguardrails/llm/filters.py | 8 +-- nemoguardrails/llm/helpers.py | 23 +++---- nemoguardrails/llm/models/initializer.py | 7 +- nemoguardrails/llm/params.py | 15 +++-- .../llm/providers/huggingface/pipeline.py | 39 ++++++++--- .../llm/providers/huggingface/streamers.py | 26 ++++++-- nemoguardrails/llm/providers/trtllm/client.py | 66 ++++++++++++------- nemoguardrails/llm/providers/trtllm/llm.py | 10 ++- nemoguardrails/llm/taskmanager.py | 21 ++++-- 9 files changed, 149 insertions(+), 66 deletions(-) diff --git a/nemoguardrails/llm/filters.py b/nemoguardrails/llm/filters.py index a0d80bb5d..5a9b333aa 100644 --- a/nemoguardrails/llm/filters.py +++ b/nemoguardrails/llm/filters.py @@ -150,7 +150,7 @@ def to_messages(colang_history: str) -> List[dict]: # a message from the user, and the rest gets translated to messages from the assistant. lines = colang_history.split("\n") - bot_lines = [] + bot_lines: list[str] = [] for i, line in enumerate(lines): if line.startswith('user "'): # If we have bot lines in the buffer, we first add a bot message. @@ -191,8 +191,8 @@ def to_messages_v2(colang_history: str) -> List[dict]: # a message from the user, and the rest gets translated to messages from the assistant. lines = colang_history.split("\n") - user_lines = [] - bot_lines = [] + user_lines: list[str] = [] + bot_lines: list[str] = [] for line in lines: if line.startswith("user action:"): if len(bot_lines) > 0: @@ -285,7 +285,7 @@ def verbose_v1(colang_history: str) -> str: return "\n".join(lines) -def to_chat_messages(events: List[dict]) -> str: +def to_chat_messages(events: List[dict]) -> List[dict]: """Filter that turns an array of events into a sequence of user/assistant messages. Properly handles multimodal content by preserving the structure when the content diff --git a/nemoguardrails/llm/helpers.py b/nemoguardrails/llm/helpers.py index 04a81a175..a0dc1ec1a 100644 --- a/nemoguardrails/llm/helpers.py +++ b/nemoguardrails/llm/helpers.py @@ -13,18 +13,16 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import List, Optional, Type, Union +from typing import List, Optional, Type from langchain.callbacks.manager import ( AsyncCallbackManagerForLLMRun, CallbackManagerForLLMRun, ) -from langchain_core.language_models.llms import LLM, BaseLLM +from langchain_core.language_models.llms import LLM -def get_llm_instance_wrapper( - llm_instance: Union[LLM, BaseLLM], llm_type: str -) -> Type[LLM]: +def get_llm_instance_wrapper(llm_instance: LLM, llm_type: str) -> Type[LLM]: """Wraps an LLM instance in a class that can be registered with LLMRails. This is useful to create specific types of LLMs using a generic LLM provider @@ -47,7 +45,7 @@ def model_kwargs(self): These are needed to allow changes to the arguments of the LLM calls. """ if hasattr(llm_instance, "model_kwargs"): - return llm_instance.model_kwargs + return getattr(llm_instance, "model_kwargs") return {} @property @@ -66,26 +64,29 @@ def _modify_instance_kwargs(self): """ if hasattr(llm_instance, "model_kwargs"): - if isinstance(llm_instance.model_kwargs, dict): - llm_instance.model_kwargs["temperature"] = self.temperature - llm_instance.model_kwargs["streaming"] = self.streaming + model_kwargs = getattr(llm_instance, "model_kwargs") + if isinstance(model_kwargs, dict): + model_kwargs["temperature"] = self.temperature + model_kwargs["streaming"] = self.streaming def _call( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[CallbackManagerForLLMRun] = None, + **kwargs, ) -> str: self._modify_instance_kwargs() - return llm_instance._call(prompt, stop, run_manager) + return llm_instance._call(prompt, stop, run_manager, **kwargs) async def _acall( self, prompt: str, stop: Optional[List[str]] = None, run_manager: Optional[AsyncCallbackManagerForLLMRun] = None, + **kwargs, ) -> str: self._modify_instance_kwargs() - return await llm_instance._acall(prompt, stop, run_manager) + return await llm_instance._acall(prompt, stop, run_manager, **kwargs) return WrapperLLM diff --git a/nemoguardrails/llm/models/initializer.py b/nemoguardrails/llm/models/initializer.py index 81a427b96..6d49e456f 100644 --- a/nemoguardrails/llm/models/initializer.py +++ b/nemoguardrails/llm/models/initializer.py @@ -20,12 +20,15 @@ from langchain_core.language_models import BaseChatModel from langchain_core.language_models.llms import BaseLLM -from .langchain_initializer import ModelInitializationError, init_langchain_model +from nemoguardrails.llm.models.langchain_initializer import ( + ModelInitializationError, + init_langchain_model, +) # later we can easily conver it to a class def init_llm_model( - model_name: Optional[str], + model_name: str, provider_name: str, mode: Literal["chat", "text"], kwargs: Dict[str, Any], diff --git a/nemoguardrails/llm/params.py b/nemoguardrails/llm/params.py index 90b5d6a15..2b6f5c376 100644 --- a/nemoguardrails/llm/params.py +++ b/nemoguardrails/llm/params.py @@ -20,7 +20,7 @@ """ import logging -from typing import Dict, Type +from typing import Any, Dict, Type from langchain.base_language import BaseLanguageModel @@ -33,18 +33,18 @@ class LLMParams: def __init__(self, llm: BaseLanguageModel, **kwargs): self.llm = llm self.altered_params = kwargs - self.original_params = {} + self.original_params: dict[str, Any] = {} def __enter__(self): # Here we can access and modify the global language model parameters. - self.original_params = {} for param, value in self.altered_params.items(): if hasattr(self.llm, param): self.original_params[param] = getattr(self.llm, param) setattr(self.llm, param, value) elif hasattr(self.llm, "model_kwargs"): - if param not in self.llm.model_kwargs: + model_kwargs = getattr(self.llm, "model_kwargs", {}) + if param not in model_kwargs: log.warning( "Parameter %s does not exist for %s. Passing to model_kwargs", param, @@ -53,9 +53,10 @@ def __enter__(self): self.original_params[param] = None else: - self.original_params[param] = self.llm.model_kwargs[param] + self.original_params[param] = model_kwargs[param] - self.llm.model_kwargs[param] = value + model_kwargs[param] = value + setattr(self.llm, "model_kwargs", model_kwargs) else: log.warning( @@ -64,7 +65,7 @@ def __enter__(self): self.llm.__class__.__name__, ) - def __exit__(self, type, value, traceback): + def __exit__(self, exc_type, value, traceback): # Restore original parameters when exiting the context for param, value in self.original_params.items(): if hasattr(self.llm, param): diff --git a/nemoguardrails/llm/providers/huggingface/pipeline.py b/nemoguardrails/llm/providers/huggingface/pipeline.py index c0dd83a4f..e99f5e499 100644 --- a/nemoguardrails/llm/providers/huggingface/pipeline.py +++ b/nemoguardrails/llm/providers/huggingface/pipeline.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import asyncio from typing import Any, List, Optional from langchain.callbacks.manager import ( @@ -20,7 +21,25 @@ CallbackManagerForLLMRun, ) from langchain.schema.output import GenerationChunk -from langchain_community.llms import HuggingFacePipeline + +# Import HuggingFacePipeline with fallbacks for different LangChain versions +HuggingFacePipeline = None # type: ignore[assignment] + +try: + from langchain_community.llms import ( + HuggingFacePipeline, # type: ignore[attr-defined,no-redef] + ) +except ImportError: + # Fallback for older versions of langchain + try: + from langchain.llms import ( + HuggingFacePipeline, # type: ignore[attr-defined,no-redef] + ) + except ImportError: + # Create a dummy class if HuggingFacePipeline is not available + class HuggingFacePipeline: # type: ignore[misc,no-redef] + def __init__(self, *args, **kwargs): + raise ImportError("HuggingFacePipeline is not available") class HuggingFacePipelineCompatible(HuggingFacePipeline): @@ -47,12 +66,13 @@ def _call( ) # Streaming for NeMo Guardrails is not supported in sync calls. - if self.model_kwargs and self.model_kwargs.get("streaming"): - raise Exception( + model_kwargs = getattr(self, "model_kwargs", {}) + if model_kwargs and model_kwargs.get("streaming"): + raise NotImplementedError( "Streaming mode not supported for HuggingFacePipeline in NeMo Guardrails!" ) - llm_result = self._generate( + llm_result = getattr(self, "_generate")( [prompt], stop=stop, run_manager=run_manager, @@ -78,11 +98,12 @@ async def _acall( ) # Handle streaming, if the flag is set - if self.model_kwargs and self.model_kwargs.get("streaming"): + model_kwargs = getattr(self, "model_kwargs", {}) + if model_kwargs and model_kwargs.get("streaming"): # Retrieve the streamer object, needs to be set in model_kwargs - streamer = self.model_kwargs.get("streamer") + streamer = model_kwargs.get("streamer") if not streamer: - raise Exception( + raise ValueError( "Cannot stream, please add HuggingFace streamer object to model_kwargs!" ) @@ -99,7 +120,7 @@ async def _acall( run_manager=run_manager, **kwargs, ) - loop.create_task(self._agenerate(**generation_kwargs)) + loop.create_task(getattr(self, "_agenerate")(**generation_kwargs)) # And start waiting for the chunks to come in. completion = "" @@ -111,7 +132,7 @@ async def _acall( return completion - llm_result = await self._agenerate( + llm_result = await getattr(self, "_agenerate")( [prompt], stop=stop, run_manager=run_manager, diff --git a/nemoguardrails/llm/providers/huggingface/streamers.py b/nemoguardrails/llm/providers/huggingface/streamers.py index d81288fae..fff35ba12 100644 --- a/nemoguardrails/llm/providers/huggingface/streamers.py +++ b/nemoguardrails/llm/providers/huggingface/streamers.py @@ -14,11 +14,27 @@ # limitations under the License. import asyncio +from typing import TYPE_CHECKING, Optional -from transformers.generation.streamers import TextStreamer +TRANSFORMERS_AVAILABLE = True +try: + from transformers.generation.streamers import ( # type: ignore[import-untyped] + TextStreamer, + ) +except ImportError: + # Fallback if transformers is not available + TRANSFORMERS_AVAILABLE = False + class TextStreamer: # type: ignore[no-redef] + def __init__(self, *args, **kwargs): + pass -class AsyncTextIteratorStreamer(TextStreamer): + +if TYPE_CHECKING: + from transformers import AutoTokenizer # type: ignore[import-untyped] + + +class AsyncTextIteratorStreamer(TextStreamer): # type: ignore[misc] """ Simple async implementation for HuggingFace Transformers streamers. @@ -30,12 +46,14 @@ def __init__( self, tokenizer: "AutoTokenizer", skip_prompt: bool = False, **decode_kwargs ): super().__init__(tokenizer, skip_prompt, **decode_kwargs) - self.text_queue = asyncio.Queue() + self.text_queue: asyncio.Queue[str] = asyncio.Queue() self.stop_signal = None - self.loop = None + self.loop: Optional[asyncio.AbstractEventLoop] = None def on_finalized_text(self, text: str, stream_end: bool = False): """Put the new text in the queue. If the stream is ending, also put a stop signal in the queue.""" + if self.loop is None: + return if len(text) > 0: asyncio.run_coroutine_threadsafe(self.text_queue.put(text), self.loop) diff --git a/nemoguardrails/llm/providers/trtllm/client.py b/nemoguardrails/llm/providers/trtllm/client.py index b3a9e2c9a..10066492f 100644 --- a/nemoguardrails/llm/providers/trtllm/client.py +++ b/nemoguardrails/llm/providers/trtllm/client.py @@ -19,7 +19,25 @@ import queue import time from functools import partial -from typing import Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union + +# Try to import tritonclient dependencies, with fallbacks for type checking +try: + import tritonclient.grpc as grpcclient + from tritonclient.grpc.service_pb2 import ( + ModelInferResponse, # type: ignore[attr-defined] + ) + + TRITONCLIENT_AVAILABLE = True +except ImportError: + # Create dummy types when tritonclient is not available + grpcclient = Any # type: ignore + ModelInferResponse = Any # type: ignore + TRITONCLIENT_AVAILABLE = False + +if TYPE_CHECKING and not TRITONCLIENT_AVAILABLE: + import tritonclient.grpc as grpcclient # type: ignore + from tritonclient.grpc.service_pb2 import ModelInferResponse # type: ignore STOP_WORDS = [""] BAD_WORDS = [""] @@ -31,11 +49,11 @@ class TritonClient: def __init__(self, server_url: str) -> None: """Initialize the client.""" - # pylint: disable-next=import-outside-toplevel - import tritonclient.grpc as grpcclient + if not TRITONCLIENT_AVAILABLE: + raise ImportError("tritonclient is required for TensorRT-LLM support") self.server_url = server_url - self.client = grpcclient.InferenceServerClient(server_url) + self.client = grpcclient.InferenceServerClient(server_url) # type: ignore def load_model(self, model_name: str, timeout: int = 1000) -> None: """Load a model into the server.""" @@ -54,29 +72,33 @@ def load_model(self, model_name: str, timeout: int = 1000) -> None: def get_model_list(self) -> List[str]: """Get a list of models loaded in the triton server.""" res = self.client.get_model_repository_index(as_json=True) + if res is None or "models" not in res: + return [] return [model["name"] for model in res["models"]] def get_model_concurrency(self, model_name: str, timeout: int = 1000) -> int: """Get the modle concurrency.""" self.load_model(model_name, timeout) - instances = self.client.get_model_config(model_name, as_json=True)["config"][ - "instance_group" - ] + config_result = self.client.get_model_config(model_name, as_json=True) + if config_result is None or "config" not in config_result: + return 0 + instances = config_result["config"].get("instance_group", []) return sum(instance["count"] * len(instance["gpus"]) for instance in instances) @staticmethod def process_result(result: Dict[str, str]) -> Dict[str, str]: """Post-process the result from the server.""" - import google.protobuf.json_format # pylint: disable=import-outside-toplevel - import tritonclient.grpc as grpcclient # pylint: disable=import-outside-toplevel + if not TRITONCLIENT_AVAILABLE: + raise ImportError("tritonclient is required for TensorRT-LLM support") - # pylint: disable-next=import-outside-toplevel - from tritonclient.grpc.service_pb2 import ModelInferResponse + import google.protobuf.json_format - message = ModelInferResponse() + message = ModelInferResponse() # type: ignore[misc] google.protobuf.json_format.Parse(json.dumps(result), message) - infer_result = grpcclient.InferResult(message) + infer_result = grpcclient.InferResult(message) # type: ignore np_res = infer_result.as_numpy("OUTPUT_0") + if np_res is None: + return {"OUTPUT_0": ""} if np_res.ndim == 2: generated_text = np_res[0, 0].decode() else: @@ -140,21 +162,21 @@ def close_streaming(self) -> None: self.client.stop_stream() @staticmethod - def generate_outputs() -> List["grpcclient.InferRequestedOutput"]: + def generate_outputs() -> List[Any]: """Generate the expected output structure.""" - import tritonclient.grpc as grpcclient # pylint: disable=import-outside-toplevel - - return [grpcclient.InferRequestedOutput("OUTPUT_0")] + if not TRITONCLIENT_AVAILABLE: + raise ImportError("tritonclient is required for TensorRT-LLM support") + return [grpcclient.InferRequestedOutput("OUTPUT_0")] # type: ignore @staticmethod - def prepare_tensor(name: str, input_data: Any) -> "grpcclient.InferInput": + def prepare_tensor(name: str, input_data: Any) -> Any: """Prepare an input data structure.""" - import tritonclient.grpc as grpcclient # pylint: disable=import-outside-toplevel + if not TRITONCLIENT_AVAILABLE: + raise ImportError("tritonclient is required for TensorRT-LLM support") - # pylint: disable-next=import-outside-toplevel from tritonclient.utils import np_to_triton_dtype - t = grpcclient.InferInput( + t = grpcclient.InferInput( # type: ignore name, input_data.shape, np_to_triton_dtype(input_data.dtype) ) t.set_data_from_numpy(input_data) @@ -170,7 +192,7 @@ def generate_inputs( # pylint: disable=too-many-arguments,too-many-locals beam_width: int = 1, repetition_penalty: float = 1, length_penalty: float = 1.0, - ) -> List["grpcclient.InferInput"]: + ) -> List[Any]: """Create the input for the triton inference server.""" import numpy as np # pylint: disable=import-outside-toplevel diff --git a/nemoguardrails/llm/providers/trtllm/llm.py b/nemoguardrails/llm/providers/trtllm/llm.py index aa57b2df6..a54bed809 100644 --- a/nemoguardrails/llm/providers/trtllm/llm.py +++ b/nemoguardrails/llm/providers/trtllm/llm.py @@ -18,7 +18,13 @@ import queue from functools import partial -from typing import Any, Dict, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +if TYPE_CHECKING: + try: + from tritonclient.utils import InferenceServerException + except ImportError: + InferenceServerException = Exception from langchain.callbacks.manager import CallbackManagerForLLMRun from langchain_core.language_models.llms import BaseLLM @@ -107,7 +113,7 @@ def _llm_type(self) -> str: def _call( self, prompt: str, - stop: Optional[List[str]] = None, + stop: Optional[List[str]] = None, # pylint: disable=unused-argument run_manager: Optional[CallbackManagerForLLMRun] = None, **kwargs: Any, ) -> str: diff --git a/nemoguardrails/llm/taskmanager.py b/nemoguardrails/llm/taskmanager.py index 3651676db..268eb7b6d 100644 --- a/nemoguardrails/llm/taskmanager.py +++ b/nemoguardrails/llm/taskmanager.py @@ -147,6 +147,8 @@ def __init__(self, config: RailsConfig): def _get_general_instructions(self): """Helper to extract the general instructions.""" text = "" + if self.config.instructions is None: + return text for instruction in self.config.instructions: if instruction.type == "general": text = instruction.content @@ -385,7 +387,9 @@ def render_task_prompt( task_prompt = self._render_string( prompt.content, context=context, events=events ) - while len(task_prompt) > prompt.max_length: + while ( + prompt.max_length is not None and len(task_prompt) > prompt.max_length + ): if not events: raise Exception( f"Prompt exceeds max length of {prompt.max_length} characters even without history" @@ -407,20 +411,27 @@ def render_task_prompt( return task_prompt else: + if prompt.messages is None: + return [] task_messages = self._render_messages( prompt.messages, context=context, events=events ) task_prompt_length = self._get_messages_text_length(task_messages) - while task_prompt_length > prompt.max_length: + while ( + prompt.max_length is not None and task_prompt_length > prompt.max_length + ): if not events: raise Exception( f"Prompt exceeds max length of {prompt.max_length} characters even without history" ) # Remove events from the beginning of the history until the prompt fits. events = events[1:] - task_messages = self._render_messages( - prompt.messages, context=context, events=events - ) + if prompt.messages is not None: + task_messages = self._render_messages( + prompt.messages, context=context, events=events + ) + else: + task_messages = [] task_prompt_length = self._get_messages_text_length(task_messages) return task_messages