diff --git a/pybit/_websocket_stream.py b/pybit/_websocket_stream.py index ff9714d..de73c4c 100644 --- a/pybit/_websocket_stream.py +++ b/pybit/_websocket_stream.py @@ -1,3 +1,5 @@ +import os + import websocket import threading import time @@ -5,12 +7,12 @@ from ._http_manager import generate_signature import logging import copy +import sys from uuid import uuid4 from . import _helpers - logger = logging.getLogger(__name__) - +from websocket._exceptions import WebSocketConnectionClosedException SUBDOMAIN_TESTNET = "stream-testnet" SUBDOMAIN_MAINNET = "stream" @@ -22,26 +24,27 @@ class _WebSocketManager: def __init__( - self, - callback_function, - ws_name, - testnet, - domain="", - demo=False, - rsa_authentication=False, - api_key=None, - api_secret=None, - ping_interval=20, - ping_timeout=10, - retries=10, - restart_on_error=True, - trace_logging=False, - private_auth_expire=1, + self, + callback_function, + ws_name, + testnet, + domain="", + demo=False, + rsa_authentication=False, + api_key=None, + api_secret=None, + ping_interval=20, + ping_timeout=10, + retries=10, + restart_on_error=True, + trace_logging=False, + private_auth_expire=1, ): self.testnet = testnet self.domain = domain self.rsa_authentication = rsa_authentication self.demo = demo + self.terminate = False # Set API keys. self.api_key = api_key self.api_secret = api_secret @@ -50,7 +53,7 @@ def __init__( self.ws_name = ws_name if api_key: self.ws_name += " (Auth)" - + # Delta time for private auth expiration in seconds self.private_auth_expire = private_auth_expire @@ -142,7 +145,7 @@ def resubscribe_to_topics(): infinitely_reconnect = False while ( - infinitely_reconnect or retries > 0 + infinitely_reconnect or retries > 0 ) and not self.is_connected(): logger.info(f"WebSocket {self.ws_name} attempting connection...") self.ws = websocket.WebSocketApp( @@ -172,13 +175,17 @@ def resubscribe_to_topics(): break # If connection was not successful, raise error. - if not infinitely_reconnect and retries <= 0: + try: + if not infinitely_reconnect and retries <= 0: + raise websocket.WebSocketTimeoutException( + f"WebSocket {self.ws_name} ({self.endpoint}) connection " + f"failed. Too many connection attempts. pybit will no " + f"longer try to reconnect." + ) + except websocket.WebSocketTimeoutException as error: + logger.error(error) + self.terminate = True self.exit() - raise websocket.WebSocketTimeoutException( - f"WebSocket {self.ws_name} ({self.endpoint}) connection " - f"failed. Too many connection attempts. pybit will no " - f"longer try to reconnect." - ) logger.info(f"WebSocket {self.ws_name} connected") @@ -255,7 +262,13 @@ def _on_pong(self): self._send_custom_ping() def _send_custom_ping(self): - self.ws.send(self.custom_ping_message) + try: + self.ws.send(self.custom_ping_message) + except WebSocketConnectionClosedException as error: + # Logging error and exiting hanging, non-reposing app (Let it fall). + logger.error(f"WebSocket {self.ws_name} not responding, error: {error}") + self.terminate = True + self.exit() def _send_initial_ping(self): """https://github.com/bybit-exchange/pybit/issues/164""" @@ -289,6 +302,11 @@ def exit(self): while self.ws.sock: continue self.exited = True + if self.terminate: + import signal + p_id = os.getpid() + logger.error("Forcing kill after receiving critical error") + os.kill(1, signal.SIGKILL) class _V5WebSocketManager(_WebSocketManager): @@ -368,8 +386,8 @@ def _process_delta_orderbook(self, message, topic): # Make updates according to delta response. book_sides = {"b": message["data"]["b"], "a": message["data"]["a"]} - self.data[topic]["u"]=message["data"]["u"] - self.data[topic]["seq"]=message["data"]["seq"] + self.data[topic]["u"] = message["data"]["u"] + self.data[topic]["seq"] = message["data"]["seq"] for side, entries in book_sides.items(): for entry in entries: @@ -463,8 +481,8 @@ def _process_normal_message(self, message): def _handle_incoming_message(self, message): def is_auth_message(): if ( - message.get("op") == "auth" - or message.get("type") == "AUTH_RESP" + message.get("op") == "auth" + or message.get("type") == "AUTH_RESP" ): return True else: @@ -472,8 +490,8 @@ def is_auth_message(): def is_subscription_message(): if ( - message.get("op") == "subscribe" - or message.get("type") == "COMMAND_RESP" + message.get("op") == "subscribe" + or message.get("type") == "COMMAND_RESP" ): return True else: