This commit is contained in:
Mehmet Can 2023-11-21 18:45:04 +00:00
parent 2305ac05bd
commit 91c95149a3
4 changed files with 174 additions and 99 deletions

View File

@ -34,7 +34,7 @@ class NeoHub:
def __init__(self, host='Neo-Hub', port=4242, request_timeout=60, request_attempts=1, token=None): def __init__(self, host='Neo-Hub', port=4242, request_timeout=60, request_attempts=1, token=None):
self._logger = logging.getLogger('neohub') self._logger = logging.getLogger('neohub')
self._host = host self._host = host
self._port = port self._port = int(port)
self._request_timeout = request_timeout self._request_timeout = request_timeout
self._request_attempts = request_attempts self._request_attempts = request_attempts
self._token = token self._token = token
@ -67,7 +67,7 @@ class NeoHub:
async def _send(self, message, expected_reply=None): async def _send(self, message, expected_reply=None):
last_exception = None last_exception = None
writer = None writer = None
for attempt in range(1, self._request_attempts+1): for attempt in range(1, self._request_attempts + 1):
try: try:
if self._token is not None: if self._token is not None:
# Websocket connection on port 4243, introduced in v3 of the API on 12/12/21 # Websocket connection on port 4243, introduced in v3 of the API on 12/12/21
@ -98,7 +98,7 @@ class NeoHub:
self._logger.debug("Using legacy connection") self._logger.debug("Using legacy connection")
reader, writer = await asyncio.open_connection(self._host, self._port) reader, writer = await asyncio.open_connection(self._host, self._port)
data = await asyncio.wait_for( data = await asyncio.wait_for(
self._send_message(reader, writer, message), timeout=self._request_timeout) self._send_message(reader, writer, message), timeout=self._request_timeout)
json_string = data.decode('utf-8') json_string = data.decode('utf-8')
self._logger.debug(f"Received message: {json_string}") self._logger.debug(f"Received message: {json_string}")
@ -133,6 +133,15 @@ class NeoHub:
raise last_exception raise last_exception
return False return False
def _devices_to_device_ids(self, devices: [NeoStat]):
"""
Returns the list of device ids
"""
try:
return [x.device_id for x in devices]
except (TypeError, AttributeError):
raise NeoHubUsageError('devices must be a list of NeoStat objects')
def _devices_to_names(self, devices: [NeoStat]): def _devices_to_names(self, devices: [NeoStat]):
""" """
Returns the list of device names Returns the list of device names
@ -290,9 +299,9 @@ class NeoHub:
result = await self._send(message) result = await self._send(message)
result.start = datetime.datetime.strptime( result.start = datetime.datetime.strptime(
result.start.strip(), "%a %b %d %H:%M:%S %Y") if result.start else None result.start.strip(), "%a %b %d %H:%M:%S %Y") if result.start else None
result.end = datetime.datetime.strptime( result.end = datetime.datetime.strptime(
result.end.strip(), "%a %b %d %H:%M:%S %Y") if result.end else None result.end.strip(), "%a %b %d %H:%M:%S %Y") if result.end else None
return result return result
async def cancel_holiday(self): async def cancel_holiday(self):
@ -435,44 +444,9 @@ class NeoHub:
""" """
message = {"GET_LIVE_DATA": 0} message = {"GET_LIVE_DATA": 0}
hub_data = await self._send(message) live_data = await self._send(message)
# We need the engineers data to get things like Device Type, Sensor Mode etc. return live_data
get_eng_data_msg = {"GET_ENGINEERS": 0}
eng_hub_data = await self._send(get_eng_data_msg)
devices = hub_data.devices
delattr(hub_data, "devices")
# Combine the live data and engineers data to produce a full dataset to work from.
# Start by working through each device in the devices
for device in devices:
# Find matching device ID in Engineers data
for key, value in eng_hub_data.__dict__.items():
if device.DEVICE_ID == value.DEVICE_ID:
# Now add the engineers data dictionary entries to the existing device dictionary.
for x in value.__dict__.items():
setattr(device, x[0], x[1])
thermostat_list = list(filter(lambda device: hasattr(device, 'THERMOSTAT') and device.THERMOSTAT, devices))
timeclock_list = list(filter(lambda device: hasattr(device, 'TIMECLOCK') and device.TIMECLOCK, devices))
thermostats = []
timeclocks = []
for thermostat in thermostat_list:
thermostats.append(NeoStat(self, thermostat))
for timeclock in timeclock_list:
timeclocks.append(NeoStat(self, timeclock))
devices = {}
devices['thermostats'] = thermostats
devices['timeclocks'] = timeclocks
return hub_data, devices
async def permit_join(self, name, timeout_s: int = 120): async def permit_join(self, name, timeout_s: int = 120):
""" """
@ -615,6 +589,32 @@ class NeoHub:
result = await self._send(message, reply) result = await self._send(message, reply)
return result return result
async def set_manual(self, state: bool, devices: [NeoStat]):
"""
Controls the timeclock built into the neoplug, can be enabled and disabled to allow for manual operation.
This function works only with NeoPlugs and does not work on NeoStats that are in timeclock mode.
"""
names = self._devices_to_names(devices)
message = {"MANUAL_ON" if state else "MANUAL_OFF": names}
reply = {"result": "manual on" if state else "manual off"}
result = await self._send(message, reply)
return result
async def set_hold(self, temperature: int, hours: int, minutes: int, devices: [NeoStat]):
"""
Tells a thermostat to maintain the current temperature for a fixed time.
"""
names = self._devices_to_names(devices)
ids = self._devices_to_device_ids(devices)
message = {"HOLD": [{"temp": temperature, "hours": hours, "minutes": minutes, "id": f"{ids}"}, names]}
reply = {"result": "temperature on hold"}
result = await self._send(message, reply)
return result
async def set_timer_hold(self, state: bool, minutes: int, devices: [NeoStat]): async def set_timer_hold(self, state: bool, minutes: int, devices: [NeoStat]):
""" """
Turns the output of timeclock on or off for certain duration Turns the output of timeclock on or off for certain duration
@ -628,3 +628,52 @@ class NeoHub:
result = await self._send(message, reply) result = await self._send(message, reply)
return result return result
async def get_engineers(self):
"""
Get engineers data
"""
message = {"GET_ENGINEERS": 0}
data = await self._send(message)
return data
async def get_devices_data(self):
"""
Returns live data from hub and all devices
"""
# Get live data from Neohub
live_hub_data = await self.get_live_data()
# Get the engineers data from the Neohub for things like Device Type, Sensor Mode etc.
eng_hub_data = await self.get_engineers()
# Remove the devices node from the hub_data.
devices = live_hub_data.devices
delattr(live_hub_data, "devices")
neo_devices = []
# Iterate through all devices in list returned by the get_live_data and enrich this object with the attributes
# from engineers data.
for device in devices:
# Find matching device ID in Engineers data
for engineers_data_key, engineers_data_value in eng_hub_data.__dict__.items():
# If we found a matching device ID combine the engineering data into the live data
if device.DEVICE_ID == engineers_data_value.DEVICE_ID:
for x in engineers_data_value.__dict__.items():
# FLOOR_LIMIT is a duplicate key, do not overwrite this.
if x[0] == "FLOOR_LIMIT":
setattr(device, "ENG_FLOOR_LIMIT", x[1])
else:
setattr(device, x[0], x[1])
for neo_device in devices:
neo_devices.append(NeoStat(self, neo_device))
devices = {
'neo_devices': neo_devices
}
return devices

View File

@ -23,51 +23,56 @@ class NeoStat(SimpleNamespace):
self._hub = hub self._hub = hub
self._simple_attrs = ( self._simple_attrs = (
'active_level', 'active_level',
'active_profile', 'active_profile',
'available_modes', 'available_modes',
'away', 'away',
'cool_on', 'cool_on',
'cool_temp', 'cool_temp',
'current_floor_temperature', 'current_floor_temperature',
'date', 'date',
'device_id', 'device_id',
'fan_control', 'device_type',
'fan_speed', 'fan_control',
'floor_limit', 'fan_speed',
'hc_mode', 'floor_limit',
'heat_mode', 'hc_mode',
'heat_on', 'heat_mode',
'hold_cool', 'heat_on',
'fan_control', 'fan_control',
'fan_speed', 'fan_speed',
'hc_mode', 'hc_mode',
'heat_mode', 'heat_mode',
'heat_on', 'heat_on',
'hold_cool', 'hold_cool',
'hold_off', 'hold_hours',
'hold_on', 'hold_mins',
'hold_temp', 'hold_off',
'hold_time', # This is updated below. 'hold_on',
'holiday', 'hold_state',
'lock', 'hold_temp',
'low_battery', 'hold_time', # This is updated below.
'manual_off', 'holiday',
'modelock', 'lock',
'modulation_level', 'low_battery',
'offline', 'manual_off',
'pin_number', 'modelock',
'preheat_active', 'modulation_level',
'prg_temp', 'offline',
'prg_timer', 'pin_number',
'standby', 'preheat_active',
'switch_delay_left', # This is updated below. 'prg_temp',
'temporary_set_flag', 'prg_timer',
'time', # This is updated below. 'sensor_mode',
'timer_on', 'standby',
'window_open', 'stat_version',
'write_count' 'switch_delay_left', # This is updated below.
) 'temporary_set_flag',
'time', # This is updated below.
'timer_on',
'window_open',
'write_count'
)
for a in self._simple_attrs: for a in self._simple_attrs:
data_attr = a.upper() data_attr = a.upper()
@ -92,11 +97,31 @@ class NeoStat(SimpleNamespace):
_switch_delay_left = datetime.strptime(self.switch_delay_left, "%H:%M") _switch_delay_left = datetime.strptime(self.switch_delay_left, "%H:%M")
self.switch_delay_left = timedelta( self.switch_delay_left = timedelta(
hours=_switch_delay_left.hour, hours=_switch_delay_left.hour,
minutes=_switch_delay_left.minute) minutes=_switch_delay_left.minute)
_time = datetime.strptime(self.time, "%H:%M") _time = datetime.strptime(self.time, "%H:%M")
self.time = timedelta(hours=_time.hour, minutes=_time.minute) self.time = timedelta(hours=_time.hour, minutes=_time.minute)
# 35 Degrees appears to be the hard limit in app, 5 degrees appears to be the hard lower limit.
self.max_temperature_limit = 35
self.min_temperature_limit = 5
# Ensure that TIMECLOCKS are correctly reported
if hasattr(self._data_, 'TIMECLOCK'):
self.time_clock_mode = True
else:
self.time_clock_mode = False
# Adding an attribute to deal with battery powered or not.
if self.device_type in [2, 5, 13, 14]:
self.battery_powered = True
else:
self.battery_powered = False
if self.device_type == 1:
# We're dealing with a NeoStat V1 there's a known bug in the API, so we must patch the hc_mode
self.hc_mode = "HEATING"
def __str__(self): def __str__(self):
""" """
String representation. String representation.
@ -105,14 +130,14 @@ class NeoStat(SimpleNamespace):
for elem in dir(self): for elem in dir(self):
if not callable(getattr(self, elem)) and not elem.startswith('_'): if not callable(getattr(self, elem)) and not elem.startswith('_'):
data_elem.append(elem) data_elem.append(elem)
out = 'HeatMiser NeoStat (%s):\n' % (self.name) out = f'HeatMiser NeoStat {self.name}:\n'
for elem in data_elem: for elem in data_elem:
out += ' - %s: %s\n' % (elem, getattr(self, elem)) out += f' - {elem}: {getattr(self, elem)}\n'
return out return out
async def identify(self): async def identify(self):
""" """
Flashes red LED light Flashes Devices LED light
""" """
message = {"IDENTIFY_DEV": self.name} message = {"IDENTIFY_DEV": self.name}

View File

@ -35,9 +35,8 @@ class NeoHubCLIArgumentError(Error):
class NeoHubCLI(object): class NeoHubCLI(object):
"""A runner for neohub_cli operations.""" """A runner for neohub_cli operations."""
def __init__(self, command, args, hub_ip=None, hub_token=None): def __init__(self, command, args, hub_ip=None, hub_port=4242, token=None):
hub_port = 4242 if hub_token is None else 4243 self._hub = NeoHub(host=hub_ip, port=hub_port, token=token)
self._hub = NeoHub(host=hub_ip, port=hub_port, token=hub_token)
self._command = command self._command = command
self._args = args self._args = args
# live data cached from the neohub. We assume this data will remain current # live data cached from the neohub. We assume this data will remain current
@ -280,7 +279,9 @@ class NeoHubCLI(object):
async def main(): async def main():
argp = argparse.ArgumentParser(description='CLI to neohub devices') argp = argparse.ArgumentParser(description='CLI to neohub devices')
argp.add_argument('--hub_ip', help='IP address of NeoHub', default=None) argp.add_argument('--hub_ip', help='IP address of NeoHub', default=None)
argp.add_argument('--hub_token', help='API token', default=None) argp.add_argument(
'--hub_port', help='Port number of NeoHub to talk to', default=4242)
argp.add_argument('--token', help='Token', default=None)
argp.add_argument('--format', help='Output format', default='list') argp.add_argument('--format', help='Output format', default='list')
argp.add_argument('command', help='Command to issue') argp.add_argument('command', help='Command to issue')
argp.add_argument('arg', help='Arguments to command', nargs='*') argp.add_argument('arg', help='Arguments to command', nargs='*')
@ -291,8 +292,8 @@ async def main():
args.command, args.command,
args.arg, args.arg,
hub_ip=args.hub_ip, hub_ip=args.hub_ip,
hub_token=args.hub_token, hub_port=int(args.hub_port),
) token=args.token)
m = await nhc.callable() m = await nhc.callable()
if m: if m:
result = await m() result = await m()

View File

@ -11,7 +11,7 @@ with open("README.md", "r", encoding="utf-8") as fh:
setuptools.setup( setuptools.setup(
name="neohubapi", name="neohubapi",
version="1.1", version="2.0",
description="Async library to communicate with Heatmiser NeoHub 2 API.", description="Async library to communicate with Heatmiser NeoHub 2 API.",
url="https://gitlab.com/neohubapi/neohubapi/", url="https://gitlab.com/neohubapi/neohubapi/",
author="Andrius Štikonas", author="Andrius Štikonas",