1717
1818import certifi
1919import websockets
20+ import websockets .exceptions
21+ from packaging import version
2022from shinny_structlog import ShinnyLoggerAdapter
2123
2224from tqsdk .datetime import _cst_now
4244"""
4345
4446
47+ websocket_version_ge_14 = version .parse (websockets .__version__ ) >= version .parse ("14.0" )
48+
49+
4550class ReconnectTimer (object ):
4651
4752 def __init__ (self ):
@@ -54,42 +59,17 @@ def set_count(self, count):
5459 self .timer = time .time () + random .uniform (seconds , seconds * 2 )
5560
5661
57- class TqStreamReader (asyncio .StreamReader ):
58-
59- def __init__ (self , * args , ** kwargs ):
60- super (TqStreamReader , self ).__init__ (* args , ** kwargs )
61- self ._start_read_message = None
62- self ._read_size = 0
63-
64- async def readexactly (self , n ):
65- data = await super (TqStreamReader , self ).readexactly (n )
66- if not self ._start_read_message :
67- self ._start_read_message = time .time ()
68- self ._read_size += n
69- return data
70-
71-
72- class TqWebSocketClientProtocol (websockets .WebSocketClientProtocol ):
73-
74- def __init__ (self , * args , ** kwargs ):
75- super (TqWebSocketClientProtocol , self ).__init__ (* args , ** kwargs )
76- self .reader = TqStreamReader (limit = self .read_limit // 2 , loop = self .loop )
77-
78- async def handshake (self , * args , ** kwargs ) -> None :
79- try :
80- await super (TqWebSocketClientProtocol , self ).handshake (* args , ** kwargs )
81- except websockets .exceptions .InvalidStatusCode as e :
82- for h_key , h_value in self .response_headers .items ():
83- if h_key == 'x-shinny-auth-check' and h_value == 'Backtest Permission Denied' :
84- raise TqBacktestPermissionError (
85- "免费账户每日可以回测3次,今日暂无回测权限,需要购买后才能使用。升级网址:https://www.shinnytech.com/tqsdk-buy/" ) from None
86- raise
87-
88- async def read_message (self ):
89- message = await super ().read_message ()
90- self .reader ._start_read_message = None
91- self .reader ._read_size = 0
92- return message
62+ if websocket_version_ge_14 :
63+ # https://websockets.readthedocs.io/en/stable/reference/exceptions.html#module-websockets.exceptions
64+ websocket_expect_exc = (
65+ websockets .exceptions .ConnectionClosedError , websockets .exceptions .InvalidHandshake , websockets .exceptions .InvalidURI ,
66+ websockets .exceptions .InvalidState , websockets .exceptions .ProtocolError
67+ )
68+ else :
69+ websocket_expect_exc = (
70+ websockets .exceptions .ConnectionClosed , websockets .exceptions .InvalidStatusCode , websockets .exceptions .InvalidURI ,
71+ websockets .exceptions .InvalidState , websockets .exceptions .ProtocolError
72+ )
9373
9474
9575class TqConnect (object ):
@@ -117,8 +97,14 @@ async def _run(self, api, url, send_chan, recv_chan):
11797 self ._subscribed_per_seconds = 100 # 每秒 subscribe_quote 请求次数限制
11898 self ._subscribed_queue = Queue (self ._subscribed_per_seconds )
11999
120- self ._keywords ["extra_headers" ] = self ._api ._base_headers
121- self ._keywords ["create_protocol" ] = TqWebSocketClientProtocol
100+ # websockets 14.0版本升级后用法有变化
101+ if websocket_version_ge_14 :
102+ # https://websockets.readthedocs.io/en/stable/howto/upgrade.html#arguments-of-connect
103+ self ._keywords ["additional_headers" ] = self ._api ._base_headers
104+ self ._keywords ["user_agent_header" ] = None # self._api._base_headers 里面已经包含了 "User-Agent"
105+ self ._keywords ["process_exception" ] = lambda exc : exc
106+ else :
107+ self ._keywords ["extra_headers" ] = self ._api ._base_headers
122108 url_info = urlparse (url )
123109 cm = NullContext ()
124110 if url_info .scheme == "wss" :
@@ -198,16 +184,10 @@ async def _run(self, api, url, send_chan, recv_chan):
198184 self ._logger .debug ("websocket received data" , pack = msg )
199185 await recv_chan .send (pack )
200186 finally :
201- self ._logger .debug ("websocket connection info" , current_time = time .time (),
202- start_read_message = client .reader ._start_read_message ,
203- read_size = client .reader ._read_size )
204187 await self ._api ._cancel_task (send_task )
205188 # 希望做到的效果是遇到网络问题可以断线重连, 但是可能抛出的例外太多了(TimeoutError,socket.gaierror等), 又没有文档或工具可以理出 try 代码中所有可能遇到的例外
206189 # 而这里的 except 又需要处理所有子函数及子函数的子函数等等可能抛出的例外, 因此这里只能遇到问题之后再补, 并且无法避免 false positive 和 false negative
207- except (websockets .exceptions .ConnectionClosed , websockets .exceptions .InvalidStatusCode , websockets .exceptions .InvalidURI ,
208- websockets .exceptions .InvalidState , websockets .exceptions .ProtocolError , asyncio .TimeoutError ,
209- OSError , EOFError ,
210- TqBacktestPermissionError ) as e :
190+ except websocket_expect_exc + (asyncio .TimeoutError , OSError , EOFError , TqBacktestPermissionError ) as e :
211191 in_ops_time = _cst_now ().hour == 19 and 0 <= _cst_now ().minute <= 30
212192 # 发送网络连接断开的通知,code = 2019112911
213193 notify_id = _generate_uuid ()
0 commit comments