From 92df4f9722f488ec84a9886af6c6afd49afc8db9 Mon Sep 17 00:00:00 2001 From: Lawrence Akka Date: Wed, 30 Nov 2022 11:19:56 +0000 Subject: [PATCH] Allow use of new websocket connection method --- neohubapi/neohub.py | 62 ++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 55 insertions(+), 7 deletions(-) diff --git a/neohubapi/neohub.py b/neohubapi/neohub.py index ded2539..477c49f 100644 --- a/neohubapi/neohub.py +++ b/neohubapi/neohub.py @@ -7,8 +7,10 @@ import datetime import json import logging import socket +import ssl from async_property import async_cached_property from types import SimpleNamespace +import websockets from neohubapi.enums import HCMode from neohubapi.enums import ScheduleFormat @@ -29,12 +31,24 @@ class NeoHubConnectionError(Error): class NeoHub: - def __init__(self, host='Neo-Hub', port=4242, request_timeout=60, request_attempts=1): + def __init__(self, host='Neo-Hub', port=4242, request_timeout=60, request_attempts=1, token=None): self._logger = logging.getLogger('neohub') self._host = host self._port = port self._request_timeout = request_timeout self._request_attempts = request_attempts + self._token = token + # Sanity checks. + if port not in (4242, 4243): + raise NeoHubConnectionError( + f'Invalid port number ({port}): use 4242 or 4243 instead') + if port == 4243 and token is None: + raise NeoHubConnectionError( + 'You must provide a token for a connection on port 4243, or use a legacy connection on port 4242') + if port == 4242 and token is not None: + raise NeoHubConnectionError( + 'You have provied a token, so you must use port=4243') + self._websocket = None async def _send_message(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, message: str): encoded_message = bytearray(json.dumps(message) + "\0\r", "utf-8") @@ -52,12 +66,41 @@ class NeoHub: async def _send(self, message, expected_reply=None): last_exception = None + writer = None for attempt in range(1, self._request_attempts+1): try: - reader, writer = await asyncio.open_connection(self._host, self._port) - data = await asyncio.wait_for( - self._send_message(reader, writer, message), timeout=self._request_timeout) - json_string = data.decode('utf-8') + if self._token is not None: + # Websocket connection on port 4243, introduced in v3 of the API on 12/12/21 + if self._websocket is None or self._websocket.closed: + # The hub uses a locally generated certificate, so + # we must diable hostname checking + context = ssl.SSLContext(check_hostname=False) + uri = f"wss://{self._host}:{self._port}" + self._websocket = await websockets.connect( + uri, ssl=context, open_timeout=self._request_timeout) + self._logger.debug("Websocket connected") + # The message format includes json nested within json nested within json! + # All that appears to matter is the toker and the command + encoded_message = \ + r"""{"message_type":"hm_get_command_queue","message":"{\"token\":\"""" + \ + self._token + r"""\",\"COMMANDS\":[{\"COMMAND\":\"""" + \ + str(message) + r"""\",\"COMMANDID\":1}]}"}""" + self._logger.debug(f"Sending message: {encoded_message}") + await self._websocket.send(encoded_message) + # There appears to be no obvious way to detect an invalid token. + # At present, the hub just seems to forcibly close the connection + # with a 1002 error + result = await asyncio.wait_for( + self._websocket.recv(), timeout=self._request_timeout) + json_string = json.loads(result)['response'] + else: + # Legacy connection, on port 4242 + self._logger.debug("Using legacy connection") + reader, writer = await asyncio.open_connection(self._host, self._port) + data = await asyncio.wait_for( + self._send_message(reader, writer, message), timeout=self._request_timeout) + json_string = data.decode('utf-8') + self._logger.debug(f"Received message: {json_string}") reply = json.loads(json_string, object_hook=lambda d: SimpleNamespace(**d)) @@ -66,14 +109,19 @@ class NeoHub: if reply.__dict__ == expected_reply: return True self._logger.error(f"[{attempt}] Unexpected reply: {reply} for message: {message}") - except (socket.gaierror, ConnectionRefusedError) as e: + except (socket.gaierror, ConnectionRefusedError, websockets.InvalidHandshake) as e: last_exception = NeoHubConnectionError(e) self._logger.error(f"[{attempt}] Could not connect to NeoHub at {self._host}: {e}") except asyncio.TimeoutError as e: last_exception = e self._logger.error(f"[{attempt}] Timed out while sending a message to {self._host}") + if self._websocket is not None: + self._websocket.close() if writer is not None: writer.close() + except websockets.exceptions.ConnectionClosedError as e: + last_exception = NeoHubConnectionError(e) + self._logger.error(f"[{attempt}] Connection forcibly closed - maybe a bad token?: {e}") except json.decoder.JSONDecodeError as e: last_exception = e self._logger.error(f"[{attempt}] Could not decode JSON: {e}") @@ -82,7 +130,7 @@ class NeoHub: await asyncio.sleep(self._request_timeout / 2) if expected_reply is None and last_exception is not None: - raise(last_exception) + raise last_exception return False def _devices_to_names(self, devices: [NeoStat]):