Streaming and Performance Module¶
Directory: src/selectools/agent/
Key Types: StreamChunk, AgentResult (from selectools.types)
Table of Contents¶
- Overview
- Quick Start
- E2E Streaming (v0.11.0)
- Parallel Tool Execution (v0.11.0)
- Native Function Calling (v0.10.0)
- Routing Mode (v0.10.0)
- Context Propagation (v0.10.0)
- AgentResult (v0.9.0)
- Custom System Prompt (v0.9.0)
- Agent.reset() (v0.9.0)
- Performance Comparison
- Practical Examples
- Best Practices
- Troubleshooting
- Further Reading
Overview¶
The selectools library provides a rich set of streaming and performance features that enable real-time token delivery, concurrent tool execution, and programmatic inspection of agent behavior. These capabilities span from token-level streaming (astream) to routing without execution (routing_only), from native function calling to context-preserving tool execution.
Feature Summary¶
| Feature | Version | Purpose |
|---|---|---|
| E2E Streaming | v0.11.0 | Token-by-token output with native tool call support |
| Parallel Tool Execution | v0.11.0 | Run multiple tools concurrently in a single iteration |
| Native Function Calling | v0.10.0 | Provider-native tool APIs, no regex parsing |
| Routing Mode | v0.10.0 | Select a tool without executing it (classification, intent routing) |
| Context Propagation | v0.10.0 | Preserve tracing and auth when running sync tools in executors |
| AgentResult | v0.9.0 | Structured return with message, tool metadata, iterations |
| Custom System Prompt | v0.9.0 | Inject domain instructions via AgentConfig |
| Agent.reset() | v0.9.0 | Clear state for clean reuse across requests |
Import Paths¶
from selectools import Agent, AgentConfig, Message, Role
from selectools.types import StreamChunk, AgentResult
Quick Start¶
Streaming with astream()¶
import asyncio
from selectools import Agent, AgentConfig, Message, Role, OpenAIProvider
from selectools.types import StreamChunk, AgentResult
agent = Agent(
tools=[search_tool],
provider=OpenAIProvider(),
config=AgentConfig(max_iterations=3),
)
async def main():
async for item in agent.astream([Message(role=Role.USER, content="Search for Python tutorials")]):
if isinstance(item, StreamChunk):
print(item.content, end="", flush=True)
elif isinstance(item, AgentResult):
print(f"\n\nDone in {item.iterations} iterations")
if item.tool_calls:
print(f"Tools used: {[tc.tool_name for tc in item.tool_calls]}")
asyncio.run(main())
E2E Streaming (v0.11.0)¶
Agent.astream()¶
Agent.astream(messages) returns an AsyncGenerator yielding Union[StreamChunk, AgentResult]:
- StreamChunk — Intermediate content chunks (text and/or tool calls)
- AgentResult — Final result, yielded once when the agent completes
StreamChunk¶
@dataclass
class StreamChunk:
content: str = "" # Text delta
role: Role = Role.ASSISTANT
tool_calls: Optional[List[ToolCall]] = None # Optional; present when chunk contains tool invocations
content: The text portion of this chunktool_calls: Optional list ofToolCallobjects when the LLM emits tool invocations during streaming
AgentResult as Final Item¶
The last item yielded by astream() is always an AgentResult. It carries:
message— Final assistant responsetool_name— Last tool called (orNone)tool_args— Args for last tooliterations— Number of loop iterationstool_calls— AllToolCallobjects from the run
Provider Protocol¶
Providers implement astream() yielding Union[str, ToolCall]:
- Text deltas — Raw
strchunks (token-by-token) - Tool calls — Complete
ToolCallobjects when ready (native function calling)
Provider.astream()
│
├──► yield "Hello" (str)
├──► yield " " (str)
├──► yield "world" (str)
├──► yield ToolCall(...) (when tool invocation complete)
└──► yield "!" (str)
Fallback Chain¶
When a provider does not support streaming:
astream() requested
│
├──► Provider has astream()? ──► Use it
│
├──► Provider has acomplete()? ──► Call it, yield full response as single StreamChunk
│
└──► Otherwise ──► Run complete() in ThreadPoolExecutor (sync in async wrapper)
Tool Call Accumulation and Multi-Iteration¶
- Accumulation: Tool calls are accumulated as they stream in from the provider.
- Execution: When all tool calls in a response are ready, they are executed (in parallel if
parallel_tool_execution=True). - Continue: Results are appended to history; streaming continues with the next LLM call.
- Final result: When the LLM produces a final text response with no tool calls,
AgentResultis yielded.
┌─────────────────────────────────────────────────────────────────┐
│ astream() Flow │
├─────────────────────────────────────────────────────────────────┤
│ │
│ Iteration 1: │
│ StreamChunk("Searching...") │
│ StreamChunk(tool_calls=[ToolCall(search, {"query": "..."})]) │
│ [Tools executed] │
│ │
│ Iteration 2 (streaming continues): │
│ StreamChunk("Here are the results:") │
│ StreamChunk(" - Result 1") │
│ ... │
│ AgentResult(message=..., iterations=2, tool_calls=[...]) │
│ │
└─────────────────────────────────────────────────────────────────┘
Parallel Tool Execution (v0.11.0)¶
Overview¶
When the LLM requests multiple tool calls in a single response (common with native function calling), the agent executes them concurrently instead of sequentially.
Configuration¶
Set to False for strictly sequential execution.
Async Execution¶
Uses asyncio.gather() for concurrent tool runs:
Sync Execution¶
Uses ThreadPoolExecutor with one worker per tool:
with ThreadPoolExecutor(max_workers=len(tool_calls)) as pool:
futures = [pool.submit(run_tool, tc) for tc in tool_calls]
results = [f.result() for f in futures]
Guarantees¶
| Guarantee | Description |
|---|---|
| Result ordering | Results appended to history in original request order |
| Error isolation | One tool failure does not block others |
| Hook invocation | on_tool_start, on_tool_end, on_tool_error fire for every tool |
| Single-tool optimization | Only one tool called → sequential path, no executor overhead |
Native Function Calling (v0.10.0)¶
Overview¶
selectools uses provider-native tool APIs instead of regex parsing:
- OpenAI —
functions/tool_usein chat completions - Anthropic —
tool_useblocks - Gemini —
function_callingin responses
Message.tool_calls¶
Responses carry structured ToolCall objects on Message.tool_calls:
response = provider.complete(...)
msg = response[0]
if msg.tool_calls:
for tc in msg.tool_calls:
print(f"Tool: {tc.tool_name}, Args: {tc.parameters}")
No Regex Parsing¶
- Providers return
ToolCallobjects directly. - No text-based patterns such as
TOOL_CALL {...}.
Fallback¶
When a provider returns plain text only (no native tool format), the agent falls back to ToolCallParser regex parsing.
Routing Mode (v0.10.0)¶
Overview¶
AgentConfig(routing_only=True) makes the agent choose a tool but not run it. Useful for classification, intent routing, and tool selection.
Configuration¶
config = AgentConfig(routing_only=True)
agent = Agent(tools=[...], provider=provider, config=config)
Return Value¶
Returns AgentResult with:
tool_name— Selected tooltool_args— Parsed argumentsmessage— Assistant message containing the selectiontrace— Execution trace (LLM call + tool selection steps)
No tool execution; only one LLM call. Observer events on_iteration_start and on_iteration_end both fire for the single iteration, along with on_run_start/on_run_end.
Use Cases¶
| Use Case | Example |
|---|---|
| Classification | Route to sales vs support vs billing |
| Intent detection | Choose between search, calculator, or Q&A |
| Tool preselection | Decide which tools to enable before full execution |
Example¶
from selectools import Agent, AgentConfig, Message, Role, OpenAIProvider
from selectools.types import AgentResult
config = AgentConfig(routing_only=True)
agent = Agent(
tools=[search_tool, calculator_tool, support_tool],
provider=OpenAIProvider(),
config=config,
)
result = agent.run([Message(role=Role.USER, content="I need help with my bill")])
# Inspect routing decision without executing
assert result.tool_name == "support_tool"
assert "billing" in str(result.tool_args).lower() or "bill" in str(result.tool_args).lower()
Context Propagation (v0.10.0)¶
Overview¶
When sync tools run inside a ThreadPoolExecutor (e.g. async agent calling sync tools), contextvars.copy_context() is used so request-scoped state (tracing, auth, etc.) is preserved.
How It Works¶
# In tools/base.py - sync tool execution from async context
context = contextvars.copy_context()
func_with_args = functools.partial(self.function, **call_args)
result = await loop.run_in_executor(executor, context.run, func_with_args)
Preserved State¶
- OpenTelemetry tracing spans
- Auth tokens
- Request IDs
- Other
contextvarsvalues
Async Tools¶
Async tools run in the same event loop as the agent; no executor, so context is already intact.
AgentResult (v0.9.0)¶
Overview¶
agent.run() and agent.arun() return AgentResult instead of Message, enabling programmatic inspection of tool usage and iterations.
Fields¶
| Field | Type | Description |
|---|---|---|
message |
Message |
Final assistant response |
tool_name |
Optional[str] |
Last tool called, or None |
tool_args |
Dict[str, Any] |
Args for last tool call |
iterations |
int |
Number of agent loop iterations |
tool_calls |
List[ToolCall] |
All tool calls in order |
Backward Compatibility¶
result.content→result.message.contentresult.role→result.message.role
Example¶
result = agent.run([Message(role=Role.USER, content="What's the weather in Tokyo?")])
print(result.content) # Final text
print(result.tool_name) # e.g. "get_weather"
print(result.tool_args) # e.g. {"location": "Tokyo"}
print(result.iterations) # e.g. 2
print(len(result.tool_calls)) # Number of tools invoked
Custom System Prompt (v0.9.0)¶
Overview¶
AgentConfig(system_prompt="...") injects domain instructions before tool schemas. They persist across iterations.
Configuration¶
config = AgentConfig(
system_prompt="You are a medical assistant. Only provide information you are confident about."
)
agent = Agent(tools=[...], provider=provider, config=config)
When to Use¶
- Domain constraints (medical, legal, etc.)
- Tone and persona
- Guardrails and safety
- Language or formatting rules
Example¶
config = AgentConfig(
system_prompt="""You are a financial advisor.
- Never guarantee returns.
- Always recommend consulting a licensed professional.
- Use clear, non-technical language."""
)
agent = Agent(tools=[lookup_stock, get_news], provider=provider, config=config)
Agent.reset() (v0.9.0)¶
Overview¶
Agent.reset() clears history, usage stats, analytics, and memory so the same agent instance can be reused across requests.
What It Clears¶
_history— Message historyusage— Token/cost statsanalytics— If enabledmemory— IfConversationMemoryis set, callsmemory.clear()
Pattern¶
agent = Agent(tools=[...], provider=provider, memory=ConversationMemory())
# Create once, reset between requests
for user_request in requests:
agent.reset()
result = agent.run([Message(role=Role.USER, content=user_request)])
Performance Comparison¶
Sequential vs Parallel Tool Execution¶
| Scenario | Sequential | Parallel | Speedup |
|---|---|---|---|
| 3 tools × 0.15s each | ~0.45s | ~0.15s | ~3× |
| 5 tools × 0.2s each | ~1.0s | ~0.2s | ~5× |
| 1 tool | 0.15s | 0.15s | 1× (no overhead) |
Benchmark Example¶
import time
from selectools import Agent, AgentConfig, Message, Role, tool
@tool(description="Simulate slow API")
def slow_api(delay: float) -> str:
time.sleep(delay)
return f"Done after {delay}s"
agent_parallel = Agent(
tools=[slow_api],
provider=provider,
config=AgentConfig(parallel_tool_execution=True, max_iterations=2),
)
agent_sequential = Agent(
tools=[slow_api],
provider=provider,
config=AgentConfig(parallel_tool_execution=False, max_iterations=2),
)
# With a prompt that triggers 3 tool calls:
# parallel: ~0.15s
# sequential: ~0.45s
Practical Examples¶
Routing Mode for Intent Classification¶
config = AgentConfig(routing_only=True)
agent = Agent(
tools=[sales_tool, support_tool, billing_tool],
provider=provider,
config=config,
)
intent = agent.run([Message(role=Role.USER, content=user_message)])
if intent.tool_name == "sales_tool":
route_to_sales_team(intent.tool_args)
elif intent.tool_name == "support_tool":
create_support_ticket(intent.tool_args)
else:
forward_to_billing(intent.tool_args)
AgentResult Inspection for Analytics¶
result = agent.run(messages)
if result.tool_calls:
for tc in result.tool_calls:
log_tool_usage(tc.tool_name, tc.parameters)
if result.iterations > 3:
alert_complex_conversation()
System Prompt for Domain Experts¶
config = AgentConfig(
system_prompt="You are a Python expert. Prefer type hints and modern syntax. Suggest tests when relevant.",
max_iterations=5,
)
agent = Agent(tools=[search_docs, run_code], provider=provider, config=config)
Best Practices¶
1. Use astream() for Responsive UX¶
async for item in agent.astream(messages):
if isinstance(item, StreamChunk):
await websocket.send_json({"type": "chunk", "content": item.content})
elif isinstance(item, AgentResult):
await websocket.send_json({"type": "done", "iterations": item.iterations})
2. Keep parallel_tool_execution Enabled¶
Default is True; disable only when tool ordering or side effects require sequential execution.
3. Prefer routing_only for Classification¶
Use routing mode for cheap classification instead of a full agent run.
4. Reuse Agents with reset()¶
5. Use AgentResult for Observability¶
Use result.tool_calls and result.iterations for logging and monitoring.
Troubleshooting¶
Streaming Yields Nothing Until Complete¶
Cause: Provider lacks astream(); agent falls back to acomplete() and yields a single chunk.
Fix: Use a provider that implements astream() (e.g. OpenAI, Anthropic, Gemini).
Parallel Tools Seem Sequential¶
Cause: parallel_tool_execution=False or only one tool per response.
Fix: Set AgentConfig(parallel_tool_execution=True) and use prompts that trigger multiple tools.
Context Lost in Sync Tools¶
Cause: Older selectools versions or custom executor usage without context propagation.
Fix: Upgrade to v0.10.0+; sync tools from async agent should receive proper context propagation.
routing_only Still Executes Tools¶
Cause: Misconfiguration or different code path.
Fix: Ensure AgentConfig(routing_only=True) is passed to Agent, not just AgentConfig().
Further Reading¶
- Agent Module - Agent lifecycle, hooks, configuration
- Tools Module - Tool definition and validation
- Providers Module - Provider implementations and streaming
- Memory Module - Conversation memory and
reset()
Next Steps: Enable streaming with agent.astream() and optimize tool-heavy workflows with parallel_tool_execution=True.