Task Definition (@agent.task)
The @agent.task decorator transforms function signatures into agent-driven implementations. You define the interface and behavior specification - the agent provides the implementation.
Tasks are defined on Agent instances and can use State for persistent memory across executions.
Basic Usage
from agex import Agent
agent = Agent()
@agent.task
def solve_equation(equation: str) -> float: # type: ignore[return-value]
"""Solve a mathematical equation and return the result."""
pass
The decorated function is completely replaced - the agent handles all implementation.
Decorator Patterns
Naked Decorator
@agent.task
def analyze_data(data: list[float]) -> dict: # type: ignore[return-value]
"""Analyze numerical data and return statistics summary."""
pass
Parameterized Decorator
@agent.task("Calculate using advanced statistical methods")
def analyze_data(data: list[float]) -> dict: # type: ignore[return-value]
"""Public API: Analyze numerical data and return statistics."""
pass
setup Parameter
The setup parameter runs preparatory code in the agent's sandbox before the agent's main execution loop begins. This is useful for providing the agent with immediate context, which can save an LLM turn.
Common use cases are to have an agent automatically inspect the head of a pandas DataFrame or view an image.
from PIL.Image import Image
@agent.task(setup="view_image(inputs.image)")
def process_image(prompt: str, image: Image) -> Image: # type: ignore[return-value]
"""Process an image based on a prompt."""
pass
# When called, `view_image(image)` will be executed in the sandbox
# before the agent starts thinking about how to handle the prompt.
# This avoids a turn where the agent just decides to view the image.
process_image("Crop this to the subject.", image=my_image)
Executing Tasks
An @agent.task-decorated function can be executed in two ways, depending on your needs for interactivity and observability.
1. Standard Execution
This is the most common way to run a task. You call the function, it blocks until the agent is finished, and then it returns the final result.
Async Execution
Tasks decorated on agents work seamlessly with Python's async/await. Define the task function as async def and await the result:
@agent.task
async def solve_equation(equation: str) -> float: # type: ignore[return-value]
"""Solve a mathematical equation and return the result."""
pass
# Use with await in async context
result = await solve_equation("2*x + 5 = 15")
print(f"Result: {result}")
All execution modes (on_event, on_token) work with async tasks.
[!NOTE] Async registered functions (via
@agent.fn) are only available in async tasks. If an agent tries to call an async function from a sync task, it will see an error and can adapt. Use async tasks when your registered functions include async code.
2. Real-time Handlers with on_event
You may get both event-level visibility and a blocking result via the on_event handler. This provides a "fire-and-forget" way to get a real-time stream of all events without needing to consume a generator.
The handler is a callable that receives the raw event object each time an event is created.
from agex import pprint_events
# For simple, colorful terminal logging, you can use the built-in pprint_events helper.
result = solve_equation("x**2 = 16", on_event=pprint_events)
Function Signature
The decorator automatically adds session, on_event, and on_token parameters to your function signature as keyword-only arguments.
@agent.task
def my_function(x: int, y: str) -> bool: # type: ignore[return-value]
"""Function description."""
pass
# Becomes callable as:
# my_function(x=10, y="hello")
# my_function(x=10, y="hello", session="user_123")
# my_function(x=10, y="hello", session="user_123", on_event=my_handler)
Session Parameter
- Optional:
session: str = "default" - Purpose: Isolate state between different users or conversations
- Requires: Agent configured with
state=connect_state(...)
from agex import Agent, connect_state
agent = Agent(
state=connect_state(type="versioned", storage="memory"),
)
@agent.task
def chat(message: str) -> str:
"""Chat with the user."""
pass
# Different sessions have isolated memory
chat("Hello", session="user_alice") # Alice's conversation
chat("Hello", session="user_bob") # Bob's separate conversation
# Same session shares memory across calls
chat("Remember X=42", session="alice")
chat("What is X?", session="alice") # Remembers X=42
See State for more details on state management.
Concurrency Control
When using Versioned state with concurrent tasks (e.g., multiple workers, background jobs), tasks may conflict when trying to merge their changes. The on_conflict parameter controls how these conflicts are handled:
# Foreground task - retry on conflict (default)
@agent.task(on_conflict='retry')
def interactive_task(query: str) -> str: # type: ignore[return-value]
"""Process user query with automatic retry."""
pass
# Background task - abandon work on conflict
@agent.task(on_conflict='abandon')
def background_indexing() -> None: # type: ignore[return-value]
"""Rebuild search index in background."""
pass
# Custom retry limit
@agent.task(on_conflict='retry', max_conflict_retries=5)
def critical_task() -> dict: # type: ignore[return-value]
"""Important task that needs more retry attempts."""
pass
Conflict Strategies:
| Strategy | Behavior | Use Case |
|---|---|---|
retry (default) |
Resets state and reruns task (up to max_conflict_retries times) |
Interactive/foreground tasks |
abandon |
Silently returns None on conflict |
Background/non-critical tasks |
How it works:
1. Task executes and calls snapshot() internally after each turn
2. On completion, task attempts to merge() its branch to HEAD
3. If another task modified HEAD concurrently, merge fails
4. With retry: state is reset and task reruns from scratch
5. With abandon: task returns None and work is discarded
[!CAUTION] Side Effects and Conflicts: Conflict handling only resets the
Versionedstate - external side effects cannot be undone. If your task calls functions that modify external systems (databases, APIs, file systems), those changes persist even when: - Retry: Task reruns from scratch (side effects from first attempt remain) - Abandon: Work is discarded (side effects already happened)This can lead to duplicate operations, orphaned data, or inconsistent state. For tasks with side effects: - Prefer functional patterns: Have agents return results that the caller uses to update external systems (after successful merge). This avoids side effects entirely. - Design functions to be idempotent (safe to retry/re-execute) - Use transactional patterns with explicit rollback - Consider deferring side effects until after successful merge - Log all external operations for manual reconciliation
See State - Concurrent Task Handling for implementation details.
Task Cancellation
Long-running tasks can be cancelled using the cancel() method on the task wrapper. This gracefully stops execution at the next iteration boundary.
import threading
@agent.task
def long_running_task() -> str:
"""A task that may need to be cancelled."""
pass
# Start task in background
def run():
try:
result = long_running_task()
except TaskCancelled as e:
print(f"Cancelled after {e.iterations_completed} iterations")
thread = threading.Thread(target=run)
thread.start()
# Cancel from main thread
long_running_task.cancel()
# Or cancel a specific session
long_running_task.cancel(session="user_123")
How it works:
cancel()writes a sentinel to the underlying state store- The task loop checks for this sentinel at the start of each iteration
- When detected, a
TaskCancelledexception is raised - A
CancelledEventis recorded in the event log
Requirements:
- State: The agent must be configured with persistent state (
Versionedwithdiskstorage recommended) - Shared state: For cross-thread/process cancellation, both the canceller and the running task must access the same state store
from agex import Agent, connect_state, TaskCancelled
agent = Agent(
state=connect_state(type="versioned", storage="disk", path="/tmp/agent-state"),
)
[!NOTE] Cancellation is checked between LLM iterations, not mid-execution. If the agent is in the middle of a long function call or waiting for an LLM response, cancellation will take effect after that operation completes.
on_event Parameter
- Optional:
on_event: Callable[[BaseEvent], None] | None = None - Purpose: Provide a callback function to receive events in real time.
- Propagation: The handler is automatically passed to any sub-agent tasks, providing a single, unified event stream for an entire end-to-end operation.
See the Events API Guide for more on event consumption patterns.
on_token Parameter
- Optional:
on_token: Callable[[TokenChunk], None] | None = None - Purpose: Receive LLM output tokens in real time (reasoning vs. code) while still awaiting the final task result.
- Common uses: Live notebooks, terminal dashboards, or UI components that benefit from progressive updates.
- Token structure: Each callback gets a
TokenChunkwithtype,content, and adoneflag signalling the end of a section.
from agex.agent import pprint_tokens
# Stream thinking/code tokens with built-in formatting
result = my_task("solve this", on_token=pprint_tokens)
See Token-Level Streaming for deeper coverage.
Dual-Decorator Pattern
For multi-agent workflows, combine @agent.fn and @agent.task decorators:
# Create specialist agents
data_processor = Agent(name="data_processor")
orchestrator = Agent(name="orchestrator")
# Dual-decorated function: orchestrator can call data_processor's task
@orchestrator.fn(docstring="Clean and process raw data")
@data_processor.task("Remove outliers and normalize values")
def process_data(raw_data: list[float]) -> list[float]: # type: ignore[return-value]
pass
Decorator Order Rules
# ✅ Correct order: @agent.fn OUTER, @agent.task INNER
@orchestrator.fn()
@specialist.task("Task description")
def dual_function():
pass
# ❌ Wrong order: @agent.task before @agent.fn
@specialist.task("Task description")
@orchestrator.fn()
def wrong_order():
pass # Raises ValueError
Validation Rules
Empty Function Body
Task functions must have empty bodies - the agent provides the implementation:
# ✅ Valid: Empty implementations
@agent.task
def valid_function():
"""Task description."""
pass
@agent.task
def also_valid():
"""Another task."""
# Comments are allowed
pass
# ❌ Invalid: Contains implementation
@agent.task
def invalid_function():
"""This will raise an error."""
return "actual code" # Not allowed!
Why empty bodies? The decorator completely replaces your function. The agent receives your function signature and instructions, then generates code to fulfill the contract. Your implementation would be ignored anyway.
Type Checker Compatibility
Type checkers (mypy, pylance) will complain about empty functions that promise to return values:
# Type checker error: Function doesn't return anything but promises a float
@agent.task
def calculate_pi() -> float:
"""Calculate pi to high precision."""
pass # mypy: error - Missing return statement
Solution: Use # type: ignore[return-value] to silence this specific warning:
@agent.task
def calculate_pi() -> float: # type: ignore[return-value]
"""Calculate pi to high precision."""
pass
@agent.task
def process_data(data: list[int]) -> dict: # type: ignore[return-value]
"""Process data and return analysis."""
pass
@agent.task
def update_database(records: list[dict]) -> bool: # type: ignore[return-value]
"""Update database with new records."""
pass
This tells the type checker: "I know this function doesn't return what it promises, but the agent will handle it at runtime."
Required Documentation
# ✅ Valid: Has primer
@agent.task("Calculate the result")
def with_primer():
pass
# ✅ Valid: Has docstring
@agent.task
def with_docstring():
"""Calculate the result."""
pass
# ❌ Invalid: No instructions
@agent.task
def no_instructions():
pass # Raises ValueError - no primer or docstring
Single Task Decorator
agent1 = Agent(name="agent1")
agent2 = Agent(name="agent2")
# Raises ValueError
@agent1.task
@agent2.task
def my_task():
"Do a thing"
pass
Type Validation
Arguments are validated against type annotations:
@agent.task
def process_numbers(data: list[int], threshold: float = 0.5) -> dict: # type: ignore[return-value]
"""Process numerical data above threshold."""
pass
# Validation occurs at call time
result = process_numbers([1, 2, 3], 0.8) # ✅ Valid
result = process_numbers("invalid", 0.8) # ❌ Raises validation error
Complete Example
from agex import Agent, connect_state
# Create agents with shared state configuration
state_config = connect_state(type="versioned", storage="memory")
researcher = Agent(name="researcher", state=state_config)
analyst = Agent(name="analyst", state=state_config)
coordinator = Agent(name="coordinator", state=state_config)
# Register specialist capabilities with coordinator
@coordinator.fn(docstring="Research a topic online")
@researcher.task("Search and summarize information about the given topic")
def research_topic(topic: str, depth: str = "basic") -> dict: # type: ignore[return-value]
"""Research information about a topic."""
pass
@coordinator.fn(docstring="Analyze research data")
@analyst.task("Extract key insights and trends from research data")
def analyze_research(research_data: dict, focus_areas: list[str]) -> dict: # type: ignore[return-value]
"""Analyze research findings."""
pass
# Main coordination task
@coordinator.task("Research and analyze a topic comprehensively")
def full_research_pipeline(topic: str, focus_areas: list[str]) -> dict: # type: ignore[return-value]
"""Complete research and analysis pipeline."""
pass
# Execute - state is managed by the agents
result = full_research_pipeline(
topic="renewable energy trends",
focus_areas=["cost", "adoption", "technology"],
)
print(result) # Comprehensive analysis from both agents
Next Steps
- Agent: Agent class and configuration
- Registration: Expose capabilities to agents
- State: Memory, persistence, and sessions
- Host: Remote execution