mbusd/tests/run_itests.py

149 lines
6.3 KiB
Python
Raw Normal View History

2018-04-11 15:42:50 +02:00
#!/usr/bin/env python
import random
import unittest
import sys
import logging
from subprocess import Popen, PIPE, STDOUT
from os.path import isfile
2018-06-18 21:15:34 +02:00
from time import sleep
2018-04-11 15:42:50 +02:00
2023-09-29 16:19:51 +02:00
from pymodbus.client import ModbusTcpClient
from pymodbus.pdu import ExceptionResponse
2018-04-11 15:42:50 +02:00
from pymodbus.bit_read_message import ReadDiscreteInputsResponse, ReadCoilsResponse
from pymodbus.bit_write_message import WriteMultipleCoilsResponse, WriteSingleCoilResponse
from pymodbus.register_read_message import ReadInputRegistersResponse, ReadHoldingRegistersResponse
from pymodbus.register_write_message import WriteMultipleRegistersResponse, WriteSingleRegisterResponse
MBUSD_PORT = 1025
MBUSD_BINARY = "../output.dir/mbusd"
2018-04-11 15:42:50 +02:00
class TestModbusRequests(unittest.TestCase):
log = logging.getLogger("TestModbusRequests")
2018-04-11 15:42:50 +02:00
@classmethod
def setUpClass(cls):
cls.log.debug("1. run socat")
cls.socat = Popen(["socat", "-d", "-d", "pty,raw,echo=0,link=/tmp/pts0", "pty,raw,echo=0,link=/tmp/pts1"],
stdout=PIPE, stderr=STDOUT)
cls.log.debug("2. run rtu_slave")
from environment.rtu_slave import ModbusSerialServer
cls.mbs = ModbusSerialServer()
cls.mbs.start()
cls.log.debug("3. run mbusd to be tested with the binary:%s" % MBUSD_BINARY)
cls.mbusd_main = Popen([MBUSD_BINARY, "-d", "-L/tmp/mbusd.log", "-v9", "-p/tmp/pts0", "-s19200", "-P" + str(MBUSD_PORT)],
stdout=PIPE, stderr=STDOUT)
2018-06-18 21:15:34 +02:00
# wait a little bit for mbusd to come up
# alternatively do a poll for the socket
# https://stackoverflow.com/questions/667640/how-to-tell-if-a-connection-is-dead-in-python/667702#667702
2018-06-18 21:15:34 +02:00
sleep(5)
cls.log.debug("4. connect the modbus TCP client to mbusd")
cls.client = ModbusTcpClient('127.0.0.1', port=MBUSD_PORT, broadcast_enable=True)
2018-04-11 15:42:50 +02:00
cls.client.connect()
@classmethod
def tearDownClass(cls):
cls.log.info("test teardown")
cls.log.debug("4. kill tcp_client")
2018-04-11 15:42:50 +02:00
cls.client.close()
cls.log.debug("3. kill mbusd")
cls.mbusd_main.kill()
cls.log.debug("2. kill rtu_slave")
cls.mbs.kill()
cls.log.debug("1. kill socat")
cls.socat.kill()
2018-04-11 15:42:50 +02:00
def test_coils(self):
bits = [random.randrange(2)>0 for i in range(8)]
# 15 Write Multiple Coils
2023-09-29 16:19:51 +02:00
result = self.client.write_coils(0, bits, slave=1)
2018-04-11 15:42:50 +02:00
self.assertIsInstance(result, WriteMultipleCoilsResponse, result)
self.assertEqual(result.address, 0, result)
self.assertEqual(result.count, 8, result)
# 01 Read Coils
2023-09-29 16:19:51 +02:00
result = self.client.read_coils(0, 8, slave=1)
2018-04-11 15:42:50 +02:00
self.assertIsInstance(result, ReadCoilsResponse, result)
self.assertEqual(result.bits, bits, result)
# 05 Write Single Coil
bit1 = not bits[0]
2023-09-29 16:19:51 +02:00
result = self.client.write_coil(0, bit1, slave=1)
2018-04-11 15:42:50 +02:00
self.assertIsInstance(result, WriteSingleCoilResponse, result)
self.assertEqual(result.address, 0, result)
self.assertEqual(result.value, bit1, result)
2023-09-29 16:19:51 +02:00
result = self.client.read_coils(0, 1, slave=1)
2018-04-11 15:42:50 +02:00
self.assertIsInstance(result, ReadCoilsResponse, result)
self.assertEqual(result.bits[0], bit1, result)
def test_discreteInputs(self):
# 02 Read Discrete Inputs
2023-09-29 16:19:51 +02:00
result = self.client.read_discrete_inputs(0, 8, slave=1)
2018-04-11 15:42:50 +02:00
self.assertIsInstance(result, ReadDiscreteInputsResponse, result)
self.assertEqual(result.bits, [True]*8, result)
def test_inputRegisters(self):
# 04 Read Input Registers
2023-09-29 16:19:51 +02:00
result = self.client.read_input_registers(0, 8, slave=1)
2018-04-11 15:42:50 +02:00
self.assertIsInstance(result, ReadInputRegistersResponse, result)
self.assertEqual(result.registers, list(range(8)), result)
def test_holdingRegisters(self):
registers = [random.randrange(8) for i in range(8)]
# 16 Write Multiple Holding Registers
2023-09-29 16:19:51 +02:00
result = self.client.write_registers(0, registers, slave=1)
2018-04-11 15:42:50 +02:00
self.assertIsInstance(result, WriteMultipleRegistersResponse, result)
self.assertEqual(result.address, 0, result)
self.assertEqual(result.count, 8, result)
# 03 Read Multiple Holding Registers
2023-09-29 16:19:51 +02:00
result = self.client.read_holding_registers(0, 8, slave=1)
2018-04-11 15:42:50 +02:00
self.assertIsInstance(result, ReadHoldingRegistersResponse, result)
self.assertEqual(result.registers, registers, result)
# 06 Write Single Holding Register
register1 = (registers[0] + 1) % 65535
2023-09-29 16:19:51 +02:00
result = self.client.write_register(0, register1, slave=1)
2018-04-11 15:42:50 +02:00
self.assertIsInstance(result, WriteSingleRegisterResponse, result)
self.assertEqual(result.address, 0, result)
self.assertEqual(result.value, register1, result)
2023-09-29 16:19:51 +02:00
result = self.client.read_holding_registers(0, 1, slave=1)
2018-04-11 15:42:50 +02:00
self.assertIsInstance(result, ReadHoldingRegistersResponse, result)
self.assertEqual(result.registers[0], register1, result)
def test_exception(self):
2023-09-29 16:19:51 +02:00
result = self.client.write_coil(1000, False, slave=1) # invalid address 1000
2018-04-11 15:42:50 +02:00
self.assertIsInstance(result, ExceptionResponse, result)
self.assertEqual(result.original_code, 5, result) # fc05 Write Single Coil
self.assertEqual(result.exception_code, 2, result) # Illegal Data Address
def test_broadcast(self):
registers = [random.randrange(8) for i in range(8)]
# 16 Write Multiple Holding Registers
2023-09-29 16:19:51 +02:00
result = self.client.write_registers(0, registers, slave=0)
# 03 Read Multiple Holding Registers
2023-09-29 16:19:51 +02:00
result = self.client.read_holding_registers(0, 8, slave=1)
self.assertIsInstance(result, ReadHoldingRegistersResponse, result)
self.assertEqual(result.registers, registers, result)
2018-04-11 15:42:50 +02:00
if __name__ == '__main__':
stdout_handler = logging.StreamHandler(sys.stdout)
logging.basicConfig(level=logging.DEBUG,
format=u'[%(asctime)s] %(name)-26s-%(levelname)-5s %(funcName)-20s:%(lineno)-4d \033[35m%(message)s\033[0m',
datefmt='%d.%m. %H:%M:%S',
handlers=[stdout_handler])
if len(sys.argv) != 2 or not isfile(sys.argv[1]):
logging.error("usage: ./run_itests.py <mbusd_binary_path>")
sys.exit(1)
MBUSD_BINARY = sys.argv[1]
unittest.main(verbosity=2, argv=[sys.argv[0]])