Skip to content

Commit 0033852

Browse files
committed
log delivery to cwl in provider account
1 parent e700790 commit 0033852

File tree

4 files changed

+266
-7
lines changed

4 files changed

+266
-7
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import logging
2+
import time
3+
from typing import Any, Mapping
4+
5+
# boto3 doesn't have stub files
6+
import boto3 # type: ignore
7+
8+
9+
class ProviderLogHandler(logging.Handler):
10+
def __init__(self, group: str, stream: str, creds: Mapping[str, str]):
11+
logging.Handler.__init__(self)
12+
self.group = group
13+
self.stream = stream.replace(":", "__")
14+
self.client = boto3.client("logs", **creds)
15+
self.sequence_token = ""
16+
17+
@classmethod
18+
def setup(cls, event_data: Mapping[str, Any]) -> None:
19+
log_creds = event_data.get("requestData", {}).get("providerCredentials", {})
20+
log_group = event_data.get("requestData", {}).get("providerLogGroupName", "")
21+
stream_prefix = event_data.get(
22+
"stackId", f'{event_data.get("awsAccountId")}-{event_data.get("region")}'
23+
)
24+
stream_suffix = event_data.get("requestData", {}).get(
25+
"logicalResourceId", event_data.get("action")
26+
)
27+
if log_creds and log_group:
28+
log_handler = cls(
29+
group=log_group,
30+
stream=f"{stream_prefix}/{stream_suffix}",
31+
creds={
32+
"aws_access_key_id": log_creds["accessKeyId"],
33+
"aws_secret_access_key": log_creds["secretAccessKey"],
34+
"aws_session_token": log_creds["sessionToken"],
35+
},
36+
)
37+
logging.getLogger().addHandler(log_handler)
38+
39+
def _create_log_group(self) -> None:
40+
try:
41+
self.client.create_log_group(logGroupName=self.group)
42+
except self.client.exceptions.ResourceAlreadyExistsException:
43+
pass
44+
45+
def _create_log_stream(self) -> None:
46+
try:
47+
self.client.create_log_stream(
48+
logGroupName=self.group, logStreamName=self.stream
49+
)
50+
except self.client.exceptions.ResourceAlreadyExistsException:
51+
pass
52+
53+
def _put_log_event(self, msg: logging.LogRecord) -> None:
54+
kwargs = {
55+
"logGroupName": self.group,
56+
"logStreamName": self.stream,
57+
"logEvents": [
58+
{
59+
"timestamp": int(round(time.time() * 1000)),
60+
"message": self.format(msg),
61+
}
62+
],
63+
}
64+
if self.sequence_token:
65+
kwargs["sequenceToken"] = self.sequence_token
66+
try:
67+
self.sequence_token = self.client.put_log_events(**kwargs)[
68+
"nextSequenceToken"
69+
]
70+
except (
71+
self.client.exceptions.DataAlreadyAcceptedException,
72+
self.client.exceptions.InvalidSequenceTokenException,
73+
) as e:
74+
self.sequence_token = str(e).split(" ")[-1]
75+
self._put_log_event(msg)
76+
77+
def emit(self, record: logging.LogRecord) -> None:
78+
try:
79+
self._put_log_event(record)
80+
except self.client.exceptions.ResourceNotFoundException as e:
81+
if "log group does not exist" in str(e):
82+
self._create_log_group()
83+
self._create_log_stream()
84+
self._put_log_event(record)

src/aws_cloudformation_rpdk_python_lib/resource.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
ResourceHandlerRequest,
1313
T,
1414
)
15+
from .log_delivery import ProviderLogHandler
1516
from .utils import (
1617
Credentials,
1718
HandlerRequest,
@@ -144,6 +145,7 @@ def __call__(
144145
self, event_data: MutableMapping[str, Any], _context: Any
145146
) -> MutableMapping[str, Any]:
146147
try:
148+
ProviderLogHandler.setup(event_data)
147149
parsed = self._parse_request(event_data)
148150
session, request, action, callback_context = parsed
149151
progress_event = self._invoke_handler(

tests/lib/log_delivery_test.py

Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
# pylint: disable=redefined-outer-name,protected-access
2+
import logging
3+
from unittest.mock import DEFAULT, Mock, create_autospec, patch
4+
5+
import pytest
6+
from aws_cloudformation_rpdk_python_lib.log_delivery import ProviderLogHandler
7+
8+
9+
@pytest.fixture
10+
def mock_logger():
11+
return create_autospec(logging.getLogger())
12+
13+
14+
@pytest.fixture
15+
def mock_provider_handler():
16+
patch("aws_cloudformation_rpdk_python_lib.log_delivery.boto3.client", autospec=True)
17+
plh = ProviderLogHandler(
18+
group="test-group",
19+
stream="test-stream",
20+
creds={
21+
"aws_access_key_id": "",
22+
"aws_secret_access_key": "",
23+
"aws_session_token": "",
24+
},
25+
)
26+
for method in ["create_log_group", "create_log_stream", "put_log_events"]:
27+
setattr(plh.client, method, Mock(auto_spec=True))
28+
return plh
29+
30+
31+
def test_setup_with_provider_creds(mock_logger):
32+
payload = {
33+
"requestData": {
34+
"providerCredentials": {
35+
"accessKeyId": "AKI",
36+
"secretAccessKey": "SAK",
37+
"sessionToken": "ST",
38+
},
39+
"providerLogGroupName": "test_group",
40+
}
41+
}
42+
with patch(
43+
"aws_cloudformation_rpdk_python_lib.log_delivery.logging.getLogger",
44+
return_value=mock_logger,
45+
) as patched_logger:
46+
with patch(
47+
"aws_cloudformation_rpdk_python_lib.log_delivery.boto3.client",
48+
autospec=True,
49+
) as mock_client:
50+
ProviderLogHandler.setup(payload)
51+
mock_client.assert_called_once_with(
52+
"logs",
53+
aws_access_key_id="AKI",
54+
aws_secret_access_key="SAK",
55+
aws_session_token="ST",
56+
)
57+
patched_logger.return_value.addHandler.assert_called_once()
58+
59+
60+
def test_setup_without_provider_creds(mock_logger):
61+
with patch(
62+
"aws_cloudformation_rpdk_python_lib.log_delivery.logging.getLogger",
63+
return_value=mock_logger,
64+
) as patched_logger:
65+
with patch(
66+
"aws_cloudformation_rpdk_python_lib.log_delivery.ProviderLogHandler"
67+
".__init__",
68+
autospec=True,
69+
) as mock___init__:
70+
ProviderLogHandler.setup({})
71+
ProviderLogHandler.setup({"requestData": {}})
72+
ProviderLogHandler.setup({"requestData": {"providerLogGroupName": "test"}})
73+
ProviderLogHandler.setup(
74+
{
75+
"requestData": {
76+
"providerCredentials": {
77+
"accessKeyId": "AKI",
78+
"secretAccessKey": "SAK",
79+
"sessionToken": "ST",
80+
}
81+
}
82+
}
83+
)
84+
mock___init__.assert_not_called()
85+
patched_logger.return_value.addHandler.assert_not_called()
86+
87+
88+
@pytest.mark.parametrize("create_method", ["_create_log_group", "_create_log_stream"])
89+
def test__create_success(mock_provider_handler, create_method):
90+
getattr(mock_provider_handler, create_method)()
91+
getattr(mock_provider_handler.client, create_method[1:]).assert_called_once()
92+
93+
94+
@pytest.mark.parametrize("create_method", ["_create_log_group", "_create_log_stream"])
95+
def test__create_already_exists(mock_provider_handler, create_method):
96+
mock_logs_method = getattr(mock_provider_handler.client, create_method[1:])
97+
exc = mock_provider_handler.client.exceptions.ResourceAlreadyExistsException
98+
mock_logs_method.side_effect = exc({}, operation_name="Test")
99+
# should not raise an exception if the log group already exists
100+
getattr(mock_provider_handler, create_method)()
101+
mock_logs_method.assert_called_once()
102+
103+
104+
@pytest.mark.parametrize("sequence_token", [None, "some-seq"])
105+
def test__put_log_event_success(mock_provider_handler, sequence_token):
106+
mock_provider_handler.sequence_token = sequence_token
107+
mock_put = mock_provider_handler.client.put_log_events
108+
mock_put.return_value = {"nextSequenceToken": "some-other-seq"}
109+
mock_provider_handler._put_log_event(
110+
logging.LogRecord("a", 123, "/", 234, "log-msg", [], False)
111+
)
112+
mock_put.assert_called_once()
113+
114+
115+
def test__put_log_event_invalid_token(mock_provider_handler):
116+
exc = mock_provider_handler.client.exceptions
117+
mock_put = mock_provider_handler.client.put_log_events
118+
mock_put.return_value = {"nextSequenceToken": "some-other-seq"}
119+
mock_put.side_effect = [
120+
exc.InvalidSequenceTokenException({}, operation_name="Test"),
121+
exc.DataAlreadyAcceptedException({}, operation_name="Test"),
122+
DEFAULT,
123+
]
124+
mock_provider_handler._put_log_event(
125+
logging.LogRecord("a", 123, "/", 234, "log-msg", [], False)
126+
)
127+
assert mock_put.call_count == 3
128+
129+
130+
def test_emit_existing_cwl_group_stream(mock_provider_handler):
131+
mock_provider_handler._put_log_event = Mock()
132+
mock_provider_handler.emit(
133+
logging.LogRecord("a", 123, "/", 234, "log-msg", [], False)
134+
)
135+
mock_provider_handler._put_log_event.assert_called_once()
136+
137+
138+
def test_emit_no_group_stream(mock_provider_handler):
139+
exc = mock_provider_handler.client.exceptions.ResourceNotFoundException
140+
group_exc = exc(
141+
{"Error": {"Message": "log group does not exist"}},
142+
operation_name="PutLogRecords",
143+
)
144+
mock_provider_handler._put_log_event = Mock()
145+
mock_provider_handler._put_log_event.side_effect = [group_exc, DEFAULT]
146+
mock_provider_handler._create_log_group = Mock()
147+
mock_provider_handler._create_log_stream = Mock()
148+
mock_provider_handler.emit(
149+
logging.LogRecord("a", 123, "/", 234, "log-msg", [], False)
150+
)
151+
assert mock_provider_handler._put_log_event.call_count == 2
152+
mock_provider_handler._create_log_group.assert_called_once()
153+
mock_provider_handler._create_log_stream.assert_called_once()
154+
155+
# create_group should not be called again if the group already exists
156+
stream_exc = exc(
157+
{"Error": {"Message": "log stream does not exist"}},
158+
operation_name="PutLogRecords",
159+
)
160+
mock_provider_handler._put_log_event.side_effect = [stream_exc, DEFAULT]
161+
mock_provider_handler.emit(
162+
logging.LogRecord("a", 123, "/", 234, "log-msg", [], False)
163+
)
164+
assert mock_provider_handler._put_log_event.call_count == 4
165+
mock_provider_handler._create_log_group.assert_called_once()
166+
assert mock_provider_handler._create_log_stream.call_count == 2

tests/lib/resource_test.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,10 @@ def patch_and_raise(resource, str_to_patch, exc_cls, entrypoint):
6767

6868

6969
def test_entrypoint_handler_error(resource):
70-
event = resource.__call__.__wrapped__(
71-
resource, {}, None
72-
) # pylint: disable=no-member
70+
with patch("aws_cloudformation_rpdk_python_lib.resource.ProviderLogHandler.setup"):
71+
event = resource.__call__.__wrapped__( # pylint: disable=no-member
72+
resource, {}, None
73+
)
7374
assert event["operationStatus"] == OperationStatus.FAILED.value
7475
assert event["errorCode"] == HandlerErrorCode.InvalidRequest
7576

@@ -79,9 +80,14 @@ def test_entrypoint_success():
7980
event = ProgressEvent(status=OperationStatus.SUCCESS, message="")
8081
mock_handler = resource.handler(Action.CREATE)(Mock(return_value=event))
8182

82-
event = resource.__call__.__wrapped__( # pylint: disable=no-member
83-
resource, ENTRYPOINT_PAYLOAD, None
84-
)
83+
with patch(
84+
"aws_cloudformation_rpdk_python_lib.resource.ProviderLogHandler.setup"
85+
) as mock_log_delivery:
86+
event = resource.__call__.__wrapped__( # pylint: disable=no-member
87+
resource, ENTRYPOINT_PAYLOAD, None
88+
)
89+
mock_log_delivery.assert_called_once()
90+
8591
assert event == {
8692
"message": "",
8793
"bearerToken": "123456",
@@ -130,7 +136,8 @@ def test__parse_request_valid_request():
130136

131137
@pytest.mark.parametrize("exc_cls", [Exception, BaseException])
132138
def test_entrypoint_uncaught_exception(resource, exc_cls):
133-
event = patch_and_raise(resource, "_parse_request", exc_cls, resource.__call__)
139+
with patch("aws_cloudformation_rpdk_python_lib.resource.ProviderLogHandler.setup"):
140+
event = patch_and_raise(resource, "_parse_request", exc_cls, resource.__call__)
134141
assert event["operationStatus"] == OperationStatus.FAILED
135142
assert event["errorCode"] == HandlerErrorCode.InternalFailure
136143
assert event["message"] == "hahaha"

0 commit comments

Comments
 (0)