Initial import
This commit is contained in:
parent
19697ea192
commit
d18082596c
6
.idea/vcs.xml
Normal file
6
.idea/vcs.xml
Normal file
@ -0,0 +1,6 @@
|
||||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project version="4">
|
||||
<component name="VcsDirectoryMappings">
|
||||
<mapping directory="$PROJECT_DIR$" vcs="Git" />
|
||||
</component>
|
||||
</project>
|
87
MAXCube.py
Normal file
87
MAXCube.py
Normal file
@ -0,0 +1,87 @@
|
||||
from telnetlib import Telnet
|
||||
import serial
|
||||
|
||||
DEBUG = True
|
||||
|
||||
|
||||
class MAXCube:
|
||||
def __init__(self):
|
||||
self.client = None
|
||||
|
||||
def disconnect(self):
|
||||
if self.client is not None:
|
||||
self.client.close()
|
||||
self.client = None
|
||||
|
||||
def request(self, req):
|
||||
if self.client is not None:
|
||||
self.client.write(req.encode())
|
||||
self.client.write(b"\n")
|
||||
if DEBUG:
|
||||
print(">>> {}".format(req))
|
||||
else:
|
||||
print("Request while not connected!")
|
||||
|
||||
def set_moritz_mode(self, moritz_mode_enable):
|
||||
if moritz_mode_enable:
|
||||
self.request("Zr")
|
||||
#return self.response()
|
||||
else:
|
||||
self.request("Zx")
|
||||
|
||||
def version_string(self):
|
||||
self.request("V")
|
||||
return self.response()
|
||||
|
||||
def is_connected(self):
|
||||
return self.client is not None
|
||||
|
||||
|
||||
class CUN(MAXCube):
|
||||
def __init__(self):
|
||||
self.client: Telnet = None
|
||||
|
||||
def connect(self, host, ip):
|
||||
if self.client is None:
|
||||
self.client = Telnet(host, ip)
|
||||
self.set_moritz_mode(True)
|
||||
|
||||
def request(self, req):
|
||||
if self.client is not None:
|
||||
self.client.write(req.encode())
|
||||
self.client.write(b"\n")
|
||||
if DEBUG:
|
||||
print(">>> {}".format(req))
|
||||
else:
|
||||
print("Request while not connected!")
|
||||
|
||||
def response(self):
|
||||
if self.client is not None:
|
||||
response = self.client.read_some().decode()
|
||||
if DEBUG:
|
||||
print("<<< {}".format(response))
|
||||
return response
|
||||
else:
|
||||
print("Waiting for response while not connected!")
|
||||
return None
|
||||
|
||||
|
||||
class CUL(MAXCube):
|
||||
def __init__(self, addr: str):
|
||||
self.client: serial.Serial = None
|
||||
self.addr = addr
|
||||
|
||||
def connect(self, port):
|
||||
if self.client is None:
|
||||
self.client = serial.Serial(port, 38400, timeout=1)
|
||||
self.set_moritz_mode(True)
|
||||
|
||||
def response(self):
|
||||
if self.client is not None:
|
||||
response = self.client.read(100).decode()
|
||||
if DEBUG:
|
||||
print("<<< {}".format(response))
|
||||
return response
|
||||
else:
|
||||
print("Waiting for response while not connected!")
|
||||
return None
|
129
MAXPacket.py
Normal file
129
MAXPacket.py
Normal file
@ -0,0 +1,129 @@
|
||||
import math
|
||||
|
||||
|
||||
class MAXPacketFactory:
|
||||
def create_packet(rec: str):
|
||||
pkt_type = int(rec[7:9], 16)
|
||||
if pkt_type == 0:
|
||||
return MAXPairPingPacket(rec)
|
||||
elif pkt_type == 1:
|
||||
return MAXPairPongPacket(rec)
|
||||
elif pkt_type == 2:
|
||||
return MAXAckPacket(rec)
|
||||
elif pkt_type == 0x22:
|
||||
return MAXSetGroupIdPacket(rec)
|
||||
elif pkt_type == 0xF1:
|
||||
return MAXWakeUpPacket(rec)
|
||||
elif pkt_type == 0xF0:
|
||||
return MAXResetPacket(rec)
|
||||
else:
|
||||
print("Unknown message type: {}".format(pkt_type))
|
||||
result = MAXPacket()
|
||||
result.from_received(rec)
|
||||
return result
|
||||
|
||||
|
||||
class MAXPacket:
|
||||
def __init__(self):
|
||||
self.length = 0
|
||||
self.counter = 0
|
||||
self.type = 0
|
||||
self.flag = 0
|
||||
self.sender_address = 0
|
||||
self.dest_address = 0
|
||||
self.group_id = 0
|
||||
|
||||
def set_values(self, message_counter: str, message_type: str, message_flag: str, sender_address: str,
|
||||
dest_address: str, group_id: str):
|
||||
# self.length = length
|
||||
self.counter = message_counter
|
||||
self.type = message_type
|
||||
self.flag = message_flag
|
||||
self.sender_address = sender_address
|
||||
self.dest_address = dest_address
|
||||
self.group_id = group_id
|
||||
|
||||
def from_received(self, rec: str):
|
||||
self.set_values(rec[3:5], rec[5:7], rec[7:9], rec[9:15], rec[15:21], rec[21:23])
|
||||
self.length = rec[1:3]
|
||||
|
||||
def gen_header(self):
|
||||
header_str = "{}{}{}{}{}{}".format(self.counter, self.flag, self.type, self.sender_address,
|
||||
self.dest_address, self.group_id)
|
||||
return header_str
|
||||
|
||||
def serialize(self):
|
||||
header = self.gen_header()
|
||||
length = int(math.ceil((len(header) + len(self.payload)) / 2))
|
||||
self.length = length
|
||||
return "Zs{0:02X}{1}{2}".format(length, header, self.payload)
|
||||
|
||||
def to_string(self):
|
||||
return "MAXPacket: len={}, counter={}, flag={}, type={}, sender_addr={}, dest_addr={}, group_id={}".format(
|
||||
self.length, self.counter, self.flag, self.type, self.sender_address, self.dest_address, self.group_id
|
||||
)
|
||||
|
||||
|
||||
class MAXPairPingPacket(MAXPacket):
|
||||
def __init__(self, rec: str):
|
||||
self.from_received(rec)
|
||||
firmware_val = int(rec[23:25], 16)
|
||||
self.firmware_major = firmware_val // 16
|
||||
self.firmware_minor = firmware_val % 16
|
||||
self.device_type = rec[25:27]
|
||||
self.test_result = rec[27:29]
|
||||
self.serial = rec[29:-2]
|
||||
|
||||
def to_string(self):
|
||||
result = "{}\nMAXPairPingPacket: firmware_major={} firmware_minor={} device_type={} " \
|
||||
"test_result={} serial={}".format(super().to_string(), self.firmware_major, self.firmware_minor,
|
||||
self.device_type, self.test_result, self.serial)
|
||||
return result
|
||||
|
||||
|
||||
class MAXPairPongPacket(MAXPacket):
|
||||
def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str,
|
||||
group_id: str):
|
||||
self.set_values(message_counter, "01", message_flag, sender_address, dest_address, group_id)
|
||||
self.payload = "00"
|
||||
|
||||
def to_string(self):
|
||||
result = "{}\nMAXPairPongPacket: payload={}".format(super().to_string(), self.payload)
|
||||
return result
|
||||
|
||||
|
||||
class MAXSetGroupIdPacket(MAXPacket):
|
||||
def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str,
|
||||
group_id: str):
|
||||
self.set_values(message_counter, "01", message_flag, sender_address, dest_address, group_id)
|
||||
self.payload = "00"
|
||||
|
||||
def to_string(self):
|
||||
result = "{}\nMAXSetGroupIdPacket: payload={}".format(super().to_string(), self.payload)
|
||||
return result
|
||||
|
||||
|
||||
class MAXAckPacket(MAXPacket):
|
||||
def __init__(self, rec):
|
||||
super().__init__()
|
||||
self.from_received(rec)
|
||||
self.ack = rec[23:25]
|
||||
self.unknown_field = rec[25:27]
|
||||
|
||||
def to_string(self):
|
||||
result = "{}\nMAXAckPacket: ack={} unknown_field={}".format(super().to_string(),
|
||||
self.ack, self.unknown_field)
|
||||
return result
|
||||
|
||||
|
||||
class MAXWakeUpPacket(MAXPacket):
|
||||
def __init__(self, rec):
|
||||
super().__init__()
|
||||
self.from_received(rec)
|
||||
|
||||
|
||||
class MAXResetPacket(MAXPacket):
|
||||
def __init__(self, rec):
|
||||
super().__init__()
|
||||
self.from_received(rec)
|
||||
|
78
MAXPacketHandler.py
Normal file
78
MAXPacketHandler.py
Normal file
@ -0,0 +1,78 @@
|
||||
from MAXCube import MAXCube
|
||||
from MAXPacket import MAXPacket, MAXPacketFactory, MAXPairPingPacket, MAXPairPongPacket, MAXAckPacket, \
|
||||
MAXResetPacket, MAXWakeUpPacket, MAXSetGroupIdPacket
|
||||
|
||||
from time import sleep
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class HandshakeState(Enum):
|
||||
PING_RECEIVED = 1
|
||||
PONG_SENT = 2
|
||||
PONG_ACK = 3
|
||||
GROUP_ID_SENT = 4
|
||||
GROUP_ID_ACK = 5
|
||||
|
||||
|
||||
class Handshake:
|
||||
def __init__(self, partner_addr, dev_type):
|
||||
self.partner_addr = partner_addr
|
||||
self.dev_type = dev_type
|
||||
self.state: HandshakeState = HandshakeState.PING_RECEIVED
|
||||
|
||||
|
||||
class MAXPacketHandler:
|
||||
def __init__(self, cube: MAXCube):
|
||||
self.cube = cube
|
||||
self.quit_flag = False
|
||||
self.handshakes = []
|
||||
|
||||
def handle_msg(self, pkt: MAXPacket):
|
||||
print(pkt.to_string())
|
||||
if pkt.dest_address == self.cube.addr or pkt.dest_address == "000000":
|
||||
if isinstance(pkt, MAXPairPingPacket):
|
||||
handshake = Handshake(pkt.sender_address, pkt.device_type)
|
||||
self.handshakes.append(handshake)
|
||||
pong = MAXPairPongPacket(message_counter="00", message_flag="00", sender_address=self.cube.addr,
|
||||
dest_address=pkt.sender_address, group_id="00")
|
||||
pong_str = pong.serialize()
|
||||
print(pong.to_string())
|
||||
self.cube.request(pong_str)
|
||||
handshake.state = HandshakeState.PONG_SENT
|
||||
elif isinstance(pkt, MAXAckPacket):
|
||||
cur_handshake: Handshake = None
|
||||
for handshake in self.handshakes:
|
||||
if handshake.partner_addr == pkt.sender_address:
|
||||
cur_handshake = handshake
|
||||
break
|
||||
if cur_handshake is not None:
|
||||
if cur_handshake.state == HandshakeState.PONG_SENT:
|
||||
cur_handshake.state = HandshakeState.PONG_ACK
|
||||
print(pkt.to_string())
|
||||
if cur_handshake.dev_type == "05":
|
||||
# Handshake is finished
|
||||
print("Paired device with addr={} and type={}".format(cur_handshake.partner_addr,
|
||||
cur_handshake.dev_type))
|
||||
self.handshakes.remove(cur_handshake)
|
||||
else:
|
||||
set_group_id = MAXSetGroupIdPacket(message_counter="00", message_flag="00", sender_address=self.cube.addr,
|
||||
dest_address=pkt.sender_address, group_id="00")
|
||||
set_group_id_str = set_group_id.serialize()
|
||||
print(set_group_id_str)
|
||||
self.cube.request(set_group_id_str)
|
||||
cur_handshake.state = HandshakeState.GROUP_ID_SENT
|
||||
if cur_handshake.state == HandshakeState.GROUP_ID_SENT:
|
||||
cur_handshake.state = HandshakeState.GROUP_ID_ACK
|
||||
|
||||
else:
|
||||
print("Paket is not ours!")
|
||||
|
||||
def receive_loop(self):
|
||||
while not self.quit_flag and self.cube.is_connected():
|
||||
resp = self.cube.response()
|
||||
if resp is not None and resp[0:1] == "Z":
|
||||
pkt = MAXPacketFactory.create_packet(resp)
|
||||
self.handle_msg(pkt)
|
||||
|
||||
|
||||
sleep(0.1)
|
225
cun.py
225
cun.py
@ -1,225 +0,0 @@
|
||||
from telnetlib import Telnet
|
||||
import serial
|
||||
|
||||
DEBUG = True
|
||||
|
||||
|
||||
class MAXPacketFactory:
|
||||
def create_packet(rec: str):
|
||||
pkt_type = int(rec[7:9], 16)
|
||||
if pkt_type == 0:
|
||||
return MAXPairPingPacket.from_received(rec)
|
||||
elif pkt_type == 1:
|
||||
return MAXPairPongPacket.from_received(rec)
|
||||
elif pkt_type == 2:
|
||||
return MAXAckPacket.from_received(rec)
|
||||
elif pkt_type == 0xF1:
|
||||
return MAXWakeUpPacket.from_received(rec)
|
||||
elif pkt_type == 0xF0:
|
||||
return MAXResetPacket.from_received(rec)
|
||||
else:
|
||||
print("Unknown message type: {}".format(pkt_type))
|
||||
return MAXPacket.from_received(rec)
|
||||
|
||||
|
||||
class MAXPacket:
|
||||
def __init__(self, length, message_counter: str, message_type: str, message_flag: str, sender_address: str, dest_address: str,
|
||||
group_id:str):
|
||||
self.length = length
|
||||
self.counter = message_counter
|
||||
self.type = message_type
|
||||
self.flag = message_flag
|
||||
self.sender_address = sender_address
|
||||
self.dest_address = dest_address
|
||||
self.group_id = group_id
|
||||
|
||||
def from_received(rec: str):
|
||||
return MAXPacket(rec[1:3], rec[3:5], rec[5:7], rec[7:9], rec[9:15], rec[15:21], rec[21:23])
|
||||
|
||||
def gen_header(self):
|
||||
header_str = "{}{}{}{}{}{}".format(self.counter, self.flag, self.type, self.sender_address,
|
||||
self.dest_address, self.group_id)
|
||||
return header_str
|
||||
|
||||
def to_string(self):
|
||||
return "MAXPacket: len={}, counter={}, flag={}, type={}, sender_addr={}, dest_addr={}, group_id={}".format(
|
||||
self.length, self.counter, self.flag, self.type, self.sender_address, self.dest_address, self.group_id
|
||||
)
|
||||
|
||||
|
||||
class MAXPairPingPacket(MAXPacket):
|
||||
def from_received(rec: str):
|
||||
result = MAXPacket.from_received(rec)
|
||||
firmware_val = int(rec[23:25], 16)
|
||||
result.firmware_major = firmware_val // 16
|
||||
result.firmware_minor = firmware_val % 16
|
||||
result.device_type = rec[25:27]
|
||||
result.test_result = rec[27:29]
|
||||
result.serial = rec[29:-2]
|
||||
return result
|
||||
|
||||
def __init__(self, length, message_counter: str, message_type: str, message_flag: str, sender_address: str, dest_address: str,
|
||||
group_id:str, firmware_major, firmware_minor, device_type, test_result, serial):
|
||||
MAXPacket.__init__(length, message_counter, message_type, message_flag, sender_address, dest_address, group_id)
|
||||
self.firmware_major = firmware_major
|
||||
self.firmware_minor = firmware_minor
|
||||
self.device_type = device_type
|
||||
self.test_result = test_result
|
||||
self.serial = serial
|
||||
|
||||
def to_string(self):
|
||||
result = "{}\nMAXPairPingPacket: firmware_major={} firmware_minor={} device_type={} " \
|
||||
"test_result={} serial={}".format(super().to_string(), self.firmware_major, self.firmware_minor,
|
||||
self.device_type, self.test_result, self.serial)
|
||||
return result
|
||||
|
||||
|
||||
class MAXPairPongPacket(MAXPacket):
|
||||
# def __init__(self, rec):
|
||||
# super().__init__(rec)
|
||||
|
||||
def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str,
|
||||
group_id:str):
|
||||
MAXPacket.__init__(message_counter, "01", message_flag, sender_address, dest_address, group_id)
|
||||
self.payload = "00"
|
||||
|
||||
def serialize(self):
|
||||
header = self.gen_header()
|
||||
length = (len(header) + len(self.payload))/2
|
||||
return "Zs{}{}{}".format(length, header, self.payload)
|
||||
|
||||
|
||||
class MAXAckPacket(MAXPacket):
|
||||
def __init__(self, rec):
|
||||
super().__init__(rec)
|
||||
|
||||
|
||||
class MAXWakeUpPacket(MAXPacket):
|
||||
def __init__(self, rec):
|
||||
super().__init__(rec)
|
||||
|
||||
|
||||
class MAXResetPacket(MAXPacket):
|
||||
def __init__(self, rec):
|
||||
super().__init__(rec)
|
||||
|
||||
|
||||
class CUL:
|
||||
def __init__(self):
|
||||
self.client: serial.Serial = None
|
||||
|
||||
def connect(self, port):
|
||||
if self.client is None:
|
||||
self.client = serial.Serial(port, 38400, timeout=1)
|
||||
|
||||
def disconnect(self):
|
||||
if self.client is not None:
|
||||
self.client.close()
|
||||
self.client = None
|
||||
|
||||
def request(self, req):
|
||||
if self.client is not None:
|
||||
self.client.write(req.encode())
|
||||
self.client.write(b"\n")
|
||||
if DEBUG:
|
||||
print(">>> {}".format(req))
|
||||
else:
|
||||
print("Request while not connected!")
|
||||
|
||||
def response(self):
|
||||
if self.client is not None:
|
||||
response = self.client.read(100).decode()
|
||||
if DEBUG:
|
||||
print("<<< {}".format(response))
|
||||
return response
|
||||
else:
|
||||
print("Waiting for response while not connected!")
|
||||
return None
|
||||
|
||||
def version_string(self):
|
||||
self.request("V")
|
||||
return self.response()
|
||||
|
||||
def is_connected(self):
|
||||
return self.client is not None
|
||||
|
||||
def set_moritz_mode(self, moritz_mode_enable):
|
||||
if moritz_mode_enable:
|
||||
self.request("Zr")
|
||||
#return self.response()
|
||||
else:
|
||||
self.request("Zx")
|
||||
|
||||
|
||||
class CUN:
|
||||
def __init__(self):
|
||||
self.client: Telnet = None
|
||||
|
||||
def connect(self, host, ip):
|
||||
if self.client is None:
|
||||
self.client = Telnet(host, ip)
|
||||
|
||||
def disconnect(self):
|
||||
if self.client is not None:
|
||||
self.client.close()
|
||||
self.client = None
|
||||
|
||||
def version_string(self):
|
||||
self.request("V")
|
||||
return self.response()
|
||||
|
||||
def request(self, req):
|
||||
if self.client is not None:
|
||||
self.client.write(req.encode())
|
||||
self.client.write(b"\n")
|
||||
if DEBUG:
|
||||
print(">>> {}".format(req))
|
||||
else:
|
||||
print("Request while not connected!")
|
||||
|
||||
def response(self):
|
||||
if self.client is not None:
|
||||
response = self.client.read_some().decode()
|
||||
if DEBUG:
|
||||
print("<<< {}".format(response))
|
||||
return response
|
||||
else:
|
||||
print("Waiting for response while not connected!")
|
||||
return None
|
||||
|
||||
def set_moritz_mode(self, moritz_mode_enable):
|
||||
if moritz_mode_enable:
|
||||
self.request("Zr")
|
||||
#return self.response()
|
||||
else:
|
||||
self.request("Zx")
|
||||
|
||||
def configure(self, ip, netmask, gateway, ):
|
||||
# ToDo: NYI
|
||||
pass
|
||||
|
||||
def h_request(self):
|
||||
self.request("H?")
|
||||
return self.response()
|
||||
|
||||
def send_moritz(self, packet: MAXPacket):
|
||||
# llnnccttssssssddddddpp...
|
||||
# ll - length
|
||||
# nn - msg counter
|
||||
# cc - control byte
|
||||
# tt - msg type
|
||||
# ss - sender address(3 byte)
|
||||
# dd - destination address(3 byte - 000000 for broadcast)
|
||||
# pp - payload...
|
||||
len_str = packet.length
|
||||
msg_counter_str = packet.ms
|
||||
control_byte_str = "00"
|
||||
message_type_str = "02"
|
||||
sender_addr_str = "000000"
|
||||
dest_addr_str = "000000"
|
||||
payload = "l:"
|
||||
self.request("Zs{}{}{}{}{}{}{}".format(packet.length, packet.message_counter, control_byte_str,
|
||||
message_type_str, sender_addr_str,
|
||||
dest_addr_str, payload))
|
||||
#self.request("l:")
|
||||
return self.response()
|
43
main.py
43
main.py
@ -1,35 +1,12 @@
|
||||
from cun import CUL, MAXPacketFactory, MAXPacket, MAXPairPingPacket, MAXPairPongPacket
|
||||
import time
|
||||
from MAXCube import CUL
|
||||
from MAXPacketHandler import MAXPacketHandler
|
||||
|
||||
DEBUG = True
|
||||
|
||||
if __name__ == '__main__':
|
||||
#cun = CUN()
|
||||
#cun.connect("192.168.0.244", 2323)
|
||||
#cun.request(b"V\n")
|
||||
#print(cun.response())
|
||||
#version = cun.version_string()
|
||||
#print(version)
|
||||
#cun.set_moritz_mode(True)
|
||||
#print(cun.send_moritz())
|
||||
#cun.request("Zsl:")
|
||||
#print(cun.response())
|
||||
#cun.disconnect()
|
||||
|
||||
cul = CUL()
|
||||
cul.connect("COM8")
|
||||
print(cul.version_string())
|
||||
cul.set_moritz_mode(True)
|
||||
while cul.is_connected():
|
||||
resp = cul.response()
|
||||
if resp is not None and resp[0:1] == "Z":
|
||||
pkt = MAXPacketFactory.create_packet(resp)
|
||||
print(pkt.to_string())
|
||||
if isinstance(pkt, MAXPairPingPacket):
|
||||
print("Sending Pong!")
|
||||
pong = MAXPairPongPacket(message_counter="00", message_flag="00", sender_address="FDF7CA",
|
||||
dest_address=pkt.sender_address, group_id="00")
|
||||
pong_str = pong.serialize()
|
||||
cul.request(pong_str)
|
||||
|
||||
time.sleep(1)
|
||||
cul.disconnect()
|
||||
|
||||
cube = CUL("123456")
|
||||
cube.connect("COM11")
|
||||
print(cube.version_string())
|
||||
handler = MAXPacketHandler(cube)
|
||||
handler.receive_loop()
|
||||
cube.disconnect()
|
||||
|
Loading…
Reference in New Issue
Block a user