Commit 3933f2e2 authored by Alexander Meißner's avatar Alexander Meißner

Merge branch 'restructure' into 'master'

restructure test environment for more tests

See merge request sre18/break-it!9
parents 5c6b36a7 f3dfb380
Pipeline #6582 passed with stage
in 5 minutes and 30 seconds
......@@ -13,7 +13,7 @@ style:
- wget https://sre18.pages.rechenknecht.net/misc/pylintrc -O .pylintrc
- pip3 install -r requirements.txt
- pip3 install pylint
- pylint run.py tests
- pylint run.py tests util
groupA:
stage: test
......
......@@ -2,7 +2,8 @@ import argparse
import sys
import unittest
from tests.TestConnection import TestConnection, TestServer
from tests import TestFuzzing, TestMail, TestSession
from util import TestServer
def main():
......@@ -23,9 +24,10 @@ def main():
test_server = None
try:
test_server = TestServer(args.domain, args.port, args.docker_template, args.docker_port, args.ssl, args.wait_up)
for method in dir(TestConnection):
if method[:5] == 'test_':
suite.addTest(TestConnection(test_server, method))
for clazz in [TestFuzzing, TestMail, TestSession]:
for method in dir(clazz):
if method[:5] == 'test_':
suite.addTest(clazz(test_server, method))
test_results = unittest.TextTestRunner(verbosity=2).run(suite)
finally:
if test_server:
......
import socket
import random
from util import BaseTest
class TestFuzzing(BaseTest):
def test_chaos(self):
possible_commands = [
'HELO',
'EHLO',
'MAIL',
'RCPT',
'DATA',
'\r\n.',
'RSET',
'NOOP',
'VRFY',
''
]
random.seed(42)
for _ in range(0, 1337):
command = random.choice(possible_commands)+' '
command += ''.join(chr(random.randint(1, 127)) for x in range(random.randint(0, 127)))
command += '\r\n'
self.client.send(command.encode('ASCII'))
while True:
try:
self.client.recv(1)
except socket.timeout:
break
self.assertResponse(500, 'FAIL\r\n')
def test_fuzzing(self):
random.seed(42)
self.assertResponse(500, 'FAIL\r\n')
def test_sentry_issue_49(self):
self.assertResponse(500, b'\xff\xf4\xff\xfd\x06' + "\r\n".encode('ASCII'))
import os
import socket
import ssl
import unittest
import subprocess
import time
import random
from util import BaseTest
class TestServer:
CONNECTION_TIMEOUT = 5
def __init__(self, hostname, port, container_template, docker_port, ssl_on=False, wait_up=0.5):
self.hostname = hostname
self.domain = 'localhost'
self.port = port
self.container_template = container_template
self.docker_port = docker_port
self.docker_id = None
self.ssl_on = ssl_on
# pylint: disable=unsubscriptable-object
self.docker_id = subprocess.check_output(['docker',
'run',
'-d',
'--rm',
'-p',
'{}:{}'.format(self.port, self.docker_port),
self.container_template])[:-1]
time.sleep(wait_up)
def cleanup(self):
subprocess.check_output(['docker', 'kill', self.docker_id])
def create_client(self):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if os.environ.get('CI', False):
server_ip = 'docker'
else:
server_ip = '127.0.0.1'
if self.ssl_on:
wrapped_client = ssl.wrap_socket(client)
wrapped_client.connect((server_ip, self.port))
wrapped_client.settimeout(self.CONNECTION_TIMEOUT)
return wrapped_client
client.connect((server_ip, self.port))
client.settimeout(self.CONNECTION_TIMEOUT)
return client
class TestConnection(unittest.TestCase):
def __init__(self, test_server, method):
super(TestConnection, self).__init__(method)
self.test_server = test_server
self.client = None
def setUp(self):
self.client = self.test_server.create_client()
self.assertResponse(220)
def tearDown(self):
self.client.close()
def assertResponse(self, expectedCode, message=None): # pylint: disable=invalid-name
def format_message(message):
if not message or isinstance(message, bytes):
return message
if message[-2:] == '\r\n':
return message[:-2]
return message
if message:
if isinstance(message, str):
self.client.send(message.encode('ASCII'))
else:
self.client.send(message)
response = ''
while not response.endswith('\r\n'):
try:
byte = self.client.recv(1)
except socket.timeout:
raise self.failureException("Timeout with message <{}>".format(format_message(message)))
if byte == b'':
raise self.failureException("No code raised, but expected {}, message <{}>".format(
expectedCode, format_message(message)))
response += byte.decode('ASCII')
self.assertEqual(int(response[:3]), expectedCode, msg="Message <{}>".format(format_message(message)))
if response[3] == '-':
self.assertResponse(expectedCode)
def test_quit(self):
self.assertResponse(221, 'QUIT\r\n')
class TestMail(BaseTest):
def test_mail(self):
self.assertResponse(250, 'HELO marschke.me\r\n')
self.assertResponse(250, 'MAIL FROM:<leonard@marschke.me>\r\n')
......@@ -117,7 +24,7 @@ class TestConnection(unittest.TestCase):
def test_mail_lower(self):
self.assertResponse(250, 'helo marschke.me\r\n')
self.assertResponse(250, 'mail FROM:<leonard@marschke.me>\r\n')
self.assertResponse(250, 'mail from:<leonard@marschke.me>\r\n')
self.assertResponse(250, 'rcpt to:<firstAddress@{}>\r\n'.format(self.test_server.domain))
self.assertResponse(250, 'rcpt to:<secondAddress@{}>\r\n'.format(self.test_server.domain))
self.assertResponse(354, 'data\r\n' + \
......@@ -154,43 +61,3 @@ class TestConnection(unittest.TestCase):
'BR\r\n')
self.assertResponse(250, '.\r\n')
self.assertResponse(221, 'quIT\r\n')
def test_ehlo(self):
self.assertResponse(250, 'EHLO marschke.me\r\n')
self.assertResponse(500, 'FAIL\r\n')
def test_chaos(self):
possible_commands = [
'HELO',
'EHLO',
'MAIL',
'RCPT',
'DATA',
'\r\n.',
'RSET',
'NOOP',
'VRFY',
''
]
random.seed(42)
for _ in range(0, 1337):
command = random.choice(possible_commands)+' '
command += ''.join(chr(random.randint(1, 127)) for x in range(random.randint(0, 127)))
command += '\r\n'
self.client.send(command.encode('ASCII'))
while True:
try:
self.client.recv(1)
except socket.timeout:
break
self.assertResponse(500, 'FAIL\r\n')
def test_fuzzing(self):
random.seed(42)
self.assertResponse(500, 'FAIL\r\n')
def test_sentry_issue_49(self):
self.assertResponse(500, b'\xff\xf4\xff\xfd\x06' + "\r\n".encode('ASCII'))
from util import BaseTest
class TestSession(BaseTest):
def test_quit(self):
self.assertResponse(221, 'QUIT\r\n')
def test_ehlo(self):
self.assertResponse(250, 'EHLO marschke.me\r\n')
self.assertResponse(500, 'FAIL\r\n')
from .TestFuzzing import TestFuzzing
from .TestMail import TestMail
from .TestSession import TestSession
import socket
import unittest
class BaseTest(unittest.TestCase):
def __init__(self, test_server, method):
super(BaseTest, self).__init__(method)
self.test_server = test_server
self.client = None
def setUp(self):
self.client = self.test_server.create_client()
self.assertResponse(220)
def tearDown(self):
self.client.close()
def assertResponse(self, expectedCode, message=None): # pylint: disable=invalid-name
def format_message(message):
if not message or isinstance(message, bytes):
return message
if message[-2:] == '\r\n':
return message[:-2]
return message
if message:
if isinstance(message, str):
self.client.send(message.encode('ASCII'))
else:
self.client.send(message)
response = ''
while not response.endswith('\r\n'):
try:
byte = self.client.recv(1)
except socket.timeout:
raise self.failureException("Timeout with message <{}>".format(format_message(message)))
if byte == b'':
raise self.failureException("No code raised, but expected {}, message <{}>".format(
expectedCode, format_message(message)))
response += byte.decode('ASCII')
self.assertEqual(int(response[:3]), expectedCode, msg="Message <{}>".format(format_message(message)))
if response[3] == '-':
self.assertResponse(expectedCode)
import os
import socket
import ssl
import subprocess
import time
class TestServer:
CONNECTION_TIMEOUT = 5
def __init__(self, hostname, port, container_template, docker_port, ssl_on=False, wait_up=0.5):
self.hostname = hostname
self.domain = 'localhost'
self.port = port
self.container_template = container_template
self.docker_port = docker_port
self.docker_id = None
self.ssl_on = ssl_on
# pylint: disable=unsubscriptable-object
self.docker_id = subprocess.check_output(['docker',
'run',
'-d',
'--rm',
'-p',
'{}:{}'.format(self.port, self.docker_port),
self.container_template])[:-1]
time.sleep(wait_up)
def cleanup(self):
subprocess.check_output(['docker', 'kill', self.docker_id])
def create_client(self):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if os.environ.get('CI', False):
server_ip = 'docker'
else:
server_ip = '127.0.0.1'
if self.ssl_on:
wrapped_client = ssl.wrap_socket(client)
wrapped_client.connect((server_ip, self.port))
wrapped_client.settimeout(self.CONNECTION_TIMEOUT)
return wrapped_client
client.connect((server_ip, self.port))
client.settimeout(self.CONNECTION_TIMEOUT)
return client
from .TestServer import TestServer
from .BaseTest import BaseTest
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment