Skip to main content

Agentic (ACP) Agent Guide

This guide demonstrates how to interact with agentic (async) AgentEx agents using the AgentEx SDK.

Overview

Agentic agents use an agentic ACP architecture that:
  • Supports both streaming and non-streaming responses
  • Requires explicit task creation before sending events
  • Ideal for complex workflows and long-running operations

Key API Method: send_event()

For agentic (ACP) agents, you always use the send_event() method to communicate with the agent. This is different from sync agents, which use send_message() for direct communication.
# Agentic agents use send_event()
# 1. Create a task first (REQUIRED for agentic agents)
task = await client.agents.create_task(agent_id=agent.id, params={})

# 2. Send events to the task
await client.agents.send_event(
    agent_id=agent.id,
    params={
        "task_id": task.id,
        "content": {"type": "text", "author": "user", "content": "Hello!"}
    }
)
Key Differences from Sync Agents:
  • Task creation required - Must create a task before sending events (sync agents don’t need this)
  • Event-based communication - Send events to tasks, not direct messages to agent names
  • Asynchronous processing - Events are processed asynchronously, poll or stream for responses
The event-based model enables asynchronous processing, state management across multiple turns, and complex workflow orchestration.

ACP Types: Base vs Temporal

Agentic agents can be implemented using two different backend types:
  • Base ACP - Simple event-driven architecture, suitable for learning and simple use cases
  • Temporal - Robust workflow engine for production, handles race conditions automatically
Important: From the client’s perspective, both base ACP and Temporal agents use the exact same API (both use send_event()). The choice between base and Temporal is purely a server-side implementation detail that is transparent to client code. You can switch an agent from base to Temporal (or vice versa) without changing any client code.

Setup

Prerequisites

pip install agentex-sdk

Environment Variables

export AGENTEX_API_KEY="your-api-key"
export AGENTEX_BASE_URL="agentex-base-url"
export SGP_ACCOUNT_ID="your-account-id"

Initialize the Client

import asyncio
from agentex import AsyncAgentex
from agentex.types.text_content import TextContent

# Initialize the async AgentEx client
client = AsyncAgentex(
    base_url=os.environ.get("AGENTEX_BASE_URL"),
    default_headers={
        "x-api-key": os.enviorn.get("AGENTEX_API_KEY"),
        "x-selected-account-id": os.environ.get("SGP_ACCOUNT_ID")
    }
)

Basic Usage

1. Get Agent Information

from agentex.types import Agent

# List all agents
agents = await client.agents.list()
for agent in agents:
    print(f"Agent: {agent.name} - {agent.id} - {agent.acp_type}")

# Retrieve a specific agent by name
AGENT_NAME = "<your-agent-name>"  # Replace with your agent name
agent = await client.agents.retrieve_by_name(agent_name=AGENT_NAME)

print(f"Agent ID: {agent.id}")
print(f"Agent Type: {agent.acp_type}")
print(f"Description: {agent.description}")

2. Create a Task

Agentic agents require a task to be created before sending events:
import uuid

# Create a new task
create_task_response = await client.agents.create_task(
    agent_id=agent.id,
    params={
        "name": f"{uuid.uuid4()}-task",
        "params": {}
    }
)

task = create_task_response.result

print(f"Task ID: {task.id}")
print(f"Task Status: {task.status}")

3. Send an Event

# Create event content
event_content = TextContent(
    type="text",
    author="user",
    content="Hello! Can you explain what quantum computing is?"
)

# Send the event to the task
event_response = await client.agents.send_event(
    agent_id=agent.id,
    params={
        "task_id": task.id,
        "content": event_content
    }
)

event = event_response.result
print(f"Event ID: {event.id}")
print(f"Event created at: {event.created_at}")

4. Receive Messages (Polling)

Since agentic agents process events asynchronously, you need to poll for messages:
from datetime import datetime

has_received_response = False
messages = []
timeout = 30
start_time = datetime.now()

# Poll for messages
while not has_received_response and (datetime.now() - start_time).seconds < timeout:
    messages = await client.messages.list(task_id=task.id)

    for message in messages:
        content = message.content
        if content.author == "agent":
            has_received_response = True
            break

    await asyncio.sleep(1)

# Print received messages
print(f"\nReceived {len(messages)} messages:\n")
for message in messages:
    if message.content and message.content.type == "text":
        print(f"[{message.content.author}]: {message.content.content}")
        print("-" * 80)

Streaming Messages

Helper Function for Streaming

Use the built-in helper to subscribe to task messages when testing in a local environmet:
from agentex.lib.utils.dev_tools import subscribe_to_async_task_messages

async def send_and_stream_event(task_id: str, user_message: str):
    """Send an event and stream the agent's response."""

    # Send the event
    event_content = TextContent(
        type="text",
        author="user",
        content=user_message
    )

    await client.agents.send_event(
        agent_id=agent.id,
        params={
            "task_id": task_id,
            "content": event_content
        }
    )

    print(f"User: {user_message}\n")

    # Subscribe to task messages
    messages = await subscribe_to_async_task_messages(
        client=client,
        task_id=task_id,
        timeout=30,
        max_retries=10,
    )

    return messages

# Usage
await send_and_stream_event(
    task.id,
    "Can you tell me about machine learning?"
)

Custom Streaming Implementation

For more control over streaming or for production environments, implement custom streaming logic:
import json

async def custom_send_stream_event(task_id: str, user_message: str):
    """Send an event and stream the agent's response with custom logic."""

    # Send the event
    event_content = TextContent(
        type="text",
        author="user",
        content=user_message
    )

    await client.agents.send_event(
        agent_id=agent.id,
        params={
            "task_id": task_id,
            "content": event_content
        }
    )

    print(f"User: {user_message}\n")
    print("Agent: ", end="", flush=True)

    # Stream events
    async with client.tasks.with_streaming_response.stream_events(
        task_id=task_id,
        timeout=30
    ) as stream:
        try:
            response_message = ""

            async for task_message_update in stream.iter_lines():
                if task_message_update.startswith("data: "):
                    # Remove 'data: ' prefix
                    task_message_update_json = task_message_update.strip()[6:]
                    task_message_update_data = json.loads(task_message_update_json)

                    if task_message_update_data.get("type") == "delta":
                        delta = task_message_update_data.get("delta", {})
                        if delta.get("type") == "text":
                            text_delta = delta.get("text_delta", "")
                            response_message += text_delta
                            print(text_delta, end="", flush=True)

                    elif task_message_update_data.get("type") == "done":
                        print("\n")
                        break

        except Exception as e:
            print(f"\nError streaming messages: {e}")

# Usage
await custom_send_stream_event(
    task.id,
    "Can you tell me about tennis?"
)

Multi-Turn Conversations

Agentic agents automatically maintain conversation history:
# Create a new task for the conversation
conversation_task_id = str(uuid.uuid4())
create_task_response = await client.agents.create_task(
    agent_id=agent.id,
    id=conversation_task_id,
    params={}
)

conversation_task = create_task_response.result

# Define a multi-turn conversation
messages = [
    "Can you tell me about machine learning?",
    "What are some practical applications?",
    "How do I get started learning it?",
]

# Send each message and stream responses
for message in messages:
    print(f"Task ID: {conversation_task.id}")
    await custom_send_stream_event(conversation_task.id, message)
    print("=" * 80 + "\n")

View Conversation History

Retrieve all messages from a task:
# Get all messages from the task
all_messages = await client.messages.list(task_id=conversation_task.id)

print("\n" + "=" * 80)
print("COMPLETE CONVERSATION HISTORY")
print("=" * 80 + "\n")

for idx, message in enumerate(all_messages, 1):
    if message.content and message.content.type == "text":
        author = message.content.author.upper()
        print(f"{idx}. [{author}]")
        print(f"{message.content.content}\n")
        print("-" * 80)

Managing Tasks

Check Task Status

# Retrieve task status
task_info = await client.tasks.retrieve(task_id=task.id)

print(f"Task ID: {task_info.id}")
print(f"Status: {task_info.status}")
print(f"Status Reason: {task_info.status_reason}")
print(f"Created At: {task_info.created_at}")
print(f"Updated At: {task_info.updated_at}")

List All Tasks for an Agent

# List tasks
tasks = await client.tasks.list(agent_id=agent.id)

for task in tasks:
    print(f"Task {task.id}: {task.status}")

State Management

Agentic agents can maintain state across events:
# List states for an agent
states = await client.states.list(agent_id=agent.id)

for state in states:
    print(f"State ID: {state.id}")
    print(f"Task ID: {state.task_id}")
    print(f"State Data: {state.state}")

I