neohubapi/neohubapi/neohub.py

680 lines
22 KiB
Python

# SPDX-FileCopyrightText: 2020-2021 Andrius Štikonas <andrius@stikonas.eu>
# SPDX-FileCopyrightText: 2021 Dave O'Connor <daveoc@google.com>
# SPDX-License-Identifier: LGPL-3.0-or-later
import asyncio
import datetime
import json
import logging
import socket
import ssl
from async_property import async_cached_property
from types import SimpleNamespace
import websockets
from neohubapi.enums import HCMode
from neohubapi.enums import ScheduleFormat
from neohubapi.enums import schedule_format_int_to_enum
from neohubapi.neostat import NeoStat
class Error(Exception):
pass
class NeoHubUsageError(Error):
pass
class NeoHubConnectionError(Error):
pass
class NeoHub:
def __init__(self, host='Neo-Hub', port=4242, request_timeout=60, request_attempts=1, token=None):
self._logger = logging.getLogger('neohub')
self._host = host
self._port = int(port)
self._request_timeout = request_timeout
self._request_attempts = request_attempts
self._token = token
# Sanity checks.
if port not in (4242, 4243):
raise NeoHubConnectionError(
f'Invalid port number ({port}): use 4242 or 4243 instead')
if port == 4243 and token is None:
raise NeoHubConnectionError(
'You must provide a token for a connection on port 4243, or use a legacy connection on port 4242')
if port == 4242 and token is not None:
raise NeoHubConnectionError(
'You have provided a token, so you must use port=4243')
self._websocket = None
async def _send_message(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, message: str):
encoded_message = bytearray(json.dumps(message) + "\0\r", "utf-8")
self._logger.debug(f"Sending message: {encoded_message}")
writer.write(encoded_message)
await writer.drain()
data = await reader.readuntil(b'\0')
writer.close()
await writer.wait_closed()
data = data.strip(b'\0')
return data
async def _send(self, message, expected_reply=None):
last_exception = None
writer = None
for attempt in range(1, self._request_attempts + 1):
try:
if self._token is not None:
# Websocket connection on port 4243, introduced in v3 of the API on 12/12/21
if self._websocket is None or self._websocket.closed:
# The hub uses a locally generated certificate, so
# we must diable hostname checking
context = ssl.SSLContext(check_hostname=False)
uri = f"wss://{self._host}:{self._port}"
self._websocket = await websockets.connect(
uri, ssl=context, open_timeout=self._request_timeout)
self._logger.debug("Websocket connected")
# The message format includes json nested within json nested within json!
# All that appears to matter is the toker and the command
encoded_message = \
r"""{"message_type":"hm_get_command_queue","message":"{\"token\":\"""" + \
self._token + r"""\",\"COMMANDS\":[{\"COMMAND\":\"""" + \
str(message) + r"""\",\"COMMANDID\":1}]}"}"""
self._logger.debug(f"Sending message: {encoded_message}")
await self._websocket.send(encoded_message)
# There appears to be no obvious way to detect an invalid token.
# At present, the hub just seems to forcibly close the connection
# with a 1002 error
result = await asyncio.wait_for(
self._websocket.recv(), timeout=self._request_timeout)
json_string = json.loads(result)['response']
else:
# Legacy connection, on port 4242
self._logger.debug("Using legacy connection")
reader, writer = await asyncio.open_connection(self._host, self._port)
data = await asyncio.wait_for(
self._send_message(reader, writer, message), timeout=self._request_timeout)
json_string = data.decode('utf-8')
self._logger.debug(f"Received message: {json_string}")
reply = json.loads(json_string, object_hook=lambda d: SimpleNamespace(**d))
if expected_reply is None:
return reply
if reply.__dict__ == expected_reply:
return True
self._logger.error(f"[{attempt}] Unexpected reply: {reply} for message: {message}")
except (socket.gaierror, ConnectionRefusedError, websockets.InvalidHandshake) as e:
last_exception = NeoHubConnectionError(e)
self._logger.error(f"[{attempt}] Could not connect to NeoHub at {self._host}: {e}")
except asyncio.TimeoutError as e:
last_exception = e
self._logger.error(f"[{attempt}] Timed out while sending a message to {self._host}")
if self._websocket is not None:
self._websocket.close()
if writer is not None:
writer.close()
except websockets.exceptions.ConnectionClosedError as e:
last_exception = NeoHubConnectionError(e)
self._logger.error(f"[{attempt}] Connection forcibly closed - maybe a bad token?: {e}")
except json.decoder.JSONDecodeError as e:
last_exception = e
self._logger.error(f"[{attempt}] Could not decode JSON: {e}")
# Wait for 1/2 of the timeout value before retrying.
if self._request_attempts > 1 and attempt < self._request_attempts:
await asyncio.sleep(self._request_timeout / 2)
if expected_reply is None and last_exception is not None:
raise last_exception
return False
def _devices_to_device_ids(self, devices: [NeoStat]):
"""
Returns the list of device ids
"""
try:
return [x.device_id for x in devices]
except (TypeError, AttributeError):
raise NeoHubUsageError('devices must be a list of NeoStat objects')
def _devices_to_names(self, devices: [NeoStat]):
"""
Returns the list of device names
"""
try:
return [x.name for x in devices]
except (TypeError, AttributeError):
raise NeoHubUsageError('devices must be a list of NeoStat objects')
async def firmware(self):
"""
NeoHub firmware version
"""
message = {"FIRMWARE": 0}
result = await self._send(message)
firmware_version = int(getattr(result, 'firmware version'))
return firmware_version
async def get_system(self):
"""
Get system wide variables
"""
message = {"GET_SYSTEM": 0}
data = await self._send(message)
data.FORMAT = schedule_format_int_to_enum(data.FORMAT)
data.ALT_TIMER_FORMAT = schedule_format_int_to_enum(data.ALT_TIMER_FORMAT)
return data
@async_cached_property
async def target_temperature_step(self):
"""
Returns Neohub's target temperature step
"""
firmware_version = await self.firmware()
if firmware_version >= 2135:
return 0.5
else:
return 1
async def reset(self):
"""
Reboot neohub
Returns True if Restart is initiated
"""
message = {"RESET": 0}
reply = {"Restarting": 1}
firmware_version = await self.firmware()
result = ""
if firmware_version >= 2027:
result = await self._send(message, reply)
return result
else:
return False
async def set_channel(self, channel: int):
"""
Set ZigBee channel.
Only channels 11, 14, 15, 19, 20, 24, 25 are allowed.
"""
try:
message = {"SET_CHANNEL": int(channel)}
except ValueError:
raise NeoHubUsageError('channel must be a number')
reply = {"result": "Trying to change channel"}
result = await self._send(message, reply)
return result
async def set_temp_format(self, temp_format: str):
"""
Set temperature format to C or F
"""
message = {"SET_TEMP_FORMAT": temp_format}
reply = {"result": f"Temperature format set to {temp_format}"}
result = await self._send(message, reply)
return result
async def set_hc_mode(self, hc_mode: HCMode, devices: [NeoStat]):
"""
Set hc_mode to AUTO or...
"""
names = self._devices_to_names(devices)
message = {"SET_HC_MODE": [hc_mode.value, names]}
reply = {"result": "HC_MODE was set"}
result = await self._send(message, reply)
return result
async def set_format(self, sched_format: ScheduleFormat):
"""
Sets schedule format
Format is specified using ScheduleFormat enum:
"""
if not isinstance(sched_format, ScheduleFormat):
raise NeoHubUsageError('sched_format must be a ScheduleFormat')
message = {"SET_FORMAT": sched_format.value}
reply = {"result": "Format was set"}
result = await self._send(message, reply)
return result
async def set_away(self, state: bool):
"""
Enables away mode for all devices.
Puts thermostats into frost mode and timeclocks are set to off.
Instead of this function it is recommended to use frost on/off commands
List of affected devices can be restricted using GLOBAL_DEV_LIST command
"""
message = {"AWAY_ON" if state else "AWAY_OFF": 0}
reply = {"result": "away on" if state else "away off"}
result = await self._send(message, reply)
return result
async def set_holiday(self, start: datetime.datetime, end: datetime.datetime):
"""
Sets holiday mode.
start: beginning of holiday
end: end of holiday
"""
for datetime_arg in (start, end):
if not isinstance(datetime_arg, datetime.datetime):
raise NeoHubUsageError('start and end must be datetime.datetime objects')
message = {"HOLIDAY": [start.strftime("%H%M%S%d%m%Y"), end.strftime("%H%M%S%d%m%Y")]}
result = await self._send(message)
return result
async def get_holiday(self):
"""
Get list of holidays
start end end times are converted to datetimes
"""
message = {"GET_HOLIDAY": 0}
result = await self._send(message)
result.start = datetime.datetime.strptime(
result.start.strip(), "%a %b %d %H:%M:%S %Y") if result.start else None
result.end = datetime.datetime.strptime(
result.end.strip(), "%a %b %d %H:%M:%S %Y") if result.end else None
return result
async def cancel_holiday(self):
"""
Cancels holidays and returns to normal schedule
"""
message = {"CANCEL_HOLIDAY": 0}
reply = {"result": "holiday cancelled"}
result = await self._send(message, reply)
return result
async def get_devices(self):
"""
Returns list of devices
{"result": ["device1"]}
"""
message = {"GET_DEVICES": 0}
result = await self._send(message)
return result
async def get_device_list(self, zone: str):
"""
Returns list of devices associated with zone
"""
message = {"GET_DEVICE_LIST": zone}
result = await self._send(message)
if 'error' in result:
return False
else:
return result[zone]
async def devices_sn(self):
"""
Returns serial numbers of attached devices
{'name': [id, 'serial', 1], ...}
"""
message = {"DEVICES_SN": 0}
result = await self._send(message)
return result
async def set_ntp(self, state: bool):
"""
Enables NTP client on Neohub
"""
message = {"NTP_ON" if state else "NTP_OFF": 0}
reply = {"result": "ntp client started" if state else "ntp client stopped"}
result = await self._send(message, reply)
return result
async def set_date(self, date: datetime.datetime = datetime.datetime.today()):
"""
Sets current date
By default, set to current date. Can be optionally passed datetime.datetime object
"""
message = {"SET_DATE": [date.year, date.month, date.day]}
reply = {"result": "Date is set"}
result = await self._send(message, reply)
return result
async def set_time(self, time: datetime.datetime = datetime.datetime.now()):
"""
Sets current time
By default, set to current time. Can be optionally passed datetime.datetime object
"""
message = {"SET_TIME": [time.hour, time.minute]}
reply = {"result": "time set"}
result = await self._send(message, reply)
return result
async def set_datetime(self, date_time: datetime.datetime = datetime.datetime.now()):
"""
Convenience method to set both date and time
"""
result = await self.set_date(date_time)
if result:
result = await self.set_time(date_time)
return result
async def manual_dst(self, state: bool):
"""
Manually enables/disables daylight saving time
"""
message = {"MANUAL_DST": int(state)}
reply = {"result": "Updated time"}
result = await self._send(message, reply)
return result
async def set_dst(self, state: bool, region: str = None):
"""
Enables/disables automatic DST handling.
By default it uses UK dates for turning DST on/off.
Available options for region are UK, EU, NZ.
"""
message = {"DST_ON" if state else "DST_OFF": 0 if region is None else region}
reply = {"result": "dst on" if state else "dst off"}
valid_timezones = ["UK", "EU", "NZ"]
if region not in valid_timezones:
raise NeoHubUsageError(f'region must be in {valid_timezones}')
result = await self._send(message, reply)
return result
async def identify(self):
"""
Flashes red LED light
"""
message = {"IDENTIFY": 0}
reply = {"result": "flashing led"}
result = await self._send(message, reply)
return result
async def get_live_data(self):
"""
Returns live data from hub and all devices
"""
message = {"GET_LIVE_DATA": 0}
live_data = await self._send(message)
return live_data
async def permit_join(self, name, timeout_s: int = 120):
"""
Permit new thermostat to join network
name: new zone will be added with this name
timeout: duration of discovery mode in seconds
To actually join network you need to select 01
from the thermostat's setup menu.
"""
message = {"PERMIT_JOIN": [timeout_s, name]}
reply = {"result": "network allows joining"}
result = await self._send(message, reply)
return result
async def set_lock(self, pin: int, devices: [NeoStat]):
"""
PIN locks thermostats
PIN is a four digit number
"""
try:
if pin < 0 or pin > 9999:
return False
except TypeError:
raise NeoHubUsageError('pin must be a number')
pins = []
for x in range(4):
pins.append(pin % 10)
pin = pin // 10
pins.reverse()
names = self._devices_to_names(devices)
message = {"LOCK": [pins, names]}
reply = {"result": "locked"}
result = await self._send(message, reply)
return result
async def unlock(self, devices: [NeoStat]):
"""
Unlocks PIN locked thermostats
"""
names = self._devices_to_names(devices)
message = {"UNLOCK": names}
reply = {"result": "unlocked"}
result = await self._send(message, reply)
return result
async def set_frost(self, state: bool, devices: [NeoStat]):
"""
Enables or disables Frost mode
"""
names = self._devices_to_names(devices)
message = {"FROST_ON" if state else "FROST_OFF": names}
reply = {"result": "frost on" if state else "frost off"}
result = await self._send(message, reply)
return result
async def set_cool_temp(self, temperature: int, devices: [NeoStat]):
"""
Sets the thermostat's cooling temperature i.e. the temperature that will
trigger the thermostat if exceeded. Note that this is only supported on
the HC (Heating/Cooling) thermostats.
The temperature will be reset once next comfort level is reached
"""
names = self._devices_to_names(devices)
message = {"SET_COOL_TEMP": [temperature, names]}
reply = {"result": "temperature was set"}
result = await self._send(message, reply)
return result
async def set_target_temperature(self, temperature: int, devices: [NeoStat]):
"""
Sets the thermostat's temperature
The temperature will be reset once next comfort level is reached
"""
names = self._devices_to_names(devices)
message = {"SET_TEMP": [temperature, names]}
reply = {"result": "temperature was set"}
result = await self._send(message, reply)
return result
async def set_diff(self, switching_differential: int, devices: [NeoStat]):
"""
Sets the thermostat's switching differential
-1: Undocumented option. Seems to set differential to 204.
0: 0.5 degrees
1: 1 degree
2: 2 degrees
3: 3 degrees
"""
names = self._devices_to_names(devices)
message = {"SET_DIFF": [switching_differential, names]}
reply = {"result": "switching differential was set"}
result = await self._send(message, reply)
return result
async def rate_of_change(self, devices: [NeoStat]):
"""
Returns time in minutes required to change temperature by 1 degree
"""
names = self._devices_to_names(devices)
message = {"VIEW_ROC": names}
result = await self._send(message)
return result.__dict__
async def set_timer(self, state: bool, devices: [NeoStat]):
"""
Turns the output of timeclock on or off
This function works only with NeoPlugs and does not work on
NeoStats that are in timeclock mode.
"""
names = self._devices_to_names(devices)
message = {"TIMER_ON" if state else "TIMER_OFF": names}
reply = {"result": "timers on" if state else "timers off"}
result = await self._send(message, reply)
return result
async def set_manual(self, state: bool, devices: [NeoStat]):
"""
Controls the timeclock built into the neoplug, can be enabled and disabled to allow for manual operation.
This function works only with NeoPlugs and does not work on NeoStats that are in timeclock mode.
"""
names = self._devices_to_names(devices)
message = {"MANUAL_ON" if state else "MANUAL_OFF": names}
reply = {"result": "manual on" if state else "manual off"}
result = await self._send(message, reply)
return result
async def set_hold(self, temperature: int, hours: int, minutes: int, devices: [NeoStat]):
"""
Tells a thermostat to maintain the current temperature for a fixed time.
"""
names = self._devices_to_names(devices)
ids = self._devices_to_device_ids(devices)
message = {"HOLD": [{"temp": temperature, "hours": hours, "minutes": minutes, "id": f"{ids}"}, names]}
reply = {"result": "temperature on hold"}
result = await self._send(message, reply)
return result
async def set_timer_hold(self, state: bool, minutes: int, devices: [NeoStat]):
"""
Turns the output of timeclock on or off for certain duration
This function works with NeoStats in timeclock mode
"""
names = self._devices_to_names(devices)
message = {"TIMER_HOLD_ON" if state else "TIMER_HOLD_OFF": [minutes, names]}
reply = {"result": "timer hold on" if state else "timer hold off"}
result = await self._send(message, reply)
return result
async def get_engineers(self):
"""
Get engineers data
"""
message = {"GET_ENGINEERS": 0}
data = await self._send(message)
return data
async def get_devices_data(self):
"""
Returns live data from hub and all devices
"""
# Get live data from Neohub
live_hub_data = await self.get_live_data()
# Get the engineers data from the Neohub for things like Device Type, Sensor Mode etc.
eng_hub_data = await self.get_engineers()
# Remove the devices node from the hub_data.
devices = live_hub_data.devices
delattr(live_hub_data, "devices")
neo_devices = []
# Iterate through all devices in list returned by the get_live_data and enrich this object with the attributes
# from engineers data.
for device in devices:
# Find matching device ID in Engineers data
for engineers_data_key, engineers_data_value in eng_hub_data.__dict__.items():
# If we found a matching device ID combine the engineering data into the live data
if device.DEVICE_ID == engineers_data_value.DEVICE_ID:
for x in engineers_data_value.__dict__.items():
# FLOOR_LIMIT is a duplicate key, do not overwrite this.
if x[0] == "FLOOR_LIMIT":
setattr(device, "ENG_FLOOR_LIMIT", x[1])
else:
setattr(device, x[0], x[1])
for neo_device in devices:
neo_devices.append(NeoStat(self, neo_device))
devices = {
'neo_devices': neo_devices
}
return devices