680 lines
22 KiB
Python
680 lines
22 KiB
Python
# SPDX-FileCopyrightText: 2020-2021 Andrius Štikonas <andrius@stikonas.eu>
|
|
# SPDX-FileCopyrightText: 2021 Dave O'Connor <daveoc@google.com>
|
|
# SPDX-License-Identifier: LGPL-3.0-or-later
|
|
|
|
import asyncio
|
|
import datetime
|
|
import json
|
|
import logging
|
|
import socket
|
|
import ssl
|
|
from async_property import async_cached_property
|
|
from types import SimpleNamespace
|
|
import websockets
|
|
|
|
from neohubapi.enums import HCMode
|
|
from neohubapi.enums import ScheduleFormat
|
|
from neohubapi.enums import schedule_format_int_to_enum
|
|
from neohubapi.neostat import NeoStat
|
|
|
|
|
|
class Error(Exception):
|
|
pass
|
|
|
|
|
|
class NeoHubUsageError(Error):
|
|
pass
|
|
|
|
|
|
class NeoHubConnectionError(Error):
|
|
pass
|
|
|
|
|
|
class NeoHub:
|
|
def __init__(self, host='Neo-Hub', port=4242, request_timeout=60, request_attempts=1, token=None):
|
|
self._logger = logging.getLogger('neohub')
|
|
self._host = host
|
|
self._port = int(port)
|
|
self._request_timeout = request_timeout
|
|
self._request_attempts = request_attempts
|
|
self._token = token
|
|
# Sanity checks.
|
|
if port not in (4242, 4243):
|
|
raise NeoHubConnectionError(
|
|
f'Invalid port number ({port}): use 4242 or 4243 instead')
|
|
if port == 4243 and token is None:
|
|
raise NeoHubConnectionError(
|
|
'You must provide a token for a connection on port 4243, or use a legacy connection on port 4242')
|
|
if port == 4242 and token is not None:
|
|
raise NeoHubConnectionError(
|
|
'You have provided a token, so you must use port=4243')
|
|
self._websocket = None
|
|
|
|
async def _send_message(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, message: str):
|
|
encoded_message = bytearray(json.dumps(message) + "\0\r", "utf-8")
|
|
self._logger.debug(f"Sending message: {encoded_message}")
|
|
writer.write(encoded_message)
|
|
await writer.drain()
|
|
|
|
data = await reader.readuntil(b'\0')
|
|
|
|
writer.close()
|
|
await writer.wait_closed()
|
|
|
|
data = data.strip(b'\0')
|
|
return data
|
|
|
|
async def _send(self, message, expected_reply=None):
|
|
last_exception = None
|
|
writer = None
|
|
for attempt in range(1, self._request_attempts + 1):
|
|
try:
|
|
if self._token is not None:
|
|
# Websocket connection on port 4243, introduced in v3 of the API on 12/12/21
|
|
if self._websocket is None or self._websocket.closed:
|
|
# The hub uses a locally generated certificate, so
|
|
# we must diable hostname checking
|
|
context = ssl.SSLContext(check_hostname=False)
|
|
uri = f"wss://{self._host}:{self._port}"
|
|
self._websocket = await websockets.connect(
|
|
uri, ssl=context, open_timeout=self._request_timeout)
|
|
self._logger.debug("Websocket connected")
|
|
# The message format includes json nested within json nested within json!
|
|
# All that appears to matter is the toker and the command
|
|
encoded_message = \
|
|
r"""{"message_type":"hm_get_command_queue","message":"{\"token\":\"""" + \
|
|
self._token + r"""\",\"COMMANDS\":[{\"COMMAND\":\"""" + \
|
|
str(message) + r"""\",\"COMMANDID\":1}]}"}"""
|
|
self._logger.debug(f"Sending message: {encoded_message}")
|
|
await self._websocket.send(encoded_message)
|
|
# There appears to be no obvious way to detect an invalid token.
|
|
# At present, the hub just seems to forcibly close the connection
|
|
# with a 1002 error
|
|
result = await asyncio.wait_for(
|
|
self._websocket.recv(), timeout=self._request_timeout)
|
|
json_string = json.loads(result)['response']
|
|
else:
|
|
# Legacy connection, on port 4242
|
|
self._logger.debug("Using legacy connection")
|
|
reader, writer = await asyncio.open_connection(self._host, self._port)
|
|
data = await asyncio.wait_for(
|
|
self._send_message(reader, writer, message), timeout=self._request_timeout)
|
|
json_string = data.decode('utf-8')
|
|
|
|
self._logger.debug(f"Received message: {json_string}")
|
|
reply = json.loads(json_string, object_hook=lambda d: SimpleNamespace(**d))
|
|
|
|
if expected_reply is None:
|
|
return reply
|
|
if reply.__dict__ == expected_reply:
|
|
return True
|
|
self._logger.error(f"[{attempt}] Unexpected reply: {reply} for message: {message}")
|
|
except (socket.gaierror, ConnectionRefusedError, websockets.InvalidHandshake) as e:
|
|
last_exception = NeoHubConnectionError(e)
|
|
self._logger.error(f"[{attempt}] Could not connect to NeoHub at {self._host}: {e}")
|
|
except asyncio.TimeoutError as e:
|
|
last_exception = e
|
|
self._logger.error(f"[{attempt}] Timed out while sending a message to {self._host}")
|
|
if self._websocket is not None:
|
|
self._websocket.close()
|
|
if writer is not None:
|
|
writer.close()
|
|
except websockets.exceptions.ConnectionClosedError as e:
|
|
last_exception = NeoHubConnectionError(e)
|
|
self._logger.error(f"[{attempt}] Connection forcibly closed - maybe a bad token?: {e}")
|
|
except json.decoder.JSONDecodeError as e:
|
|
last_exception = e
|
|
self._logger.error(f"[{attempt}] Could not decode JSON: {e}")
|
|
# Wait for 1/2 of the timeout value before retrying.
|
|
if self._request_attempts > 1 and attempt < self._request_attempts:
|
|
await asyncio.sleep(self._request_timeout / 2)
|
|
|
|
if expected_reply is None and last_exception is not None:
|
|
raise last_exception
|
|
return False
|
|
|
|
def _devices_to_device_ids(self, devices: [NeoStat]):
|
|
"""
|
|
Returns the list of device ids
|
|
"""
|
|
try:
|
|
return [x.device_id for x in devices]
|
|
except (TypeError, AttributeError):
|
|
raise NeoHubUsageError('devices must be a list of NeoStat objects')
|
|
|
|
def _devices_to_names(self, devices: [NeoStat]):
|
|
"""
|
|
Returns the list of device names
|
|
"""
|
|
try:
|
|
return [x.name for x in devices]
|
|
except (TypeError, AttributeError):
|
|
raise NeoHubUsageError('devices must be a list of NeoStat objects')
|
|
|
|
async def firmware(self):
|
|
"""
|
|
NeoHub firmware version
|
|
"""
|
|
|
|
message = {"FIRMWARE": 0}
|
|
|
|
result = await self._send(message)
|
|
firmware_version = int(getattr(result, 'firmware version'))
|
|
return firmware_version
|
|
|
|
async def get_system(self):
|
|
"""
|
|
Get system wide variables
|
|
"""
|
|
message = {"GET_SYSTEM": 0}
|
|
|
|
data = await self._send(message)
|
|
data.FORMAT = schedule_format_int_to_enum(data.FORMAT)
|
|
data.ALT_TIMER_FORMAT = schedule_format_int_to_enum(data.ALT_TIMER_FORMAT)
|
|
return data
|
|
|
|
@async_cached_property
|
|
async def target_temperature_step(self):
|
|
"""
|
|
Returns Neohub's target temperature step
|
|
"""
|
|
|
|
firmware_version = await self.firmware()
|
|
if firmware_version >= 2135:
|
|
return 0.5
|
|
else:
|
|
return 1
|
|
|
|
async def reset(self):
|
|
"""
|
|
Reboot neohub
|
|
|
|
Returns True if Restart is initiated
|
|
"""
|
|
|
|
message = {"RESET": 0}
|
|
reply = {"Restarting": 1}
|
|
|
|
firmware_version = await self.firmware()
|
|
result = ""
|
|
if firmware_version >= 2027:
|
|
result = await self._send(message, reply)
|
|
return result
|
|
else:
|
|
return False
|
|
|
|
async def set_channel(self, channel: int):
|
|
"""
|
|
Set ZigBee channel.
|
|
|
|
Only channels 11, 14, 15, 19, 20, 24, 25 are allowed.
|
|
"""
|
|
|
|
try:
|
|
message = {"SET_CHANNEL": int(channel)}
|
|
except ValueError:
|
|
raise NeoHubUsageError('channel must be a number')
|
|
|
|
reply = {"result": "Trying to change channel"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_temp_format(self, temp_format: str):
|
|
"""
|
|
Set temperature format to C or F
|
|
"""
|
|
|
|
message = {"SET_TEMP_FORMAT": temp_format}
|
|
reply = {"result": f"Temperature format set to {temp_format}"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_hc_mode(self, hc_mode: HCMode, devices: [NeoStat]):
|
|
"""
|
|
Set hc_mode to AUTO or...
|
|
"""
|
|
names = self._devices_to_names(devices)
|
|
message = {"SET_HC_MODE": [hc_mode.value, names]}
|
|
reply = {"result": "HC_MODE was set"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_format(self, sched_format: ScheduleFormat):
|
|
"""
|
|
Sets schedule format
|
|
|
|
Format is specified using ScheduleFormat enum:
|
|
"""
|
|
if not isinstance(sched_format, ScheduleFormat):
|
|
raise NeoHubUsageError('sched_format must be a ScheduleFormat')
|
|
|
|
message = {"SET_FORMAT": sched_format.value}
|
|
reply = {"result": "Format was set"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_away(self, state: bool):
|
|
"""
|
|
Enables away mode for all devices.
|
|
|
|
Puts thermostats into frost mode and timeclocks are set to off.
|
|
Instead of this function it is recommended to use frost on/off commands
|
|
|
|
List of affected devices can be restricted using GLOBAL_DEV_LIST command
|
|
"""
|
|
|
|
message = {"AWAY_ON" if state else "AWAY_OFF": 0}
|
|
reply = {"result": "away on" if state else "away off"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_holiday(self, start: datetime.datetime, end: datetime.datetime):
|
|
"""
|
|
Sets holiday mode.
|
|
|
|
start: beginning of holiday
|
|
end: end of holiday
|
|
"""
|
|
for datetime_arg in (start, end):
|
|
if not isinstance(datetime_arg, datetime.datetime):
|
|
raise NeoHubUsageError('start and end must be datetime.datetime objects')
|
|
|
|
message = {"HOLIDAY": [start.strftime("%H%M%S%d%m%Y"), end.strftime("%H%M%S%d%m%Y")]}
|
|
|
|
result = await self._send(message)
|
|
return result
|
|
|
|
async def get_holiday(self):
|
|
"""
|
|
Get list of holidays
|
|
|
|
start end end times are converted to datetimes
|
|
"""
|
|
message = {"GET_HOLIDAY": 0}
|
|
|
|
result = await self._send(message)
|
|
result.start = datetime.datetime.strptime(
|
|
result.start.strip(), "%a %b %d %H:%M:%S %Y") if result.start else None
|
|
result.end = datetime.datetime.strptime(
|
|
result.end.strip(), "%a %b %d %H:%M:%S %Y") if result.end else None
|
|
return result
|
|
|
|
async def cancel_holiday(self):
|
|
"""
|
|
Cancels holidays and returns to normal schedule
|
|
"""
|
|
|
|
message = {"CANCEL_HOLIDAY": 0}
|
|
reply = {"result": "holiday cancelled"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def get_devices(self):
|
|
"""
|
|
Returns list of devices
|
|
|
|
{"result": ["device1"]}
|
|
"""
|
|
|
|
message = {"GET_DEVICES": 0}
|
|
|
|
result = await self._send(message)
|
|
return result
|
|
|
|
async def get_device_list(self, zone: str):
|
|
"""
|
|
Returns list of devices associated with zone
|
|
"""
|
|
|
|
message = {"GET_DEVICE_LIST": zone}
|
|
|
|
result = await self._send(message)
|
|
if 'error' in result:
|
|
return False
|
|
else:
|
|
return result[zone]
|
|
|
|
async def devices_sn(self):
|
|
"""
|
|
Returns serial numbers of attached devices
|
|
|
|
{'name': [id, 'serial', 1], ...}
|
|
"""
|
|
|
|
message = {"DEVICES_SN": 0}
|
|
|
|
result = await self._send(message)
|
|
return result
|
|
|
|
async def set_ntp(self, state: bool):
|
|
"""
|
|
Enables NTP client on Neohub
|
|
"""
|
|
|
|
message = {"NTP_ON" if state else "NTP_OFF": 0}
|
|
reply = {"result": "ntp client started" if state else "ntp client stopped"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_date(self, date: datetime.datetime = datetime.datetime.today()):
|
|
"""
|
|
Sets current date
|
|
|
|
By default, set to current date. Can be optionally passed datetime.datetime object
|
|
"""
|
|
|
|
message = {"SET_DATE": [date.year, date.month, date.day]}
|
|
reply = {"result": "Date is set"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_time(self, time: datetime.datetime = datetime.datetime.now()):
|
|
"""
|
|
Sets current time
|
|
|
|
By default, set to current time. Can be optionally passed datetime.datetime object
|
|
"""
|
|
message = {"SET_TIME": [time.hour, time.minute]}
|
|
reply = {"result": "time set"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_datetime(self, date_time: datetime.datetime = datetime.datetime.now()):
|
|
"""
|
|
Convenience method to set both date and time
|
|
"""
|
|
|
|
result = await self.set_date(date_time)
|
|
if result:
|
|
result = await self.set_time(date_time)
|
|
return result
|
|
|
|
async def manual_dst(self, state: bool):
|
|
"""
|
|
Manually enables/disables daylight saving time
|
|
"""
|
|
|
|
message = {"MANUAL_DST": int(state)}
|
|
reply = {"result": "Updated time"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_dst(self, state: bool, region: str = None):
|
|
"""
|
|
Enables/disables automatic DST handling.
|
|
|
|
By default it uses UK dates for turning DST on/off.
|
|
Available options for region are UK, EU, NZ.
|
|
"""
|
|
|
|
message = {"DST_ON" if state else "DST_OFF": 0 if region is None else region}
|
|
reply = {"result": "dst on" if state else "dst off"}
|
|
|
|
valid_timezones = ["UK", "EU", "NZ"]
|
|
if region not in valid_timezones:
|
|
raise NeoHubUsageError(f'region must be in {valid_timezones}')
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def identify(self):
|
|
"""
|
|
Flashes red LED light
|
|
"""
|
|
|
|
message = {"IDENTIFY": 0}
|
|
reply = {"result": "flashing led"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def get_live_data(self):
|
|
"""
|
|
Returns live data from hub and all devices
|
|
"""
|
|
|
|
message = {"GET_LIVE_DATA": 0}
|
|
live_data = await self._send(message)
|
|
|
|
return live_data
|
|
|
|
async def permit_join(self, name, timeout_s: int = 120):
|
|
"""
|
|
Permit new thermostat to join network
|
|
|
|
name: new zone will be added with this name
|
|
timeout: duration of discovery mode in seconds
|
|
|
|
To actually join network you need to select 01
|
|
from the thermostat's setup menu.
|
|
"""
|
|
|
|
message = {"PERMIT_JOIN": [timeout_s, name]}
|
|
reply = {"result": "network allows joining"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_lock(self, pin: int, devices: [NeoStat]):
|
|
"""
|
|
PIN locks thermostats
|
|
|
|
PIN is a four digit number
|
|
"""
|
|
|
|
try:
|
|
if pin < 0 or pin > 9999:
|
|
return False
|
|
except TypeError:
|
|
raise NeoHubUsageError('pin must be a number')
|
|
|
|
pins = []
|
|
for x in range(4):
|
|
pins.append(pin % 10)
|
|
pin = pin // 10
|
|
pins.reverse()
|
|
|
|
names = self._devices_to_names(devices)
|
|
message = {"LOCK": [pins, names]}
|
|
reply = {"result": "locked"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def unlock(self, devices: [NeoStat]):
|
|
"""
|
|
Unlocks PIN locked thermostats
|
|
"""
|
|
|
|
names = self._devices_to_names(devices)
|
|
message = {"UNLOCK": names}
|
|
reply = {"result": "unlocked"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_frost(self, state: bool, devices: [NeoStat]):
|
|
"""
|
|
Enables or disables Frost mode
|
|
"""
|
|
|
|
names = self._devices_to_names(devices)
|
|
message = {"FROST_ON" if state else "FROST_OFF": names}
|
|
reply = {"result": "frost on" if state else "frost off"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_cool_temp(self, temperature: int, devices: [NeoStat]):
|
|
"""
|
|
Sets the thermostat's cooling temperature i.e. the temperature that will
|
|
trigger the thermostat if exceeded. Note that this is only supported on
|
|
the HC (Heating/Cooling) thermostats.
|
|
|
|
The temperature will be reset once next comfort level is reached
|
|
"""
|
|
|
|
names = self._devices_to_names(devices)
|
|
message = {"SET_COOL_TEMP": [temperature, names]}
|
|
reply = {"result": "temperature was set"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_target_temperature(self, temperature: int, devices: [NeoStat]):
|
|
"""
|
|
Sets the thermostat's temperature
|
|
|
|
The temperature will be reset once next comfort level is reached
|
|
"""
|
|
|
|
names = self._devices_to_names(devices)
|
|
message = {"SET_TEMP": [temperature, names]}
|
|
reply = {"result": "temperature was set"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_diff(self, switching_differential: int, devices: [NeoStat]):
|
|
"""
|
|
Sets the thermostat's switching differential
|
|
|
|
-1: Undocumented option. Seems to set differential to 204.
|
|
0: 0.5 degrees
|
|
1: 1 degree
|
|
2: 2 degrees
|
|
3: 3 degrees
|
|
"""
|
|
|
|
names = self._devices_to_names(devices)
|
|
message = {"SET_DIFF": [switching_differential, names]}
|
|
reply = {"result": "switching differential was set"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def rate_of_change(self, devices: [NeoStat]):
|
|
"""
|
|
Returns time in minutes required to change temperature by 1 degree
|
|
"""
|
|
|
|
names = self._devices_to_names(devices)
|
|
message = {"VIEW_ROC": names}
|
|
|
|
result = await self._send(message)
|
|
return result.__dict__
|
|
|
|
async def set_timer(self, state: bool, devices: [NeoStat]):
|
|
"""
|
|
Turns the output of timeclock on or off
|
|
|
|
This function works only with NeoPlugs and does not work on
|
|
NeoStats that are in timeclock mode.
|
|
"""
|
|
|
|
names = self._devices_to_names(devices)
|
|
message = {"TIMER_ON" if state else "TIMER_OFF": names}
|
|
reply = {"result": "timers on" if state else "timers off"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_manual(self, state: bool, devices: [NeoStat]):
|
|
"""
|
|
Controls the timeclock built into the neoplug, can be enabled and disabled to allow for manual operation.
|
|
This function works only with NeoPlugs and does not work on NeoStats that are in timeclock mode.
|
|
"""
|
|
|
|
names = self._devices_to_names(devices)
|
|
message = {"MANUAL_ON" if state else "MANUAL_OFF": names}
|
|
reply = {"result": "manual on" if state else "manual off"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_hold(self, temperature: int, hours: int, minutes: int, devices: [NeoStat]):
|
|
"""
|
|
Tells a thermostat to maintain the current temperature for a fixed time.
|
|
"""
|
|
|
|
names = self._devices_to_names(devices)
|
|
ids = self._devices_to_device_ids(devices)
|
|
message = {"HOLD": [{"temp": temperature, "hours": hours, "minutes": minutes, "id": f"{ids}"}, names]}
|
|
reply = {"result": "temperature on hold"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def set_timer_hold(self, state: bool, minutes: int, devices: [NeoStat]):
|
|
"""
|
|
Turns the output of timeclock on or off for certain duration
|
|
|
|
This function works with NeoStats in timeclock mode
|
|
"""
|
|
|
|
names = self._devices_to_names(devices)
|
|
message = {"TIMER_HOLD_ON" if state else "TIMER_HOLD_OFF": [minutes, names]}
|
|
reply = {"result": "timer hold on" if state else "timer hold off"}
|
|
|
|
result = await self._send(message, reply)
|
|
return result
|
|
|
|
async def get_engineers(self):
|
|
"""
|
|
Get engineers data
|
|
"""
|
|
message = {"GET_ENGINEERS": 0}
|
|
|
|
data = await self._send(message)
|
|
return data
|
|
|
|
async def get_devices_data(self):
|
|
"""
|
|
Returns live data from hub and all devices
|
|
"""
|
|
|
|
# Get live data from Neohub
|
|
live_hub_data = await self.get_live_data()
|
|
|
|
# Get the engineers data from the Neohub for things like Device Type, Sensor Mode etc.
|
|
eng_hub_data = await self.get_engineers()
|
|
|
|
# Remove the devices node from the hub_data.
|
|
devices = live_hub_data.devices
|
|
delattr(live_hub_data, "devices")
|
|
|
|
neo_devices = []
|
|
|
|
# Iterate through all devices in list returned by the get_live_data and enrich this object with the attributes
|
|
# from engineers data.
|
|
for device in devices:
|
|
# Find matching device ID in Engineers data
|
|
for engineers_data_key, engineers_data_value in eng_hub_data.__dict__.items():
|
|
# If we found a matching device ID combine the engineering data into the live data
|
|
if device.DEVICE_ID == engineers_data_value.DEVICE_ID:
|
|
for x in engineers_data_value.__dict__.items():
|
|
# FLOOR_LIMIT is a duplicate key, do not overwrite this.
|
|
if x[0] == "FLOOR_LIMIT":
|
|
setattr(device, "ENG_FLOOR_LIMIT", x[1])
|
|
else:
|
|
setattr(device, x[0], x[1])
|
|
|
|
for neo_device in devices:
|
|
neo_devices.append(NeoStat(self, neo_device))
|
|
|
|
devices = {
|
|
'neo_devices': neo_devices
|
|
}
|
|
|
|
return devices
|