1515
1616import abc
1717import atexit
18+ import concurrent .futures
1819import json
1920import logging
2021import random
2324from collections import OrderedDict
2425from contextlib import contextmanager
2526from types import TracebackType
26- from typing import Iterator , MutableSequence , Optional , Sequence , Tuple , Type
27+ from typing import (
28+ Any ,
29+ Callable ,
30+ Iterator ,
31+ MutableSequence ,
32+ Optional ,
33+ Sequence ,
34+ Tuple ,
35+ Type ,
36+ Union ,
37+ )
2738
2839from opentelemetry import context as context_api
2940from opentelemetry import trace as trace_api
@@ -90,9 +101,12 @@ def force_flush(self, timeout_millis: int = 30000) -> bool:
90101 """
91102
92103
93- class MultiSpanProcessor (SpanProcessor ):
94- """Implementation of :class:`SpanProcessor` that forwards all received
95- events to a list of `SpanProcessor`.
104+ class SynchronousMultiSpanProcessor (SpanProcessor ):
105+ """Implementation of class:`SpanProcessor` that forwards all received
106+ events to a list of span processors sequentially.
107+
108+ The underlying span processors are called in sequential order as they were
109+ added.
96110 """
97111
98112 def __init__ (self ):
@@ -115,9 +129,113 @@ def on_end(self, span: "Span") -> None:
115129 sp .on_end (span )
116130
117131 def shutdown (self ) -> None :
132+ """Sequentially shuts down all underlying span processors.
133+ """
118134 for sp in self ._span_processors :
119135 sp .shutdown ()
120136
137+ def force_flush (self , timeout_millis : int = 30000 ) -> bool :
138+ """Sequentially calls force_flush on all underlying
139+ :class:`SpanProcessor`
140+
141+ Args:
142+ timeout_millis: The maximum amount of time over all span processors
143+ to wait for spans to be exported. In case the first n span
144+ processors exceeded the timeout followup span processors will be
145+ skipped.
146+
147+ Returns:
148+ True if all span processors flushed their spans within the
149+ given timeout, False otherwise.
150+ """
151+ deadline_ns = time_ns () + timeout_millis * 1000000
152+ for sp in self ._span_processors :
153+ current_time_ns = time_ns ()
154+ if current_time_ns >= deadline_ns :
155+ return False
156+
157+ if not sp .force_flush ((deadline_ns - current_time_ns ) // 1000000 ):
158+ return False
159+
160+ return True
161+
162+
163+ class ConcurrentMultiSpanProcessor (SpanProcessor ):
164+ """Implementation of :class:`SpanProcessor` that forwards all received
165+ events to a list of span processors in parallel.
166+
167+ Calls to the underlying span processors are forwarded in parallel by
168+ submitting them to a thread pool executor and waiting until each span
169+ processor finished its work.
170+
171+ Args:
172+ num_threads: The number of threads managed by the thread pool executor
173+ and thus defining how many span processors can work in parallel.
174+ """
175+
176+ def __init__ (self , num_threads : int = 2 ):
177+ # use a tuple to avoid race conditions when adding a new span and
178+ # iterating through it on "on_start" and "on_end".
179+ self ._span_processors = () # type: Tuple[SpanProcessor, ...]
180+ self ._lock = threading .Lock ()
181+ self ._executor = concurrent .futures .ThreadPoolExecutor (
182+ max_workers = num_threads
183+ )
184+
185+ def add_span_processor (self , span_processor : SpanProcessor ) -> None :
186+ """Adds a SpanProcessor to the list handled by this instance."""
187+ with self ._lock :
188+ self ._span_processors = self ._span_processors + (span_processor ,)
189+
190+ def _submit_and_await (
191+ self , func : Callable [[SpanProcessor ], Callable [..., None ]], * args : Any
192+ ):
193+ futures = []
194+ for sp in self ._span_processors :
195+ future = self ._executor .submit (func (sp ), * args )
196+ futures .append (future )
197+ for future in futures :
198+ future .result ()
199+
200+ def on_start (self , span : "Span" ) -> None :
201+ self ._submit_and_await (lambda sp : sp .on_start , span )
202+
203+ def on_end (self , span : "Span" ) -> None :
204+ self ._submit_and_await (lambda sp : sp .on_end , span )
205+
206+ def shutdown (self ) -> None :
207+ """Shuts down all underlying span processors in parallel."""
208+ self ._submit_and_await (lambda sp : sp .shutdown )
209+
210+ def force_flush (self , timeout_millis : int = 30000 ) -> bool :
211+ """Calls force_flush on all underlying span processors in parallel.
212+
213+ Args:
214+ timeout_millis: The maximum amount of time to wait for spans to be
215+ exported.
216+
217+ Returns:
218+ True if all span processors flushed their spans within the given
219+ timeout, False otherwise.
220+ """
221+ futures = []
222+ for sp in self ._span_processors : # type: SpanProcessor
223+ future = self ._executor .submit (sp .force_flush , timeout_millis )
224+ futures .append (future )
225+
226+ timeout_sec = timeout_millis / 1e3
227+ done_futures , not_done_futures = concurrent .futures .wait (
228+ futures , timeout_sec
229+ )
230+ if not_done_futures :
231+ return False
232+
233+ for future in done_futures :
234+ if not future .result ():
235+ return False
236+
237+ return True
238+
121239
122240class EventBase (abc .ABC ):
123241 def __init__ (self , name : str , timestamp : Optional [int ] = None ) -> None :
@@ -742,8 +860,13 @@ def __init__(
742860 sampler : sampling .Sampler = trace_api .sampling .ALWAYS_ON ,
743861 resource : Resource = Resource .create_empty (),
744862 shutdown_on_exit : bool = True ,
863+ active_span_processor : Union [
864+ SynchronousMultiSpanProcessor , ConcurrentMultiSpanProcessor
865+ ] = None ,
745866 ):
746- self ._active_span_processor = MultiSpanProcessor ()
867+ self ._active_span_processor = (
868+ active_span_processor or SynchronousMultiSpanProcessor ()
869+ )
747870 self .resource = resource
748871 self .sampler = sampler
749872 self ._atexit_handler = None
@@ -771,8 +894,8 @@ def add_span_processor(self, span_processor: SpanProcessor) -> None:
771894 The span processors are invoked in the same order they are registered.
772895 """
773896
774- # no lock here because MultiSpanProcessor. add_span_processor is
775- # thread safe
897+ # no lock here because add_span_processor is thread safe for both
898+ # SynchronousMultiSpanProcessor and ConcurrentMultiSpanProcessor.
776899 self ._active_span_processor .add_span_processor (span_processor )
777900
778901 def shutdown (self ):
@@ -781,3 +904,23 @@ def shutdown(self):
781904 if self ._atexit_handler is not None :
782905 atexit .unregister (self ._atexit_handler )
783906 self ._atexit_handler = None
907+
908+ def force_flush (self , timeout_millis : int = 30000 ) -> bool :
909+ """Requests the active span processor to process all spans that have not
910+ yet been processed.
911+
912+ By default force flush is called sequentially on all added span
913+ processors. This means that span processors further back in the list
914+ have less time to flush their spans.
915+ To have span processors flush their spans in parallel it is possible to
916+ initialize the tracer provider with an instance of
917+ `ConcurrentMultiSpanProcessor` at the cost of using multiple threads.
918+
919+ Args:
920+ timeout_millis: The maximum amount of time to wait for spans to be
921+ processed.
922+
923+ Returns:
924+ False if the timeout is exceeded, True otherwise.
925+ """
926+ return self ._active_span_processor .force_flush (timeout_millis )
0 commit comments