import json
async def custom_send_stream_event(task_id: str, user_message: str):
"""Send an event and stream the agent's response with custom logic."""
# Send the event
event_content = TextContent(
type="text",
author="user",
content=user_message
)
await client.agents.send_event(
agent_id=agent.id,
params={
"task_id": task_id,
"content": event_content
}
)
print(f"User: {user_message}\n")
print("Agent: ", end="", flush=True)
# Stream events
async with client.tasks.with_streaming_response.stream_events(
task_id=task_id,
timeout=30
) as stream:
try:
response_message = ""
async for task_message_update in stream.iter_lines():
if task_message_update.startswith("data: "):
# Remove 'data: ' prefix
task_message_update_json = task_message_update.strip()[6:]
task_message_update_data = json.loads(task_message_update_json)
if task_message_update_data.get("type") == "delta":
delta = task_message_update_data.get("delta", {})
if delta.get("type") == "text":
text_delta = delta.get("text_delta", "")
response_message += text_delta
print(text_delta, end="", flush=True)
elif task_message_update_data.get("type") == "done":
print("\n")
break
except Exception as e:
print(f"\nError streaming messages: {e}")
# Usage
await custom_send_stream_event(
task.id,
"Can you tell me about tennis?"
)