@@ -55,15 +55,23 @@ def create(*, unbounditems=UNBOUND):
55
55
"""
56
56
unbound = _serialize_unbound (unbounditems )
57
57
unboundop , = unbound
58
- cid = _channels .create (unboundop )
59
- recv , send = RecvChannel (cid ), SendChannel (cid , _unbound = unbound )
58
+ cid = _channels .create (unboundop , - 1 )
59
+ recv , send = RecvChannel (cid ), SendChannel (cid )
60
+ send ._set_unbound (unboundop , unbounditems )
60
61
return recv , send
61
62
62
63
63
64
def list_all ():
64
65
"""Return a list of (recv, send) for all open channels."""
65
- return [(RecvChannel (cid ), SendChannel (cid , _unbound = unbound ))
66
- for cid , unbound in _channels .list_all ()]
66
+ channels = []
67
+ for cid , unboundop , _ in _channels .list_all ():
68
+ chan = _ , send = RecvChannel (cid ), SendChannel (cid )
69
+ if not hasattr (send , '_unboundop' ):
70
+ send ._set_unbound (unboundop )
71
+ else :
72
+ assert send ._unbound [0 ] == op
73
+ channels .append (chan )
74
+ return channels
67
75
68
76
69
77
class _ChannelEnd :
@@ -175,78 +183,95 @@ class SendChannel(_ChannelEnd):
175
183
176
184
_end = 'send'
177
185
178
- def __new__ (cls , cid , * , _unbound = None ):
179
- if _unbound is None :
180
- try :
181
- op = _channels .get_channel_defaults (cid )
182
- _unbound = (op ,)
183
- except ChannelNotFoundError :
184
- _unbound = _serialize_unbound (UNBOUND )
185
- self = super ().__new__ (cls , cid )
186
- self ._unbound = _unbound
187
- return self
186
+ # def __new__(cls, cid, *, _unbound=None):
187
+ # if _unbound is None:
188
+ # try:
189
+ # op = _channels.get_channel_defaults(cid)
190
+ # _unbound = (op,)
191
+ # except ChannelNotFoundError:
192
+ # _unbound = _serialize_unbound(UNBOUND)
193
+ # self = super().__new__(cls, cid)
194
+ # self._unbound = _unbound
195
+ # return self
196
+
197
+ def _set_unbound (self , op , items = None ):
198
+ assert not hasattr (self , '_unbound' )
199
+ if items is None :
200
+ items = _resolve_unbound (op )
201
+ unbound = (op , items )
202
+ self ._unbound = unbound
203
+ return unbound
204
+
205
+ @property
206
+ def unbounditems (self ):
207
+ try :
208
+ _ , items = self ._unbound
209
+ except AttributeError :
210
+ op , _ = _channels .get_queue_defaults (self ._id )
211
+ _ , items = self ._set_unbound (op )
212
+ return items
188
213
189
214
@property
190
215
def is_closed (self ):
191
216
info = self ._info
192
217
return info .closed or info .closing
193
218
194
219
def send (self , obj , timeout = None , * ,
195
- unbound = None ,
220
+ unbounditems = None ,
196
221
):
197
222
"""Send the object (i.e. its data) to the channel's receiving end.
198
223
199
224
This blocks until the object is received.
200
225
"""
201
- if unbound is None :
202
- unboundop , = self . _unbound
226
+ if unbounditems is None :
227
+ unboundop = - 1
203
228
else :
204
- unboundop , = _serialize_unbound (unbound )
229
+ unboundop , = _serialize_unbound (unbounditems )
205
230
_channels .send (self ._id , obj , unboundop , timeout = timeout , blocking = True )
206
231
207
232
def send_nowait (self , obj , * ,
208
- unbound = None ,
233
+ unbounditems = None ,
209
234
):
210
235
"""Send the object to the channel's receiving end.
211
236
212
237
If the object is immediately received then return True
213
238
(else False). Otherwise this is the same as send().
214
239
"""
215
- if unbound is None :
216
- unboundop , = self . _unbound
240
+ if unbounditems is None :
241
+ unboundop = - 1
217
242
else :
218
- unboundop , = _serialize_unbound (unbound )
243
+ unboundop , = _serialize_unbound (unbounditems )
219
244
# XXX Note that at the moment channel_send() only ever returns
220
245
# None. This should be fixed when channel_send_wait() is added.
221
246
# See bpo-32604 and gh-19829.
222
247
return _channels .send (self ._id , obj , unboundop , blocking = False )
223
248
224
249
def send_buffer (self , obj , timeout = None , * ,
225
- unbound = None ,
250
+ unbounditems = None ,
226
251
):
227
252
"""Send the object's buffer to the channel's receiving end.
228
253
229
254
This blocks until the object is received.
230
255
"""
231
- if unbound is None :
232
- unboundop , = self . _unbound
256
+ if unbounditems is None :
257
+ unboundop = - 1
233
258
else :
234
- unboundop , = _serialize_unbound (unbound )
259
+ unboundop , = _serialize_unbound (unbounditems )
235
260
_channels .send_buffer (self ._id , obj , unboundop ,
236
261
timeout = timeout , blocking = True )
237
262
238
263
def send_buffer_nowait (self , obj , * ,
239
- unbound = None ,
264
+ unbounditems = None ,
240
265
):
241
266
"""Send the object's buffer to the channel's receiving end.
242
267
243
268
If the object is immediately received then return True
244
269
(else False). Otherwise this is the same as send().
245
270
"""
246
- if unbound is None :
247
- unboundop , = self . _unbound
271
+ if unbounditems is None :
272
+ unboundop = - 1
248
273
else :
249
- unboundop , = _serialize_unbound (unbound )
274
+ unboundop , = _serialize_unbound (unbounditems )
250
275
return _channels .send_buffer (self ._id , obj , unboundop , blocking = False )
251
276
252
277
def close (self ):
0 commit comments