diff --git a/aikido_zen/__init__.py b/aikido_zen/__init__.py index c65ba3509..d0150969d 100644 --- a/aikido_zen/__init__.py +++ b/aikido_zen/__init__.py @@ -7,6 +7,7 @@ from aikido_zen.background_process.test_uds_file_access import test_uds_file_access # Re-export functions : +from aikido_zen.lambda_helper import protect_lambda from aikido_zen.context.users import set_user from aikido_zen.helpers.check_gevent import check_gevent from aikido_zen.helpers.python_version_not_supported import python_version_not_supported diff --git a/aikido_zen/aws_lambda/__init__.py b/aikido_zen/aws_lambda/__init__.py deleted file mode 100644 index 35ee53f98..000000000 --- a/aikido_zen/aws_lambda/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -"""Lambda init.py file""" - - -def protect(handler): - """Aikido protect function for the lambda""" - return handler diff --git a/aikido_zen/background_process/__init__.py b/aikido_zen/background_process/__init__.py index f011f5ca5..a3f0d38d4 100644 --- a/aikido_zen/background_process/__init__.py +++ b/aikido_zen/background_process/__init__.py @@ -37,6 +37,8 @@ def start_background_process(): if platform.system() == "Windows": # Python does not support Windows UDS just yet, so we have to rely on INET address = ("127.0.0.1", 49156) + if os.getenv("AIKIDO_INET_ONLY") == "1": + address = ("127.0.0.1", 49156) comms = AikidoIPCCommunications(address, secret_key_bytes) diff --git a/aikido_zen/background_process/aikido_background_process.py b/aikido_zen/background_process/aikido_background_process.py index 881e03b9b..a2a0eee23 100644 --- a/aikido_zen/background_process/aikido_background_process.py +++ b/aikido_zen/background_process/aikido_background_process.py @@ -53,6 +53,7 @@ def __init__(self, address, key): conn = listener.accept() while True: try: + logger.debug("Waiting for data...") data = conn.recv() # because of this no sleep needed in thread logger.debug("Incoming data : %s", data) process_incoming_command( diff --git a/aikido_zen/background_process/commands/__init__.py b/aikido_zen/background_process/commands/__init__.py index 151d25621..62ea69c4e 100644 --- a/aikido_zen/background_process/commands/__init__.py +++ b/aikido_zen/background_process/commands/__init__.py @@ -4,6 +4,7 @@ from .attack import process_attack from .check_firewall_lists import process_check_firewall_lists from .read_property import process_read_property +from .send_heartbeat import process_send_heartbeat from .should_ratelimit import process_should_ratelimit from .ping import process_ping from .sync_data import process_sync_data @@ -12,6 +13,7 @@ # This maps to a tuple : (function, returns_data?) # Commands that don't return data : "ATTACK": (process_attack, False), + "SEND_HEARTBEAT": (process_send_heartbeat, False), # Commands that return data : "SYNC_DATA": (process_sync_data, True), "READ_PROPERTY": (process_read_property, True), diff --git a/aikido_zen/background_process/commands/send_heartbeat.py b/aikido_zen/background_process/commands/send_heartbeat.py new file mode 100644 index 000000000..b2560bcb0 --- /dev/null +++ b/aikido_zen/background_process/commands/send_heartbeat.py @@ -0,0 +1,11 @@ +from aikido_zen.helpers.logging import logger + + +def process_send_heartbeat(connection_manager, data, queue): + """ + SEND_HEARTBEAT: Used by lambdas to flush data. + """ + logger.debug("SEND_HEARTBEAT start [->]") + connection_manager.send_heartbeat() + logger.debug("SEND_HEARTBEAT END [<-]") + return {"status": "Heartbeat sent"} diff --git a/aikido_zen/background_process/comms.py b/aikido_zen/background_process/comms.py index 48b7e402e..a2b907988 100644 --- a/aikido_zen/background_process/comms.py +++ b/aikido_zen/background_process/comms.py @@ -77,6 +77,7 @@ def target(address, key, receive, data, result_obj): result_obj[0] = True # Connection ended gracefully except Exception as e: logger.debug("Exception occurred in thread : %s", e) + logger.debug("Address: %s", address) # Create a shared result object between the thread and this process : result_obj = [False, None] # Needs to be an array so we can make a ref. diff --git a/aikido_zen/lambda_helper/__init__.py b/aikido_zen/lambda_helper/__init__.py new file mode 100644 index 000000000..205931128 --- /dev/null +++ b/aikido_zen/lambda_helper/__init__.py @@ -0,0 +1,47 @@ +from aikido_zen.background_process import get_comms +from aikido_zen.thread.thread_cache import renew as process_cache_renew +from aikido_zen.helpers.logging import logger + +LAMBDA_SEND_HEARTBEAT_TIMEOUT = 500 / 1000 # 500ms + + +def protect_lambda(handler): + """Aikido protect function for the lambda""" + + def wrapper(*args, **kwargs): + + # Before + + rv = handler(*args, **kwargs) + + # After + lambda_post_handler() + + return rv + + return wrapper + + +def lambda_post_handler(): + """ + Lambda post handler, after the lambda is finished, we want to flush the data to core + """ + ipc_client = get_comms() + if not ipc_client: + logger.warning("Lambda: Failed to flush data, no IPC Client available.") + return + + # Flush data from this process + process_cache_renew() + + # Flush data from background process + res = ipc_client.send_data_to_bg_process( + action="SEND_HEARTBEAT", + obj=None, + receive=True, + timeout_in_sec=LAMBDA_SEND_HEARTBEAT_TIMEOUT, + ) + if res["success"]: + logger.info("Lambda: successfully flushed data.") + else: + logger.warning("Lambda: Failed to flush data, error %s.", res["error"])