.. DO NOT EDIT. .. THIS FILE WAS AUTOMATICALLY GENERATED BY SPHINX-GALLERY. .. TO MAKE CHANGES, EDIT THE SOURCE PYTHON FILE: .. "tutorial/task_middleware.py" .. LINE NUMBERS ARE GIVEN BELOW. .. only:: html .. note:: :class: sphx-glr-download-link-note :ref:`Go to the end ` to download the full example code. .. rst-class:: sphx-glr-example-title .. _sphx_glr_tutorial_task_middleware.py: .. _middleware: Middleware =========================== AgentScope provides a flexible middleware system that allows developers to intercept and modify the execution of various operations. Currently, middleware support is available for **tool execution** in the ``Toolkit`` class. The middleware system follows an **onion model**, where each middleware wraps around the previous one, forming layers. This allows developers to: - Perform **pre-processing** before the operation - **Intercept and modify** responses during execution - Perform **post-processing** after the operation completes - **Skip** the operation execution entirely based on conditions .. tip:: Future versions of AgentScope will expand middleware support to other components such as agents and models. .. GENERATED FROM PYTHON SOURCE LINES 22-29 .. code-block:: Python import asyncio from typing import AsyncGenerator, Callable from agentscope.message import TextBlock, ToolUseBlock from agentscope.tool import ToolResponse, Toolkit .. GENERATED FROM PYTHON SOURCE LINES 30-79 Tool Execution Middleware ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The ``Toolkit`` class supports middleware for tool execution via the ``register_middleware`` method. Each middleware can intercept the tool call and modify the input or output. Middleware Signature ------------------------------ A middleware function should have the following signature: .. code-block:: python async def middleware( kwargs: dict, next_handler: Callable, ) -> AsyncGenerator[ToolResponse, None]: # Access parameters from kwargs tool_call = kwargs["tool_call"] # Pre-processing # ... # Call the next middleware or tool function async for response in await next_handler(**kwargs): # Post-processing yield response .. list-table:: Middleware Parameters :header-rows: 1 * - Parameter - Type - Description * - ``kwargs`` - ``dict`` - Context parameters. Currently, includes ``tool_call`` (ToolUseBlock). May include additional parameters in future versions. * - ``next_handler`` - ``Callable`` - A callable that accepts kwargs dict and returns a coroutine yielding AsyncGenerator of ToolResponse objects * - **Returns** - ``AsyncGenerator[ToolResponse, None]`` - An async generator that yields ToolResponse objects Basic Example ------------------------------ Here is a simple middleware that logs tool calls: .. GENERATED FROM PYTHON SOURCE LINES 79-103 .. code-block:: Python async def logging_middleware( kwargs: dict, next_handler: Callable, ) -> AsyncGenerator[ToolResponse, None]: """A middleware that logs tool execution.""" # Access the tool call from kwargs tool_call = kwargs["tool_call"] # Pre-processing: log before tool execution print(f"[Middleware] Calling tool: {tool_call['name']}") print(f"[Middleware] Input: {tool_call['input']}") # Call the next handler (either another middleware or the actual tool) async for response in await next_handler(**kwargs): # Post-processing: log the response print(f"[Middleware] Response: {response.content[0]['text']}") yield response # This will execute after all responses are yielded print(f"[Middleware] Tool {tool_call['name']} completed") .. GENERATED FROM PYTHON SOURCE LINES 104-106 Let's register this middleware with a toolkit and test it: .. GENERATED FROM PYTHON SOURCE LINES 106-157 .. code-block:: Python async def search_tool(query: str) -> ToolResponse: """A simple search tool. Args: query (`str`): The search query. Returns: `ToolResponse`: The search result. """ return ToolResponse( content=[ TextBlock( type="text", text=f"Search results for '{query}'", ), ], ) async def example_logging_middleware() -> None: """Example of using logging middleware.""" # Create a toolkit and register the tool toolkit = Toolkit() toolkit.register_tool_function(search_tool) # Register the middleware toolkit.register_middleware(logging_middleware) # Call the tool result = await toolkit.call_tool_function( ToolUseBlock( type="tool_use", id="1", name="search_tool", input={"query": "AgentScope"}, ), ) async for response in result: print(f"\n[Final] {response.content[0]['text']}\n") print("=" * 60) print("Example 1: Logging Middleware") print("=" * 60) asyncio.run(example_logging_middleware()) .. rst-class:: sphx-glr-script-out .. code-block:: none ============================================================ Example 1: Logging Middleware ============================================================ [Middleware] Calling tool: search_tool [Middleware] Input: {'query': 'AgentScope'} [Middleware] Response: Search results for 'AgentScope' [Final] Search results for 'AgentScope' [Middleware] Tool search_tool completed .. GENERATED FROM PYTHON SOURCE LINES 158-163 Modifying Input and Output ------------------------------ Middleware can also modify the tool call input and the response content: .. GENERATED FROM PYTHON SOURCE LINES 163-208 .. code-block:: Python async def transform_middleware( kwargs: dict, next_handler: Callable, ) -> AsyncGenerator[ToolResponse, None]: """A middleware that transforms input and output.""" # Access the tool call from kwargs tool_call = kwargs["tool_call"] # Pre-processing: modify the input original_query = tool_call["input"]["query"] tool_call["input"]["query"] = f"[TRANSFORMED] {original_query}" async for response in await next_handler(**kwargs): # Post-processing: modify the response original_text = response.content[0]["text"] response.content[0]["text"] = f"{original_text} [MODIFIED]" yield response async def example_transform_middleware() -> None: """Example of transforming middleware.""" toolkit = Toolkit() toolkit.register_tool_function(search_tool) toolkit.register_middleware(transform_middleware) result = await toolkit.call_tool_function( ToolUseBlock( type="tool_use", id="2", name="search_tool", input={"query": "middleware"}, ), ) async for response in result: print(f"Result: {response.content[0]['text']}") print("\n" + "=" * 60) print("Example 2: Transform Middleware") print("=" * 60) asyncio.run(example_transform_middleware()) .. rst-class:: sphx-glr-script-out .. code-block:: none ============================================================ Example 2: Transform Middleware ============================================================ Result: Search results for '[TRANSFORMED] middleware' [MODIFIED] .. GENERATED FROM PYTHON SOURCE LINES 209-214 Authorization Middleware ------------------------------ You can use middleware to implement authorization checks and skip tool execution if not authorized: .. GENERATED FROM PYTHON SOURCE LINES 214-301 .. code-block:: Python async def authorization_middleware( kwargs: dict, next_handler: Callable, ) -> AsyncGenerator[ToolResponse, None]: """A middleware that checks authorization.""" # Access the tool call from kwargs tool_call = kwargs["tool_call"] # Check if the tool is authorized (simple example) authorized_tools = {"search_tool"} if tool_call["name"] not in authorized_tools: # Skip execution and return error directly print(f"[Auth] Tool {tool_call['name']} is not authorized") yield ToolResponse( content=[ TextBlock( type="text", text=f"Error: Tool '{tool_call['name']}' is not authorized", # noqa: E501 ), ], ) return # Tool is authorized, proceed print(f"[Auth] Tool {tool_call['name']} is authorized") async for response in await next_handler(**kwargs): yield response async def unauthorized_tool(data: str) -> ToolResponse: """An unauthorized tool. Args: data (`str`): Some data. Returns: `ToolResponse`: The result. """ return ToolResponse( content=[TextBlock(type="text", text=f"Processing {data}")], ) async def example_authorization_middleware() -> None: """Example of authorization middleware.""" toolkit = Toolkit() toolkit.register_tool_function(search_tool) toolkit.register_tool_function(unauthorized_tool) toolkit.register_middleware(authorization_middleware) # Try authorized tool print("\nCalling authorized tool:") result = await toolkit.call_tool_function( ToolUseBlock( type="tool_use", id="3", name="search_tool", input={"query": "test"}, ), ) async for response in result: print(f"Result: {response.content[0]['text']}") # Try unauthorized tool print("\nCalling unauthorized tool:") result = await toolkit.call_tool_function( ToolUseBlock( type="tool_use", id="4", name="unauthorized_tool", input={"data": "test"}, ), ) async for response in result: print(f"Result: {response.content[0]['text']}") print("\n" + "=" * 60) print("Example 3: Authorization Middleware") print("=" * 60) asyncio.run(example_authorization_middleware()) .. rst-class:: sphx-glr-script-out .. code-block:: none ============================================================ Example 3: Authorization Middleware ============================================================ Calling authorized tool: [Auth] Tool search_tool is authorized Result: Search results for 'test' Calling unauthorized tool: [Auth] Tool unauthorized_tool is not authorized Result: Error: Tool 'unauthorized_tool' is not authorized .. GENERATED FROM PYTHON SOURCE LINES 302-314 Multiple Middleware (Onion Model) ------------------------------ When multiple middleware are registered, they form an onion-like structure. The execution order follows the onion model: - **Pre-processing**: Executes in the order middleware are registered - **Post-processing**: Executes in reverse order (inner to outer) This is because the actual tool response object is passed through the middleware chain, and each middleware modifies it in place. .. GENERATED FROM PYTHON SOURCE LINES 314-384 .. code-block:: Python async def middleware_1( kwargs: dict, next_handler: Callable, ) -> AsyncGenerator[ToolResponse, None]: """First middleware.""" # Access the tool call from kwargs tool_call = kwargs["tool_call"] # Pre-processing print("[M1] Pre-processing") tool_call["input"]["query"] += " [M1]" async for response in await next_handler(**kwargs): # Post-processing response.content[0]["text"] += " [M1]" print("[M1] Post-processing") yield response async def middleware_2( kwargs: dict, next_handler: Callable, ) -> AsyncGenerator[ToolResponse, None]: """Second middleware.""" # Access the tool call from kwargs tool_call = kwargs["tool_call"] # Pre-processing print("[M2] Pre-processing") tool_call["input"]["query"] += " [M2]" async for response in await next_handler(**kwargs): # Post-processing response.content[0]["text"] += " [M2]" print("[M2] Post-processing") yield response async def example_multiple_middleware() -> None: """Example of multiple middleware.""" toolkit = Toolkit() toolkit.register_tool_function(search_tool) # Register middleware in order toolkit.register_middleware(middleware_1) toolkit.register_middleware(middleware_2) result = await toolkit.call_tool_function( ToolUseBlock( type="tool_use", id="5", name="search_tool", input={"query": "test"}, ), ) async for response in result: print(f"\nFinal result: {response.content[0]['text']}") print("\n" + "=" * 60) print("Example 4: Multiple Middleware (Onion Model)") print("=" * 60) print("\nExecution flow:") print("M1 Pre → M2 Pre → Tool → M2 Post → M1 Post") print() asyncio.run(example_multiple_middleware()) .. rst-class:: sphx-glr-script-out .. code-block:: none ============================================================ Example 4: Multiple Middleware (Onion Model) ============================================================ Execution flow: M1 Pre → M2 Pre → Tool → M2 Post → M1 Post [M1] Pre-processing [M2] Pre-processing [M2] Post-processing [M1] Post-processing Final result: Search results for 'test [M1] [M2]' [M2] [M1] .. GENERATED FROM PYTHON SOURCE LINES 385-404 Use Cases ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ The middleware system is useful for various scenarios: - **Logging and Monitoring**: Track tool usage and performance - **Authorization**: Control access to specific tools - **Rate Limiting**: Limit the frequency of tool calls - **Caching**: Cache tool responses for repeated calls - **Error Handling**: Add retry logic or graceful degradation - **Input Validation**: Validate and sanitize tool inputs - **Output Transformation**: Format or filter tool outputs - **Metrics Collection**: Collect statistics about tool usage .. note:: - Middleware are applied in the order they are registered - The same ``ToolResponse`` object is passed through the middleware chain and modified in place - Middleware can completely skip tool execution by not calling ``next_handler`` - All middleware must be async generator functions that yield ``ToolResponse`` objects .. rst-class:: sphx-glr-timing **Total running time of the script:** (0 minutes 0.011 seconds) .. _sphx_glr_download_tutorial_task_middleware.py: .. only:: html .. container:: sphx-glr-footer sphx-glr-footer-example .. container:: sphx-glr-download sphx-glr-download-jupyter :download:`Download Jupyter notebook: task_middleware.ipynb ` .. container:: sphx-glr-download sphx-glr-download-python :download:`Download Python source code: task_middleware.py ` .. container:: sphx-glr-download sphx-glr-download-zip :download:`Download zipped: task_middleware.zip ` .. only:: html .. rst-class:: sphx-glr-signature `Gallery generated by Sphinx-Gallery `_