from claude_agent_sdk import query, ClaudeAgentOptions, ResultMessage
from claude_agent_sdk.types import StreamEvent
import asyncio
import sys
async def streaming_ui():
options = ClaudeAgentOptions(
include_partial_messages=True,
allowed_tools=["Read", "Bash", "Grep"],
)
# 跟踪我们当前是否在工具调用中
in_tool = False
async for message in query(
prompt="Find all TODO comments in the codebase", options=options
):
if isinstance(message, StreamEvent):
event = message.event
event_type = event.get("type")
if event_type == "content_block_start":
content_block = event.get("content_block", {})
if content_block.get("type") == "tool_use":
# 工具调用开始 - 显示状态指示器
tool_name = content_block.get("name")
print(f"\n[Using {tool_name}...]", end="", flush=True)
in_tool = True
elif event_type == "content_block_delta":
delta = event.get("delta", {})
# 仅在不执行工具时流式传输文本
if delta.get("type") == "text_delta" and not in_tool:
sys.stdout.write(delta.get("text", ""))
sys.stdout.flush()
elif event_type == "content_block_stop":
if in_tool:
# 工具调用完成
print(" done", flush=True)
in_tool = False
elif isinstance(message, ResultMessage):
# 代理完成所有工作
print(f"\n\n--- Complete ---")
asyncio.run(streaming_ui())