Source code for binance.ws.threaded_stream

import asyncio
import logging
import threading
from typing import Optional, Dict, Any

from binance.async_client import AsyncClient
from binance.helpers import get_loop


[docs] class ThreadedApiManager(threading.Thread):
[docs] def __init__( self, api_key: Optional[str] = None, api_secret: Optional[str] = None, requests_params: Optional[Dict[str, Any]] = None, tld: str = "com", testnet: bool = False, session_params: Optional[Dict[str, Any]] = None, https_proxy: Optional[str] = None, _loop: Optional[asyncio.AbstractEventLoop] = None, verbose: bool = False, ): """Initialise the ThreadedApiManager :param api_key: Binance API key :param api_secret: Binance API secret :param requests_params: optional - Dictionary of requests params :param tld: optional - Top level domain, default is 'com' :param testnet: optional - Use testnet endpoint :param session_params: optional - Session params for aiohttp :param https_proxy: optional - Proxy URL :param _loop: optional - Event loop :param verbose: Enable verbose logging for WebSocket connections :type verbose: bool """ super().__init__() self._loop: asyncio.AbstractEventLoop = get_loop() if _loop is None else _loop self._client: Optional[AsyncClient] = None self._running: bool = True self._socket_running: Dict[str, bool] = {} self._log = logging.getLogger(__name__) self.verbose = verbose self._client_params = { "api_key": api_key, "api_secret": api_secret, "requests_params": requests_params, "tld": tld, "testnet": testnet, "session_params": session_params, "https_proxy": https_proxy, "verbose": verbose, } if verbose: logging.getLogger('binance.ws').setLevel(logging.DEBUG)
async def _before_socket_listener_start(self): ...
[docs] async def socket_listener(self): try: self._client = await AsyncClient.create(loop=self._loop, **self._client_params) await self._before_socket_listener_start() except Exception as e: self._log.error(f"Failed to create client: {e}") self.stop() while self._running: await asyncio.sleep(0.2) while self._socket_running: await asyncio.sleep(0.2) self._log.info("Socket listener stopped")
[docs] async def start_listener(self, socket, path: str, callback): async with socket as s: while self._socket_running[path]: try: msg = await asyncio.wait_for(s.recv(), 3) except asyncio.TimeoutError: ... continue except Exception as e: self._log.error(f"Error receiving message: {e}") msg = { "e": "error", "type": e.__class__.__name__, "m": f"{e}", } if not msg: continue # Handle both async and sync callbacks if asyncio.iscoroutinefunction(callback): asyncio.create_task(callback(msg)) else: callback(msg) del self._socket_running[path]
[docs] def run(self): self._loop.run_until_complete(self.socket_listener())
[docs] def stop_socket(self, socket_name): if socket_name in self._socket_running: self._socket_running[socket_name] = False
[docs] async def stop_client(self): if not self._client: return await self._client.close_connection()
[docs] def stop(self): self._log.debug("Stopping ThreadedApiManager") if not self._running: return self._running = False if self._client and self._loop and not self._loop.is_closed(): try: future = asyncio.run_coroutine_threadsafe( self.stop_client(), self._loop ) future.result(timeout=5) # Add timeout to prevent hanging except Exception as e: # Log the error but don't raise it self._log.error(f"Error stopping client: {e}") for socket_name in self._socket_running.keys(): self._socket_running[socket_name] = False