diff --git a/.vscode/settings.json b/.vscode/settings.json index c7cadb4d6c..c167a13dc2 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,4 @@ { - "python.pythonPath": ".venv/bin/python" + "python.pythonPath": ".venv/bin/python", + "python.formatting.provider": "black" } \ No newline at end of file diff --git a/scripts/init_serverless_sdk.py b/scripts/init_serverless_sdk.py index 878ff6029e..7a414ff406 100644 --- a/scripts/init_serverless_sdk.py +++ b/scripts/init_serverless_sdk.py @@ -51,16 +51,23 @@ def extract_and_load_lambda_function_module(self, module_path): # Supported python versions are 2.7, 3.6, 3.7, 3.8 if py_version >= (3, 5): import importlib.util - spec = importlib.util.spec_from_file_location(module_name, module_file_path) + + spec = importlib.util.spec_from_file_location( + module_name, module_file_path + ) self.lambda_function_module = importlib.util.module_from_spec(spec) spec.loader.exec_module(self.lambda_function_module) elif py_version[0] < 3: import imp - self.lambda_function_module = imp.load_source(module_name, module_file_path) + + self.lambda_function_module = imp.load_source( + module_name, module_file_path + ) else: raise ValueError("Python version %s is not supported." % py_version) else: import importlib + self.lambda_function_module = importlib.import_module(module_path) def get_lambda_handler(self): diff --git a/sentry_sdk/_types.py b/sentry_sdk/_types.py index a69896a248..7ce7e9e4f6 100644 --- a/sentry_sdk/_types.py +++ b/sentry_sdk/_types.py @@ -37,7 +37,14 @@ NotImplementedType = Any EventDataCategory = Literal[ - "default", "error", "crash", "transaction", "security", "attachment", "session" + "default", + "error", + "crash", + "transaction", + "security", + "attachment", + "session", + "internal", ] SessionStatus = Literal["ok", "exited", "crashed", "abnormal"] EndpointType = Literal["store", "envelope"] diff --git a/sentry_sdk/client.py b/sentry_sdk/client.py index 7687baa76f..05ea4dec99 100644 --- a/sentry_sdk/client.py +++ b/sentry_sdk/client.py @@ -243,6 +243,9 @@ def _should_capture( self.options["sample_rate"] < 1.0 and random.random() >= self.options["sample_rate"] ): + # record a lost event if we did not sample this. + if self.transport: + self.transport.record_lost_event("sample_rate", data_category="error") return False if self._is_ignored_error(event, hint): diff --git a/sentry_sdk/consts.py b/sentry_sdk/consts.py index a9822e8223..5370fec7b2 100644 --- a/sentry_sdk/consts.py +++ b/sentry_sdk/consts.py @@ -75,6 +75,7 @@ def __init__( traces_sampler=None, # type: Optional[TracesSampler] auto_enabling_integrations=True, # type: bool auto_session_tracking=True, # type: bool + send_client_reports=True, # type: bool _experiments={}, # type: Experiments # noqa: B006 ): # type: (...) -> None diff --git a/sentry_sdk/envelope.py b/sentry_sdk/envelope.py index 5645eb8a12..ebb2842000 100644 --- a/sentry_sdk/envelope.py +++ b/sentry_sdk/envelope.py @@ -2,7 +2,7 @@ import json import mimetypes -from sentry_sdk._compat import text_type +from sentry_sdk._compat import text_type, PY2 from sentry_sdk._types import MYPY from sentry_sdk.session import Session from sentry_sdk.utils import json_dumps, capture_internal_exceptions @@ -18,6 +18,14 @@ from sentry_sdk._types import Event, EventDataCategory +def parse_json(data): + # type: (Union[bytes, text_type]) -> Any + # on some python 3 versions this needs to be bytes + if not PY2 and isinstance(data, bytes): + data = data.decode("utf-8", "replace") + return json.loads(data) + + class Envelope(object): def __init__( self, @@ -114,7 +122,7 @@ def deserialize_from( cls, f # type: Any ): # type: (...) -> Envelope - headers = json.loads(f.readline()) + headers = parse_json(f.readline()) items = [] while 1: item = Item.deserialize_from(f) @@ -236,6 +244,8 @@ def data_category(self): return "transaction" elif ty == "event": return "error" + elif ty == "client_report": + return "internal" else: return "default" @@ -284,11 +294,11 @@ def deserialize_from( line = f.readline().rstrip() if not line: return None - headers = json.loads(line) + headers = parse_json(line) length = headers["length"] payload = f.read(length) if headers.get("type") in ("event", "transaction"): - rv = cls(headers=headers, payload=PayloadRef(json=json.loads(payload))) + rv = cls(headers=headers, payload=PayloadRef(json=parse_json(payload))) else: rv = cls(headers=headers, payload=payload) f.readline() diff --git a/sentry_sdk/tracing.py b/sentry_sdk/tracing.py index 4ce25f27c2..749ab63b5b 100644 --- a/sentry_sdk/tracing.py +++ b/sentry_sdk/tracing.py @@ -507,13 +507,22 @@ def finish(self, hub=None): # This transaction is already finished, ignore. return None + hub = hub or self.hub or sentry_sdk.Hub.current + client = hub.client + # This is a de facto proxy for checking if sampled = False if self._span_recorder is None: logger.debug("Discarding transaction because sampled = False") - return None - hub = hub or self.hub or sentry_sdk.Hub.current - client = hub.client + # This is not entirely accurate because discards here are not + # exclusively based on sample rate but also traces sampler, but + # we handle this the same here. + if client and client.transport: + client.transport.record_lost_event( + "sample_rate", data_category="transaction" + ) + + return None if client is None: # We have no client and therefore nowhere to send this transaction. diff --git a/sentry_sdk/transport.py b/sentry_sdk/transport.py index a254b4f6ee..bcaebf37b7 100644 --- a/sentry_sdk/transport.py +++ b/sentry_sdk/transport.py @@ -4,12 +4,14 @@ import urllib3 # type: ignore import certifi import gzip +import time from datetime import datetime, timedelta +from collections import defaultdict from sentry_sdk.utils import Dsn, logger, capture_internal_exceptions, json_dumps from sentry_sdk.worker import BackgroundWorker -from sentry_sdk.envelope import Envelope +from sentry_sdk.envelope import Envelope, Item, PayloadRef from sentry_sdk._types import MYPY @@ -22,6 +24,7 @@ from typing import Tuple from typing import Type from typing import Union + from typing import DefaultDict from urllib3.poolmanager import PoolManager # type: ignore from urllib3.poolmanager import ProxyManager @@ -92,6 +95,18 @@ def kill(self): """Forcefully kills the transport.""" pass + def record_lost_event( + self, + reason, # type: str + data_category=None, # type: Optional[str] + item=None, # type: Optional[Item] + ): + # type: (...) -> None + """This increments a counter for event loss by reason and + data category. + """ + return None + def __del__(self): # type: () -> None try: @@ -126,11 +141,15 @@ def __init__( Transport.__init__(self, options) assert self.parsed_dsn is not None - self.options = options + self.options = options # type: Dict[str, Any] self._worker = BackgroundWorker(queue_size=options["transport_queue_size"]) self._auth = self.parsed_dsn.to_auth("sentry.python/%s" % VERSION) self._disabled_until = {} # type: Dict[DataCategory, datetime] self._retry = urllib3.util.Retry() + self._discarded_events = defaultdict( + int + ) # type: DefaultDict[Tuple[str, str], int] + self._last_client_report_sent = time.time() self._pool = self._make_pool( self.parsed_dsn, @@ -143,6 +162,28 @@ def __init__( self.hub_cls = Hub + def record_lost_event( + self, + reason, # type: str + data_category=None, # type: Optional[str] + item=None, # type: Optional[Item] + ): + # type: (...) -> None + if not self.options["send_client_reports"]: + return + + quantity = 1 + if item is not None: + data_category = item.data_category + if data_category == "attachment": + # quantity of 0 is actually 1 as we do not want to count + # empty attachments as actually empty. + quantity = len(item.get_bytes()) or 1 + elif data_category is None: + raise TypeError("data category not provided") + + self._discarded_events[data_category, reason] += quantity + def _update_rate_limits(self, response): # type: (urllib3.HTTPResponse) -> None @@ -167,8 +208,18 @@ def _send_request( body, # type: bytes headers, # type: Dict[str, str] endpoint_type="store", # type: EndpointType + envelope=None, # type: Optional[Envelope] ): # type: (...) -> None + + def record_loss(reason): + # type: (str) -> None + if envelope is None: + self.record_lost_event(reason, data_category="error") + else: + for item in envelope.items: + self.record_lost_event(reason, item=item) + headers.update( { "User-Agent": str(self._auth.client), @@ -184,6 +235,7 @@ def _send_request( ) except Exception: self.on_dropped_event("network") + record_loss("network_error") raise try: @@ -191,7 +243,9 @@ def _send_request( if response.status == 429: # if we hit a 429. Something was rate limited but we already - # acted on this in `self._update_rate_limits`. + # acted on this in `self._update_rate_limits`. Note that we + # do not want to record event loss here as we will have recorded + # an outcome in relay already. self.on_dropped_event("status_429") pass @@ -202,12 +256,50 @@ def _send_request( response.data, ) self.on_dropped_event("status_{}".format(response.status)) + record_loss("network_error") finally: response.close() def on_dropped_event(self, reason): # type: (str) -> None - pass + return None + + def _fetch_pending_client_report(self, force=False, interval=60): + # type: (bool, int) -> Optional[Item] + if not self.options["send_client_reports"]: + return None + + if not (force or self._last_client_report_sent < time.time() - interval): + return None + + discarded_events = self._discarded_events + self._discarded_events = defaultdict(int) + self._last_client_report_sent = time.time() + + if not discarded_events: + return None + + return Item( + PayloadRef( + json={ + "timestamp": time.time(), + "discarded_events": [ + {"reason": reason, "category": category, "quantity": quantity} + for ( + (category, reason), + quantity, + ) in discarded_events.items() + ], + } + ), + type="client_report", + ) + + def _flush_client_reports(self, force=False): + # type: (bool) -> None + client_report = self._fetch_pending_client_report(force=force, interval=60) + if client_report is not None: + self.capture_envelope(Envelope(items=[client_report])) def _check_disabled(self, category): # type: (str) -> bool @@ -225,6 +317,7 @@ def _send_event( if self._check_disabled("error"): self.on_dropped_event("self_rate_limits") + self.record_lost_event("ratelimit_backoff", data_category="error") return None body = io.BytesIO() @@ -254,12 +347,28 @@ def _send_envelope( # type: (...) -> None # remove all items from the envelope which are over quota - envelope.items[:] = [ - x for x in envelope.items if not self._check_disabled(x.data_category) - ] + new_items = [] + for item in envelope.items: + if self._check_disabled(item.data_category): + if item.data_category in ("transaction", "error", "default"): + self.on_dropped_event("self_rate_limits") + self.record_lost_event("ratelimit_backoff", item=item) + else: + new_items.append(item) + + envelope.items[:] = new_items if not envelope.items: return None + # since we're already in the business of sending out an envelope here + # check if we have one pending for the stats session envelopes so we + # can attach it to this enveloped scheduled for sending. This will + # currently typically attach the client report to the most recent + # session update. + client_report_item = self._fetch_pending_client_report(interval=30) + if client_report_item is not None: + envelope.items.append(client_report_item) + body = io.BytesIO() with gzip.GzipFile(fileobj=body, mode="w") as f: envelope.serialize_into(f) @@ -271,6 +380,7 @@ def _send_envelope( self.parsed_dsn.project_id, self.parsed_dsn.host, ) + self._send_request( body.getvalue(), headers={ @@ -278,6 +388,7 @@ def _send_envelope( "Content-Encoding": "gzip", }, endpoint_type="envelope", + envelope=envelope, ) return None @@ -337,9 +448,11 @@ def send_event_wrapper(): with hub: with capture_internal_exceptions(): self._send_event(event) + self._flush_client_reports() if not self._worker.submit(send_event_wrapper): self.on_dropped_event("full_queue") + self.record_lost_event("queue_overflow", data_category="error") def capture_envelope( self, envelope # type: Envelope @@ -352,9 +465,12 @@ def send_envelope_wrapper(): with hub: with capture_internal_exceptions(): self._send_envelope(envelope) + self._flush_client_reports() if not self._worker.submit(send_envelope_wrapper): self.on_dropped_event("full_queue") + for item in envelope.items: + self.record_lost_event("queue_overflow", item=item) def flush( self, @@ -363,7 +479,9 @@ def flush( ): # type: (...) -> None logger.debug("Flushing HTTP transport") + if timeout > 0: + self._worker.submit(lambda: self._flush_client_reports(force=True)) self._worker.flush(timeout, callback) def kill(self): diff --git a/tests/test_transport.py b/tests/test_transport.py index 96145eb951..0ce155e6e6 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -1,21 +1,77 @@ # coding: utf-8 import logging import pickle +import gzip +import io from datetime import datetime, timedelta import pytest +from collections import namedtuple +from werkzeug.wrappers import Request, Response -from sentry_sdk import Hub, Client, add_breadcrumb, capture_message +from pytest_localserver.http import WSGIServer + +from sentry_sdk import Hub, Client, add_breadcrumb, capture_message, Scope from sentry_sdk.transport import _parse_rate_limits +from sentry_sdk.envelope import Envelope, parse_json from sentry_sdk.integrations.logging import LoggingIntegration +CapturedData = namedtuple("CapturedData", ["path", "event", "envelope"]) + + +class CapturingServer(WSGIServer): + def __init__(self, host="127.0.0.1", port=0, ssl_context=None): + WSGIServer.__init__(self, host, port, self, ssl_context=ssl_context) + self.code = 204 + self.headers = {} + self.captured = [] + + def respond_with(self, code=200, headers=None): + self.code = code + if headers: + self.headers = headers + + def clear_captured(self): + del self.captured[:] + + def __call__(self, environ, start_response): + """ + This is the WSGI application. + """ + request = Request(environ) + event = envelope = None + if request.mimetype == "application/json": + event = parse_json(gzip.GzipFile(fileobj=io.BytesIO(request.data)).read()) + else: + envelope = Envelope.deserialize_from( + gzip.GzipFile(fileobj=io.BytesIO(request.data)) + ) + + self.captured.append( + CapturedData(path=request.path, event=event, envelope=envelope) + ) + + response = Response(status=self.code) + response.headers.extend(self.headers) + return response(environ, start_response) + + @pytest.fixture -def make_client(request, httpserver): +def capturing_server(request): + server = CapturingServer() + server.start() + request.addfinalizer(server.stop) + return server + + +@pytest.fixture +def make_client(request, capturing_server): def inner(**kwargs): return Client( - "http://foobar@{}/132".format(httpserver.url[len("http://") :]), **kwargs + "http://foobar@{}/132".format(capturing_server.url[len("http://") :]), + **kwargs ) return inner @@ -26,7 +82,7 @@ def inner(**kwargs): @pytest.mark.parametrize("client_flush_method", ["close", "flush"]) @pytest.mark.parametrize("use_pickle", (True, False)) def test_transport_works( - httpserver, + capturing_server, request, capsys, caplog, @@ -36,7 +92,6 @@ def test_transport_works( use_pickle, maybe_monkeypatched_threading, ): - httpserver.serve_content("ok", 200) caplog.set_level(logging.DEBUG) client = make_client(debug=debug) @@ -53,14 +108,12 @@ def test_transport_works( out, err = capsys.readouterr() assert not err and not out - assert httpserver.requests + assert capturing_server.captured assert any("Sending event" in record.msg for record in caplog.records) == debug -def test_transport_infinite_loop(httpserver, request, make_client): - httpserver.serve_content("ok", 200) - +def test_transport_infinite_loop(capturing_server, request, make_client): client = make_client( debug=True, # Make sure we cannot create events from our own logging @@ -71,7 +124,7 @@ def test_transport_infinite_loop(httpserver, request, make_client): capture_message("hi") client.flush() - assert len(httpserver.requests) == 1 + assert len(capturing_server.captured) == 1 NOW = datetime(2014, 6, 2) @@ -109,16 +162,16 @@ def test_parse_rate_limits(input, expected): assert dict(_parse_rate_limits(input, now=NOW)) == expected -def test_simple_rate_limits(httpserver, capsys, caplog, make_client): +def test_simple_rate_limits(capturing_server, capsys, caplog, make_client): client = make_client() - httpserver.serve_content("no", 429, headers={"Retry-After": "4"}) + capturing_server.respond_with(code=429, headers={"Retry-After": "4"}) client.capture_event({"type": "transaction"}) client.flush() - assert len(httpserver.requests) == 1 - assert httpserver.requests[0].url.endswith("/api/132/envelope/") - del httpserver.requests[:] + assert len(capturing_server.captured) == 1 + assert capturing_server.captured[0].path == "/api/132/envelope/" + capturing_server.clear_captured() assert set(client.transport._disabled_until) == set([None]) @@ -126,24 +179,35 @@ def test_simple_rate_limits(httpserver, capsys, caplog, make_client): client.capture_event({"type": "event"}) client.flush() - assert not httpserver.requests + assert not capturing_server.captured @pytest.mark.parametrize("response_code", [200, 429]) -def test_data_category_limits(httpserver, capsys, caplog, response_code, make_client): - client = make_client() - httpserver.serve_content( - "hm", - response_code, +def test_data_category_limits( + capturing_server, capsys, caplog, response_code, make_client, monkeypatch +): + client = make_client(send_client_reports=False) + + captured_outcomes = [] + + def record_lost_event(reason, data_category=None, item=None): + if data_category is None: + data_category = item.data_category + return captured_outcomes.append((reason, data_category)) + + monkeypatch.setattr(client.transport, "record_lost_event", record_lost_event) + + capturing_server.respond_with( + code=response_code, headers={"X-Sentry-Rate-Limits": "4711:transaction:organization"}, ) client.capture_event({"type": "transaction"}) client.flush() - assert len(httpserver.requests) == 1 - assert httpserver.requests[0].url.endswith("/api/132/envelope/") - del httpserver.requests[:] + assert len(capturing_server.captured) == 1 + assert capturing_server.captured[0].path == "/api/132/envelope/" + capturing_server.clear_captured() assert set(client.transport._disabled_until) == set(["transaction"]) @@ -151,31 +215,119 @@ def test_data_category_limits(httpserver, capsys, caplog, response_code, make_cl client.capture_event({"type": "transaction"}) client.flush() - assert not httpserver.requests + assert not capturing_server.captured client.capture_event({"type": "event"}) client.flush() - assert len(httpserver.requests) == 1 + assert len(capturing_server.captured) == 1 + assert capturing_server.captured[0].path == "/api/132/store/" + + assert captured_outcomes == [ + ("ratelimit_backoff", "transaction"), + ("ratelimit_backoff", "transaction"), + ] + + +@pytest.mark.parametrize("response_code", [200, 429]) +def test_data_category_limits_reporting( + capturing_server, capsys, caplog, response_code, make_client, monkeypatch +): + client = make_client(send_client_reports=True) + + capturing_server.respond_with( + code=response_code, + headers={ + "X-Sentry-Rate-Limits": "4711:transaction:organization, 4711:attachment:organization" + }, + ) + + outcomes_enabled = False + real_fetch = client.transport._fetch_pending_client_report + + def intercepting_fetch(*args, **kwargs): + if outcomes_enabled: + return real_fetch(*args, **kwargs) + + monkeypatch.setattr( + client.transport, "_fetch_pending_client_report", intercepting_fetch + ) + # get rid of threading making things hard to track + monkeypatch.setattr(client.transport._worker, "submit", lambda x: x() or True) + + client.capture_event({"type": "transaction"}) + client.flush() + + assert len(capturing_server.captured) == 1 + assert capturing_server.captured[0].path == "/api/132/envelope/" + capturing_server.clear_captured() + + assert set(client.transport._disabled_until) == set(["attachment", "transaction"]) + + client.capture_event({"type": "transaction"}) + client.capture_event({"type": "transaction"}) + capturing_server.clear_captured() + + # flush out the events but don't flush the client reports + client.flush() + client.transport._last_client_report_sent = 0 + outcomes_enabled = True + + scope = Scope() + scope.add_attachment(bytes=b"Hello World", filename="hello.txt") + client.capture_event({"type": "error"}, scope=scope) + client.flush() + + # this goes out with an extra envelope because it's flushed after the last item + # that is normally in the queue. This is quite funny in a way beacuse it means + # that the envelope that caused its own over quota report (an error with an + # attachment) will include its outcome since it's pending. + assert len(capturing_server.captured) == 1 + envelope = capturing_server.captured[0].envelope + assert envelope.items[0].type == "event" + assert envelope.items[1].type == "client_report" + report = parse_json(envelope.items[1].get_bytes()) + assert sorted(report["discarded_events"], key=lambda x: x["quantity"]) == [ + {"category": "transaction", "reason": "ratelimit_backoff", "quantity": 2}, + {"category": "attachment", "reason": "ratelimit_backoff", "quantity": 11}, + ] + capturing_server.clear_captured() + + # here we sent a normal event + client.capture_event({"type": "transaction"}) + client.capture_event({"type": "error", "release": "foo"}) + client.flush() + + assert len(capturing_server.captured) == 2 + + event = capturing_server.captured[0].event + assert event["type"] == "error" + assert event["release"] == "foo" + + envelope = capturing_server.captured[1].envelope + assert envelope.items[0].type == "client_report" + report = parse_json(envelope.items[0].get_bytes()) + assert report["discarded_events"] == [ + {"category": "transaction", "reason": "ratelimit_backoff", "quantity": 1}, + ] @pytest.mark.parametrize("response_code", [200, 429]) def test_complex_limits_without_data_category( - httpserver, capsys, caplog, response_code, make_client + capturing_server, capsys, caplog, response_code, make_client ): client = make_client() - httpserver.serve_content( - "hm", - response_code, + capturing_server.respond_with( + code=response_code, headers={"X-Sentry-Rate-Limits": "4711::organization"}, ) client.capture_event({"type": "transaction"}) client.flush() - assert len(httpserver.requests) == 1 - assert httpserver.requests[0].url.endswith("/api/132/envelope/") - del httpserver.requests[:] + assert len(capturing_server.captured) == 1 + assert capturing_server.captured[0].path == "/api/132/envelope/" + capturing_server.clear_captured() assert set(client.transport._disabled_until) == set([None]) @@ -184,4 +336,4 @@ def test_complex_limits_without_data_category( client.capture_event({"type": "event"}) client.flush() - assert len(httpserver.requests) == 0 + assert len(capturing_server.captured) == 0