Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dynatrace_extension/__about__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
# SPDX-License-Identifier: MIT


__version__ = "1.3.1"
__version__ = "1.4.0"
49 changes: 40 additions & 9 deletions dynatrace_extension/sdk/extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ def __init__(self, name: str = "") -> None:
"timediff": datetime.now() + TIME_DIFF_INTERVAL,
"heartbeat": datetime.now() + HEARTBEAT_INTERVAL,
"metrics": datetime.now() + METRIC_SENDING_INTERVAL,
"events": datetime.now() + METRIC_SENDING_INTERVAL,
"sfm_metrics": datetime.now() + SFM_METRIC_SENDING_INTERVAL,
}

Expand All @@ -227,6 +228,10 @@ def __init__(self, name: str = "") -> None:
self._metrics_lock = RLock()
self._metrics: List[str] = []

# Extension logs
self._logs_lock = RLock()
self._logs: List[dict] = []

# Self monitoring metrics
self._sfm_metrics_lock = Lock()
self._callbackSfmReport: Dict[str, WrappedCallback] = {}
Expand Down Expand Up @@ -505,6 +510,7 @@ def report_event(
properties: Optional[dict] = None,
timestamp: Optional[datetime] = None,
severity: Union[Severity, str] = Severity.INFO,
send_immediately: bool = False,
) -> None:
"""Report an event using log ingest.

Expand All @@ -514,6 +520,7 @@ def report_event(
properties: A dictionary of extra event properties
timestamp: The timestamp of the event, defaults to the current time
severity: The severity of the event, defaults to Severity.INFO
send_immediately: Option to directly schedule log to be sent without batching
"""
if timestamp is None:
timestamp = datetime.now(tz=timezone.utc)
Expand All @@ -530,7 +537,7 @@ def report_event(
**self._metadata,
**properties,
}
self._send_events(event)
self._send_events(event, send_immediately=send_immediately)

def report_dt_event(
self,
Expand Down Expand Up @@ -635,33 +642,36 @@ def report_dt_event_dict(self, event: dict):
raise ValueError(msg)
self._send_dt_event(event)

def report_log_event(self, log_event: dict):
def report_log_event(self, log_event: dict, send_immediately: bool = False):
"""Report a custom log event using log ingest.

Note:
See reference: https://www.dynatrace.com/support/help/shortlink/log-monitoring-log-data-ingestion

Args:
log_event: The log event dictionary.
send_immediately: Option to directly schedule log to be sent without batching
"""
self._send_events(log_event)
self._send_events(log_event, send_immediately=send_immediately)

def report_log_events(self, log_events: List[dict]):
def report_log_events(self, log_events: List[dict], send_immediately: bool = False):
"""Report a list of custom log events using log ingest.

Args:
log_events: The list of log events
send_immediately: Option to directly schedule log to be sent without batching
"""
self._send_events(log_events)
self._send_events(log_events, send_immediately=send_immediately)

def report_log_lines(self, log_lines: List[Union[str, bytes]]):
def report_log_lines(self, log_lines: List[Union[str, bytes]], send_immediately: bool = False):
"""Report a list of log lines using log ingest

Args:
log_lines: The list of log lines
send_immediately: Option to directly schedule log to be sent without batching
"""
events = [{"content": line} for line in log_lines]
self._send_events(events)
self._send_events(events, send_immediately=send_immediately)

@property
def enabled_feature_sets(self) -> dict[str, list[str]]:
Expand Down Expand Up @@ -819,6 +829,7 @@ def _start_extension_loop(self):
for callback in self._scheduled_callbacks_before_run:
self._schedule_callback(callback)
self._metrics_iteration()
self._events_iteration()
self._sfm_metrics_iteration()
self._timediff_iteration()
self._scheduler.run()
Expand All @@ -838,6 +849,11 @@ def _metrics_iteration(self):
next_timestamp = self._get_and_set_next_internal_callback_timestamp("metrics", METRIC_SENDING_INTERVAL)
self._scheduler.enterabs(next_timestamp, 1, self._metrics_iteration)

def _events_iteration(self):
self._internal_executor.submit(self._send_buffered_events)
next_timestamp = self._get_and_set_next_internal_callback_timestamp("events", METRIC_SENDING_INTERVAL)
self._scheduler.enterabs(next_timestamp, 1, self._events_iteration)

def _sfm_metrics_iteration(self):
self._internal_executor.submit(self._send_sfm_metrics)
next_timestamp = self._get_and_set_next_internal_callback_timestamp("sfm_metrics", SFM_METRIC_SENDING_INTERVAL)
Expand Down Expand Up @@ -1044,8 +1060,23 @@ def _send_events_internal(self, events: Union[dict, List[dict]]):
with self._internal_callbacks_results_lock:
self._internal_callbacks_results[self._send_events.__name__] = Status(StatusValue.GENERIC_ERROR, str(e))

def _send_events(self, events: Union[dict, List[dict]]):
self._internal_executor.submit(self._send_events_internal, events)
def _send_events(self, events: Union[dict, List[dict]], send_immediately: bool = False):
if send_immediately:
self._internal_executor.submit(self._send_events_internal, events)
return
with self._logs_lock:
if isinstance(events, dict):
self._logs.append(events)
elif isinstance(events, list):
self._logs.extend(events)
else:
self.logger.error(f'Invalid log format: {events}')

def _send_buffered_events(self):
with self._logs_lock:
if len(self._logs) > 0:
self._send_events_internal(self._logs)
self._logs = []

def _send_dt_event(self, event: dict[str, str | int | dict[str, str]]):
self._client.send_dt_event(event)
Expand Down
45 changes: 45 additions & 0 deletions tests/sdk/test_extension.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,50 @@ def test_metrics_flushed(self):
with extension._metrics_lock:
self.assertEqual(len(extension._metrics), 0)

def test_add_event(self):
extension = Extension()
extension.logger = MagicMock()
extension._running_in_sim = True
extension.report_event("my_event1", "my_description")
extension.report_event("my_event1", "my_description")
self.assertEqual(len(extension._logs), 2)
self.assertEqual(extension._logs[0].get("content", "not_good"), f"my_event1\nmy_description")

def test_send_event_immediately(self):
extension = Extension()
extension.logger = MagicMock()
extension._running_in_sim = True
extension.report_event("my_event1", "my_description", send_immediately=True)
extension.report_event("my_event1", "my_description", send_immediately=True)
self.assertEqual(len(extension._logs), 0)

def test_add_log_events(self):
extension = Extension()
extension.logger = MagicMock()
extension._running_in_sim = True
extension.report_log_events([{"my_event": "hello there!"}, {"my_event": "hello there!"}])
self.assertEqual(len(extension._logs), 2)
self.assertEqual(extension._logs[0].get("my_event", "not_good"), "hello there!")

def test_send_log_events_immediately(self):
extension = Extension()
extension.logger = MagicMock()
extension._running_in_sim = True
extension.report_log_events([{"my_event": "hello there!"}, {"my_event": "hello there!"}], send_immediately=True)
self.assertEqual(len(extension._logs), 0)

def test_flush_events(self):
extension = Extension()
extension.logger = MagicMock()
extension._running_in_sim = True
extension.report_event("my_event1", "my_description")
extension.report_event("my_event1", "my_description")
self.assertEqual(len(extension._logs), 2)
extension._events_iteration()
time.sleep(0.01)
with extension._logs_lock:
self.assertEqual(len(extension._logs), 0)

def test_callback(self):
extension = Extension()
extension.logger = MagicMock()
Expand Down Expand Up @@ -453,6 +497,7 @@ def test_sfm_ok(self):
extension._running_in_sim = True
extension._is_fastcheck = False
extension._client = MagicMock()

# extension._run_callback = MagicMock()

def callback():
Expand Down