OpenHome/MAXPacket.py
2021-09-06 01:30:04 +02:00

172 lines
5.8 KiB
Python

import math
from enum import Enum
class MAXPacketFactory:
def create_packet(rec: str):
pkt_type = int(rec[7:9], 16)
if pkt_type == 0:
return MAXPairPingPacket(rec)
elif pkt_type == 1:
return MAXPairPongPacket(rec)
elif pkt_type == 2:
return MAXAckPacket(rec)
elif pkt_type == 0x22:
return MAXSetGroupIdPacket(rec)
elif pkt_type == 0xF1:
return MAXWakeUpPacket(rec)
elif pkt_type == 0xF0:
return MAXResetPacket(rec)
elif pkt_type == 0x50:
return MAXPushButtonPacket(rec)
else:
print("Unknown message type: {}".format(pkt_type))
result = MAXPacket()
result.from_received(rec)
return result
class MAXPacket:
def __init__(self):
self.length = 0
self.counter = 0
self.type = 0
self.flag = 0
self.sender_address = 0
self.dest_address = 0
self.group_id = 0
def set_values(self, message_counter: str, message_type: str, message_flag: str, sender_address: str,
dest_address: str, group_id: str):
# self.length = length
self.counter = message_counter
self.type = message_type
self.flag = message_flag
self.sender_address = sender_address
self.dest_address = dest_address
self.group_id = group_id
def from_received(self, rec: str):
self.set_values(rec[3:5], rec[5:7], rec[7:9], rec[9:15], rec[15:21], rec[21:23])
self.length = rec[1:3]
def gen_header(self):
header_str = "{}{}{}{}{}{}".format(self.counter, self.flag, self.type, self.sender_address,
self.dest_address, self.group_id)
return header_str
def serialize(self):
header = self.gen_header()
length = int(math.ceil((len(header) + len(self.payload)) / 2))
self.length = length
return "Zs{0:02X}{1}{2}".format(length, header, self.payload)
def to_string(self):
return "MAXPacket: len={}, counter={}, flag={}, type={}, sender_addr={}, dest_addr={}, group_id={}".format(
self.length, self.counter, self.flag, self.type, self.sender_address, self.dest_address, self.group_id
)
class MAXPairPingPacket(MAXPacket):
def __init__(self, rec: str):
self.from_received(rec)
firmware_val = int(rec[23:25], 16)
self.firmware_major = firmware_val // 16
self.firmware_minor = firmware_val % 16
self.device_type = rec[25:27]
self.test_result = rec[27:29]
self.serial = rec[29:-2]
def to_string(self):
result = "{}\nMAXPairPingPacket: firmware_major={} firmware_minor={} device_type={} " \
"test_result={} serial={}".format(super().to_string(), self.firmware_major, self.firmware_minor,
self.device_type, self.test_result, self.serial)
return result
class MAXPairPongPacket(MAXPacket):
def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str,
group_id: str):
self.set_values(message_counter, "01", message_flag, sender_address, dest_address, group_id)
self.payload = "00"
def to_string(self):
result = "{}\nMAXPairPongPacket: payload={}".format(super().to_string(), self.payload)
return result
class MAXSetGroupIdPacket(MAXPacket):
def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str,
group_id: str):
self.set_values(message_counter, "01", message_flag, sender_address, dest_address, group_id)
self.payload = "00"
def to_string(self):
result = "{}\nMAXSetGroupIdPacket: payload={}".format(super().to_string(), self.payload)
return result
class MAXAckPacket(MAXPacket):
def __init__(self, rec):
super().__init__()
self.from_received(rec)
self.ack = rec[23:25]
self.unknown_field = rec[25:27]
def to_string(self):
result = "{}\nMAXAckPacket: ack={} unknown_field={}".format(super().to_string(),
self.ack, self.unknown_field)
return result
class MAXCubeAckPacket(MAXPacket):
def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str,
group_id: str, acknowledged):
self.set_values(message_counter, "01", message_flag, sender_address, dest_address, group_id)
if acknowledged:
self.payload = "01"
else:
self.payload = "81"
def to_string(self):
result = "{}\nMAXCubeAckPacket: payload={}".format(super().to_string(), self.payload)
return result
class PushButtonState(Enum):
AUTO = 0
ECO = 1
UNKNOWN = 2
class MAXPushButtonPacket(MAXPacket):
def __init__(self, rec: str):
self.from_received(rec)
if rec[26:27] == "0":
self.button_state = PushButtonState.ECO
elif rec[26:27] == "1":
self.button_state = PushButtonState.AUTO
else:
self.button_state = PushButtonState.UNKNOWN
if rec[23:24] == "5":
self.retransmit = 1
else:
self.retransmit = 0
def to_string(self):
result = "{}\nMAXPushButtonPacket: button_state={} retransmit={}".format(super().to_string(),
self.button_state, self.retransmit)
return result
class MAXWakeUpPacket(MAXPacket):
def __init__(self, rec):
super().__init__()
self.from_received(rec)
class MAXResetPacket(MAXPacket):
def __init__(self, rec):
super().__init__()
self.from_received(rec)