forked from andrius/neohubapi
Add error checking/exception to connection code.
Add a timeout for async read/write to the stream.
This commit is contained in:
parent
90d5969e53
commit
e769da03f7
|
@ -6,6 +6,7 @@ import asyncio
|
||||||
import datetime
|
import datetime
|
||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import socket
|
||||||
from async_property import async_cached_property
|
from async_property import async_cached_property
|
||||||
from types import SimpleNamespace
|
from types import SimpleNamespace
|
||||||
|
|
||||||
|
@ -22,27 +23,49 @@ class NeoHubUsageError(Error):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class NeoHubConnectionError(Error):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
class NeoHub:
|
class NeoHub:
|
||||||
def __init__(self, host='Neo-Hub', port=4242):
|
def __init__(self, host='Neo-Hub', port=4242, request_timeout=5):
|
||||||
self._logger = logging.getLogger('neohub')
|
self._logger = logging.getLogger('neohub')
|
||||||
self._host = host
|
self._host = host
|
||||||
self._port = port
|
self._port = port
|
||||||
|
self._request_timeout = request_timeout
|
||||||
|
|
||||||
async def _send(self, message, expected_reply=None):
|
async def _send_message(self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, message: str):
|
||||||
reader, writer = await asyncio.open_connection(self._host, self._port)
|
|
||||||
encoded_message = bytearray(json.dumps(message) + "\0\r", "utf-8")
|
encoded_message = bytearray(json.dumps(message) + "\0\r", "utf-8")
|
||||||
self._logger.debug(f"Sending message: {encoded_message}")
|
self._logger.debug(f"Sending message: {encoded_message}")
|
||||||
writer.write(encoded_message)
|
writer.write(encoded_message)
|
||||||
await writer.drain()
|
await writer.drain()
|
||||||
|
|
||||||
data = await reader.readuntil(b'\0')
|
data = await reader.readuntil(b'\0')
|
||||||
data = data.strip(b'\0')
|
|
||||||
json_string = data.decode('utf-8')
|
|
||||||
self._logger.debug(f"Received message: {json_string}")
|
|
||||||
|
|
||||||
writer.close()
|
writer.close()
|
||||||
await writer.wait_closed()
|
await writer.wait_closed()
|
||||||
|
|
||||||
|
data = data.strip(b'\0')
|
||||||
|
return data
|
||||||
|
|
||||||
|
async def _send(self, message, expected_reply=None):
|
||||||
|
try:
|
||||||
|
reader, writer = await asyncio.open_connection(self._host, self._port)
|
||||||
|
except (socket.gaierror, ConnectionRefusedError) as e:
|
||||||
|
err = f'Could not connect to NeoHub at {self._host}: {e}'
|
||||||
|
self._logger.error(err)
|
||||||
|
raise NeoHubConnectionError from e
|
||||||
|
|
||||||
|
try:
|
||||||
|
data = await asyncio.wait_for(
|
||||||
|
self._send_message(reader, writer, message), timeout=self._request_timeout)
|
||||||
|
except asyncio.TimeoutError as e:
|
||||||
|
self._logger.error(f'Timeout talking to NeoHub: {e}')
|
||||||
|
return False
|
||||||
|
|
||||||
|
json_string = data.decode('utf-8')
|
||||||
|
self._logger.debug(f"Received message: {json_string}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
reply = json.loads(json_string, object_hook=lambda d: SimpleNamespace(**d))
|
reply = json.loads(json_string, object_hook=lambda d: SimpleNamespace(**d))
|
||||||
except json.decoder.JSONDecodeError as e:
|
except json.decoder.JSONDecodeError as e:
|
||||||
|
|
Loading…
Reference in New Issue