Chris
c5a7a8699c
Implement read write of dev file, add example Change debug text Implement devices class with push button and thermostat Implement pairing with thermostats
136 wiersze
7.0 KiB
Python
136 wiersze
7.0 KiB
Python
from typing import List
|
|
from MAXCube import MAXCube
|
|
from MAXPacket import MAXPacket, MAXPacketFactory, MAXPairPingPacket, MAXPairPongPacket, MAXAckPacket, \
|
|
MAXSetGroupIdPacket, MAXPushButtonPacket, MAXCubeAckPacket, MAXSetTempPacket, \
|
|
MAXRadiatorControlMode, MAXAddLinkPartnerPacket
|
|
from MAXDevice import MAXDevice, MAXPushButton, MAXThermostat
|
|
from enum import Enum
|
|
|
|
|
|
class HandshakeState(Enum):
|
|
PING_RECEIVED = 1
|
|
PONG_SENT = 2
|
|
PONG_ACK = 3
|
|
GROUP_ID_SENT = 4
|
|
GROUP_ID_ACK = 5
|
|
CONFIG_TEMP_SENT = 6
|
|
CONFIG_TEMP_ACK = 7
|
|
ADD_LINK_PARTNER_SENT = 8
|
|
ADD_LINK_PARTNER_ACK = 9
|
|
|
|
|
|
class Handshake:
|
|
def __init__(self, partner_addr, dev_type):
|
|
self.partner_addr = partner_addr
|
|
self.dev_type = dev_type
|
|
self.state: HandshakeState = HandshakeState.PING_RECEIVED
|
|
|
|
|
|
class MAXPacketHandler:
|
|
def __init__(self, cube, devices: MAXCube):
|
|
self.cube = cube
|
|
self.quit_flag = False
|
|
self.handshakes = []
|
|
self.devices: List[MAXDevice] = devices
|
|
|
|
def handle_msg(self, pkt: MAXPacket):
|
|
print(pkt.to_string())
|
|
if pkt.dest_address == self.cube.addr or pkt.dest_address == "000000":
|
|
if isinstance(pkt, MAXPairPingPacket):
|
|
handshake = Handshake(pkt.sender_address, pkt.device_type)
|
|
self.handshakes.append(handshake)
|
|
pong = MAXPairPongPacket(message_counter="00", message_flag="00", sender_address=self.cube.addr,
|
|
dest_address=pkt.sender_address, group_id="00")
|
|
pong_str = pong.serialize()
|
|
print(pong.to_string())
|
|
self.cube.request(pong_str)
|
|
handshake.state = HandshakeState.PONG_SENT
|
|
elif isinstance(pkt, MAXAckPacket):
|
|
cur_handshake: Handshake = None
|
|
for handshake in self.handshakes:
|
|
if handshake.partner_addr == pkt.sender_address:
|
|
cur_handshake = handshake
|
|
break
|
|
if cur_handshake is not None:
|
|
if cur_handshake.state == HandshakeState.PONG_SENT:
|
|
cur_handshake.state = HandshakeState.PONG_ACK
|
|
#print(pkt.to_string())
|
|
if cur_handshake.dev_type == "05":
|
|
# Handshake is finished
|
|
print("Paired device with addr={} and type={}".format(cur_handshake.partner_addr,
|
|
cur_handshake.dev_type))
|
|
found = False
|
|
for dev in self.devices:
|
|
if dev.address == cur_handshake.partner_addr:
|
|
found = True
|
|
break
|
|
if not found:
|
|
new_dev = MAXPushButton("PushButton_{}".format(cur_handshake.partner_addr),
|
|
cur_handshake.partner_addr)
|
|
self.devices.append(new_dev)
|
|
self.handshakes.remove(cur_handshake)
|
|
|
|
else:
|
|
set_group_id = MAXSetGroupIdPacket(message_counter="00", message_flag="00", sender_address=self.cube.addr,
|
|
dest_address=pkt.sender_address, group_id="00")
|
|
set_group_id_str = set_group_id.serialize()
|
|
print(set_group_id.to_string())
|
|
self.cube.request(set_group_id_str)
|
|
cur_handshake.state = HandshakeState.GROUP_ID_SENT
|
|
return
|
|
if cur_handshake.state == HandshakeState.GROUP_ID_SENT:
|
|
cur_handshake.state = HandshakeState.GROUP_ID_ACK
|
|
|
|
set_temp = MAXSetTempPacket(message_counter="00", message_flag="00", sender_address=self.cube.addr,
|
|
dest_address=pkt.sender_address, group_id="00", temp=21.5,
|
|
mode=MAXRadiatorControlMode.MANUAL)
|
|
set_temp_str = set_temp.serialize()
|
|
print(set_temp.to_string())
|
|
self.cube.request(set_temp_str)
|
|
|
|
cur_handshake.state = HandshakeState.CONFIG_TEMP_SENT
|
|
return
|
|
if cur_handshake.state == HandshakeState.CONFIG_TEMP_SENT:
|
|
cur_handshake.state = HandshakeState.CONFIG_TEMP_ACK
|
|
|
|
add_link = MAXAddLinkPartnerPacket(message_counter="00", message_flag="00", sender_address=self.cube.addr,
|
|
dest_address=pkt.sender_address, group_id="00", partner_addr=self.cube.addr, partner_type=0)
|
|
add_link_str = add_link.serialize()
|
|
print(add_link.to_string())
|
|
self.cube.request(add_link_str)
|
|
cur_handshake.state = HandshakeState.ADD_LINK_PARTNER_SENT
|
|
return
|
|
if cur_handshake.state == HandshakeState.ADD_LINK_PARTNER_SENT:
|
|
cur_handshake.state = HandshakeState.ADD_LINK_PARTNER_ACK
|
|
found = False
|
|
for dev in self.devices:
|
|
if dev.address == cur_handshake.partner_addr:
|
|
found = True
|
|
break
|
|
if not found:
|
|
new_dev = MAXThermostat("Thermostat_{}".format(cur_handshake.partner_addr),
|
|
cur_handshake.partner_addr)
|
|
self.devices.append(new_dev)
|
|
print("Paired device with addr={} and type={}".format(cur_handshake.partner_addr,
|
|
cur_handshake.dev_type))
|
|
self.handshakes.remove(cur_handshake)
|
|
return
|
|
elif isinstance(pkt, MAXPushButtonPacket):
|
|
dev = None
|
|
for known_dev in self.devices:
|
|
if known_dev.address == pkt.sender_address:
|
|
dev = known_dev
|
|
break
|
|
if dev is not None:
|
|
dev.state = pkt.button_state
|
|
print("Button {} state={}".format(dev.address, dev.state))
|
|
if not pkt.retransmit:
|
|
act_pkt = MAXCubeAckPacket(pkt.counter, "00", self.cube.addr,
|
|
pkt.sender_address, pkt.group_id, True)
|
|
act_str = act_pkt.serialize()
|
|
#print(act_str)
|
|
#self.cube.request(act_str)
|
|
#ToDo: Does not work properly
|
|
else:
|
|
print("Paket is not ours!")
|