|
- from typing import List
- from MAXDevice import MAXDevice, MAXThermostat, MAXPushButton
- from MAXCube import CUL
- from MAXPacketHandler import MAXPacketHandler
- from MAXPacket import MAXPacketFactory
- from time import sleep
- import sys
- import signal
-
- DEBUG = True
-
- devices: List[MAXDevice] = []
- quit_flag = False
- cube: CUL = None
- handler: MAXPacketHandler = None
-
-
- def read_devices(file_name):
- file = open(file_name, "r")
- lines = file.readlines()
- file.close()
-
- for line in lines:
- if not line.startswith("#") and line.strip() != "\n":
- new_dev = MAXDevice.from_string(line)
- if new_dev is not None:
- devices.append(new_dev)
-
-
- def write_devices(file_name):
- file = open(file_name, "w")
- for device in devices:
- dev_str = device.serialize().replace("\n", "")
- file.write(dev_str)
- file.write("\n")
- file.close()
-
-
- def receive_loop():
- global quit_flag
- while not quit_flag and cube.is_connected():
- try:
- resp = cube.response()
- if resp is not None and resp[0:1] == "Z":
- pkt = MAXPacketFactory.create_packet(resp)
- handler.handle_msg(pkt)
- sleep(0.1)
- except KeyboardInterrupt:
- quit_flag = True
- return
-
-
- if __name__ == '__main__':
- read_devices("devices.conf")
- cube = CUL("123456")
- cube.connect("COM10")
- print(cube.version_string())
- handler = MAXPacketHandler(cube, devices)
- signal.signal(signal.SIGINT, signal.default_int_handler)
- receive_loop()
- cube.disconnect()
- write_devices("devices.conf")
|