1
1
"""Cross-interpreter Queues High Level Module."""
2
2
3
+ import pickle
3
4
import queue
4
5
import time
5
6
import weakref
@@ -31,38 +32,47 @@ class QueueFull(_queues.QueueFull, queue.Full):
31
32
"""
32
33
33
34
34
- def create (maxsize = 0 ):
35
+ _SHARED_ONLY = 0
36
+ _PICKLED = 1
37
+
38
+ def create (maxsize = 0 , * , syncobj = False ):
35
39
"""Return a new cross-interpreter queue.
36
40
37
41
The queue may be used to pass data safely between interpreters.
42
+
43
+ "syncobj" sets the default for Queue.put()
44
+ and Queue.put_nowait().
38
45
"""
39
- qid = _queues .create (maxsize )
40
- return Queue (qid )
46
+ fmt = _SHARED_ONLY if syncobj else _PICKLED
47
+ qid = _queues .create (maxsize , fmt )
48
+ return Queue (qid , _fmt = fmt )
41
49
42
50
43
51
def list_all ():
44
52
"""Return a list of all open queues."""
45
- return [Queue (qid )
46
- for qid in _queues .list_all ()]
47
-
53
+ return [Queue (qid , _fmt = fmt )
54
+ for qid , fmt in _queues .list_all ()]
48
55
49
56
50
57
_known_queues = weakref .WeakValueDictionary ()
51
58
52
59
class Queue :
53
60
"""A cross-interpreter queue."""
54
61
55
- def __new__ (cls , id , / ):
62
+ def __new__ (cls , id , / , * , _fmt = None ):
56
63
# There is only one instance for any given ID.
57
64
if isinstance (id , int ):
58
65
id = int (id )
59
66
else :
60
67
raise TypeError (f'id must be an int, got { id !r} ' )
68
+ if _fmt is None :
69
+ _fmt = _queues .get_default_fmt (id )
61
70
try :
62
71
self = _known_queues [id ]
63
72
except KeyError :
64
73
self = super ().__new__ (cls )
65
74
self ._id = id
75
+ self ._fmt = _fmt
66
76
_known_queues [id ] = self
67
77
_queues .bind (id )
68
78
return self
@@ -105,20 +115,50 @@ def qsize(self):
105
115
return _queues .get_count (self ._id )
106
116
107
117
def put (self , obj , timeout = None , * ,
118
+ syncobj = None ,
108
119
_delay = 10 / 1000 , # 10 milliseconds
109
120
):
110
121
"""Add the object to the queue.
111
122
112
123
This blocks while the queue is full.
124
+
125
+ If "syncobj" is None (the default) then it uses the
126
+ queue's default, set with create_queue()..
127
+
128
+ If "syncobj" is false then all objects are supported,
129
+ at the expense of worse performance.
130
+
131
+ If "syncobj" is true then the object must be "shareable".
132
+ Examples of "shareable" objects include the builtin singletons,
133
+ str, and memoryview. One benefit is that such objects are
134
+ passed through the queue efficiently.
135
+
136
+ The key difference, though, is conceptual: the corresponding
137
+ object returned from Queue.get() will be strictly equivalent
138
+ to the given obj. In other words, the two objects will be
139
+ effectively indistinguishable from each other, even if the
140
+ object is mutable. The received object may actually be the
141
+ same object, or a copy (immutable values only), or a proxy.
142
+ Regardless, the received object should be treated as though
143
+ the original has been shared directly, whether or not it
144
+ actually is. That's a slightly different and stronger promise
145
+ than just (initial) equality, which is all "syncobj=False"
146
+ can promise.
113
147
"""
148
+ if syncobj is None :
149
+ fmt = self ._fmt
150
+ else :
151
+ fmt = _SHARED_ONLY if syncobj else _PICKLED
114
152
if timeout is not None :
115
153
timeout = int (timeout )
116
154
if timeout < 0 :
117
155
raise ValueError (f'timeout value must be non-negative' )
118
156
end = time .time () + timeout
157
+ if fmt is _PICKLED :
158
+ obj = pickle .dumps (obj )
119
159
while True :
120
160
try :
121
- _queues .put (self ._id , obj )
161
+ _queues .put (self ._id , obj , fmt )
122
162
except _queues .QueueFull as exc :
123
163
if timeout is not None and time .time () >= end :
124
164
exc .__class__ = QueueFull
@@ -127,9 +167,15 @@ def put(self, obj, timeout=None, *,
127
167
else :
128
168
break
129
169
130
- def put_nowait (self , obj ):
170
+ def put_nowait (self , obj , * , syncobj = None ):
171
+ if syncobj is None :
172
+ fmt = self ._fmt
173
+ else :
174
+ fmt = _SHARED_ONLY if syncobj else _PICKLED
175
+ if fmt is _PICKLED :
176
+ obj = pickle .dumps (obj )
131
177
try :
132
- return _queues .put (self ._id , obj )
178
+ _queues .put (self ._id , obj , fmt )
133
179
except _queues .QueueFull as exc :
134
180
exc .__class__ = QueueFull
135
181
raise # re-raise
@@ -148,12 +194,18 @@ def get(self, timeout=None, *,
148
194
end = time .time () + timeout
149
195
while True :
150
196
try :
151
- return _queues .get (self ._id )
197
+ obj , fmt = _queues .get (self ._id )
152
198
except _queues .QueueEmpty as exc :
153
199
if timeout is not None and time .time () >= end :
154
200
exc .__class__ = QueueEmpty
155
201
raise # re-raise
156
202
time .sleep (_delay )
203
+ else :
204
+ break
205
+ if fmt == _PICKLED :
206
+ obj = pickle .loads (obj )
207
+ else :
208
+ assert fmt == _SHARED_ONLY
157
209
return obj
158
210
159
211
def get_nowait (self ):
0 commit comments