import math from enum import Enum class MAXPacketFactory: def create_packet(rec: str): pkt_type = int(rec[7:9], 16) if pkt_type == 0: return MAXPairPingPacket(rec) elif pkt_type == 1: return MAXPairPongPacket(rec) elif pkt_type == 2: return MAXAckPacket(rec) elif pkt_type == 0x22: return MAXSetGroupIdPacket(rec) elif pkt_type == 0xF1: return MAXWakeUpPacket(rec) elif pkt_type == 0xF0: return MAXResetPacket(rec) elif pkt_type == 0x50: return MAXPushButtonPacket(rec) elif pkt_type == 0x60: return MAXThermostatStatePacket(rec) else: print("Unknown message type: {}".format(pkt_type)) result = MAXPacket() result.from_received(rec) return result class MAXPacket: def __init__(self): self.length = 0 self.counter = 0 self.type = 0 self.flag = 0 self.sender_address = 0 self.dest_address = 0 self.group_id = 0 def set_values(self, message_counter: str, message_type: str, message_flag: str, sender_address: str, dest_address: str, group_id: str): # self.length = length self.counter = message_counter self.type = message_type self.flag = message_flag self.sender_address = sender_address self.dest_address = dest_address self.group_id = group_id def from_received(self, rec: str): self.set_values(rec[3:5], rec[5:7], rec[7:9], rec[9:15], rec[15:21], rec[21:23]) self.length = rec[1:3] def gen_header(self): header_str = "{}{}{}{}{}{}".format(self.counter, self.flag, self.type, self.sender_address, self.dest_address, self.group_id) return header_str def serialize(self): header = self.gen_header() length = int(math.ceil((len(header) + len(self.payload)) / 2)) self.length = length return "Zs{0:02X}{1}{2}".format(length, header, self.payload) def to_string(self): return "MAXPacket: len={}, counter={}, flag={}, type={}, sender_addr={}, dest_addr={}, group_id={}".format( self.length, self.counter, self.flag, self.type, self.sender_address, self.dest_address, self.group_id ) class MAXPairPingPacket(MAXPacket): def __init__(self, rec: str): self.from_received(rec) firmware_val = int(rec[23:25], 16) self.firmware_major = firmware_val // 16 self.firmware_minor = firmware_val % 16 self.device_type = rec[25:27] self.test_result = rec[27:29] self.serial = rec[29:-2] def to_string(self): result = "{}\nMAXPairPingPacket: firmware_major={} firmware_minor={} device_type={} " \ "test_result={} serial={}".format(super().to_string(), self.firmware_major, self.firmware_minor, self.device_type, self.test_result, self.serial) return result class MAXPairPongPacket(MAXPacket): def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str, group_id: str): self.set_values(message_counter, "01", message_flag, sender_address, dest_address, group_id) self.payload = "00" def to_string(self): result = "{}\nMAXPairPongPacket: payload={}".format(super().to_string(), self.payload) return result class MAXSetGroupIdPacket(MAXPacket): def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str, group_id: str): self.set_values(message_counter, "22", message_flag, sender_address, dest_address, group_id) self.payload = "01" def to_string(self): result = "{}\nMAXSetGroupIdPacket: payload={}".format(super().to_string(), self.payload) return result class MAXRadiatorControlMode(Enum): AUTO = 0, MANUAL = 1, TEMPORARY = 2, BOOST = 3 class MAXSetTempPacket(MAXPacket): def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str, group_id: str, temp: float, mode: MAXRadiatorControlMode): self.set_values(message_counter, "40", message_flag, sender_address, dest_address, group_id) if temp > 30.5: temp = 30.5 elif temp < 4.5: temp = 4.5 payload_int = round(temp*2).to_bytes(1, 'big')[0] payload_int = payload_int | ((mode.value[0] & 3) << 6) self.payload = format(payload_int, 'x') def to_string(self): result = "{}\nMAXSetTempPacket: payload={}".format(super().to_string(), self.payload) return result class MAXAddLinkPartnerPacket(MAXPacket): def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str, group_id: str, partner_addr: str, partner_type: int): self.set_values(message_counter, "40", message_flag, sender_address, dest_address, group_id) type_str = format(partner_type, 'x').zfill(2) self.payload = "{}{}".format(partner_addr, type_str) def to_string(self): result = "{}\nMAXAddLinkPartnerPacket: payload={}".format(super().to_string(), self.payload) return result class MAXAckPacket(MAXPacket): def __init__(self, rec): super().__init__() self.from_received(rec) self.ack = rec[23:25] self.unknown_field = rec[25:27] def to_string(self): result = "{}\nMAXAckPacket: ack={} unknown_field={}".format(super().to_string(), self.ack, self.unknown_field) return result class MAXThermostatStatePacket(MAXPacket): def __init__(self, rec): super().__init__() self.from_received(rec) payload_1 = int(rec[23:25], 16) payload_2 = int(rec[25:27], 16) payload_3 = int(rec[27:29], 16) payload_4 = int(rec[29:31], 16) payload_5 = int(rec[31:33], 16) ctrl_mode_bits = payload_1 & 3 if ctrl_mode_bits == 0: self.ctrl_mode = MAXRadiatorControlMode.AUTO elif ctrl_mode_bits == 1: self.ctrl_mode = MAXRadiatorControlMode.MANUAL elif ctrl_mode_bits == 2: self.ctrl_mode = MAXRadiatorControlMode.TEMPORARY elif ctrl_mode_bits == 3: self.ctrl_mode = MAXRadiatorControlMode.BOOST else: print("Invalid control mode: {}".format(ctrl_mode_bits)) self.dst_active = (payload_1 & 0b100) >> 2 self.lan_gateway = (payload_1 & 0b1000) >> 3 self.locked = (payload_1 & 0b10000) >> 4 self.rf_error = (payload_1 & 0b100000) >> 5 self.bat_low = (payload_1 & 0b1000000) >> 6 self.valve_pos = payload_2 self.target_temp = (payload_3 & 0x7f) / 2 self.current_temp = payload_4 & 1 self.current_temp = self.current_temp << 8 self.current_temp = self.current_temp | (payload_5 & 0xff) self.current_temp = self.current_temp / 10.0 def to_string(self): result = "{}\nMAXThermostatStatePacket: dst_active={} lan_gateway={} locked={} rf_error={} bat_low={} " \ "valve_pos={} target_temp={} " \ "current_temp={}".format(super().to_string(), self.dst_active, self.lan_gateway, self.locked, self.rf_error, self.bat_low, self.valve_pos, self.target_temp, self.current_temp) return result class MAXCubeAckPacket(MAXPacket): def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str, group_id: str, acknowledged): self.set_values(message_counter, "02", message_flag, sender_address, dest_address, group_id) if acknowledged: self.payload = "01" else: self.payload = "81" def to_string(self): result = "{}\nMAXCubeAckPacket: payload={}".format(super().to_string(), self.payload) return result class PushButtonState(Enum): AUTO = 0 ECO = 1 UNKNOWN = 2 class MAXPushButtonPacket(MAXPacket): def __init__(self, rec: str): self.from_received(rec) if rec[26:27] == "0": self.button_state = PushButtonState.ECO elif rec[26:27] == "1": self.button_state = PushButtonState.AUTO else: self.button_state = PushButtonState.UNKNOWN if rec[23:24] == "5": self.retransmit = 1 else: self.retransmit = 0 def to_string(self): result = "{}\nMAXPushButtonPacket: button_state={} retransmit={}".format(super().to_string(), self.button_state, self.retransmit) return result class MAXWakeUpPacket(MAXPacket): def __init__(self, rec): super().__init__() self.from_received(rec) class MAXResetPacket(MAXPacket): def __init__(self, rec): super().__init__() self.from_received(rec)