77############
88############
99+ TLS server automaton tests
10+ ~ server
1011
1112### DISCLAIMER: Those tests are slow ###
1213
@@ -60,15 +61,17 @@ def check_output_for_data(out, err, expected_data):
6061 else:
6162 return (False, None)
6263
64+ def get_file(filename):
65+ return os.getenv("SCAPY_ROOT_DIR")+filename if not os.path.exists(filename) else filename
66+
67+
6368def run_tls_test_server(expected_data, q, curve=None, cookie=False, client_auth=False):
6469 correct = False
6570 print("Server started !")
6671 with captured_output() as (out, err):
6772 # Prepare automaton
68- filename = "/test/tls/pki/srv_cert.pem"
69- mycert = os.getenv("SCAPY_ROOT_DIR")+filename if not os.path.exists(filename) else filename
70- filename = "/test/tls/pki/srv_key.pem"
71- mykey = os.getenv("SCAPY_ROOT_DIR")+filename if not os.path.exists(filename) else filename
73+ mycert = get_file("/test/tls/pki/srv_cert.pem")
74+ mykey = get_file("/test/tls/pki/srv_key.pem")
7275 print(os.environ["SCAPY_ROOT_DIR"])
7376 print(mykey)
7477 print(mycert)
@@ -100,18 +103,27 @@ def test_tls_server(suite="", version="", tls13=False, client_auth=False):
100103 q_.get()
101104 time.sleep(1)
102105 # Run client
103- filename = "/test/tls/pki/ca_cert.pem"
104- filename = os.getenv("SCAPY_ROOT_DIR")+filename if not os.path.exists(filename) else filename
105- CA_f = os.path.abspath(filename)
106+ CA_f = get_file("/test/tls/pki/ca_cert.pem")
107+ mycert = get_file("/test/tls/pki/cli_cert.pem")
108+ mykey = get_file("/test/tls/pki/cli_key.pem")
109+ args = [
110+ "openssl", "s_client",
111+ "-connect", "127.0.0.1:4433", "-debug",
112+ "-ciphersuites" if tls13 else "-cipher", suite,
113+ version,
114+ "-CAfile", CA_f
115+ ]
116+ if client_auth:
117+ args.extend(["-cert", mycert, "-key", mykey])
106118 p = subprocess.Popen(
107- ["openssl", "s_client", "-connect", "127.0.0.1:4433", "-debug", "-ciphersuites" if tls13 else "-cipher", suite, version, "-CAfile", CA_f] ,
119+ args ,
108120 stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
109121 )
110122 msg += b"\nstop_server\n"
111123 out = p.communicate(input=msg)[0]
112124 print(out.decode())
113125 if p.returncode != 0:
114- raise RuntimeError("OpenSSL returned with error code" )
126+ raise RuntimeError("OpenSSL returned with error code %s" % p.returncode )
115127 else:
116128 p = re.compile(br'verify return:(\d+)')
117129 _failed = False
@@ -165,6 +177,7 @@ test_tls_server("TLS_AES_256_GCM_SHA384", "-tls1_3", tls13=True)
165177test_tls_server("TLS_AES_256_GCM_SHA384", "-tls1_3", tls13=True, client_auth=True)
166178
167179+ TLS client automaton tests
180+ ~ client
168181
169182= Load client utils functions
170183
@@ -177,16 +190,18 @@ from scapy.modules.six.moves.queue import Queue
177190
178191send_data = cipher_suite_code = version = None
179192
180- def run_tls_test_client(send_data=None, cipher_suite_code=None, version=None):
193+ def run_tls_test_client(send_data=None, cipher_suite_code=None, version=None, client_auth=False ):
181194 print("Loading client...")
195+ mycert = get_file("/test/tls/pki/cli_cert.pem") if client_auth else None
196+ mykey = get_file("/test/tls/pki/cli_key.pem") if client_auth else None
182197 if version == "0002":
183- t = TLSClientAutomaton(data=[send_data, b"stop_server", b"quit"], version="sslv2", debug=5)
198+ t = TLSClientAutomaton(data=[send_data, b"stop_server", b"quit"], version="sslv2", debug=5, mycert=mycert, mykey=mykey )
184199 elif version == "0304":
185200 ch = TLS13ClientHello(ciphers=int(cipher_suite_code, 16))
186- t = TLSClientAutomaton(client_hello=ch, data=[send_data, b"stop_server", b"quit"], version="tls13", debug=5)
201+ t = TLSClientAutomaton(client_hello=ch, data=[send_data, b"stop_server", b"quit"], version="tls13", debug=5, mycert=mycert, mykey=mykey )
187202 else:
188203 ch = TLSClientHello(version=int(version, 16), ciphers=int(cipher_suite_code, 16))
189- t = TLSClientAutomaton(client_hello=ch, data=[send_data, b"stop_server", b"quit"], debug=5)
204+ t = TLSClientAutomaton(client_hello=ch, data=[send_data, b"stop_server", b"quit"], debug=5, mycert=mycert, mykey=mykey )
190205 print("Running client...")
191206 t.run()
192207
@@ -204,7 +219,7 @@ def test_tls_client(suite, version, curve=None, cookie=False, client_auth=False)
204219 time.sleep(1)
205220 print("Thread synchronised")
206221 # Run client
207- run_tls_test_client(msg, suite, version)
222+ run_tls_test_client(msg, suite, version, client_auth )
208223 # Wait for server
209224 print("Client running, waiting...")
210225 th_.join(5)
0 commit comments