Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/agentex/lib/core/tracing/processors/sgp_tracing_processor.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import os
import asyncio
import weakref
from typing import cast, override
Expand All @@ -22,6 +23,28 @@
logger = make_logger(__name__)


_SKIP_SPAN_START_ENV = "AGENTEX_TRACING_SKIP_SPAN_START"


def _skip_span_start_enabled() -> bool:
"""Whether to skip the span-start upsert and write each span only on end.

Tracing writes each span twice — once on start (no ``end_time``) and once
on end. The start row is only ever overwritten by the end write moments
later, so persisting it doubles span-ingest write volume and, on the SGP
backend, costs a non-HOT UPDATE (tsvector/GIN recompute + index churn) plus
a dead tuple per span. Skipping the start makes the end write a single
INSERT.

Default ON. Set ``AGENTEX_TRACING_SKIP_SPAN_START`` to
``0``/``false``/``no``/``off`` to restore the start write — e.g. if you
need in-flight spans visible before they complete, or spans that never end
(process crash) to still be persisted.
"""
raw = os.environ.get(_SKIP_SPAN_START_ENV, "1").strip().lower()
return raw not in ("0", "false", "no", "off")


def _get_span_type(span: Span) -> str:
"""Read span_type from span.data['__span_type__'], defaulting to STANDALONE."""
if isinstance(span.data, dict):
Expand Down Expand Up @@ -75,9 +98,18 @@ def __init__(self, config: SGPTracingProcessorConfig):
disabled=disabled,
)
self.env_vars = EnvironmentVariables.refresh()
logger.info(
"SGP tracing span-start upsert %s (%s)",
"disabled — end-only ingest" if _skip_span_start_enabled() else "enabled",
_SKIP_SPAN_START_ENV,
)

@override
def on_span_start(self, span: Span) -> None:
# End-only ingest: by default the start write is skipped (see
# _skip_span_start_enabled) so each span is persisted once, on end.
if _skip_span_start_enabled():
return
sgp_span = _build_sgp_span(span, self.env_vars)
sgp_span.flush(blocking=False)

Expand Down Expand Up @@ -107,6 +139,11 @@ def __init__(self, config: SGPTracingProcessorConfig):
asyncio.AbstractEventLoop, AsyncSGPClient
] = weakref.WeakKeyDictionary()
self.env_vars = EnvironmentVariables.refresh()
logger.info(
"SGP tracing span-start upsert %s (%s)",
"disabled — end-only ingest" if _skip_span_start_enabled() else "enabled",
_SKIP_SPAN_START_ENV,
)

def _build_client(self) -> AsyncSGPClient:
import httpx
Expand Down Expand Up @@ -150,6 +187,10 @@ async def on_span_end(self, span: Span) -> None:

@override
async def on_spans_start(self, spans: list[Span]) -> None:
# End-only ingest: by default the start write is skipped (see
# _skip_span_start_enabled) so each span is persisted once, on end.
if _skip_span_start_enabled():
return
if not spans:
return

Expand Down
22 changes: 18 additions & 4 deletions src/agentex/lib/core/tracing/span_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,18 @@

logger = make_logger(__name__)

_DEFAULT_BATCH_SIZE = 50
_DEFAULT_LINGER_MS = 100
# Max spans coalesced into one ``upsert_batch`` HTTP call (one
# ``INSERT ... ON CONFLICT`` statement server-side). Larger batches amortize
# the per-request round trip and the per-statement parse/plan + index
# maintenance overhead, which dominates at high span volume. Kept well under
# the EGP backend's 1000-row cap; tune per-deploy via
# ``AGENTEX_SPAN_QUEUE_BATCH_SIZE``.
_DEFAULT_BATCH_SIZE = 200
# Max time the drain lingers after the first span to let a batch fill. Spans
# typically arrive a few ms apart, so a longer linger fills the larger batch
# above rather than shipping near-size-1 batches; bounded so worst-case ingest
# latency (and the in-flight loss window) stays sub-second.
_DEFAULT_LINGER_MS = 250
# 0 == unbounded (preserves prior behavior). A bound makes backpressure
# visible (dropped spans are counted) and caps worst-case memory.
_DEFAULT_MAX_SIZE = 0
Expand Down Expand Up @@ -114,7 +124,7 @@ class AsyncSpanQueue:

def __init__(
self,
batch_size: int = _DEFAULT_BATCH_SIZE,
batch_size: int | None = None,
linger_ms: int | None = None,
max_size: int | None = None,
max_retries: int | None = None,
Expand All @@ -126,7 +136,11 @@ def __init__(
self._queue: asyncio.Queue[_SpanQueueItem] = asyncio.Queue(maxsize=resolved_max_size)
self._drain_task: asyncio.Task[None] | None = None
self._stopping = False
self._batch_size = batch_size
self._batch_size = (
_read_int_env("AGENTEX_SPAN_QUEUE_BATCH_SIZE", _DEFAULT_BATCH_SIZE, minimum=1)
if batch_size is None
else max(1, batch_size)
)
self._linger_ms = _read_linger_ms_env() if linger_ms is None else max(0, linger_ms)
self._max_retries = (
_read_int_env("AGENTEX_SPAN_QUEUE_MAX_RETRIES", _DEFAULT_MAX_RETRIES, minimum=1)
Expand Down
106 changes: 100 additions & 6 deletions tests/lib/core/tracing/processors/test_sgp_tracing_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from datetime import UTC, datetime
from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from agentex.types.span import Span
from agentex.lib.types.tracing import SGPTracingProcessorConfig

Expand Down Expand Up @@ -65,8 +67,9 @@ def test_processor_holds_no_per_span_state(self):
processor, _ = self._make_processor()
assert not hasattr(processor, "_spans")

def test_span_lifecycle_produces_two_flushes(self):
"""Each span produces one flush on start and one on end."""
def test_span_lifecycle_produces_two_flushes(self, monkeypatch):
"""With start writes enabled, each span produces one flush on start and one on end."""
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
processor, _ = self._make_processor()

with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()) as mock_cs:
Expand Down Expand Up @@ -105,6 +108,38 @@ def capture_create_span(**kwargs):
assert captured_spans[0].start_time is not None
assert captured_spans[0].end_time is not None

def test_span_start_skipped_by_default(self, monkeypatch):
"""Default (end-only): on_span_start is a no-op; only on_span_end writes."""
monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False)
processor, _ = self._make_processor()

with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()) as mock_cs:
span = _make_span()
processor.on_span_start(span)
assert mock_cs.call_count == 0 # start skipped — nothing built or flushed
span.end_time = datetime.now(UTC)
processor.on_span_end(span)

assert mock_cs.call_count == 1 # only the end write

def test_span_start_emitted_when_skip_disabled(self, monkeypatch):
"""With skip disabled, on_span_start builds and flushes a span."""
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
processor, _ = self._make_processor()

captured: list[MagicMock] = []

def capture(**kwargs):
sgp_span = _make_mock_sgp_span()
captured.append(sgp_span)
return sgp_span

with patch(f"{MODULE}.create_span", side_effect=capture):
processor.on_span_start(_make_span())

assert len(captured) == 1
assert captured[0].flush.called


# ---------------------------------------------------------------------------
# Async processor tests
Expand Down Expand Up @@ -141,8 +176,9 @@ def test_processor_holds_no_per_span_state(self):
processor, _, _ = self._make_processor()
assert not hasattr(processor, "_spans")

async def test_span_lifecycle_produces_two_upserts(self):
"""Each span produces one upsert_batch call on start and one on end."""
async def test_span_lifecycle_produces_two_upserts(self, monkeypatch):
"""With start writes enabled, each span produces one upsert on start and one on end."""
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
processor, _, mock_client = self._make_processor()

with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
Expand All @@ -153,6 +189,31 @@ async def test_span_lifecycle_produces_two_upserts(self):

assert mock_client.spans.upsert_batch.call_count == 2

async def test_spans_start_skipped_by_default(self, monkeypatch):
"""Default (end-only): on_spans_start makes no upsert; on_spans_end does."""
monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False)
processor, _, mock_client = self._make_processor()

with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
spans = [_make_span() for _ in range(3)]
await processor.on_spans_start(spans)
assert mock_client.spans.upsert_batch.call_count == 0 # start skipped
for s in spans:
s.end_time = datetime.now(UTC)
await processor.on_spans_end(spans)

assert mock_client.spans.upsert_batch.call_count == 1 # only the end write

async def test_spans_start_emitted_when_skip_disabled(self, monkeypatch):
"""With skip disabled, on_spans_start makes one upsert_batch call."""
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
processor, _, mock_client = self._make_processor()

with patch(f"{MODULE}.create_span", side_effect=lambda **kw: _make_mock_sgp_span()):
await processor.on_spans_start([_make_span()])

assert mock_client.spans.upsert_batch.call_count == 1

async def test_span_end_without_prior_start_still_upserts(self):
"""Cross-pod Temporal case: END activity lands on a pod that never saw START.

Expand All @@ -171,8 +232,9 @@ async def test_span_end_without_prior_start_still_upserts(self):
items = mock_client.spans.upsert_batch.call_args.kwargs["items"]
assert len(items) == 1

async def test_sgp_span_input_and_output_propagated_on_end(self):
async def test_sgp_span_input_and_output_propagated_on_end(self, monkeypatch):
"""on_span_end should send the span's current input and output via upsert_batch."""
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
processor, _, mock_client = self._make_processor()

captured: list[MagicMock] = []
Expand Down Expand Up @@ -207,8 +269,9 @@ def capture_create_span(**kwargs):
assert end_call_kwargs["input"]["messages"][-1]["role"] == "assistant"
assert end_call_kwargs["output"] == {"response": "hi"}

async def test_on_spans_start_sends_single_upsert_for_batch(self):
async def test_on_spans_start_sends_single_upsert_for_batch(self, monkeypatch):
"""Given N spans at once, on_spans_start should make ONE upsert_batch HTTP call."""
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
processor, _, mock_client = self._make_processor()

n = 10
Expand All @@ -224,6 +287,7 @@ async def test_on_spans_start_sends_single_upsert_for_batch(self):

async def test_on_spans_start_records_export_success_metrics(self, monkeypatch):
monkeypatch.setenv("AGENTEX_TRACING_METRICS", "1")
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", "0")
import agentex.lib.core.observability.tracing_metrics_recording as recording

recording._metrics_enabled = None
Expand Down Expand Up @@ -400,3 +464,33 @@ async def test_on_spans_end_sends_single_upsert_for_batch(self):
)
items = mock_client.spans.upsert_batch.call_args.kwargs["items"]
assert len(items) == n


# ---------------------------------------------------------------------------
# AGENTEX_TRACING_SKIP_SPAN_START env parsing
# ---------------------------------------------------------------------------


class TestSkipSpanStartEnv:
@staticmethod
def _fn():
from agentex.lib.core.tracing.processors.sgp_tracing_processor import (
_skip_span_start_enabled,
)

return _skip_span_start_enabled

def test_default_is_skip_enabled(self, monkeypatch):
"""Unset → skip span-start (end-only ingest is the default)."""
monkeypatch.delenv("AGENTEX_TRACING_SKIP_SPAN_START", raising=False)
assert self._fn()() is True

@pytest.mark.parametrize("val", ["0", "false", "no", "off", "FALSE", "Off", " no "])
def test_falsy_values_restore_span_start(self, monkeypatch, val):
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", val)
assert self._fn()() is False

@pytest.mark.parametrize("val", ["1", "true", "yes", "on", "anything"])
def test_other_values_keep_skip_enabled(self, monkeypatch, val):
monkeypatch.setenv("AGENTEX_TRACING_SKIP_SPAN_START", val)
assert self._fn()() is True
34 changes: 33 additions & 1 deletion tests/lib/core/tracing/test_span_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@
from unittest.mock import AsyncMock, MagicMock, patch

from agentex.types.span import Span
from agentex.lib.core.tracing.span_queue import SpanEventType, AsyncSpanQueue
from agentex.lib.core.tracing.span_queue import (
_DEFAULT_BATCH_SIZE,
SpanEventType,
AsyncSpanQueue,
)


def _make_span(span_id: str | None = None) -> Span:
Expand Down Expand Up @@ -859,3 +863,31 @@ async def test_enqueue_overhead_with_metrics_disabled(self, monkeypatch):

assert elapsed < 0.05, f"disabled metrics enqueue too slow: {elapsed:.3f}s"
mock_get.assert_not_called()


class TestAsyncSpanQueueBatchSizeConfig:
"""batch_size resolution: explicit arg > AGENTEX_SPAN_QUEUE_BATCH_SIZE env > default."""

async def test_default_batch_size(self, monkeypatch):
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE

async def test_explicit_arg_overrides_default(self, monkeypatch):
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
assert AsyncSpanQueue(batch_size=10)._batch_size == 10

async def test_explicit_arg_clamped_to_min_one(self, monkeypatch):
monkeypatch.delenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", raising=False)
assert AsyncSpanQueue(batch_size=0)._batch_size == 1

async def test_env_used_when_arg_is_none(self, monkeypatch):
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500")
assert AsyncSpanQueue()._batch_size == 500

async def test_explicit_arg_beats_env(self, monkeypatch):
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "500")
assert AsyncSpanQueue(batch_size=7)._batch_size == 7

async def test_invalid_env_falls_back_to_default(self, monkeypatch):
monkeypatch.setenv("AGENTEX_SPAN_QUEUE_BATCH_SIZE", "not-an-int")
assert AsyncSpanQueue()._batch_size == _DEFAULT_BATCH_SIZE
Loading