Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions aikido_zen/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from aikido_zen.background_process.test_uds_file_access import test_uds_file_access

# Re-export functions :
from aikido_zen.lambda_helper import protect_lambda
from aikido_zen.context.users import set_user
from aikido_zen.helpers.check_gevent import check_gevent
from aikido_zen.helpers.python_version_not_supported import python_version_not_supported
Expand Down
6 changes: 0 additions & 6 deletions aikido_zen/aws_lambda/__init__.py

This file was deleted.

2 changes: 2 additions & 0 deletions aikido_zen/background_process/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ def start_background_process():
if platform.system() == "Windows":
# Python does not support Windows UDS just yet, so we have to rely on INET
address = ("127.0.0.1", 49156)
if os.getenv("AIKIDO_INET_ONLY") == "1":
address = ("127.0.0.1", 49156)

comms = AikidoIPCCommunications(address, secret_key_bytes)

Expand Down
1 change: 1 addition & 0 deletions aikido_zen/background_process/aikido_background_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ def __init__(self, address, key):
conn = listener.accept()
while True:
try:
logger.debug("Waiting for data...")
data = conn.recv() # because of this no sleep needed in thread
logger.debug("Incoming data : %s", data)
process_incoming_command(
Expand Down
2 changes: 2 additions & 0 deletions aikido_zen/background_process/commands/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .attack import process_attack
from .check_firewall_lists import process_check_firewall_lists
from .read_property import process_read_property
from .send_heartbeat import process_send_heartbeat
from .should_ratelimit import process_should_ratelimit
from .ping import process_ping
from .sync_data import process_sync_data
Expand All @@ -12,6 +13,7 @@
# This maps to a tuple : (function, returns_data?)
# Commands that don't return data :
"ATTACK": (process_attack, False),
"SEND_HEARTBEAT": (process_send_heartbeat, False),
# Commands that return data :
"SYNC_DATA": (process_sync_data, True),
"READ_PROPERTY": (process_read_property, True),
Expand Down
11 changes: 11 additions & 0 deletions aikido_zen/background_process/commands/send_heartbeat.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from aikido_zen.helpers.logging import logger


def process_send_heartbeat(connection_manager, data, queue):
"""
SEND_HEARTBEAT: Used by lambdas to flush data.
"""
logger.debug("SEND_HEARTBEAT start [->]")
connection_manager.send_heartbeat()
logger.debug("SEND_HEARTBEAT END [<-]")
return {"status": "Heartbeat sent"}
1 change: 1 addition & 0 deletions aikido_zen/background_process/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def target(address, key, receive, data, result_obj):
result_obj[0] = True # Connection ended gracefully
except Exception as e:
logger.debug("Exception occurred in thread : %s", e)
logger.debug("Address: %s", address)

# Create a shared result object between the thread and this process :
result_obj = [False, None] # Needs to be an array so we can make a ref.
Expand Down
47 changes: 47 additions & 0 deletions aikido_zen/lambda_helper/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
from aikido_zen.background_process import get_comms
from aikido_zen.thread.thread_cache import renew as process_cache_renew
from aikido_zen.helpers.logging import logger

LAMBDA_SEND_HEARTBEAT_TIMEOUT = 500 / 1000 # 500ms


def protect_lambda(handler):
"""Aikido protect function for the lambda"""

def wrapper(*args, **kwargs):

# Before

rv = handler(*args, **kwargs)

# After
lambda_post_handler()

return rv

return wrapper


def lambda_post_handler():
"""
Lambda post handler, after the lambda is finished, we want to flush the data to core
"""
ipc_client = get_comms()
if not ipc_client:
logger.warning("Lambda: Failed to flush data, no IPC Client available.")
return

# Flush data from this process
process_cache_renew()

# Flush data from background process
res = ipc_client.send_data_to_bg_process(
action="SEND_HEARTBEAT",
obj=None,
receive=True,
timeout_in_sec=LAMBDA_SEND_HEARTBEAT_TIMEOUT,
)
if res["success"]:
logger.info("Lambda: successfully flushed data.")
else:
logger.warning("Lambda: Failed to flush data, error %s.", res["error"])
Loading