From d18082596c116586b3f541c10a625efe6ac7c61b Mon Sep 17 00:00:00 2001 From: Chris Date: Thu, 26 Aug 2021 18:49:18 +0200 Subject: [PATCH] Initial import --- .idea/vcs.xml | 6 ++ MAXCube.py | 87 +++++++++++++++++ MAXPacket.py | 129 +++++++++++++++++++++++++ MAXPacketHandler.py | 78 +++++++++++++++ cun.py | 225 -------------------------------------------- main.py | 43 ++------- 6 files changed, 310 insertions(+), 258 deletions(-) create mode 100644 .idea/vcs.xml create mode 100644 MAXCube.py create mode 100644 MAXPacket.py create mode 100644 MAXPacketHandler.py delete mode 100644 cun.py diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/MAXCube.py b/MAXCube.py new file mode 100644 index 0000000..70ceb02 --- /dev/null +++ b/MAXCube.py @@ -0,0 +1,87 @@ +from telnetlib import Telnet +import serial + +DEBUG = True + + +class MAXCube: + def __init__(self): + self.client = None + + def disconnect(self): + if self.client is not None: + self.client.close() + self.client = None + + def request(self, req): + if self.client is not None: + self.client.write(req.encode()) + self.client.write(b"\n") + if DEBUG: + print(">>> {}".format(req)) + else: + print("Request while not connected!") + + def set_moritz_mode(self, moritz_mode_enable): + if moritz_mode_enable: + self.request("Zr") + #return self.response() + else: + self.request("Zx") + + def version_string(self): + self.request("V") + return self.response() + + def is_connected(self): + return self.client is not None + + +class CUN(MAXCube): + def __init__(self): + self.client: Telnet = None + + def connect(self, host, ip): + if self.client is None: + self.client = Telnet(host, ip) + self.set_moritz_mode(True) + + def request(self, req): + if self.client is not None: + self.client.write(req.encode()) + self.client.write(b"\n") + if DEBUG: + print(">>> {}".format(req)) + else: + print("Request while not connected!") + + def response(self): + if self.client is not None: + response = self.client.read_some().decode() + if DEBUG: + print("<<< {}".format(response)) + return response + else: + print("Waiting for response while not connected!") + return None + + +class CUL(MAXCube): + def __init__(self, addr: str): + self.client: serial.Serial = None + self.addr = addr + + def connect(self, port): + if self.client is None: + self.client = serial.Serial(port, 38400, timeout=1) + self.set_moritz_mode(True) + + def response(self): + if self.client is not None: + response = self.client.read(100).decode() + if DEBUG: + print("<<< {}".format(response)) + return response + else: + print("Waiting for response while not connected!") + return None \ No newline at end of file diff --git a/MAXPacket.py b/MAXPacket.py new file mode 100644 index 0000000..67a03d4 --- /dev/null +++ b/MAXPacket.py @@ -0,0 +1,129 @@ +import math + + +class MAXPacketFactory: + def create_packet(rec: str): + pkt_type = int(rec[7:9], 16) + if pkt_type == 0: + return MAXPairPingPacket(rec) + elif pkt_type == 1: + return MAXPairPongPacket(rec) + elif pkt_type == 2: + return MAXAckPacket(rec) + elif pkt_type == 0x22: + return MAXSetGroupIdPacket(rec) + elif pkt_type == 0xF1: + return MAXWakeUpPacket(rec) + elif pkt_type == 0xF0: + return MAXResetPacket(rec) + else: + print("Unknown message type: {}".format(pkt_type)) + result = MAXPacket() + result.from_received(rec) + return result + + +class MAXPacket: + def __init__(self): + self.length = 0 + self.counter = 0 + self.type = 0 + self.flag = 0 + self.sender_address = 0 + self.dest_address = 0 + self.group_id = 0 + + def set_values(self, message_counter: str, message_type: str, message_flag: str, sender_address: str, + dest_address: str, group_id: str): + # self.length = length + self.counter = message_counter + self.type = message_type + self.flag = message_flag + self.sender_address = sender_address + self.dest_address = dest_address + self.group_id = group_id + + def from_received(self, rec: str): + self.set_values(rec[3:5], rec[5:7], rec[7:9], rec[9:15], rec[15:21], rec[21:23]) + self.length = rec[1:3] + + def gen_header(self): + header_str = "{}{}{}{}{}{}".format(self.counter, self.flag, self.type, self.sender_address, + self.dest_address, self.group_id) + return header_str + + def serialize(self): + header = self.gen_header() + length = int(math.ceil((len(header) + len(self.payload)) / 2)) + self.length = length + return "Zs{0:02X}{1}{2}".format(length, header, self.payload) + + def to_string(self): + return "MAXPacket: len={}, counter={}, flag={}, type={}, sender_addr={}, dest_addr={}, group_id={}".format( + self.length, self.counter, self.flag, self.type, self.sender_address, self.dest_address, self.group_id + ) + + +class MAXPairPingPacket(MAXPacket): + def __init__(self, rec: str): + self.from_received(rec) + firmware_val = int(rec[23:25], 16) + self.firmware_major = firmware_val // 16 + self.firmware_minor = firmware_val % 16 + self.device_type = rec[25:27] + self.test_result = rec[27:29] + self.serial = rec[29:-2] + + def to_string(self): + result = "{}\nMAXPairPingPacket: firmware_major={} firmware_minor={} device_type={} " \ + "test_result={} serial={}".format(super().to_string(), self.firmware_major, self.firmware_minor, + self.device_type, self.test_result, self.serial) + return result + + +class MAXPairPongPacket(MAXPacket): + def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str, + group_id: str): + self.set_values(message_counter, "01", message_flag, sender_address, dest_address, group_id) + self.payload = "00" + + def to_string(self): + result = "{}\nMAXPairPongPacket: payload={}".format(super().to_string(), self.payload) + return result + + +class MAXSetGroupIdPacket(MAXPacket): + def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str, + group_id: str): + self.set_values(message_counter, "01", message_flag, sender_address, dest_address, group_id) + self.payload = "00" + + def to_string(self): + result = "{}\nMAXSetGroupIdPacket: payload={}".format(super().to_string(), self.payload) + return result + + +class MAXAckPacket(MAXPacket): + def __init__(self, rec): + super().__init__() + self.from_received(rec) + self.ack = rec[23:25] + self.unknown_field = rec[25:27] + + def to_string(self): + result = "{}\nMAXAckPacket: ack={} unknown_field={}".format(super().to_string(), + self.ack, self.unknown_field) + return result + + +class MAXWakeUpPacket(MAXPacket): + def __init__(self, rec): + super().__init__() + self.from_received(rec) + + +class MAXResetPacket(MAXPacket): + def __init__(self, rec): + super().__init__() + self.from_received(rec) + diff --git a/MAXPacketHandler.py b/MAXPacketHandler.py new file mode 100644 index 0000000..883546c --- /dev/null +++ b/MAXPacketHandler.py @@ -0,0 +1,78 @@ +from MAXCube import MAXCube +from MAXPacket import MAXPacket, MAXPacketFactory, MAXPairPingPacket, MAXPairPongPacket, MAXAckPacket, \ + MAXResetPacket, MAXWakeUpPacket, MAXSetGroupIdPacket + +from time import sleep +from enum import Enum + + +class HandshakeState(Enum): + PING_RECEIVED = 1 + PONG_SENT = 2 + PONG_ACK = 3 + GROUP_ID_SENT = 4 + GROUP_ID_ACK = 5 + + +class Handshake: + def __init__(self, partner_addr, dev_type): + self.partner_addr = partner_addr + self.dev_type = dev_type + self.state: HandshakeState = HandshakeState.PING_RECEIVED + + +class MAXPacketHandler: + def __init__(self, cube: MAXCube): + self.cube = cube + self.quit_flag = False + self.handshakes = [] + + def handle_msg(self, pkt: MAXPacket): + print(pkt.to_string()) + if pkt.dest_address == self.cube.addr or pkt.dest_address == "000000": + if isinstance(pkt, MAXPairPingPacket): + handshake = Handshake(pkt.sender_address, pkt.device_type) + self.handshakes.append(handshake) + pong = MAXPairPongPacket(message_counter="00", message_flag="00", sender_address=self.cube.addr, + dest_address=pkt.sender_address, group_id="00") + pong_str = pong.serialize() + print(pong.to_string()) + self.cube.request(pong_str) + handshake.state = HandshakeState.PONG_SENT + elif isinstance(pkt, MAXAckPacket): + cur_handshake: Handshake = None + for handshake in self.handshakes: + if handshake.partner_addr == pkt.sender_address: + cur_handshake = handshake + break + if cur_handshake is not None: + if cur_handshake.state == HandshakeState.PONG_SENT: + cur_handshake.state = HandshakeState.PONG_ACK + print(pkt.to_string()) + if cur_handshake.dev_type == "05": + # Handshake is finished + print("Paired device with addr={} and type={}".format(cur_handshake.partner_addr, + cur_handshake.dev_type)) + self.handshakes.remove(cur_handshake) + else: + set_group_id = MAXSetGroupIdPacket(message_counter="00", message_flag="00", sender_address=self.cube.addr, + dest_address=pkt.sender_address, group_id="00") + set_group_id_str = set_group_id.serialize() + print(set_group_id_str) + self.cube.request(set_group_id_str) + cur_handshake.state = HandshakeState.GROUP_ID_SENT + if cur_handshake.state == HandshakeState.GROUP_ID_SENT: + cur_handshake.state = HandshakeState.GROUP_ID_ACK + + else: + print("Paket is not ours!") + + def receive_loop(self): + while not self.quit_flag and self.cube.is_connected(): + resp = self.cube.response() + if resp is not None and resp[0:1] == "Z": + pkt = MAXPacketFactory.create_packet(resp) + self.handle_msg(pkt) + + + sleep(0.1) \ No newline at end of file diff --git a/cun.py b/cun.py deleted file mode 100644 index f03f495..0000000 --- a/cun.py +++ /dev/null @@ -1,225 +0,0 @@ -from telnetlib import Telnet -import serial - -DEBUG = True - - -class MAXPacketFactory: - def create_packet(rec: str): - pkt_type = int(rec[7:9], 16) - if pkt_type == 0: - return MAXPairPingPacket.from_received(rec) - elif pkt_type == 1: - return MAXPairPongPacket.from_received(rec) - elif pkt_type == 2: - return MAXAckPacket.from_received(rec) - elif pkt_type == 0xF1: - return MAXWakeUpPacket.from_received(rec) - elif pkt_type == 0xF0: - return MAXResetPacket.from_received(rec) - else: - print("Unknown message type: {}".format(pkt_type)) - return MAXPacket.from_received(rec) - - -class MAXPacket: - def __init__(self, length, message_counter: str, message_type: str, message_flag: str, sender_address: str, dest_address: str, - group_id:str): - self.length = length - self.counter = message_counter - self.type = message_type - self.flag = message_flag - self.sender_address = sender_address - self.dest_address = dest_address - self.group_id = group_id - - def from_received(rec: str): - return MAXPacket(rec[1:3], rec[3:5], rec[5:7], rec[7:9], rec[9:15], rec[15:21], rec[21:23]) - - def gen_header(self): - header_str = "{}{}{}{}{}{}".format(self.counter, self.flag, self.type, self.sender_address, - self.dest_address, self.group_id) - return header_str - - def to_string(self): - return "MAXPacket: len={}, counter={}, flag={}, type={}, sender_addr={}, dest_addr={}, group_id={}".format( - self.length, self.counter, self.flag, self.type, self.sender_address, self.dest_address, self.group_id - ) - - -class MAXPairPingPacket(MAXPacket): - def from_received(rec: str): - result = MAXPacket.from_received(rec) - firmware_val = int(rec[23:25], 16) - result.firmware_major = firmware_val // 16 - result.firmware_minor = firmware_val % 16 - result.device_type = rec[25:27] - result.test_result = rec[27:29] - result.serial = rec[29:-2] - return result - - def __init__(self, length, message_counter: str, message_type: str, message_flag: str, sender_address: str, dest_address: str, - group_id:str, firmware_major, firmware_minor, device_type, test_result, serial): - MAXPacket.__init__(length, message_counter, message_type, message_flag, sender_address, dest_address, group_id) - self.firmware_major = firmware_major - self.firmware_minor = firmware_minor - self.device_type = device_type - self.test_result = test_result - self.serial = serial - - def to_string(self): - result = "{}\nMAXPairPingPacket: firmware_major={} firmware_minor={} device_type={} " \ - "test_result={} serial={}".format(super().to_string(), self.firmware_major, self.firmware_minor, - self.device_type, self.test_result, self.serial) - return result - - -class MAXPairPongPacket(MAXPacket): - # def __init__(self, rec): - # super().__init__(rec) - - def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str, - group_id:str): - MAXPacket.__init__(message_counter, "01", message_flag, sender_address, dest_address, group_id) - self.payload = "00" - - def serialize(self): - header = self.gen_header() - length = (len(header) + len(self.payload))/2 - return "Zs{}{}{}".format(length, header, self.payload) - - -class MAXAckPacket(MAXPacket): - def __init__(self, rec): - super().__init__(rec) - - -class MAXWakeUpPacket(MAXPacket): - def __init__(self, rec): - super().__init__(rec) - - -class MAXResetPacket(MAXPacket): - def __init__(self, rec): - super().__init__(rec) - - -class CUL: - def __init__(self): - self.client: serial.Serial = None - - def connect(self, port): - if self.client is None: - self.client = serial.Serial(port, 38400, timeout=1) - - def disconnect(self): - if self.client is not None: - self.client.close() - self.client = None - - def request(self, req): - if self.client is not None: - self.client.write(req.encode()) - self.client.write(b"\n") - if DEBUG: - print(">>> {}".format(req)) - else: - print("Request while not connected!") - - def response(self): - if self.client is not None: - response = self.client.read(100).decode() - if DEBUG: - print("<<< {}".format(response)) - return response - else: - print("Waiting for response while not connected!") - return None - - def version_string(self): - self.request("V") - return self.response() - - def is_connected(self): - return self.client is not None - - def set_moritz_mode(self, moritz_mode_enable): - if moritz_mode_enable: - self.request("Zr") - #return self.response() - else: - self.request("Zx") - - -class CUN: - def __init__(self): - self.client: Telnet = None - - def connect(self, host, ip): - if self.client is None: - self.client = Telnet(host, ip) - - def disconnect(self): - if self.client is not None: - self.client.close() - self.client = None - - def version_string(self): - self.request("V") - return self.response() - - def request(self, req): - if self.client is not None: - self.client.write(req.encode()) - self.client.write(b"\n") - if DEBUG: - print(">>> {}".format(req)) - else: - print("Request while not connected!") - - def response(self): - if self.client is not None: - response = self.client.read_some().decode() - if DEBUG: - print("<<< {}".format(response)) - return response - else: - print("Waiting for response while not connected!") - return None - - def set_moritz_mode(self, moritz_mode_enable): - if moritz_mode_enable: - self.request("Zr") - #return self.response() - else: - self.request("Zx") - - def configure(self, ip, netmask, gateway, ): - # ToDo: NYI - pass - - def h_request(self): - self.request("H?") - return self.response() - - def send_moritz(self, packet: MAXPacket): - # llnnccttssssssddddddpp... - # ll - length - # nn - msg counter - # cc - control byte - # tt - msg type - # ss - sender address(3 byte) - # dd - destination address(3 byte - 000000 for broadcast) - # pp - payload... - len_str = packet.length - msg_counter_str = packet.ms - control_byte_str = "00" - message_type_str = "02" - sender_addr_str = "000000" - dest_addr_str = "000000" - payload = "l:" - self.request("Zs{}{}{}{}{}{}{}".format(packet.length, packet.message_counter, control_byte_str, - message_type_str, sender_addr_str, - dest_addr_str, payload)) - #self.request("l:") - return self.response() diff --git a/main.py b/main.py index 8e8b284..d42c204 100644 --- a/main.py +++ b/main.py @@ -1,35 +1,12 @@ -from cun import CUL, MAXPacketFactory, MAXPacket, MAXPairPingPacket, MAXPairPongPacket -import time +from MAXCube import CUL +from MAXPacketHandler import MAXPacketHandler -if __name__ == '__main__': - #cun = CUN() - #cun.connect("192.168.0.244", 2323) - #cun.request(b"V\n") - #print(cun.response()) - #version = cun.version_string() - #print(version) - #cun.set_moritz_mode(True) - #print(cun.send_moritz()) - #cun.request("Zsl:") - #print(cun.response()) - #cun.disconnect() - - cul = CUL() - cul.connect("COM8") - print(cul.version_string()) - cul.set_moritz_mode(True) - while cul.is_connected(): - resp = cul.response() - if resp is not None and resp[0:1] == "Z": - pkt = MAXPacketFactory.create_packet(resp) - print(pkt.to_string()) - if isinstance(pkt, MAXPairPingPacket): - print("Sending Pong!") - pong = MAXPairPongPacket(message_counter="00", message_flag="00", sender_address="FDF7CA", - dest_address=pkt.sender_address, group_id="00") - pong_str = pong.serialize() - cul.request(pong_str) - - time.sleep(1) - cul.disconnect() +DEBUG = True +if __name__ == '__main__': + cube = CUL("123456") + cube.connect("COM11") + print(cube.version_string()) + handler = MAXPacketHandler(cube) + handler.receive_loop() + cube.disconnect()