Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 116 additions & 6 deletions tests/test_docker_splunk.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ def handle_request_retry(self, method, url, kwargs):
continue
raise e

def check_splunkd(self, username, password, name=None):
def check_splunkd(self, username, password, name=None, scheme="https"):
'''
NOTE: This helper method can only be used for `compose up` scenarios where self.project_name is defined
'''
Expand All @@ -223,7 +223,7 @@ def check_splunkd(self, username, password, name=None):
if "maintainer" not in container["Labels"] or container["Labels"]["maintainer"] != "[email protected]":
continue
splunkd_port = self.client.port(container["Id"], 8089)[0]["HostPort"]
url = "https://localhost:{}/services/server/info".format(splunkd_port)
url = "{}://localhost:{}/services/server/info".format(scheme, splunkd_port)
kwargs = {"auth": (username, password), "verify": False}
status, content = self.handle_request_retry("GET", url, kwargs)
assert status == 200
Expand Down Expand Up @@ -2008,7 +2008,7 @@ def test_adhoc_1so_splunkd_custom_ssl(self):
]
for cmd in cmds:
execute_cmd = subprocess.check_output(["/bin/sh", "-c", cmd])
# Update s2s ssl settings
# Update server ssl settings
output = re.sub(r'''^ ssl:.*?password: null''', r''' ssl:
ca: /tmp/defaults/ca.pem
cert: /tmp/defaults/cert.pem
Expand Down Expand Up @@ -2038,7 +2038,6 @@ def test_adhoc_1so_splunkd_custom_ssl(self):
# Check if the created file exists
exec_command = self.client.exec_create(cid, "cat /opt/splunk/etc/system/local/server.conf", user="splunk")
std_out = self.client.exec_start(exec_command)
assert "enableSplunkdSSL = 1" in std_out
assert "sslRootCAPath = /tmp/defaults/ca.pem" in std_out
assert "serverCert = /tmp/defaults/cert.pem" in std_out
# Check splunkd using the custom certs
Expand Down Expand Up @@ -2089,7 +2088,7 @@ def test_adhoc_1uf_splunkd_custom_ssl(self):
]
for cmd in cmds:
execute_cmd = subprocess.check_output(["/bin/sh", "-c", cmd])
# Update s2s ssl settings
# Update server ssl settings
output = re.sub(r'''^ ssl:.*?password: null''', r''' ssl:
ca: /tmp/defaults/ca.pem
cert: /tmp/defaults/cert.pem
Expand Down Expand Up @@ -2119,7 +2118,6 @@ def test_adhoc_1uf_splunkd_custom_ssl(self):
# Check if the created file exists
exec_command = self.client.exec_create(cid, "cat /opt/splunkforwarder/etc/system/local/server.conf", user="splunk")
std_out = self.client.exec_start(exec_command)
assert "enableSplunkdSSL = 1" in std_out
assert "sslRootCAPath = /tmp/defaults/ca.pem" in std_out
assert "serverCert = /tmp/defaults/cert.pem" in std_out
# Check splunkd using the custom certs
Expand All @@ -2146,6 +2144,118 @@ def test_adhoc_1uf_splunkd_custom_ssl(self):
]
self.cleanup_files(files)

def test_adhoc_1so_splunkd_no_ssl(self):
# Generate default.yml
cid = self.client.create_container(self.SPLUNK_IMAGE_NAME, tty=True, command="create-defaults")
self.client.start(cid.get("Id"))
output = self.get_container_logs(cid.get("Id"))
self.client.remove_container(cid.get("Id"), v=True, force=True)
# Get the password
password = re.search(r"^ password: (.*?)\n", output, flags=re.MULTILINE|re.DOTALL).group(1).strip()
assert password and password != "null"
# Update server ssl settings
output = re.sub(r'''^ ssl:.*?password: null''', r''' ssl:
ca: null
cert: null
enable: false
password: null''', output, flags=re.MULTILINE|re.DOTALL)
# Write the default.yml to a file
with open(os.path.join(FIXTURES_DIR, "default.yml"), "w") as f:
f.write(output)
# Create the container and mount the default.yml
cid = None
try:
splunk_container_name = generate_random_string()
cid = self.client.create_container(self.SPLUNK_IMAGE_NAME, tty=True, ports=[8000, 8089],
volumes=["/tmp/defaults/"], name=splunk_container_name,
environment={"DEBUG": "true",
"SPLUNK_START_ARGS": "--accept-license",
"SPLUNK_CERT_PREFIX": "http",
"SPLUNK_PASSWORD": password},
host_config=self.client.create_host_config(binds=[FIXTURES_DIR + ":/tmp/defaults/"],
port_bindings={8089: ("0.0.0.0",), 8000: ("0.0.0.0",)})
)
cid = cid.get("Id")
self.client.start(cid)
# Poll for the container to be ready
assert self.wait_for_containers(1, name=splunk_container_name)
# Check splunkd
assert self.check_splunkd("admin", password, scheme="http")
# Check if the created file exists
exec_command = self.client.exec_create(cid, "cat /opt/splunk/etc/system/local/server.conf", user="splunk")
std_out = self.client.exec_start(exec_command)
assert "enableSplunkdSSL = false" in std_out
# Check splunkd using the custom certs
mgmt_port = self.client.port(cid, 8089)[0]["HostPort"]
url = "http://localhost:{}/services/server/info".format(mgmt_port)
kwargs = {"auth": ("admin", password)}
status, content = self.handle_request_retry("GET", url, kwargs)
assert status == 200
except Exception as e:
self.logger.error(e)
raise e
finally:
if cid:
self.client.remove_container(cid, v=True, force=True)
files = [os.path.join(FIXTURES_DIR, "default.yml")]
self.cleanup_files(files)

def test_adhoc_1uf_splunkd_no_ssl(self):
# Generate default.yml
cid = self.client.create_container(self.UF_IMAGE_NAME, tty=True, command="create-defaults")
self.client.start(cid.get("Id"))
output = self.get_container_logs(cid.get("Id"))
self.client.remove_container(cid.get("Id"), v=True, force=True)
# Get the password
password = re.search(r"^ password: (.*?)\n", output, flags=re.MULTILINE|re.DOTALL).group(1).strip()
assert password and password != "null"
# Update server ssl settings
output = re.sub(r'''^ ssl:.*?password: null''', r''' ssl:
ca: null
cert: null
enable: false
password: null''', output, flags=re.MULTILINE|re.DOTALL)
# Write the default.yml to a file
with open(os.path.join(FIXTURES_DIR, "default.yml"), "w") as f:
f.write(output)
# Create the container and mount the default.yml
cid = None
try:
splunk_container_name = generate_random_string()
cid = self.client.create_container(self.UF_IMAGE_NAME, tty=True, ports=[8000, 8089],
volumes=["/tmp/defaults/"], name=splunk_container_name,
environment={"DEBUG": "true",
"SPLUNK_START_ARGS": "--accept-license",
"SPLUNK_CERT_PREFIX": "http",
"SPLUNK_PASSWORD": password},
host_config=self.client.create_host_config(binds=[FIXTURES_DIR + ":/tmp/defaults/"],
port_bindings={8089: ("0.0.0.0",), 8000: ("0.0.0.0",)})
)
cid = cid.get("Id")
self.client.start(cid)
# Poll for the container to be ready
assert self.wait_for_containers(1, name=splunk_container_name)
# Check splunkd
assert self.check_splunkd("admin", password, scheme="http")
# Check if the created file exists
exec_command = self.client.exec_create(cid, "cat /opt/splunkforwarder/etc/system/local/server.conf", user="splunk")
std_out = self.client.exec_start(exec_command)
assert "enableSplunkdSSL = false" in std_out
# Check splunkd using the custom certs
mgmt_port = self.client.port(cid, 8089)[0]["HostPort"]
url = "http://localhost:{}/services/server/info".format(mgmt_port)
kwargs = {"auth": ("admin", password)}
status, content = self.handle_request_retry("GET", url, kwargs)
assert status == 200
except Exception as e:
self.logger.error(e)
raise e
finally:
if cid:
self.client.remove_container(cid, v=True, force=True)
files = [os.path.join(FIXTURES_DIR, "default.yml")]
self.cleanup_files(files)

def test_adhoc_1so_web_ssl(self):
# Generate a password
password = generate_random_string()
Expand Down