You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

257 lines
9.1KB

  1. import math
  2. from enum import Enum
  3. class MAXPacketFactory:
  4. def create_packet(rec: str):
  5. pkt_type = int(rec[7:9], 16)
  6. if pkt_type == 0:
  7. return MAXPairPingPacket(rec)
  8. elif pkt_type == 1:
  9. return MAXPairPongPacket(rec)
  10. elif pkt_type == 2:
  11. return MAXAckPacket(rec)
  12. elif pkt_type == 0x22:
  13. return MAXSetGroupIdPacket(rec)
  14. elif pkt_type == 0xF1:
  15. return MAXWakeUpPacket(rec)
  16. elif pkt_type == 0xF0:
  17. return MAXResetPacket(rec)
  18. elif pkt_type == 0x50:
  19. return MAXPushButtonPacket(rec)
  20. elif pkt_type == 0x60:
  21. return MAXThermostatStatePacket(rec)
  22. else:
  23. print("Unknown message type: {}".format(pkt_type))
  24. result = MAXPacket()
  25. result.from_received(rec)
  26. return result
  27. class MAXPacket:
  28. def __init__(self):
  29. self.length = 0
  30. self.counter = 0
  31. self.type = 0
  32. self.flag = 0
  33. self.sender_address = 0
  34. self.dest_address = 0
  35. self.group_id = 0
  36. def set_values(self, message_counter: str, message_type: str, message_flag: str, sender_address: str,
  37. dest_address: str, group_id: str):
  38. # self.length = length
  39. self.counter = message_counter
  40. self.type = message_type
  41. self.flag = message_flag
  42. self.sender_address = sender_address
  43. self.dest_address = dest_address
  44. self.group_id = group_id
  45. def from_received(self, rec: str):
  46. self.set_values(rec[3:5], rec[5:7], rec[7:9], rec[9:15], rec[15:21], rec[21:23])
  47. self.length = rec[1:3]
  48. def gen_header(self):
  49. header_str = "{}{}{}{}{}{}".format(self.counter, self.flag, self.type, self.sender_address,
  50. self.dest_address, self.group_id)
  51. return header_str
  52. def serialize(self):
  53. header = self.gen_header()
  54. length = int(math.ceil((len(header) + len(self.payload)) / 2))
  55. self.length = length
  56. return "Zs{0:02X}{1}{2}".format(length, header, self.payload)
  57. def to_string(self):
  58. return "MAXPacket: len={}, counter={}, flag={}, type={}, sender_addr={}, dest_addr={}, group_id={}".format(
  59. self.length, self.counter, self.flag, self.type, self.sender_address, self.dest_address, self.group_id
  60. )
  61. class MAXPairPingPacket(MAXPacket):
  62. def __init__(self, rec: str):
  63. self.from_received(rec)
  64. firmware_val = int(rec[23:25], 16)
  65. self.firmware_major = firmware_val // 16
  66. self.firmware_minor = firmware_val % 16
  67. self.device_type = rec[25:27]
  68. self.test_result = rec[27:29]
  69. self.serial = rec[29:-2]
  70. def to_string(self):
  71. result = "{}\nMAXPairPingPacket: firmware_major={} firmware_minor={} device_type={} " \
  72. "test_result={} serial={}".format(super().to_string(), self.firmware_major, self.firmware_minor,
  73. self.device_type, self.test_result, self.serial)
  74. return result
  75. class MAXPairPongPacket(MAXPacket):
  76. def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str,
  77. group_id: str):
  78. self.set_values(message_counter, "01", message_flag, sender_address, dest_address, group_id)
  79. self.payload = "00"
  80. def to_string(self):
  81. result = "{}\nMAXPairPongPacket: payload={}".format(super().to_string(), self.payload)
  82. return result
  83. class MAXSetGroupIdPacket(MAXPacket):
  84. def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str,
  85. group_id: str):
  86. self.set_values(message_counter, "22", message_flag, sender_address, dest_address, group_id)
  87. self.payload = "01"
  88. def to_string(self):
  89. result = "{}\nMAXSetGroupIdPacket: payload={}".format(super().to_string(), self.payload)
  90. return result
  91. class MAXRadiatorControlMode(Enum):
  92. AUTO = 0,
  93. MANUAL = 1,
  94. TEMPORARY = 2,
  95. BOOST = 3
  96. class MAXSetTempPacket(MAXPacket):
  97. def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str,
  98. group_id: str, temp: float, mode: MAXRadiatorControlMode):
  99. self.set_values(message_counter, "40", message_flag, sender_address, dest_address, group_id)
  100. if temp > 30.5:
  101. temp = 30.5
  102. elif temp < 4.5:
  103. temp = 4.5
  104. payload_int = round(temp*2).to_bytes(1, 'big')[0]
  105. payload_int = payload_int | ((mode.value[0] & 3) << 6)
  106. self.payload = format(payload_int, 'x')
  107. def to_string(self):
  108. result = "{}\nMAXSetTempPacket: payload={}".format(super().to_string(), self.payload)
  109. return result
  110. class MAXAddLinkPartnerPacket(MAXPacket):
  111. def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str,
  112. group_id: str, partner_addr: str, partner_type: int):
  113. self.set_values(message_counter, "40", message_flag, sender_address, dest_address, group_id)
  114. type_str = format(partner_type, 'x').zfill(2)
  115. self.payload = "{}{}".format(partner_addr, type_str)
  116. def to_string(self):
  117. result = "{}\nMAXAddLinkPartnerPacket: payload={}".format(super().to_string(), self.payload)
  118. return result
  119. class MAXAckPacket(MAXPacket):
  120. def __init__(self, rec):
  121. super().__init__()
  122. self.from_received(rec)
  123. self.ack = rec[23:25]
  124. self.unknown_field = rec[25:27]
  125. def to_string(self):
  126. result = "{}\nMAXAckPacket: ack={} unknown_field={}".format(super().to_string(),
  127. self.ack, self.unknown_field)
  128. return result
  129. class MAXThermostatStatePacket(MAXPacket):
  130. def __init__(self, rec):
  131. super().__init__()
  132. self.from_received(rec)
  133. payload_1 = int(rec[23:25], 16)
  134. payload_2 = int(rec[25:27], 16)
  135. payload_3 = int(rec[27:29], 16)
  136. payload_4 = int(rec[29:31], 16)
  137. payload_5 = int(rec[31:33], 16)
  138. ctrl_mode_bits = payload_1 & 3
  139. if ctrl_mode_bits == 0:
  140. self.ctrl_mode = MAXRadiatorControlMode.AUTO
  141. elif ctrl_mode_bits == 1:
  142. self.ctrl_mode = MAXRadiatorControlMode.MANUAL
  143. elif ctrl_mode_bits == 2:
  144. self.ctrl_mode = MAXRadiatorControlMode.TEMPORARY
  145. elif ctrl_mode_bits == 3:
  146. self.ctrl_mode = MAXRadiatorControlMode.BOOST
  147. else:
  148. print("Invalid control mode: {}".format(ctrl_mode_bits))
  149. self.dst_active = (payload_1 & 0b100) >> 2
  150. self.lan_gateway = (payload_1 & 0b1000) >> 3
  151. self.locked = (payload_1 & 0b10000) >> 4
  152. self.rf_error = (payload_1 & 0b100000) >> 5
  153. self.bat_low = (payload_1 & 0b1000000) >> 6
  154. self.valve_pos = payload_2
  155. self.target_temp = (payload_3 & 0x7f) / 2
  156. self.current_temp = payload_4 & 1
  157. self.current_temp = self.current_temp << 8
  158. self.current_temp = self.current_temp | (payload_5 & 0xff)
  159. self.current_temp = self.current_temp / 10.0
  160. def to_string(self):
  161. result = "{}\nMAXThermostatStatePacket: dst_active={} lan_gateway={} locked={} rf_error={} bat_low={} " \
  162. "valve_pos={} target_temp={} " \
  163. "current_temp={}".format(super().to_string(), self.dst_active, self.lan_gateway,
  164. self.locked, self.rf_error, self.bat_low, self.valve_pos,
  165. self.target_temp, self.current_temp)
  166. return result
  167. class MAXCubeAckPacket(MAXPacket):
  168. def __init__(self, message_counter: str, message_flag: str, sender_address: str, dest_address: str,
  169. group_id: str, acknowledged):
  170. self.set_values(message_counter, "02", message_flag, sender_address, dest_address, group_id)
  171. if acknowledged:
  172. self.payload = "01"
  173. else:
  174. self.payload = "81"
  175. def to_string(self):
  176. result = "{}\nMAXCubeAckPacket: payload={}".format(super().to_string(), self.payload)
  177. return result
  178. class PushButtonState(Enum):
  179. AUTO = 0
  180. ECO = 1
  181. UNKNOWN = 2
  182. class MAXPushButtonPacket(MAXPacket):
  183. def __init__(self, rec: str):
  184. self.from_received(rec)
  185. if rec[26:27] == "0":
  186. self.button_state = PushButtonState.ECO
  187. elif rec[26:27] == "1":
  188. self.button_state = PushButtonState.AUTO
  189. else:
  190. self.button_state = PushButtonState.UNKNOWN
  191. if rec[23:24] == "5":
  192. self.retransmit = 1
  193. else:
  194. self.retransmit = 0
  195. def to_string(self):
  196. result = "{}\nMAXPushButtonPacket: button_state={} retransmit={}".format(super().to_string(),
  197. self.button_state, self.retransmit)
  198. return result
  199. class MAXWakeUpPacket(MAXPacket):
  200. def __init__(self, rec):
  201. super().__init__()
  202. self.from_received(rec)
  203. class MAXResetPacket(MAXPacket):
  204. def __init__(self, rec):
  205. super().__init__()
  206. self.from_received(rec)