Chris
c5a7a8699c
Implement read write of dev file, add example Change debug text Implement devices class with push button and thermostat Implement pairing with thermostats
87 lines
2.3 KiB
Python
87 lines
2.3 KiB
Python
from telnetlib import Telnet
|
|
import serial
|
|
|
|
DEBUG = True
|
|
|
|
|
|
class MAXCube:
|
|
def __init__(self):
|
|
self.client = None
|
|
|
|
def disconnect(self):
|
|
if self.client is not None:
|
|
self.client.close()
|
|
self.client = None
|
|
|
|
def request(self, req):
|
|
if self.client is not None:
|
|
self.client.write(req.encode())
|
|
self.client.write(b"\n")
|
|
if DEBUG:
|
|
print("SENT: {}".format(req))
|
|
else:
|
|
print("Request while not connected!")
|
|
|
|
def set_moritz_mode(self, moritz_mode_enable):
|
|
if moritz_mode_enable:
|
|
self.request("Zr")
|
|
#return self.response()
|
|
else:
|
|
self.request("Zx")
|
|
|
|
def version_string(self):
|
|
self.request("V")
|
|
return self.response()
|
|
|
|
def is_connected(self):
|
|
return self.client is not None
|
|
|
|
|
|
class CUN(MAXCube):
|
|
def __init__(self):
|
|
self.client: Telnet = None
|
|
|
|
def connect(self, host, ip):
|
|
if self.client is None:
|
|
self.client = Telnet(host, ip)
|
|
self.set_moritz_mode(True)
|
|
|
|
def request(self, req):
|
|
if self.client is not None:
|
|
self.client.write(req.encode())
|
|
self.client.write(b"\n")
|
|
if DEBUG:
|
|
print("SENT: {}".format(req))
|
|
else:
|
|
print("Request while not connected!")
|
|
|
|
def response(self):
|
|
if self.client is not None:
|
|
response = self.client.read_some().decode()
|
|
if DEBUG:
|
|
print("RECV: {}".format(response))
|
|
return response
|
|
else:
|
|
print("Waiting for response while not connected!")
|
|
return None
|
|
|
|
|
|
class CUL(MAXCube):
|
|
def __init__(self, addr: str):
|
|
self.client: serial.Serial = None
|
|
self.addr = addr
|
|
|
|
def connect(self, port):
|
|
if self.client is None:
|
|
self.client = serial.Serial(port, 38400, timeout=1)
|
|
self.set_moritz_mode(True)
|
|
|
|
def response(self):
|
|
if self.client is not None:
|
|
response = self.client.read(100).decode()
|
|
if DEBUG:
|
|
print("RECV: {}".format(response))
|
|
return response
|
|
else:
|
|
print("Waiting for response while not connected!")
|
|
return None |