Skip to main content

Strands Agents integration

Temporal's integration with Strands Agents is a Plugin that gives your Strands agents Durable Execution via the Temporal platform. The plugin routes model invocations, tool calls, MCP tool calls, and hooks through Temporal Activities, so every step the agent takes is recorded in Workflow history and can survive crashes, restarts, and infrastructure failures.

info

The Temporal Python SDK integration with Strands Agents is currently at an experimental release stage. The API may change in future versions.

Code snippets in this guide are taken from the Strands Agents plugin samples. Refer to the samples for the complete code.

Get started

Install the plugin, then run a minimal Strands agent inside a Temporal Workflow.

Prerequisites

Install the plugin

Install the Temporal Python SDK with Strands Agents support (requires temporalio 1.28.0 or later):

uv add "temporalio[strands-agents]"

or with pip:

pip install "temporalio[strands-agents]"

Run a Strands agent with Durable Execution

The following example runs a Strands agent inside a Temporal Workflow. Model calls execute as Temporal Activities, which means they get automatic retries, timeouts, and durable execution. If the Worker process crashes mid-conversation, Temporal replays the Workflow and resumes from the last completed Activity.

1. Define the Workflow

Create a Workflow that holds a TemporalAgent and invokes it with a prompt. The start_to_close_timeout sets the maximum time each model call Activity can run:

strands_plugin/hello_world/workflow.py

from datetime import timedelta

from temporalio import workflow
from temporalio.contrib.strands import TemporalAgent


@workflow.defn
class HelloWorldWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(start_to_close_timeout=timedelta(seconds=60))

@workflow.run
async def run(self, prompt: str) -> str:
result = await self.agent.invoke_async(prompt)
return str(result)
caution

Inside a Workflow, always call agent.invoke_async(message), not agent(message). The synchronous form spawns a worker thread, which the Workflow sandbox blocks.

2. Start a Worker

Create a Worker that registers the Workflow and the StrandsPlugin. The plugin automatically registers the Activities that handle model calls:

strands_plugin/hello_world/run_worker.py

import asyncio
import os

from temporalio.client import Client
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker

from strands_plugin.hello_world.workflow import HelloWorldWorkflow


async def main() -> None:
plugin = StrandsPlugin()
client = await Client.connect(
os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
plugins=[plugin],
)

worker = Worker(
client,
task_queue="strands-hello-world",
workflows=[HelloWorldWorkflow],
)
print("Worker started. Ctrl+C to exit.")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())

3. Run the Workflow

Start the Workflow from a separate client script. This example sends the prompt "Write a haiku about durable execution" and prints the agent's response:

strands_plugin/hello_world/run_workflow.py

import asyncio
import os

from temporalio.client import Client

from strands_plugin.hello_world.workflow import HelloWorldWorkflow


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))

result = await client.execute_workflow(
HelloWorldWorkflow.run,
"Write a haiku about durable execution.",
id="strands-hello-world",
task_queue="strands-hello-world",
)

print(f"Result: {result}")


if __name__ == "__main__":
asyncio.run(main())

Build the agent

Customize which model provider your agent uses, add tools that run as Activities, subscribe to lifecycle events with hooks, and connect to MCP servers.

Choose and configure models

By default, StrandsPlugin uses Strands' own default model (BedrockModel). To use a different model, pass a models mapping to StrandsPlugin on the Worker. When you provide a custom models mapping, each TemporalAgent must specify which model to use by name.

Each entry in the mapping pairs a name with a factory function that creates a model provider (such as AnthropicModel or BedrockModel). The provider is created on first use and reused for the Worker's lifetime:

from strands.models.anthropic import AnthropicModel
from strands.models.bedrock import BedrockModel

# Workflow
@workflow.defn
class MultiModelWorkflow:
def __init__(self) -> None:
self.agent_a = TemporalAgent(
model="claude",
start_to_close_timeout=timedelta(seconds=60),
)
self.agent_b = TemporalAgent(
model="bedrock",
start_to_close_timeout=timedelta(seconds=60),
)

# Worker
Worker(..., plugins=[StrandsPlugin(models={
"claude": lambda: AnthropicModel(client_args={"api_key": "..."}),
"bedrock": lambda: BedrockModel(),
})])

Each TemporalAgent carries its own Activity options (timeouts, retry policy, task queue, streaming topic) and dispatches to a shared model Activity, which resolves the model name against the registered factories at runtime. A model name not present in the models mapping raises ValueError inside the Activity.

Run non-deterministic tools as Activities

Strands tools that perform I/O, access external services, or produce non-deterministic results need to run as Temporal Activities rather than inline in the Workflow. Wrap each tool in an @activity.defn function, register the Activities on the Worker, and pass them to the agent using activity_as_tool.

Define an Activity for the tool:

strands_plugin/tools/workflow.py

@activity.defn
async def fetch_weather(city: str) -> dict:
"""Stub weather lookup — replace with a real HTTP call in production."""
return {
"city": city,
"temperature_f": 72,
"conditions": "sunny",
}

Pass the Activity to the agent in the Workflow using activity_as_tool:

strands_plugin/tools/workflow.py

@workflow.defn
class ToolsWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[
letter_counter,
activity_as_tool(
fetch_weather,
start_to_close_timeout=timedelta(seconds=30),
),
activity_as_tool(
environment_activity,
start_to_close_timeout=timedelta(seconds=30),
),
],
)

@workflow.run
async def run(self, prompt: str) -> str:
result = await self.agent.invoke_async(prompt)
return str(result)

Register the Activity functions on the Worker:

strands_plugin/tools/run_worker.py

import asyncio
import os

from temporalio.client import Client
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker

from strands_plugin.tools.workflow import (
ToolsWorkflow,
environment_activity,
fetch_weather,
)


async def main() -> None:
plugin = StrandsPlugin()
client = await Client.connect(
os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
plugins=[plugin],
)

worker = Worker(
client,
task_queue="strands-tools",
workflows=[ToolsWorkflow],
activities=[fetch_weather, environment_activity],
)
print("Worker started. Ctrl+C to exit.")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())

If you are using built-in strands_tools, wrap them in a thin async function decorated with @activity.defn so they run as Temporal Activities.

React to agent lifecycle events

Strands' hook system lets you subscribe callbacks to events in the agent lifecycle, such as invocation start/end, model call before/after, tool call before/after, and message added. Use hooks to add logging, metrics, or custom logic at each stage.

Pass hooks=[MyHookProvider()] to TemporalAgent. Hook callbacks fire in Workflow context, so deterministic callbacks work without any extra setup.

For callbacks that need I/O (audit logging, metrics, alerting), use activity_as_hook to dispatch the work as a Temporal Activity. The following example shows both patterns in one HookProvider. The _record callback runs in Workflow context (deterministic), while persist_tool_call runs as an Activity (I/O-safe):

strands_plugin/hooks/workflow.py

@activity.defn
async def persist_tool_call(tool_name: str) -> None:
# In production, write to a database / S3 / your audit pipeline.
activity.logger.info(f"audit: tool {tool_name} completed")

strands_plugin/hooks/workflow.py

class AuditHook(HookProvider):
def __init__(self) -> None:
self.fired: list[str] = []

def register_hooks(self, registry: HookRegistry, **kwargs: object) -> None:
registry.add_callback(AfterToolCallEvent, self._record)
registry.add_callback(
AfterToolCallEvent,
activity_as_hook(
persist_tool_call,
activity_input=lambda event: event.tool_use["name"],
start_to_close_timeout=timedelta(seconds=15),
),
)

def _record(self, event: AfterToolCallEvent) -> None:
self.fired.append(event.tool_use["name"])
danger

Hook callbacks run in Workflow context, so they must be deterministic. Do not use time.time(), uuid.uuid4(), or I/O inside hook callbacks. Use activity_as_hook for anything that requires I/O.

The activity_input parameter extracts serializable values from the event to pass as the Activity's input. Use a dataclass or Pydantic model for multiple values. This is needed because hook events hold references to Agent, AgentTool instances, and other objects that cannot cross the Activity boundary.

Connect to MCP servers

If your agent needs access to tools provided by an MCP server, configure the MCP clients on the Worker and reference them by name in the Workflow.

StrandsPlugin(mcp_clients=...) takes a mapping of name to MCPClient factory, mirroring the models pattern. The plugin registers a per-server Activity and connects at Worker startup to enumerate available tools. In the Workflow, TemporalMCPClient(server="name") is a handle that references the server by name and carries per-call Activity options.

Define the Workflow with a TemporalMCPClient:

strands_plugin/mcp/workflow.py

from datetime import timedelta

from temporalio import workflow
from temporalio.contrib.strands import TemporalAgent, TemporalMCPClient


@workflow.defn
class MCPWorkflow:
def __init__(self) -> None:
echo = TemporalMCPClient(
server="echo",
start_to_close_timeout=timedelta(seconds=30),
)
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[echo],
)

@workflow.run
async def run(self, prompt: str) -> str:
result = await self.agent.invoke_async(prompt)
return str(result)

Register the MCP client factory on the Worker:

strands_plugin/mcp/run_worker.py

# ...
from mcp import StdioServerParameters, stdio_client
from strands.tools.mcp.mcp_client import MCPClient
from temporalio.client import Client
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker
# ...
def _make_echo_client() -> MCPClient:
return MCPClient(
lambda: stdio_client(
StdioServerParameters(
command=sys.executable,
args=[str(ECHO_SERVER)],
)
)
)
# ...
async def main() -> None:
plugin = StrandsPlugin(mcp_clients={"echo": _make_echo_client})
client = await Client.connect(
os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
plugins=[plugin],
)

worker = Worker(
client,
task_queue="strands-mcp",
workflows=[MCPWorkflow],
)
print("Worker started. Ctrl+C to exit.")
await worker.run()

Each factory returns a fully configured MCPClient, so you can pass options like tool_filters, prefix, elicitation_callback, or tasks_config to it.

note

The plugin connects to each MCP server once at Worker startup to enumerate tools. The schema is frozen for the Worker's lifetime. Restart Workers to pick up MCP server changes. If a server is unavailable at startup, the Worker fails to start.

Interact with the agent

Control the shape of agent responses, stream output in real time, and pause the agent for human approval.

Add human approval gates

Some agent actions, such as deleting resources or sending messages, may require human approval before proceeding. Strands offers two ways to interrupt an agent and wait for a response. Both work with the plugin.

In each case, agent.invoke_async() returns AgentResult(stop_reason="interrupt", interrupts=[...]) instead of raising. Pair this with a Signal handler that supplies responses, then resume by calling agent.invoke_async(responses).

Interrupt from a hook

A hook on an interruptible event such as BeforeToolCallEvent can pause the agent by calling event.interrupt(name, reason=...). The hook runs in Workflow context, so it must be deterministic.

Define the approval hook:

strands_plugin/human_in_the_loop/workflow.py

class ApprovalHook(HookProvider):
def register_hooks(self, registry: HookRegistry, **kwargs: object) -> None:
registry.add_callback(BeforeToolCallEvent, self._gate)

def _gate(self, event: BeforeToolCallEvent) -> None:
if event.tool_use["name"] != "delete_file":
return
approval = event.interrupt(
"approval",
reason=f"approve delete of {event.tool_use['input']['path']}?",
)
if approval != "approve":
event.cancel_tool = "denied"

The Workflow waits for a Signal carrying the approval response, then resumes the agent:

strands_plugin/human_in_the_loop/workflow.py

@workflow.defn
class HumanInTheLoopWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
tools=[delete_file],
hooks=[ApprovalHook()],
)
self._approval: Optional[str] = None
self._pending_reason: Optional[str] = None

@workflow.signal
def approve(self, response: str) -> None:
self._approval = response

@workflow.query
def pending_approval(self) -> Optional[str]:
return self._pending_reason

@workflow.run
async def run(self, prompt: str) -> str:
result = await self.agent.invoke_async(prompt)
while result.stop_reason == "interrupt":
interrupts = list(result.interrupts or [])
self._pending_reason = interrupts[0].reason if interrupts else None
await workflow.wait_condition(lambda: self._approval is not None)
response = self._approval
self._approval = None
self._pending_reason = None
responses: list[InterruptResponseContent] = [
{"interruptResponse": {"interruptId": i.id, "response": response}}
for i in interrupts
]
result = await self.agent.invoke_async(responses)
return str(result)

Interrupt from a tool

A @strands.tool function can raise InterruptException(Interrupt(...)) directly. The agent stops with the interrupt, and the Workflow handles the resume the same way as for hooks:

from strands import tool
from strands.interrupt import Interrupt, InterruptException


@tool
def delete_thing(name: str) -> str:
raise InterruptException(
Interrupt(id=f"delete:{name}", name="approval", reason=f"delete {name}?")
)

The same approach works from an activity_as_tool-wrapped Activity. The plugin's failure converter preserves the Interrupt payload across the Activity boundary, so AgentResult.interrupts is populated the same way.

Define the Activity that raises the interrupt:

strands_plugin/activity_interrupt/workflow.py

@activity.defn
async def delete_thing(name: str) -> str:
if name not in _APPROVED:
_APPROVED.add(name)
raise InterruptException(
Interrupt(
id=f"delete:{name}",
name="approval",
reason=f"approve delete of protected resource '{name}'?",
)
)
return f"deleted {name}"
danger

Activity-tool interrupts rely on the plugin's failure converter, which is installed via the client's data converter. Attach StrandsPlugin to the client (not just the Worker) for Activity-tool interrupts to work. Workers built from that client pick up the plugin automatically:

strands_plugin/activity_interrupt/run_worker.py

import asyncio
import os

from temporalio.client import Client
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker

from strands_plugin.activity_interrupt.workflow import (
ActivityInterruptWorkflow,
delete_thing,
)


async def main() -> None:
plugin = StrandsPlugin()
# The plugin MUST be on the client so its failure converter is installed.
client = await Client.connect(
os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
plugins=[plugin],
)

worker = Worker(
client,
task_queue="strands-activity-interrupt",
workflows=[ActivityInterruptWorkflow],
activities=[delete_thing],
)
print("Worker started. Ctrl+C to exit.")
await worker.run()


if __name__ == "__main__":
asyncio.run(main())

Return structured data from an agent

To have the agent return a typed object instead of free-form text, pass a structured_output_model to TemporalAgent. The plugin defaults to the pydantic_data_converter, so Pydantic types serialize cleanly across the Activity and Workflow boundary:

strands_plugin/structured_output/workflow.py

from datetime import timedelta

from pydantic import BaseModel, Field
from temporalio import workflow
from temporalio.contrib.strands import TemporalAgent


class PersonInfo(BaseModel):
name: str = Field(description="Name of the person")
age: int = Field(description="Age of the person")
occupation: str = Field(description="Occupation of the person")


@workflow.defn
class StructuredOutputWorkflow:
def __init__(self) -> None:
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
structured_output_model=PersonInfo,
)

@workflow.run
async def run(self, prompt: str) -> PersonInfo:
result = await self.agent.invoke_async(prompt)
assert isinstance(result.structured_output, PersonInfo)
return result.structured_output

Stream agent output to clients

For long-running agent calls, you may want to forward model output chunks to an external consumer as they arrive rather than waiting for the full response.

Pass streaming_topic="..." to TemporalAgent and host a WorkflowStream on the Workflow. Each StreamEvent is published from inside the model Activity. Subscribers read events through WorkflowStreamClient. Chunks are batched on streaming_batch_interval (default 100 ms).

Define the Workflow with a WorkflowStream and a streaming topic:

strands_plugin/streaming/workflow.py

from datetime import timedelta

from temporalio import workflow
from temporalio.contrib.strands import TemporalAgent
from temporalio.contrib.workflow_streams import WorkflowStream


@workflow.defn
class StreamingWorkflow:
def __init__(self) -> None:
self.stream = WorkflowStream()
self.agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
streaming_topic="events",
)

@workflow.run
async def run(self, prompt: str) -> str:
result = await self.agent.invoke_async(prompt)
return str(result)

Subscribe to the stream from a client:

strands_plugin/streaming/run_workflow.py

import asyncio
import os
from datetime import timedelta

from strands.types.streaming import StreamEvent
from temporalio.client import Client
from temporalio.contrib.workflow_streams import WorkflowStreamClient

from strands_plugin.streaming.workflow import StreamingWorkflow


async def main() -> None:
client = await Client.connect(os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"))
workflow_id = "strands-streaming"

handle = await client.start_workflow(
StreamingWorkflow.run,
"Count from 1 to 5, one number per sentence.",
id=workflow_id,
task_queue="strands-streaming",
)

async def consume() -> None:
stream = WorkflowStreamClient.create(client, workflow_id)
async for item in stream.subscribe(
["events"],
from_offset=0,
result_type=StreamEvent,
poll_cooldown=timedelta(milliseconds=50),
):
event: StreamEvent = item.data
if "contentBlockDelta" in event:
delta = event["contentBlockDelta"].get("delta", {})
if "text" in delta:
print(delta["text"], end="", flush=True)
elif "messageStop" in event:
print()
return

consume_task = asyncio.create_task(consume())
result = await handle.result()
await asyncio.wait_for(consume_task, timeout=10.0)
print(f"Final result: {result}")


if __name__ == "__main__":
asyncio.run(main())

Run in production

Configure retry policies, handle long-running chat sessions, and add distributed tracing.

Configure retries

TemporalAgent disables Strands' built-in ModelRetryStrategy so that retries are handled exclusively by Temporal. Configure retries with retry_policy on TemporalAgent for model calls, and on the Activity options accepted by activity_as_tool, activity_as_hook, and TemporalMCPClient for their respective calls:

from temporalio.common import RetryPolicy


TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
retry_policy=RetryPolicy(maximum_attempts=3),
)

Passing retry_strategy=... to TemporalAgent(...) raises ValueError. Remove the argument (or pass retry_strategy=None) and use retry_policy instead.

Handle long-running chat sessions

A chat-style Workflow accumulates message history with every turn. Over a long session, the Workflow's event history can grow large enough to hit Temporal's per-Workflow history limit. To avoid this, use Continue-as-New to start a fresh Workflow execution while carrying the agent's message history forward as input.

In this example, each user turn arrives as a Workflow Update, so the caller gets the agent's reply back from the same call. The run method creates the agent, then waits until either the chat ends or Temporal suggests continue-as-new. When it does, the Workflow drains any in-flight updates and starts a fresh execution with the agent's accumulated messages:

strands_plugin/continue_as_new/workflow.py

import asyncio
from dataclasses import dataclass, field
from datetime import timedelta

from strands.types.content import Messages
from temporalio import workflow
from temporalio.contrib.strands import TemporalAgent


@dataclass
class ChatInput:
messages: Messages = field(default_factory=list)


@workflow.defn
class ChatWorkflow:
def __init__(self) -> None:
self._done = False
self._lock = asyncio.Lock()
self._agent: TemporalAgent | None = None

@workflow.update
async def turn(self, prompt: str) -> str:
await workflow.wait_condition(lambda: self._agent is not None)
async with self._lock:
assert self._agent is not None
result = await self._agent.invoke_async(prompt)
return str(result).strip()

@workflow.signal
def end_chat(self) -> None:
self._done = True

@workflow.query
def messages(self) -> Messages:
return list(self._agent.messages) if self._agent else []

@workflow.run
async def run(self, input: ChatInput) -> None:
self._agent = TemporalAgent(
start_to_close_timeout=timedelta(seconds=60),
messages=list(input.messages),
)

await workflow.wait_condition(
lambda: self._done or workflow.info().is_continue_as_new_suggested()
)

await workflow.wait_condition(workflow.all_handlers_finished)

if not self._done:
workflow.continue_as_new(ChatInput(messages=self._agent.messages))

Add tracing with OpenTelemetry

To get distributed traces across model, tool, and MCP Activities, combine StrandsPlugin with the OpenTelemetry plugin. Register OpenTelemetryPlugin on the client and StrandsPlugin on the Worker. Workers built from that client pick up the OpenTelemetry plugin automatically:

import opentelemetry.trace
from temporalio.client import Client
from temporalio.contrib.opentelemetry import OpenTelemetryPlugin, create_tracer_provider
from temporalio.contrib.strands import StrandsPlugin
from temporalio.worker import Worker


opentelemetry.trace.set_tracer_provider(create_tracer_provider())

client = await Client.connect("localhost:7233", plugins=[OpenTelemetryPlugin()])

Worker(
client,
task_queue="strands",
workflows=[MyWorkflow],
plugins=[StrandsPlugin()],
)

Set the tracer provider before connecting the client.

Snapshots are not supported

TemporalAgent.take_snapshot() and TemporalAgent.load_snapshot() raise NotImplementedError. Temporal's event history already persists Workflow state durably at a finer granularity than Strands snapshots, so snapshots are redundant inside a Workflow.

Samples

The Strands Agents plugin samples demonstrate all supported patterns end-to-end.