From 39ac2d191ff461d9951c1aef9e89f5cea390b245 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Tue, 28 Jun 2016 18:51:19 +0200 Subject: [PATCH 01/12] Introduce client-side sampling with sample_rate --- ddtrace/sampler.py | 16 +++++++++ ddtrace/span.py | 4 +++ ddtrace/tracer.py | 86 ++++++++++++++++++++++++-------------------- tests/test_tracer.py | 32 +++++++++++++++++ 4 files changed, 100 insertions(+), 38 deletions(-) create mode 100644 ddtrace/sampler.py diff --git a/ddtrace/sampler.py b/ddtrace/sampler.py new file mode 100644 index 00000000000..cb49799d0df --- /dev/null +++ b/ddtrace/sampler.py @@ -0,0 +1,16 @@ +from .span import MAX_TRACE_ID + +class Sampler(object): + """Sampler manages the client-side trace sampling + + Keep (100 * sample_rate)% of the traces. + Any sampled trace should be entirely ignored by the instrumentation and won't be written. + It samples randomly, its main purpose is to reduce the instrumentation footprint. + """ + + def __init__(self, sample_rate): + self.sample_rate = sample_rate + self.sampling_id_threshold = sample_rate * MAX_TRACE_ID + + def should_sample(self, span): + return span.trace_id >= self.sampling_id_threshold diff --git a/ddtrace/span.py b/ddtrace/span.py index 0da43e25341..fabfe061175 100644 --- a/ddtrace/span.py +++ b/ddtrace/span.py @@ -53,6 +53,9 @@ def __init__(self, self.span_id = span_id or _new_id() self.parent_id = parent_id + # sampling + self.sampled = False + self._tracer = tracer self._parent = None @@ -185,6 +188,7 @@ def __repr__(self): self.name, ) +MAX_TRACE_ID = 2 ** 63 def _new_id(): """Generate a random trace_id""" return random.getrandbits(63) diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index b1ca15224fd..a85ef46cdc4 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -1,8 +1,8 @@ - import logging import threading from .buffer import ThreadLocalSpanBuffer +from .sampler import Sampler from .span import Span from .writer import AgentWriter @@ -12,16 +12,18 @@ class Tracer(object): - def __init__(self, enabled=True, writer=None, span_buffer=None): + def __init__(self, enabled=True, writer=None, span_buffer=None, sample_rate=1): """ Create a new tracer object. - enabled: if False, no spans will be submitted to the writer. - + enabled: if False, no spans will be submitted to the writer writer: an instance of Writer span_buffer: a span buffer instance. used to store inflight traces. by - default, will use thread local storage. + default, will use thread local storage + sample_rate: Pre-sampling rate. """ + self.enabled = enabled + self._writer = writer or AgentWriter() self._span_buffer = span_buffer or ThreadLocalSpanBuffer() @@ -29,7 +31,13 @@ def __init__(self, enabled=True, writer=None, span_buffer=None): self._spans_lock = threading.Lock() self._spans = [] - self.enabled = enabled + if sample_rate <= 0: + log.error("sample_rate is negative or null, disable the Tracer") + sample_rate = 0 + self.enabled = False + elif sample_rate > 1: + sample_rate = 1 + self.sampler = Sampler(sample_rate) # A hook for local debugging. shouldn't be needed or used # in production. @@ -49,26 +57,32 @@ def trace(self, name, service=None, resource=None, span_type=None): >>> parent.finish() >>> parent2 = tracer.trace("parent2") # has no parent span """ - # if we have a current span link the parent + child nodes. + span = None parent = self._span_buffer.get() - trace_id, parent_id = None, None - if parent: - trace_id, parent_id = parent.trace_id, parent.span_id - - # Create the trace. - span = Span(self, - name, - service=service, - resource=resource, - trace_id=trace_id, - parent_id=parent_id, - span_type=span_type, - ) - - # if there's a parent, link them and inherit the service. + log.error(parent) + if parent: + # if we have a current span link the parent + child nodes. + span = Span( + self, + name, + trace_id=parent.trace_id, + parent_id=parent.span_id, + service=(service or parent.service), + resource=resource, + span_type=span_type, + ) span._parent = parent - span.service = span.service or parent.service + span.sampled = parent.sampled + else: + span = Span( + self, + name, + service=service, + resource=resource, + span_type=span_type, + ) + span.sampled = self.sampler.should_sample(span) # Note the current trace. self._span_buffer.set(span) @@ -84,18 +98,17 @@ def record(self, span): if not self.enabled: return - if self._writer: - spans = None - with self._spans_lock: - self._spans.append(span) - parent = span._parent - self._span_buffer.set(parent) - if not parent: - spans = self._spans - self._spans = [] + spans = [] + with self._spans_lock: + self._spans.append(span) + parent = span._parent + self._span_buffer.set(parent) + if not parent: + spans = self._spans + self._spans = [] - if spans: - self.write(spans) + if self._writer and not span.sampled: + self.write(spans) def write(self, spans): """ Submit the given spans to the agent. """ @@ -103,9 +116,6 @@ def write(self, spans): if self.debug_logging: log.debug("submitting %s spans", len(spans)) for span in spans: - log.debug("\n%s" % span.pprint()) + log.debug("\n%s", span.pprint()) self._writer.write(spans) - - - diff --git a/tests/test_tracer.py b/tests/test_tracer.py index b43b95fdee3..2a781b80e47 100644 --- a/tests/test_tracer.py +++ b/tests/test_tracer.py @@ -3,6 +3,8 @@ """ import time +import random + from nose.tools import eq_ from ddtrace.tracer import Tracer @@ -88,6 +90,36 @@ def test_tracer_disabled(): s.set_tag("a", "b") assert not writer.pop() +def test_sampling(): + writer = DummyWriter() + tracer = Tracer(writer=writer, sample_rate=0.5) + + # Set the seed so that the choice of sampled traces is deterministic, then write tests accordingly + random.seed(4012) + + # First trace, not sampled + with tracer.trace("foo") as s: + assert not s.sampled + assert writer.pop() + + # Second trace, sampled + with tracer.trace("figh") as s: + assert s.sampled + s2 = tracer.trace("what") + assert s2.sampled + s2.finish() + with tracer.trace("ever") as s3: + assert s3.sampled + s4 = tracer.trace("!") + assert s4.sampled + s4.finish() + spans = writer.pop() + assert not spans, spans + + # Third trace, not sampled + with tracer.trace("ters") as s: + assert not s.sampled + assert writer.pop() class DummyWriter(object): From 69b567306b1ffbf96cfd5bb0eeaf411e4f6dfe05 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Tue, 28 Jun 2016 19:41:03 +0200 Subject: [PATCH 02/12] Skip ES instrumentation when sampled --- ddtrace/contrib/elasticsearch/transport.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ddtrace/contrib/elasticsearch/transport.py b/ddtrace/contrib/elasticsearch/transport.py index cd7b7c3dd13..c212c2fdbad 100644 --- a/ddtrace/contrib/elasticsearch/transport.py +++ b/ddtrace/contrib/elasticsearch/transport.py @@ -23,6 +23,10 @@ def perform_request(self, method, url, params=None, body=None): This is ConnectionClass-agnostic. """ with self._datadog_tracer.trace("elasticsearch.query") as s: + # Don't instrument if the trace is sampled + if s.sampled: + return super(TracedTransport, self).perform_request(method, url, params=params, body=body) + s.service = self._datadog_service s.span_type = SPAN_TYPE s.set_tag(metadata.METHOD, method) From d4f900b418648670b65dd34351e70f1828c351ae Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Tue, 28 Jun 2016 19:41:18 +0200 Subject: [PATCH 03/12] Skip Flask instrumentation when sampled --- ddtrace/contrib/flask/middleware.py | 31 +++++++++++++++-------------- 1 file changed, 16 insertions(+), 15 deletions(-) diff --git a/ddtrace/contrib/flask/middleware.py b/ddtrace/contrib/flask/middleware.py index c4f0d41d575..6c41d8a299a 100644 --- a/ddtrace/contrib/flask/middleware.py +++ b/ddtrace/contrib/flask/middleware.py @@ -62,21 +62,22 @@ def _finish_span(self, response=None, exception=None): """ Close and finsh the active span if it exists. """ span = getattr(g, 'flask_datadog_span', None) if span: - error = 0 - code = response.status_code if response else None - - # if we didn't get a response, but we did get an exception, set - # codes accordingly. - if not response and exception: - error = 1 - code = 500 - span.set_tag(errors.ERROR_TYPE, type(exception)) - span.set_tag(errors.ERROR_MSG, exception) - - span.resource = str(request.endpoint or "").lower() - span.set_tag(http.URL, str(request.base_url or "")) - span.set_tag(http.STATUS_CODE, code) - span.error = error + if not span.sampled: + error = 0 + code = response.status_code if response else None + + # if we didn't get a response, but we did get an exception, set + # codes accordingly. + if not response and exception: + error = 1 + code = 500 + span.set_tag(errors.ERROR_TYPE, type(exception)) + span.set_tag(errors.ERROR_MSG, exception) + + span.resource = str(request.endpoint or "").lower() + span.set_tag(http.URL, str(request.base_url or "")) + span.set_tag(http.STATUS_CODE, code) + span.error = error span.finish() # Clear our span just in case. g.flask_datadog_span = None From b597b685dcaa55a10cb2d893dc5205fa2ee25d84 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Tue, 28 Jun 2016 19:41:28 +0200 Subject: [PATCH 04/12] Skip Psycopg instrumentation when sampled --- ddtrace/contrib/psycopg/connection.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ddtrace/contrib/psycopg/connection.py b/ddtrace/contrib/psycopg/connection.py index 18f35c6b38b..f165ca38b19 100644 --- a/ddtrace/contrib/psycopg/connection.py +++ b/ddtrace/contrib/psycopg/connection.py @@ -45,6 +45,9 @@ def execute(self, query, vars=None): return cursor.execute(self, query, vars) with self._datadog_tracer.trace("postgres.query") as s: + if s.sampled: + return super(TracedCursor, self).execute(query, vars) + s.resource = query s.service = self._datadog_service s.span_type = sqlx.TYPE From 113097ea22a78f0c61ab1ff7a30e1451ae12dc13 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Wed, 29 Jun 2016 11:46:19 +0200 Subject: [PATCH 05/12] Skip Sqlite instrumentation when sampled --- ddtrace/contrib/sqlite3/connection.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ddtrace/contrib/sqlite3/connection.py b/ddtrace/contrib/sqlite3/connection.py index 11626ca31d0..67c3e407b3e 100644 --- a/ddtrace/contrib/sqlite3/connection.py +++ b/ddtrace/contrib/sqlite3/connection.py @@ -30,6 +30,10 @@ def execute(self, sql, *args, **kwargs): return Cursor.execute(self, sql, *args, **kwargs) with self._datadog_tracer.trace("sqlite3.query", span_type=sqlx.TYPE) as s: + # Don't instrument if the trace is sampled + if s.sampled: + return Cursor.execute(self, sql, *args, **kwargs) + s.set_tag(sqlx.QUERY, sql) s.service = self._datadog_service s.resource = sql # will be normalized From 585d9fe2d50145160eed10560e6795d0d878a839 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Wed, 29 Jun 2016 11:57:22 +0200 Subject: [PATCH 06/12] Skip Pylons instrumentation when sampled --- ddtrace/contrib/pylons/middleware.py | 27 +++++++-------------------- 1 file changed, 7 insertions(+), 20 deletions(-) diff --git a/ddtrace/contrib/pylons/middleware.py b/ddtrace/contrib/pylons/middleware.py index 8ba9ec51a13..32aee1d5ca9 100644 --- a/ddtrace/contrib/pylons/middleware.py +++ b/ddtrace/contrib/pylons/middleware.py @@ -13,32 +13,22 @@ def __init__(self, app, tracer, service="pylons"): self._tracer = tracer def __call__(self, environ, start_response): - span = None - try: - span = self._tracer.trace("pylons.request", service=self._service, span_type=http.TYPE) - log.debug("Initialize new trace %d", span.trace_id) + with self._tracer.trace("pylons.request", service=self._service, span_type=http.TYPE) as span: + + if span.sampled: + return self.app(environ, start_response) def _start_response(status, *args, **kwargs): """ a patched response callback which will pluck some metadata. """ - span.span_type = http.TYPE http_code = int(status.split()[0]) span.set_tag(http.STATUS_CODE, http_code) if http_code >= 500: span.error = 1 return start_response(status, *args, **kwargs) - except Exception: - log.exception("error starting span") - - try: - return self.app(environ, _start_response) - except Exception: - if span: - span.set_traceback() - raise - finally: - if not span: - return + try: + return self.app(environ, _start_response) + finally: controller = environ.get('pylons.routes_dict', {}).get('controller') action = environ.get('pylons.routes_dict', {}).get('action') span.resource = "%s.%s" % (controller, action) @@ -50,6 +40,3 @@ def _start_response(status, *args, **kwargs): "pylons.route.controller": controller, "pylons.route.action": action, }) - span.finish() - except Exception: - log.exception("Error finishing trace") From 4ec676cbf2db46d1003e8c5980b07b013f7a1b20 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Wed, 29 Jun 2016 15:04:19 +0200 Subject: [PATCH 07/12] Define 'set_sample_rate' in Trace --- ddtrace/tracer.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index a85ef46cdc4..87eba45db06 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -31,6 +31,13 @@ def __init__(self, enabled=True, writer=None, span_buffer=None, sample_rate=1): self._spans_lock = threading.Lock() self._spans = [] + self.set_sample_rate(sample_rate) + + # A hook for local debugging. shouldn't be needed or used + # in production. + self.debug_logging = False + + def set_sample_rate(self, sample_rate): if sample_rate <= 0: log.error("sample_rate is negative or null, disable the Tracer") sample_rate = 0 @@ -39,9 +46,6 @@ def __init__(self, enabled=True, writer=None, span_buffer=None, sample_rate=1): sample_rate = 1 self.sampler = Sampler(sample_rate) - # A hook for local debugging. shouldn't be needed or used - # in production. - self.debug_logging = False def trace(self, name, service=None, resource=None, span_type=None): """ @@ -59,7 +63,6 @@ def trace(self, name, service=None, resource=None, span_type=None): """ span = None parent = self._span_buffer.get() - log.error(parent) if parent: # if we have a current span link the parent + child nodes. From 83715d0a6efed3958ba424c3762014973b1a65e4 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Wed, 29 Jun 2016 17:46:50 +0200 Subject: [PATCH 08/12] Simplify 'reporter.py' file --- ddtrace/reporter.py | 184 +------------------------------------------ ddtrace/transport.py | 177 +++++++++++++++++++++++++++++++++++++++++ ddtrace/writer.py | 19 +---- 3 files changed, 183 insertions(+), 197 deletions(-) create mode 100644 ddtrace/transport.py diff --git a/ddtrace/reporter.py b/ddtrace/reporter.py index 004d6545759..716c14660cb 100644 --- a/ddtrace/reporter.py +++ b/ddtrace/reporter.py @@ -1,21 +1,12 @@ """ Report spans to the Agent API. - -The asnyc HTTPReporter is taken from raven.transport.threaded. """ - -import atexit -from .compat import httplib import logging -import threading -from time import sleep, time -import os +from time import time # project -from .compat import Queue, json - - -DEFAULT_TIMEOUT = 10 +from .compat import json +from .transport import ThreadedHTTPTransport log = logging.getLogger(__name__) @@ -24,17 +15,11 @@ class AgentReporter(object): SERVICES_FLUSH_INTERVAL = 60 - def __init__(self, disabled=False, config=None): - self.disabled = disabled - self.config = config + def __init__(self): self.transport = ThreadedHTTPTransport() self.last_services_flush = 0 def report(self, spans, services): - if self.disabled: - log.debug("Trace reporter disabled, skip flushing") - return - if spans: self.send_spans(spans) if services: @@ -54,164 +39,3 @@ def send_services(self, services): data = json.dumps(services) headers = {} self.transport.send("PUT", "/services", data, headers) - - -class ThreadedHTTPTransport(object): - - # Async worker, to be defined at first run - _worker = None - - def send(self, method, endpoint, data, headers): - return self.async_send( - method, endpoint, data, headers, - self.success_callback, self.failure_callback - ) - - def async_send(self, method, endpoint, data, headers, success_cb, failure_cb): - self.get_worker().queue( - self.send_sync, method, endpoint, data, headers, success_cb, failure_cb) - - def send_sync(self, method, endpoint, data, headers, success_cb, failure_cb): - try: - conn = httplib.HTTPConnection('localhost', 7777) - conn.request(method, endpoint, data, headers) - except Exception as e: - failure_cb(e) - else: - success_cb() - - def get_worker(self): - if self._worker is None or not self._worker.is_alive(): - self._worker = AsyncWorker() - return self._worker - - def failure_callback(self, error): - log.error("Failed to report a trace, %s", error) - - def success_callback(self): - pass - - -class AsyncWorker(object): - _terminator = object() - - def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT): - self._queue = Queue(-1) - self._lock = threading.Lock() - self._thread = None - self.options = { - 'shutdown_timeout': shutdown_timeout, - } - self.start() - - def is_alive(self): - return self._thread.is_alive() - - def main_thread_terminated(self): - self._lock.acquire() - try: - if not self._thread: - # thread not started or already stopped - nothing to do - return - - # wake the processing thread up - self._queue.put_nowait(self._terminator) - - timeout = self.options['shutdown_timeout'] - - # wait briefly, initially - initial_timeout = 0.1 - if timeout < initial_timeout: - initial_timeout = timeout - - if not self._timed_queue_join(initial_timeout): - # if that didn't work, wait a bit longer - # NB that size is an approximation, because other threads may - # add or remove items - size = self._queue.qsize() - - print("Sentry is attempting to send %i pending error messages" - % size) - print("Waiting up to %s seconds" % timeout) - - if os.name == 'nt': - print("Press Ctrl-Break to quit") - else: - print("Press Ctrl-C to quit") - - self._timed_queue_join(timeout - initial_timeout) - - self._thread = None - - finally: - self._lock.release() - - def _timed_queue_join(self, timeout): - """ - implementation of Queue.join which takes a 'timeout' argument - - returns true on success, false on timeout - """ - deadline = time() + timeout - queue = self._queue - - queue.all_tasks_done.acquire() - try: - while queue.unfinished_tasks: - delay = deadline - time() - if delay <= 0: - # timed out - return False - - queue.all_tasks_done.wait(timeout=delay) - - return True - - finally: - queue.all_tasks_done.release() - - def start(self): - """ - Starts the task thread. - """ - self._lock.acquire() - try: - if not self._thread: - self._thread = threading.Thread(target=self._target) - self._thread.setDaemon(True) - self._thread.start() - finally: - self._lock.release() - atexit.register(self.main_thread_terminated) - - def stop(self, timeout=None): - """ - Stops the task thread. Synchronous! - """ - self._lock.acquire() - try: - if self._thread: - self._queue.put_nowait(self._terminator) - self._thread.join(timeout=timeout) - self._thread = None - finally: - self._lock.release() - - def queue(self, callback, *args, **kwargs): - self._queue.put_nowait((callback, args, kwargs)) - - def _target(self): - while True: - record = self._queue.get() - try: - if record is self._terminator: - break - callback, args, kwargs = record - try: - callback(*args, **kwargs) - except Exception: - log.error('Failed processing job', exc_info=True) - finally: - self._queue.task_done() - - sleep(0) diff --git a/ddtrace/transport.py b/ddtrace/transport.py new file mode 100644 index 00000000000..5131ab942dc --- /dev/null +++ b/ddtrace/transport.py @@ -0,0 +1,177 @@ +""" +The asnyc HTTPReporter is taken from raven.transport.threaded. +""" + +import atexit +import logging +import threading +from time import sleep, time +import os + +# project +from .compat import httplib, Queue + +log = logging.getLogger(__name__) + + +DEFAULT_TIMEOUT = 10 + +class ThreadedHTTPTransport(object): + + # Async worker, to be defined at first run + _worker = None + + def send(self, method, endpoint, data, headers): + return self.async_send( + method, endpoint, data, headers, + self.success_callback, self.failure_callback + ) + + def async_send(self, method, endpoint, data, headers, success_cb, failure_cb): + self.get_worker().queue( + self.send_sync, method, endpoint, data, headers, success_cb, failure_cb) + + def send_sync(self, method, endpoint, data, headers, success_cb, failure_cb): + try: + conn = httplib.HTTPConnection('localhost', 7777) + conn.request(method, endpoint, data, headers) + except Exception as e: + failure_cb(e) + else: + success_cb() + + def get_worker(self): + if self._worker is None or not self._worker.is_alive(): + self._worker = AsyncWorker() + return self._worker + + def failure_callback(self, error): + log.error("Failed to report a trace, %s", error) + + def success_callback(self): + pass + + +class AsyncWorker(object): + _terminator = object() + + def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT): + self._queue = Queue(-1) + self._lock = threading.Lock() + self._thread = None + self.options = { + 'shutdown_timeout': shutdown_timeout, + } + self.start() + + def is_alive(self): + return self._thread.is_alive() + + def main_thread_terminated(self): + self._lock.acquire() + try: + if not self._thread: + # thread not started or already stopped - nothing to do + return + + # wake the processing thread up + self._queue.put_nowait(self._terminator) + + timeout = self.options['shutdown_timeout'] + + # wait briefly, initially + initial_timeout = 0.1 + if timeout < initial_timeout: + initial_timeout = timeout + + if not self._timed_queue_join(initial_timeout): + # if that didn't work, wait a bit longer + # NB that size is an approximation, because other threads may + # add or remove items + size = self._queue.qsize() + + print("Sentry is attempting to send %i pending error messages" + % size) + print("Waiting up to %s seconds" % timeout) + + if os.name == 'nt': + print("Press Ctrl-Break to quit") + else: + print("Press Ctrl-C to quit") + + self._timed_queue_join(timeout - initial_timeout) + + self._thread = None + + finally: + self._lock.release() + + def _timed_queue_join(self, timeout): + """ + implementation of Queue.join which takes a 'timeout' argument + + returns true on success, false on timeout + """ + deadline = time() + timeout + queue = self._queue + + queue.all_tasks_done.acquire() + try: + while queue.unfinished_tasks: + delay = deadline - time() + if delay <= 0: + # timed out + return False + + queue.all_tasks_done.wait(timeout=delay) + + return True + + finally: + queue.all_tasks_done.release() + + def start(self): + """ + Starts the task thread. + """ + self._lock.acquire() + try: + if not self._thread: + self._thread = threading.Thread(target=self._target) + self._thread.setDaemon(True) + self._thread.start() + finally: + self._lock.release() + atexit.register(self.main_thread_terminated) + + def stop(self, timeout=None): + """ + Stops the task thread. Synchronous! + """ + self._lock.acquire() + try: + if self._thread: + self._queue.put_nowait(self._terminator) + self._thread.join(timeout=timeout) + self._thread = None + finally: + self._lock.release() + + def queue(self, callback, *args, **kwargs): + self._queue.put_nowait((callback, args, kwargs)) + + def _target(self): + while True: + record = self._queue.get() + try: + if record is self._terminator: + break + callback, args, kwargs = record + try: + callback(*args, **kwargs) + except Exception: + log.error('Failed processing job', exc_info=True) + finally: + self._queue.task_done() + + sleep(0) diff --git a/ddtrace/writer.py b/ddtrace/writer.py index 8a80f8dfefb..4b4822878be 100644 --- a/ddtrace/writer.py +++ b/ddtrace/writer.py @@ -1,25 +1,10 @@ - from .reporter import AgentReporter -class Writer(object): - - def write(self, spans): - raise NotImplementedError() - - -class NullWriter(Writer): - - def write(self, spans): - pass - - -class AgentWriter(Writer): +class AgentWriter(object): def __init__(self): self._reporter = AgentReporter() - self.enabled = True # flip this to disable on the fly def write(self, spans): - if self.enabled: - self._reporter.report(spans, []) + self._reporter.report(spans, []) From 2c865f1976284eb84a1ce521c6c80cbfdc016e9b Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Wed, 29 Jun 2016 17:47:22 +0200 Subject: [PATCH 09/12] Add weight attribute to all spans --- ddtrace/span.py | 2 ++ ddtrace/tracer.py | 4 ++++ 2 files changed, 6 insertions(+) diff --git a/ddtrace/span.py b/ddtrace/span.py index fabfe061175..c704f4b9f77 100644 --- a/ddtrace/span.py +++ b/ddtrace/span.py @@ -55,6 +55,7 @@ def __init__(self, # sampling self.sampled = False + self.weight = 1 self._tracer = tracer self._parent = None @@ -77,6 +78,7 @@ def to_dict(self): 'resource' : self.resource, 'name' : self.name, 'error': self.error, + 'weight': self.weight, } if self.start: diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index 87eba45db06..6426a9bc9bc 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -45,6 +45,8 @@ def set_sample_rate(self, sample_rate): elif sample_rate > 1: sample_rate = 1 self.sampler = Sampler(sample_rate) + # `weight` is an attribute applied to all spans to help scaling related statistics + self.weight = 1 / (sample_rate or 1) def trace(self, name, service=None, resource=None, span_type=None): @@ -87,6 +89,8 @@ def trace(self, name, service=None, resource=None, span_type=None): ) span.sampled = self.sampler.should_sample(span) + span.weight = self.weight + # Note the current trace. self._span_buffer.set(span) From 8af16fd27ff2e5943610820201b932880c6f11be Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Wed, 29 Jun 2016 17:48:34 +0200 Subject: [PATCH 10/12] Add a test on span.weight value --- tests/test_tracer.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_tracer.py b/tests/test_tracer.py index 2a781b80e47..7cdeeca8fa0 100644 --- a/tests/test_tracer.py +++ b/tests/test_tracer.py @@ -100,6 +100,7 @@ def test_sampling(): # First trace, not sampled with tracer.trace("foo") as s: assert not s.sampled + assert s.weight == 2 assert writer.pop() # Second trace, sampled From ea55443e64c2d9d638dc2a8a4a34836ab39a1aa5 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Thu, 30 Jun 2016 11:58:02 +0200 Subject: [PATCH 11/12] Rename Sampler to RateSampler, improve its interface --- ddtrace/sampler.py | 21 +++++++++++++++++---- ddtrace/tracer.py | 20 +++----------------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/ddtrace/sampler.py b/ddtrace/sampler.py index cb49799d0df..61bca36d2a3 100644 --- a/ddtrace/sampler.py +++ b/ddtrace/sampler.py @@ -1,7 +1,12 @@ +import logging + from .span import MAX_TRACE_ID -class Sampler(object): - """Sampler manages the client-side trace sampling +log = logging.getLogger(__name__) + + +class RateSampler(object): + """RateSampler manages the client-side trace sampling based on a rate Keep (100 * sample_rate)% of the traces. Any sampled trace should be entirely ignored by the instrumentation and won't be written. @@ -9,8 +14,16 @@ class Sampler(object): """ def __init__(self, sample_rate): + if sample_rate <= 0: + log.error("sample_rate is negative or null, disable the Sampler") + sample_rate = 1 + elif sample_rate > 1: + sample_rate = 1 + self.sample_rate = sample_rate self.sampling_id_threshold = sample_rate * MAX_TRACE_ID - def should_sample(self, span): - return span.trace_id >= self.sampling_id_threshold + def sample(self, span): + span.sampled = span.trace_id >= self.sampling_id_threshold + # `weight` is an attribute applied to all spans to help scaling related statistics + span.weight = 1 / (self.sample_rate or 1) diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index 6426a9bc9bc..ac628f95004 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -2,7 +2,7 @@ import threading from .buffer import ThreadLocalSpanBuffer -from .sampler import Sampler +from .sampler import RateSampler from .span import Span from .writer import AgentWriter @@ -31,24 +31,12 @@ def __init__(self, enabled=True, writer=None, span_buffer=None, sample_rate=1): self._spans_lock = threading.Lock() self._spans = [] - self.set_sample_rate(sample_rate) + self.sampler = RateSampler(sample_rate) # A hook for local debugging. shouldn't be needed or used # in production. self.debug_logging = False - def set_sample_rate(self, sample_rate): - if sample_rate <= 0: - log.error("sample_rate is negative or null, disable the Tracer") - sample_rate = 0 - self.enabled = False - elif sample_rate > 1: - sample_rate = 1 - self.sampler = Sampler(sample_rate) - # `weight` is an attribute applied to all spans to help scaling related statistics - self.weight = 1 / (sample_rate or 1) - - def trace(self, name, service=None, resource=None, span_type=None): """ Return a span that will trace an operation called `name`. @@ -87,9 +75,7 @@ def trace(self, name, service=None, resource=None, span_type=None): resource=resource, span_type=span_type, ) - span.sampled = self.sampler.should_sample(span) - - span.weight = self.weight + self.sampler.sample(span) # Note the current trace. self._span_buffer.set(span) From fc0983dd619aac17a9dd6757f354d9839d4d8a13 Mon Sep 17 00:00:00 2001 From: Benjamin Fernandes Date: Thu, 30 Jun 2016 12:56:32 +0200 Subject: [PATCH 12/12] Redefine `span.sampled` as `span` being kept --- ddtrace/contrib/elasticsearch/transport.py | 4 ++-- ddtrace/contrib/flask/middleware.py | 4 ++-- ddtrace/contrib/psycopg/connection.py | 2 +- ddtrace/contrib/pylons/middleware.py | 2 +- ddtrace/contrib/sqlite3/connection.py | 4 ++-- ddtrace/sampler.py | 4 ++-- ddtrace/span.py | 2 +- ddtrace/tracer.py | 2 +- tests/test_tracer.py | 16 ++++++++-------- 9 files changed, 20 insertions(+), 20 deletions(-) diff --git a/ddtrace/contrib/elasticsearch/transport.py b/ddtrace/contrib/elasticsearch/transport.py index c212c2fdbad..45a1973d75d 100644 --- a/ddtrace/contrib/elasticsearch/transport.py +++ b/ddtrace/contrib/elasticsearch/transport.py @@ -23,8 +23,8 @@ def perform_request(self, method, url, params=None, body=None): This is ConnectionClass-agnostic. """ with self._datadog_tracer.trace("elasticsearch.query") as s: - # Don't instrument if the trace is sampled - if s.sampled: + # Don't instrument if the trace is not sampled + if not s.sampled: return super(TracedTransport, self).perform_request(method, url, params=params, body=body) s.service = self._datadog_service diff --git a/ddtrace/contrib/flask/middleware.py b/ddtrace/contrib/flask/middleware.py index 6c41d8a299a..7a439fd39c0 100644 --- a/ddtrace/contrib/flask/middleware.py +++ b/ddtrace/contrib/flask/middleware.py @@ -59,10 +59,10 @@ def _start_span(self): self.app.logger.exception("error tracing request") def _finish_span(self, response=None, exception=None): - """ Close and finsh the active span if it exists. """ + """ Close and finish the active span if it exists. """ span = getattr(g, 'flask_datadog_span', None) if span: - if not span.sampled: + if span.sampled: error = 0 code = response.status_code if response else None diff --git a/ddtrace/contrib/psycopg/connection.py b/ddtrace/contrib/psycopg/connection.py index f165ca38b19..c108fe59b6c 100644 --- a/ddtrace/contrib/psycopg/connection.py +++ b/ddtrace/contrib/psycopg/connection.py @@ -45,7 +45,7 @@ def execute(self, query, vars=None): return cursor.execute(self, query, vars) with self._datadog_tracer.trace("postgres.query") as s: - if s.sampled: + if not s.sampled: return super(TracedCursor, self).execute(query, vars) s.resource = query diff --git a/ddtrace/contrib/pylons/middleware.py b/ddtrace/contrib/pylons/middleware.py index 32aee1d5ca9..cf5efba60a8 100644 --- a/ddtrace/contrib/pylons/middleware.py +++ b/ddtrace/contrib/pylons/middleware.py @@ -15,7 +15,7 @@ def __init__(self, app, tracer, service="pylons"): def __call__(self, environ, start_response): with self._tracer.trace("pylons.request", service=self._service, span_type=http.TYPE) as span: - if span.sampled: + if not span.sampled: return self.app(environ, start_response) def _start_response(status, *args, **kwargs): diff --git a/ddtrace/contrib/sqlite3/connection.py b/ddtrace/contrib/sqlite3/connection.py index 67c3e407b3e..8628da45029 100644 --- a/ddtrace/contrib/sqlite3/connection.py +++ b/ddtrace/contrib/sqlite3/connection.py @@ -30,8 +30,8 @@ def execute(self, sql, *args, **kwargs): return Cursor.execute(self, sql, *args, **kwargs) with self._datadog_tracer.trace("sqlite3.query", span_type=sqlx.TYPE) as s: - # Don't instrument if the trace is sampled - if s.sampled: + # Don't instrument if the trace is not sampled + if not s.sampled: return Cursor.execute(self, sql, *args, **kwargs) s.set_tag(sqlx.QUERY, sql) diff --git a/ddtrace/sampler.py b/ddtrace/sampler.py index 61bca36d2a3..d9fa0dd71ce 100644 --- a/ddtrace/sampler.py +++ b/ddtrace/sampler.py @@ -9,7 +9,7 @@ class RateSampler(object): """RateSampler manages the client-side trace sampling based on a rate Keep (100 * sample_rate)% of the traces. - Any sampled trace should be entirely ignored by the instrumentation and won't be written. + Any `sampled = False` trace won't be written, and can be ignored by the instrumentation. It samples randomly, its main purpose is to reduce the instrumentation footprint. """ @@ -24,6 +24,6 @@ def __init__(self, sample_rate): self.sampling_id_threshold = sample_rate * MAX_TRACE_ID def sample(self, span): - span.sampled = span.trace_id >= self.sampling_id_threshold + span.sampled = span.trace_id <= self.sampling_id_threshold # `weight` is an attribute applied to all spans to help scaling related statistics span.weight = 1 / (self.sample_rate or 1) diff --git a/ddtrace/span.py b/ddtrace/span.py index c704f4b9f77..2470defa714 100644 --- a/ddtrace/span.py +++ b/ddtrace/span.py @@ -54,7 +54,7 @@ def __init__(self, self.parent_id = parent_id # sampling - self.sampled = False + self.sampled = True self.weight = 1 self._tracer = tracer diff --git a/ddtrace/tracer.py b/ddtrace/tracer.py index ac628f95004..adda9045de3 100644 --- a/ddtrace/tracer.py +++ b/ddtrace/tracer.py @@ -100,7 +100,7 @@ def record(self, span): spans = self._spans self._spans = [] - if self._writer and not span.sampled: + if self._writer and span.sampled: self.write(spans) def write(self, spans): diff --git a/tests/test_tracer.py b/tests/test_tracer.py index 7cdeeca8fa0..382698fbd23 100644 --- a/tests/test_tracer.py +++ b/tests/test_tracer.py @@ -97,29 +97,29 @@ def test_sampling(): # Set the seed so that the choice of sampled traces is deterministic, then write tests accordingly random.seed(4012) - # First trace, not sampled + # First trace, sampled with tracer.trace("foo") as s: - assert not s.sampled + assert s.sampled assert s.weight == 2 assert writer.pop() - # Second trace, sampled + # Second trace, not sampled with tracer.trace("figh") as s: - assert s.sampled + assert not s.sampled s2 = tracer.trace("what") - assert s2.sampled + assert not s2.sampled s2.finish() with tracer.trace("ever") as s3: - assert s3.sampled + assert not s3.sampled s4 = tracer.trace("!") - assert s4.sampled + assert not s4.sampled s4.finish() spans = writer.pop() assert not spans, spans # Third trace, not sampled with tracer.trace("ters") as s: - assert not s.sampled + assert s.sampled assert writer.pop()