Commit a5cda26b authored by Pawel Boening's avatar Pawel Boening

merge master

parents 40796625 3933f2e2
Pipeline #6585 passed with stage
in 5 minutes and 25 seconds
......@@ -13,7 +13,7 @@ style:
- wget https://sre18.pages.rechenknecht.net/misc/pylintrc -O .pylintrc
- pip3 install -r requirements.txt
- pip3 install pylint
- pylint run.py tests
- pylint run.py tests util
groupA:
stage: test
......@@ -21,7 +21,7 @@ groupA:
- docker:dind
script:
- dockerfiles/A/build.sh
- python3 run.py testing.marschke.me 9001 smtp-server-group-a 6666
- python3 run.py testing.marschke.me 9001 sre18groupa/smtp-server-group-a 6666
allow_failure: true
groupB:
......
......@@ -2,8 +2,7 @@
set -e
curl https://sre18.pages.rechenknecht.net/smtp-server/pack/Dockerfile > Dockerfile
docker build --no-cache --tag smtp-server-group-a .
docker pull sre18groupa/smtp-server-group-a
#mkdir -p /tmp/localMailbox
#docker run --rm -v /tmp/localMailbox:/usr/src/app/mailbox -e HOST_NAME='marschke.me' -p 8001:6666 smtp-server-group-a
#!/usr/bin/env bash
docker run -it --rm -e HOST_NAME='marschke.me' -p 8001:6666 smtp-server-group-a
docker run -it --rm -e HOST_NAME='marschke.me' -p 8001:6666 sre18groupa/smtp-server-group-a
......@@ -2,7 +2,8 @@ import argparse
import sys
import unittest
from tests.TestConnection import TestConnection, TestServer
from tests import TestFuzzing, TestMail, TestSession
from util import TestServer
def main():
......@@ -23,9 +24,10 @@ def main():
test_server = None
try:
test_server = TestServer(args.domain, args.port, args.docker_template, args.docker_port, args.ssl, args.wait_up)
for method in dir(TestConnection):
if method[:5] == 'test_':
suite.addTest(TestConnection(test_server, method))
for clazz in [TestFuzzing, TestMail, TestSession]:
for method in dir(clazz):
if method[:5] == 'test_':
suite.addTest(clazz(test_server, method))
test_results = unittest.TextTestRunner(verbosity=2).run(suite)
finally:
if test_server:
......
import socket
import random
from util import BaseTest
class TestFuzzing(BaseTest):
def test_chaos(self):
possible_commands = [
'HELO',
'EHLO',
'MAIL',
'RCPT',
'DATA',
'\r\n.',
'RSET',
'NOOP',
'VRFY',
''
]
random.seed(42)
for _ in range(0, 1337):
command = random.choice(possible_commands)+' '
command += ''.join(chr(random.randint(1, 127)) for x in range(random.randint(0, 127)))
command += '\r\n'
self.client.send(command.encode('ASCII'))
while True:
try:
self.client.recv(1)
except socket.timeout:
break
self.assertResponse(500, 'FAIL\r\n')
def test_fuzzing(self):
random.seed(42)
self.assertResponse(500, 'FAIL\r\n')
def test_sentry_issue_49(self):
self.assertResponse(500, b'\xff\xf4\xff\xfd\x06' + "\r\n".encode('ASCII'))
from util import BaseTest
class TestSession(BaseTest):
def test_quit(self):
self.assertResponse(221, 'QUIT\r\n')
def test_ehlo(self):
self.assertResponse(250, 'EHLO marschke.me\r\n')
self.assertResponse(500, 'FAIL\r\n')
def test_post_invalid_parameter_stability_0(self):
self.assertResponse(250, b'EHLO localhost\r\n')
self.assertResponse(500, b'MAIL wrong input blabla\r\n')
self.assertResponse(250, b'MAIL FROM:<correct.mail@test.de>\r\n')
def test_post_invalid_parameter_stability_1(self):
self.assertResponse(250, b'EHLO localhost\r\n')
self.assertResponse(250, b'MAIL FROM:<correct.mail@test.de>\r\n')
self.assertResponse(500, b'RCPT wrong input blabla\r\n')
self.assertResponse(250, b'RCPT TO:<correct@localhost>\r\n')
def test_help(self):
self.assertResponse(250, b'HELP\r\n')
def test_helo(self):
self.assertResponse(250, b'HELO localDomain\r\n')
def test_helo_dot(self):
self.assertResponse(250, b'HELO localDomain.de\r\n')
def test_helo_no_domain(self):
self.assertResponse(501, b'HELO\r\n')
def test_ehlo_dot(self):
self.assertResponse(250, b'EHLO localDomain.de\r\n')
def test_ehlo_no_domain(self):
self.assertResponse(501, b'EHLO\r\n')
def test_mail_no_ehlo(self):
self.assertResponse(503, b'MAIL FROM:<correct.mail@test.de>\r\n')
def test_mail_double_send(self):
self.assertResponse(250, b'EHLO localDomain\r\n')
self.assertResponse(250, b'MAIL FROM:<correct.mail@test.de>\r\n')
self.assertResponse(503, b'MAIL FROM:<correct2.mail@test.de>\r\n')
def test_invalid_state_transition_03(self):
self.assertResponse(250, b'EHLO localhost\r\n')
self.assertResponse(250, b'MAIL FROM:<leonard@marschke.me>\r\n')
self.assertResponse(503, b'DATA\r\n')
self.assertResponse(250, b'RSET\r\n')
def test_invalid_state_transition_04(self):
self.assertResponse(250, b'RSET\r\n')
def test_reset_operation(self):
self.assertResponse(250, b'EHLO localhost\r\n')
self.assertResponse(250, b'MAIL FROM:<leonard@marschke.me>\r\n')
self.assertResponse(250, b'RCPT TO:<firstAddress@localhost>\r\n')
self.assertResponse(250, b'RSET\r\n')
self.assertResponse(503, b'DATA\r\n')
def test_reset_operation_parameter(self):
self.assertResponse(250, b'EHLO localhost\r\n')
self.assertResponse(250, b'MAIL FROM:<leonard@marschke.me>\r\n')
self.assertResponse(250, b'RCPT TO:<firstAddress@localhost>\r\n')
self.assertResponse(250, b'RSET MAIL FROM:<leonard@marschke.me>\r\n')
self.assertResponse(503, b'DATA\r\n')
def test_noop_operation(self):
self.assertResponse(250, b'NOOP\r\n')
def test_noop_parameter_operation(self):
self.assertResponse(250, b'NOOP MAIL FROM:<leonard@marschke.me>\r\n')
self.assertResponse(503, b'DATA\r\n')
def test_vrfy(self):
self.assertResponse(252, b'VRFY leonard@marschke.me\r\n')
def test_multiple_token_wrong(self):
self.assertResponse(500, b'VERFY BLA BLUB')
def test_token_wrong(self):
self.assertResponse(500, b'VRFZ\r\n')
self.assertResponse(250, b'NOOP\r\n')
def test_quit_operation(self):
pass
def test_parse_helo_correct(self):
self.assertResponse(250, b'HELO marschke.me\r\n')
def test_invalid_state_transition_01(self):
self.assertResponse(503, b'RCPT TO:<firstAddress@localhost>\r\n')
def test_invalid_state_transition_02(self):
self.assertResponse(503, b'DATA\r\n')
from .TestFuzzing import TestFuzzing
from .TestMail import TestMail
from .TestSession import TestSession
import socket
import unittest
class BaseTest(unittest.TestCase):
def __init__(self, test_server, method):
super(BaseTest, self).__init__(method)
self.test_server = test_server
self.client = None
def setUp(self):
self.client = self.test_server.create_client()
self.assertResponse(220)
def tearDown(self):
self.client.close()
def assertResponse(self, expectedCode, message=None): # pylint: disable=invalid-name
def format_message(message):
if not message or isinstance(message, bytes):
return message
if message[-2:] == '\r\n':
return message[:-2]
return message
if message:
if isinstance(message, str):
self.client.send(message.encode('ASCII'))
else:
self.client.send(message)
response = ''
while not response.endswith('\r\n'):
try:
byte = self.client.recv(1)
except socket.timeout:
raise self.failureException("Timeout with message <{}>".format(format_message(message)))
if byte == b'':
raise self.failureException("No code raised, but expected {}, message <{}>".format(
expectedCode, format_message(message)))
response += byte.decode('ASCII')
self.assertEqual(int(response[:3]), expectedCode, msg="Message <{}>".format(format_message(message)))
if response[3] == '-':
self.assertResponse(expectedCode)
import os
import socket
import ssl
import subprocess
import time
class TestServer:
CONNECTION_TIMEOUT = 5
def __init__(self, hostname, port, container_template, docker_port, ssl_on=False, wait_up=0.5):
self.hostname = hostname
self.domain = 'localhost'
self.port = port
self.container_template = container_template
self.docker_port = docker_port
self.docker_id = None
self.ssl_on = ssl_on
# pylint: disable=unsubscriptable-object
self.docker_id = subprocess.check_output(['docker',
'run',
'-d',
'--rm',
'-p',
'{}:{}'.format(self.port, self.docker_port),
self.container_template])[:-1]
time.sleep(wait_up)
def cleanup(self):
subprocess.check_output(['docker', 'kill', self.docker_id])
def create_client(self):
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
if os.environ.get('CI', False):
server_ip = 'docker'
else:
server_ip = '127.0.0.1'
if self.ssl_on:
wrapped_client = ssl.wrap_socket(client)
wrapped_client.connect((server_ip, self.port))
wrapped_client.settimeout(self.CONNECTION_TIMEOUT)
return wrapped_client
client.connect((server_ip, self.port))
client.settimeout(self.CONNECTION_TIMEOUT)
return client
from .TestServer import TestServer
from .BaseTest import BaseTest
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment