Documentation Index
Fetch the complete documentation index at: https://docs.gp.scale.com/llms.txt
Use this file to discover all available pages before exploring further.
If you’re using agentex agents on Temporal, you don’t need this page — agentex provides its own SGP tracing integration through its tracing processors. This guide is for users writing their own Temporal workflows or activities that emit SGP spans directly.
Temporal workflows are deterministic: workflow code is replayed from history on every worker, so it cannot perform network IO, generate non-deterministic UUIDs, or read system time directly. This makes integrating any tracing SDK with Temporal a question of where the tracing calls live.
Activities, by contrast, are regular Python functions running on a worker. They are free to do network IO, so the SGP Tracing SDK works inside an activity exactly like in any other process.
This guide covers the two patterns that come up in practice.
Pattern 1: Trace at the activity boundary (recommended)
Use this when “one activity = one span” gives you the granularity you need. It is the simplest and matches how most Temporal applications are observed.
- Activities call
tracing.create_span(...) directly using the SDK’s context manager.
- The caller of the workflow creates the parent span and passes
trace_id and parent_span_id as workflow input.
- The workflow forwards those IDs to each activity. Each activity span is rooted under the caller’s parent.
from datetime import timedelta
import scale_gp_beta.lib.tracing as tracing
from scale_gp_beta import SGPClient
from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.worker import Worker
@activity.defn
async def fetch_data_activity(url: str, trace_id: str, parent_span_id: str) -> dict:
with tracing.create_span(
name="fetch_data",
trace_id=trace_id,
parent_id=parent_span_id,
input={"url": url},
) as span:
# ... actual work ...
result = {"status": "ok", "url": url}
span.output = result
return result
@workflow.defn
class MyWorkflow:
@workflow.run
async def run(self, urls: list[str], trace_id: str, parent_span_id: str) -> list[dict]:
results = []
for url in urls:
result = await workflow.execute_activity(
fetch_data_activity,
args=(url, trace_id, parent_span_id),
start_to_close_timeout=timedelta(seconds=30),
)
results.append(result)
return results
async def run_worker():
tracing.init(SGPClient(api_key="YOUR_API_KEY", account_id="YOUR_ACCOUNT_ID"))
client = await Client.connect("localhost:7233")
worker = Worker(
client,
task_queue="my-queue",
workflows=[MyWorkflow],
activities=[fetch_data_activity],
)
await worker.run()
The caller starts the workflow inside an existing span, passing the IDs through workflow input:
import scale_gp_beta.lib.tracing as tracing
from scale_gp_beta import SGPClient
from temporalio.client import Client
# Initialize the SDK once at the caller process entry point. The caller and
# the worker run in different processes, each with its own queue manager, so
# tracing.init() is required in both.
tracing.init(SGPClient(api_key="YOUR_API_KEY", account_id="YOUR_ACCOUNT_ID"))
async def kick_off():
client = await Client.connect("localhost:7233")
with tracing.create_trace("my_pipeline") as trace:
with tracing.create_span("orchestrate_pipeline") as parent:
await client.execute_workflow(
MyWorkflow.run,
args=[["https://a.example", "https://b.example"], trace.trace_id, parent.span_id],
id="my-workflow-1",
task_queue="my-queue",
)
Use workflow.uuid4() (Temporal’s deterministic UUID helper), not uuid.uuid4(), when generating IDs inside workflow code. Standard library uuid.uuid4() is non-deterministic and will break workflow replay.
The SDK’s background queue worker is a daemon thread inside the activity worker process, not the workflow. It is fine to use the default tracing.init() configuration. The queue flushes on size or cadence triggers and on worker shutdown.
Pattern 2: Buffer in workflow, flush via activity (advanced)
Use this when you want span boundaries that mirror workflow steps rather than activity invocations, or when activity calls are too coarse-grained to capture the hierarchy you want.
The shape:
- The workflow buffers lightweight span dicts (the SDK’s
Item TypedDict) in workflow state. No network IO, no SDK calls.
- A dedicated
flush_spans activity receives the buffered list and writes them via client.spans.upsert_batch(items=...).
- The flush activity runs:
- when the buffer hits a size threshold,
- on a periodic timer,
- before the workflow completes or calls
continue_as_new.
from datetime import timedelta
from scale_gp_beta import AsyncSGPClient
from scale_gp_beta.types.span_upsert_batch_params import Item as SpanRequest
from temporalio import activity, workflow
from temporalio.common import RetryPolicy
# Module-level client so we reuse the connection pool across activity invocations.
_client = AsyncSGPClient(api_key="YOUR_API_KEY", account_id="YOUR_ACCOUNT_ID")
# 4xx-style errors that should not retry: bad input, auth failure, validation.
# 5xx, rate limits, and transport errors are retried by Temporal per the policy below.
_NON_RETRYABLE = [
"BadRequestError",
"AuthenticationError",
"PermissionDeniedError",
"NotFoundError",
"UnprocessableEntityError",
]
@activity.defn
async def flush_spans_activity(spans: list[SpanRequest]) -> None:
# No tracing.init() needed here: this activity bypasses the SDK queue and
# writes directly via upsert_batch. tracing.init() is only required if you
# call tracing.create_span(...) inside an activity (Pattern 1).
await _client.spans.upsert_batch(items=spans)
@workflow.defn
class BufferedTracingWorkflow:
def __init__(self) -> None:
self._buffer: list[SpanRequest] = []
def _enqueue(self, span: SpanRequest) -> None:
self._buffer.append(span)
async def _maybe_flush(self, force: bool = False) -> None:
if not self._buffer:
return
if not force and len(self._buffer) < 200:
return
spans_to_send = self._buffer
self._buffer = []
await workflow.execute_activity(
flush_spans_activity,
args=(spans_to_send,),
start_to_close_timeout=timedelta(seconds=30),
schedule_to_close_timeout=timedelta(minutes=10),
retry_policy=RetryPolicy(
initial_interval=timedelta(seconds=1),
maximum_interval=timedelta(seconds=30),
maximum_attempts=5,
non_retryable_error_types=_NON_RETRYABLE,
),
)
@workflow.run
async def run(self, items: list[str]) -> None:
trace_id = workflow.uuid4().hex
root_span_id = workflow.uuid4().hex
root: SpanRequest = {
"id": root_span_id,
"trace_id": trace_id,
"name": "workflow_root",
"start_timestamp": workflow.now().isoformat(),
}
self._enqueue(root)
for item in items:
child: SpanRequest = {
"id": workflow.uuid4().hex,
"trace_id": trace_id,
"parent_id": root_span_id,
"name": f"process_{item}",
"start_timestamp": workflow.now().isoformat(),
}
# Real work goes through an activity:
# await workflow.execute_activity(do_work, item, start_to_close_timeout=...)
child["end_timestamp"] = workflow.now().isoformat()
self._enqueue(child)
await self._maybe_flush()
# Set root's end_timestamp and re-enqueue so the upsert lands. The
# upsert_batch endpoint is keyed on `id`, so a second send with
# end_timestamp set just updates the existing row. This handles the
# case where root was already flushed mid-loop (buffer hit 200) and
# the in-memory mutation alone would be lost.
root["end_timestamp"] = workflow.now().isoformat()
self._enqueue(root)
await self._maybe_flush(force=True)
Item is a TypedDict, so values are plain dict at runtime. That means no asdict(...) conversion before the flush activity, and Temporal’s default data converter serializes them as JSON cleanly.
A non-retryable failure (any error class listed in non_retryable_error_types) drops the buffered batch. The data is gone — there is no second chance for those spans. Choose the non-retryable list deliberately: list only errors where retrying truly cannot succeed (bad input, auth) and let everything else retry.
Why upsert_batch and not create
- Idempotency. Temporal retries activities on transient failures.
upsert_batch is keyed on the client-supplied span id, so a retry that partially succeeded will re-converge to the right state. client.spans.create is not safe under retry because a partial success leaves orphan rows.
- Throughput. One HTTP call carries many spans. The batch endpoint accepts up to 1000 items per request.
Keep batches at 200 to 500 items per flush in practice. That keeps payloads under 1 MB and HTTP timeouts comfortable, and gives you headroom under the 1000-item server cap.
Span integrity discipline
The server rejects the whole batch if any child’s trace_id does not match its parent’s. This is easy to trip if parent IDs are minted in workflow code but child IDs are minted in an activity without the trace context piped through. Always pass trace_id (and parent_span_id where relevant) explicitly when crossing the workflow/activity boundary.
Distributed tracing across workflows
If one workflow starts another (child workflow, or an external start_workflow call), propagate trace_id and the originating span’s id via workflow input or memo, and use them in the child workflow as parent_id for its root span. The same propagation rule from the Distributed Tracing page applies; Temporal’s workflow-input mechanism is a clean place to carry it.