Documentation Index
Fetch the complete documentation index at: https://docs.gp.scale.com/llms.txt
Use this file to discover all available pages before exploring further.
Overview
Agentic agents use an agentic ACP architecture that:
- Supports both streaming and non-streaming responses
- Requires explicit task creation before sending events
- Ideal for complex workflows and long-running operations
Key API Method: send_event()
For agentic (ACP) agents, you always use the send_event() method to communicate with the agent. This is different from sync agents, which use send_message() for direct communication.
# Agentic agents use send_event()
# 1. Create a task first (REQUIRED for agentic agents)
task = await client.agents.create_task(agent_id=agent.id, params={})
# 2. Send events to the task
await client.agents.send_event(
agent_id=agent.id,
params={
"task_id": task.id,
"content": {"type": "text", "author": "user", "content": "Hello!"}
}
)
Key Differences from Sync Agents:
- Task creation required - Must create a task before sending events (sync agents don’t need this)
- Event-based communication - Send events to tasks, not direct messages to agent names
- Asynchronous processing - Events are processed asynchronously, poll or stream for responses
The event-based model enables asynchronous processing, state management across multiple turns, and complex workflow orchestration.
ACP Types: Base vs Temporal
Agentic agents can be implemented using two different backend types:
- Base ACP - Simple event-driven architecture, suitable for learning and simple use cases
- Temporal - Robust workflow engine for production, handles race conditions automatically
Important: From the client’s perspective, both base ACP and Temporal agents use the exact same API (both use send_event()). The choice between base and Temporal is purely a server-side implementation detail that is transparent to client code. You can switch an agent from base to Temporal (or vice versa) without changing any client code.
Setup
Prerequisites
Environment Variables
export AGENTEX_API_KEY="your-api-key"
export AGENTEX_BASE_URL="agentex-base-url"
export SGP_ACCOUNT_ID="your-account-id"
Initialize the Client
import asyncio
from agentex import AsyncAgentex
from agentex.types.text_content import TextContent
# Initialize the async Agentex client
client = AsyncAgentex(
base_url=os.environ.get("AGENTEX_BASE_URL"),
default_headers={
"x-api-key": os.enviorn.get("AGENTEX_API_KEY"),
"x-selected-account-id": os.environ.get("SGP_ACCOUNT_ID")
}
)
Basic Usage
from agentex.types import Agent
# List all agents
agents = await client.agents.list()
for agent in agents:
print(f"Agent: {agent.name} - {agent.id} - {agent.acp_type}")
# Retrieve a specific agent by name
AGENT_NAME = "<your-agent-name>" # Replace with your agent name
agent = await client.agents.retrieve_by_name(agent_name=AGENT_NAME)
print(f"Agent ID: {agent.id}")
print(f"Agent Type: {agent.acp_type}")
print(f"Description: {agent.description}")
2. Create a Task
Agentic agents require a task to be created before sending events:
import uuid
# Create a new task
create_task_response = await client.agents.create_task(
agent_id=agent.id,
params={
"name": f"{uuid.uuid4()}-task",
"params": {}
}
)
task = create_task_response.result
print(f"Task ID: {task.id}")
print(f"Task Status: {task.status}")
3. Send an Event
# Create event content
event_content = TextContent(
type="text",
author="user",
content="Hello! Can you explain what quantum computing is?"
)
# Send the event to the task
event_response = await client.agents.send_event(
agent_id=agent.id,
params={
"task_id": task.id,
"content": event_content
}
)
event = event_response.result
print(f"Event ID: {event.id}")
print(f"Event created at: {event.created_at}")
4. Receive Messages (Polling)
Since agentic agents process events asynchronously, you need to poll for messages:
from datetime import datetime
has_received_response = False
messages = []
timeout = 30
start_time = datetime.now()
# Poll for messages
while not has_received_response and (datetime.now() - start_time).seconds < timeout:
messages = await client.messages.list(task_id=task.id)
for message in messages:
content = message.content
if content.author == "agent":
has_received_response = True
break
await asyncio.sleep(1)
# Print received messages
print(f"\nReceived {len(messages)} messages:\n")
for message in messages:
if message.content and message.content.type == "text":
print(f"[{message.content.author}]: {message.content.content}")
print("-" * 80)
Streaming Messages
Helper Function for Streaming
Use the built-in helper to subscribe to task messages when testing in a local environmet:
from agentex.lib.utils.dev_tools import subscribe_to_async_task_messages
async def send_and_stream_event(task_id: str, user_message: str):
"""Send an event and stream the agent's response."""
# Send the event
event_content = TextContent(
type="text",
author="user",
content=user_message
)
await client.agents.send_event(
agent_id=agent.id,
params={
"task_id": task_id,
"content": event_content
}
)
print(f"User: {user_message}\n")
# Subscribe to task messages
messages = await subscribe_to_async_task_messages(
client=client,
task_id=task_id,
timeout=30,
max_retries=10,
)
return messages
# Usage
await send_and_stream_event(
task.id,
"Can you tell me about machine learning?"
)
Custom Streaming Implementation
For more control over streaming or for production environments, implement custom streaming logic:
import json
async def custom_send_stream_event(task_id: str, user_message: str):
"""Send an event and stream the agent's response with custom logic."""
# Send the event
event_content = TextContent(
type="text",
author="user",
content=user_message
)
await client.agents.send_event(
agent_id=agent.id,
params={
"task_id": task_id,
"content": event_content
}
)
print(f"User: {user_message}\n")
print("Agent: ", end="", flush=True)
# Stream events
async with client.tasks.with_streaming_response.stream_events(
task_id=task_id,
timeout=30
) as stream:
try:
response_message = ""
async for task_message_update in stream.iter_lines():
if task_message_update.startswith("data: "):
# Remove 'data: ' prefix
task_message_update_json = task_message_update.strip()[6:]
task_message_update_data = json.loads(task_message_update_json)
if task_message_update_data.get("type") == "delta":
delta = task_message_update_data.get("delta", {})
if delta.get("type") == "text":
text_delta = delta.get("text_delta", "")
response_message += text_delta
print(text_delta, end="", flush=True)
elif task_message_update_data.get("type") == "done":
print("\n")
break
except Exception as e:
print(f"\nError streaming messages: {e}")
# Usage
await custom_send_stream_event(
task.id,
"Can you tell me about tennis?"
)
Multi-Turn Conversations
Agentic agents automatically maintain conversation history:
# Create a new task for the conversation
conversation_task_id = str(uuid.uuid4())
create_task_response = await client.agents.create_task(
agent_id=agent.id,
id=conversation_task_id,
params={}
)
conversation_task = create_task_response.result
# Define a multi-turn conversation
messages = [
"Can you tell me about machine learning?",
"What are some practical applications?",
"How do I get started learning it?",
]
# Send each message and stream responses
for message in messages:
print(f"Task ID: {conversation_task.id}")
await custom_send_stream_event(conversation_task.id, message)
print("=" * 80 + "\n")
View Conversation History
Retrieve all messages from a task:
# Get all messages from the task
all_messages = await client.messages.list(task_id=conversation_task.id)
print("\n" + "=" * 80)
print("COMPLETE CONVERSATION HISTORY")
print("=" * 80 + "\n")
for idx, message in enumerate(all_messages, 1):
if message.content and message.content.type == "text":
author = message.content.author.upper()
print(f"{idx}. [{author}]")
print(f"{message.content.content}\n")
print("-" * 80)
Managing Tasks
Check Task Status
# Retrieve task status
task_info = await client.tasks.retrieve(task_id=task.id)
print(f"Task ID: {task_info.id}")
print(f"Status: {task_info.status}")
print(f"Status Reason: {task_info.status_reason}")
print(f"Created At: {task_info.created_at}")
print(f"Updated At: {task_info.updated_at}")
List All Tasks for an Agent
# List tasks
tasks = await client.tasks.list(agent_id=agent.id)
for task in tasks:
print(f"Task {task.id}: {task.status}")
State Management
Agentic agents can maintain state across events:
# List states for an agent
states = await client.states.list(agent_id=agent.id)
for state in states:
print(f"State ID: {state.id}")
print(f"Task ID: {state.task_id}")
print(f"State Data: {state.state}")