Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 27 additions & 3 deletions temporalio/contrib/opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import opentelemetry.trace
import opentelemetry.trace.propagation.tracecontext
import opentelemetry.util.types
from opentelemetry.context import Context
from opentelemetry.trace import Span, SpanKind, Status, StatusCode, _Links
from opentelemetry.util import types
from typing_extensions import Protocol, TypeAlias, TypedDict

import temporalio.activity
Expand All @@ -34,6 +37,7 @@
import temporalio.exceptions
import temporalio.worker
import temporalio.workflow
from temporalio.exceptions import ApplicationError, ApplicationErrorCategory

# OpenTelemetry dynamically, lazily chooses its context implementation at
# runtime. When first accessed, they use pkg_resources.iter_entry_points + load.
Expand Down Expand Up @@ -167,11 +171,31 @@ def _start_as_current_span(
attributes: opentelemetry.util.types.Attributes,
input: Optional[_InputWithHeaders] = None,
kind: opentelemetry.trace.SpanKind,
context: Optional[Context] = None,
) -> Iterator[None]:
with self.tracer.start_as_current_span(name, attributes=attributes, kind=kind):
with self.tracer.start_as_current_span(
name,
attributes=attributes,
kind=kind,
context=context,
set_status_on_exception=False,
) as span:
if input:
input.headers = self._context_to_headers(input.headers)
yield None
try:
yield None
except Exception as exc:
if (
not isinstance(exc, ApplicationError)
or exc.category != ApplicationErrorCategory.BENIGN
):
span.set_status(
Status(
status_code=StatusCode.ERROR,
description=f"{type(exc).__name__}: {exc}",
)
)
raise

def _completed_workflow_span(
self, params: _CompletedWorkflowSpanParams
Expand Down Expand Up @@ -282,7 +306,7 @@ async def execute_activity(
self, input: temporalio.worker.ExecuteActivityInput
) -> Any:
info = temporalio.activity.info()
with self.root.tracer.start_as_current_span(
with self.root._start_as_current_span(
f"RunActivity:{info.activity_type}",
context=self.root._context_from_headers(input.headers),
attributes={
Expand Down
58 changes: 57 additions & 1 deletion tests/contrib/test_opentelemetry.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,22 @@
import asyncio
import logging
import uuid
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from datetime import timedelta
from typing import Iterable, List, Optional

from opentelemetry.sdk.trace import ReadableSpan, TracerProvider
from opentelemetry.sdk.trace.export import SimpleSpanProcessor
from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter
from opentelemetry.trace import get_tracer
from opentelemetry.trace import StatusCode, get_tracer

from temporalio import activity, workflow
from temporalio.client import Client
from temporalio.common import RetryPolicy
from temporalio.contrib.opentelemetry import TracingInterceptor
from temporalio.contrib.opentelemetry import workflow as otel_workflow
from temporalio.exceptions import ApplicationError, ApplicationErrorCategory
from temporalio.testing import WorkflowEnvironment
from temporalio.worker import UnsandboxedWorkflowRunner, Worker

Expand Down Expand Up @@ -386,6 +388,60 @@ async def test_opentelemetry_always_create_workflow_spans(client: Client):
assert spans[0].name == "RunWorkflow:SimpleWorkflow"


attempted = False


@activity.defn
def benign_activity() -> str:
global attempted
if attempted:
return "done"
attempted = True
raise ApplicationError(
category=ApplicationErrorCategory.BENIGN, message="Benign Error"
)


@workflow.defn
class BenignWorkflow:
@workflow.run
async def run(self) -> str:
return await workflow.execute_activity(
benign_activity, schedule_to_close_timeout=timedelta(seconds=1)
)


async def test_opentelemetry_benign_exception(client: Client):
# Create a tracer that has an in-memory exporter
exporter = InMemorySpanExporter()
provider = TracerProvider()
provider.add_span_processor(SimpleSpanProcessor(exporter))
tracer = get_tracer(__name__, tracer_provider=provider)

# Create new client with tracer interceptor
client_config = client.config()
client_config["interceptors"] = [TracingInterceptor(tracer)]
client = Client(**client_config)

async with Worker(
client,
task_queue=f"task_queue_{uuid.uuid4()}",
workflows=[BenignWorkflow],
activities=[benign_activity],
activity_executor=ThreadPoolExecutor(max_workers=1),
) as worker:
assert "done" == await client.execute_workflow(
BenignWorkflow.run,
id=f"workflow_{uuid.uuid4()}",
task_queue=worker.task_queue,
retry_policy=RetryPolicy(
maximum_attempts=2, initial_interval=timedelta(milliseconds=10)
),
)
spans = exporter.get_finished_spans()
assert all(span.status.status_code == StatusCode.UNSET for span in spans)


# TODO(cretz): Additional tests to write
# * query without interceptor (no headers)
# * workflow without interceptor (no headers) but query with interceptor (headers)
Expand Down
Loading