Skip to content
Merged
4 changes: 4 additions & 0 deletions ddtrace/contrib/elasticsearch/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ def perform_request(self, method, url, params=None, body=None):
This is ConnectionClass-agnostic.
"""
with self._datadog_tracer.trace("elasticsearch.query") as s:
# Don't instrument if the trace is not sampled
if not s.sampled:
return super(TracedTransport, self).perform_request(method, url, params=params, body=body)

s.service = self._datadog_service
s.span_type = SPAN_TYPE
s.set_tag(metadata.METHOD, method)
Expand Down
33 changes: 17 additions & 16 deletions ddtrace/contrib/flask/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,24 +59,25 @@ def _start_span(self):
self.app.logger.exception("error tracing request")

def _finish_span(self, response=None, exception=None):
""" Close and finsh the active span if it exists. """
""" Close and finish the active span if it exists. """
span = getattr(g, 'flask_datadog_span', None)
if span:
error = 0
code = response.status_code if response else None

# if we didn't get a response, but we did get an exception, set
# codes accordingly.
if not response and exception:
error = 1
code = 500
span.set_tag(errors.ERROR_TYPE, type(exception))
span.set_tag(errors.ERROR_MSG, exception)

span.resource = str(request.endpoint or "").lower()
span.set_tag(http.URL, str(request.base_url or ""))
span.set_tag(http.STATUS_CODE, code)
span.error = error
if span.sampled:
error = 0
code = response.status_code if response else None

# if we didn't get a response, but we did get an exception, set
# codes accordingly.
if not response and exception:
error = 1
code = 500
span.set_tag(errors.ERROR_TYPE, type(exception))
span.set_tag(errors.ERROR_MSG, exception)

span.resource = str(request.endpoint or "").lower()
span.set_tag(http.URL, str(request.base_url or ""))
span.set_tag(http.STATUS_CODE, code)
span.error = error
span.finish()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be in the if block as well ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The span already got created. Even if it is not sampled (and we didn't put the various attributes), we still have to finish it (to keep a valid tree).

# Clear our span just in case.
g.flask_datadog_span = None
Expand Down
3 changes: 3 additions & 0 deletions ddtrace/contrib/psycopg/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ def execute(self, query, vars=None):
return cursor.execute(self, query, vars)

with self._datadog_tracer.trace("postgres.query") as s:
if not s.sampled:
return super(TracedCursor, self).execute(query, vars)

s.resource = query
s.service = self._datadog_service
s.span_type = sqlx.TYPE
Expand Down
27 changes: 7 additions & 20 deletions ddtrace/contrib/pylons/middleware.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,22 @@ def __init__(self, app, tracer, service="pylons"):
self._tracer = tracer

def __call__(self, environ, start_response):
span = None
try:
span = self._tracer.trace("pylons.request", service=self._service, span_type=http.TYPE)
log.debug("Initialize new trace %d", span.trace_id)
with self._tracer.trace("pylons.request", service=self._service, span_type=http.TYPE) as span:

if not span.sampled:
return self.app(environ, start_response)

def _start_response(status, *args, **kwargs):
""" a patched response callback which will pluck some metadata. """
span.span_type = http.TYPE
http_code = int(status.split()[0])
span.set_tag(http.STATUS_CODE, http_code)
if http_code >= 500:
span.error = 1
return start_response(status, *args, **kwargs)
except Exception:
log.exception("error starting span")

try:
return self.app(environ, _start_response)
except Exception:
if span:
span.set_traceback()
raise
finally:
if not span:
return

try:
return self.app(environ, _start_response)
finally:
controller = environ.get('pylons.routes_dict', {}).get('controller')
action = environ.get('pylons.routes_dict', {}).get('action')
span.resource = "%s.%s" % (controller, action)
Expand All @@ -50,6 +40,3 @@ def _start_response(status, *args, **kwargs):
"pylons.route.controller": controller,
"pylons.route.action": action,
})
span.finish()
except Exception:
log.exception("Error finishing trace")
4 changes: 4 additions & 0 deletions ddtrace/contrib/sqlite3/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ def execute(self, sql, *args, **kwargs):
return Cursor.execute(self, sql, *args, **kwargs)

with self._datadog_tracer.trace("sqlite3.query", span_type=sqlx.TYPE) as s:
# Don't instrument if the trace is not sampled
if not s.sampled:
return Cursor.execute(self, sql, *args, **kwargs)

s.set_tag(sqlx.QUERY, sql)
s.service = self._datadog_service
s.resource = sql # will be normalized
Expand Down
184 changes: 4 additions & 180 deletions ddtrace/reporter.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
"""
Report spans to the Agent API.

The asnyc HTTPReporter is taken from raven.transport.threaded.
"""

import atexit
from .compat import httplib
import logging
import threading
from time import sleep, time
import os
from time import time

# project
from .compat import Queue, json


DEFAULT_TIMEOUT = 10
from .compat import json
from .transport import ThreadedHTTPTransport


log = logging.getLogger(__name__)
Expand All @@ -24,17 +15,11 @@
class AgentReporter(object):
SERVICES_FLUSH_INTERVAL = 60

def __init__(self, disabled=False, config=None):
self.disabled = disabled
self.config = config
def __init__(self):
self.transport = ThreadedHTTPTransport()
self.last_services_flush = 0

def report(self, spans, services):
if self.disabled:
log.debug("Trace reporter disabled, skip flushing")
return

if spans:
self.send_spans(spans)
if services:
Expand All @@ -54,164 +39,3 @@ def send_services(self, services):
data = json.dumps(services)
headers = {}
self.transport.send("PUT", "/services", data, headers)


class ThreadedHTTPTransport(object):

# Async worker, to be defined at first run
_worker = None

def send(self, method, endpoint, data, headers):
return self.async_send(
method, endpoint, data, headers,
self.success_callback, self.failure_callback
)

def async_send(self, method, endpoint, data, headers, success_cb, failure_cb):
self.get_worker().queue(
self.send_sync, method, endpoint, data, headers, success_cb, failure_cb)

def send_sync(self, method, endpoint, data, headers, success_cb, failure_cb):
try:
conn = httplib.HTTPConnection('localhost', 7777)
conn.request(method, endpoint, data, headers)
except Exception as e:
failure_cb(e)
else:
success_cb()

def get_worker(self):
if self._worker is None or not self._worker.is_alive():
self._worker = AsyncWorker()
return self._worker

def failure_callback(self, error):
log.error("Failed to report a trace, %s", error)

def success_callback(self):
pass


class AsyncWorker(object):
_terminator = object()

def __init__(self, shutdown_timeout=DEFAULT_TIMEOUT):
self._queue = Queue(-1)
self._lock = threading.Lock()
self._thread = None
self.options = {
'shutdown_timeout': shutdown_timeout,
}
self.start()

def is_alive(self):
return self._thread.is_alive()

def main_thread_terminated(self):
self._lock.acquire()
try:
if not self._thread:
# thread not started or already stopped - nothing to do
return

# wake the processing thread up
self._queue.put_nowait(self._terminator)

timeout = self.options['shutdown_timeout']

# wait briefly, initially
initial_timeout = 0.1
if timeout < initial_timeout:
initial_timeout = timeout

if not self._timed_queue_join(initial_timeout):
# if that didn't work, wait a bit longer
# NB that size is an approximation, because other threads may
# add or remove items
size = self._queue.qsize()

print("Sentry is attempting to send %i pending error messages"
% size)
print("Waiting up to %s seconds" % timeout)

if os.name == 'nt':
print("Press Ctrl-Break to quit")
else:
print("Press Ctrl-C to quit")

self._timed_queue_join(timeout - initial_timeout)

self._thread = None

finally:
self._lock.release()

def _timed_queue_join(self, timeout):
"""
implementation of Queue.join which takes a 'timeout' argument

returns true on success, false on timeout
"""
deadline = time() + timeout
queue = self._queue

queue.all_tasks_done.acquire()
try:
while queue.unfinished_tasks:
delay = deadline - time()
if delay <= 0:
# timed out
return False

queue.all_tasks_done.wait(timeout=delay)

return True

finally:
queue.all_tasks_done.release()

def start(self):
"""
Starts the task thread.
"""
self._lock.acquire()
try:
if not self._thread:
self._thread = threading.Thread(target=self._target)
self._thread.setDaemon(True)
self._thread.start()
finally:
self._lock.release()
atexit.register(self.main_thread_terminated)

def stop(self, timeout=None):
"""
Stops the task thread. Synchronous!
"""
self._lock.acquire()
try:
if self._thread:
self._queue.put_nowait(self._terminator)
self._thread.join(timeout=timeout)
self._thread = None
finally:
self._lock.release()

def queue(self, callback, *args, **kwargs):
self._queue.put_nowait((callback, args, kwargs))

def _target(self):
while True:
record = self._queue.get()
try:
if record is self._terminator:
break
callback, args, kwargs = record
try:
callback(*args, **kwargs)
except Exception:
log.error('Failed processing job', exc_info=True)
finally:
self._queue.task_done()

sleep(0)
29 changes: 29 additions & 0 deletions ddtrace/sampler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import logging

from .span import MAX_TRACE_ID

log = logging.getLogger(__name__)


class RateSampler(object):
"""RateSampler manages the client-side trace sampling based on a rate

Keep (100 * sample_rate)% of the traces.
Any `sampled = False` trace won't be written, and can be ignored by the instrumentation.
It samples randomly, its main purpose is to reduce the instrumentation footprint.
"""

def __init__(self, sample_rate):
if sample_rate <= 0:
log.error("sample_rate is negative or null, disable the Sampler")
sample_rate = 1
elif sample_rate > 1:
sample_rate = 1

self.sample_rate = sample_rate
self.sampling_id_threshold = sample_rate * MAX_TRACE_ID

def sample(self, span):
span.sampled = span.trace_id <= self.sampling_id_threshold
# `weight` is an attribute applied to all spans to help scaling related statistics
span.weight = 1 / (self.sample_rate or 1)
6 changes: 6 additions & 0 deletions ddtrace/span.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ def __init__(self,
self.span_id = span_id or _new_id()
self.parent_id = parent_id

# sampling
self.sampled = True
self.weight = 1

self._tracer = tracer
self._parent = None

Expand All @@ -74,6 +78,7 @@ def to_dict(self):
'resource' : self.resource,
'name' : self.name,
'error': self.error,
'weight': self.weight,
}

if self.start:
Expand Down Expand Up @@ -185,6 +190,7 @@ def __repr__(self):
self.name,
)

MAX_TRACE_ID = 2 ** 63
def _new_id():
"""Generate a random trace_id"""
return random.getrandbits(63)
Expand Down
Loading