test: moved all setup/teardown into the python realm

This commit is contained in:
Nick 2018-06-18 16:06:16 +02:00
parent ae360073bf
commit 74189d6caf
3 changed files with 29 additions and 59 deletions

View File

@ -1,28 +0,0 @@
#!/usr/bin/env bash
RTU_SLAVE_PID=/tmp/rtu_slave.pid
CURRENT_DIR="$(dirname "$(realpath "$0")")"
. $CURRENT_DIR/subprocess_helper.sh
check_preconditions() {
python -c "import pymodbus" || exit 1
}
# check argument count
## https://stackoverflow.com/questions/4341630/checking-for-the-correct-number-of-arguments
if [ "$#" -ne 1 ]; then
echo "[E] usage: $0 <start|stop>" >&2
exit 1
fi
check_preconditions
case "$1" in
up|start)
CMD="python ${CURRENT_DIR}/rtu_slave.py &"
run_cmd_save_pid "$CMD" $RTU_SLAVE_PID
;;
down|stop)
kill_pid $RTU_SLAVE_PID
;;
esac

View File

@ -3,6 +3,9 @@
import random import random
import unittest import unittest
import sys import sys
import logging
from subprocess import Popen
from os.path import isfile
from pymodbus.client.sync import ModbusTcpClient from pymodbus.client.sync import ModbusTcpClient
from pymodbus.pdu import ExceptionResponse from pymodbus.pdu import ExceptionResponse
@ -11,31 +14,41 @@ from pymodbus.bit_write_message import WriteMultipleCoilsResponse, WriteSingleCo
from pymodbus.register_read_message import ReadInputRegistersResponse, ReadHoldingRegistersResponse from pymodbus.register_read_message import ReadInputRegistersResponse, ReadHoldingRegistersResponse
from pymodbus.register_write_message import WriteMultipleRegistersResponse, WriteSingleRegisterResponse from pymodbus.register_write_message import WriteMultipleRegistersResponse, WriteSingleRegisterResponse
from environment.rtu_slave import ModbusSerialServer MBUSD_PORT = 1025
MBUSD_BINARY = "../output.dir/mbusd"
MBUSD_PORT = 1025
class TestModbusRequests(unittest.TestCase): class TestModbusRequests(unittest.TestCase):
log = logging.getLogger("TestModbusRequests")
@classmethod @classmethod
def setUpClass(cls): def setUpClass(cls):
cls.log = logging.getLogger("TestModbusRequests") cls.log.debug("1. run socat")
cls.socat = Popen(["socat", "-d", "-d", "pty,raw,echo=0,link=/tmp/pts0", "pty,raw,echo=0,link=/tmp/pts1"])
cls.log.debug("2. run rtu_slave")
from environment.rtu_slave import ModbusSerialServer
cls.mbs = ModbusSerialServer() cls.mbs = ModbusSerialServer()
cls.mbs.start() cls.mbs.start()
cls.log.debug("3. run mbusd to be tested with the binary:%s" % MBUSD_BINARY)
cls.mbusd_main = Popen([MBUSD_BINARY, "-d", "-L", "-v9", "-p/tmp/pts0", "-s19200", "-P" + str(MBUSD_PORT)])
cls.log.debug("4. connect the modbus TCP client to mbusd")
cls.client = ModbusTcpClient('127.0.0.1', port=MBUSD_PORT) cls.client = ModbusTcpClient('127.0.0.1', port=MBUSD_PORT)
cls.client.connect() cls.client.connect()
@classmethod @classmethod
def tearDownClass(cls): def tearDownClass(cls):
cls.log.info("test teardown") cls.log.info("test teardown")
cls.log.debug("4. kill tcp_client")
cls.client.close() cls.client.close()
try: cls.log.debug("3. kill mbusd")
cls.mbs.kill() cls.mbusd_main.kill()
except e: cls.log.debug("2. kill rtu_slave")
cls.log.info("Fetched exception during mbs.kill") cls.mbs.kill()
cls.log.debug("1. kill socat")
cls.socat.kill()
def test_coils(self): def test_coils(self):
@ -104,12 +117,15 @@ class TestModbusRequests(unittest.TestCase):
self.assertEqual(result.original_code, 5, result) # fc05 Write Single Coil self.assertEqual(result.original_code, 5, result) # fc05 Write Single Coil
self.assertEqual(result.exception_code, 2, result) # Illegal Data Address self.assertEqual(result.exception_code, 2, result) # Illegal Data Address
if __name__ == '__main__': if __name__ == '__main__':
import logging
stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler = logging.StreamHandler(sys.stdout)
logging.basicConfig(level=logging.DEBUG, logging.basicConfig(level=logging.DEBUG,
format=u'[%(asctime)s] %(name)-26s-%(levelname)-5s %(funcName)-20s:%(lineno)-4d \033[35m%(message)s\033[0m', format=u'[%(asctime)s] %(name)-26s-%(levelname)-5s %(funcName)-20s:%(lineno)-4d \033[35m%(message)s\033[0m',
datefmt='%d.%m. %H:%M:%S', datefmt='%d.%m. %H:%M:%S',
handlers=[stdout_handler]) handlers=[stdout_handler])
unittest.main(verbosity=2) if len(sys.argv) != 2 or not isfile(sys.argv[1]):
logging.error("usage: ./run_itests.py <mbusd_binary_path>")
sys.exit(1)
MBUSD_BINARY = sys.argv[1]
unittest.main(verbosity=2, argv=[sys.argv[0]])

View File

@ -8,24 +8,6 @@ if ! [ -x "$1" ]; then
fi fi
export MBUSD_BIN=$1 export MBUSD_BIN=$1
$CWD/run_itests.py "$MBUSD_BIN" || exit 1
function setup() {
echo "[I] do test environment setup"
$CWD/environment/socat_runner.sh start || return 1
# $CWD/environment/rtu_slave_runner.sh start || return 1
$CWD/environment/mbusd_runner.sh start || return 1
}
function teardown() {
echo "[I] do test environment teardown"
$CWD/environment/mbusd_runner.sh stop
# $CWD/environment/rtu_slave_runner.sh stop
$CWD/environment/socat_runner.sh stop
}
trap teardown EXIT
setup || (echo "failed to setup" && exit 1)
$CWD/run_itests.py || exit 1
exit 0 exit 0