Skip to content

Commit bd411f9

Browse files
committed
Pipeline objects are now executed atomically via the MULTI and EXEC commands
1 parent 708c458 commit bd411f9

File tree

2 files changed

+87
-13
lines changed

2 files changed

+87
-13
lines changed

redis/client.py

+57-13
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ def execute_command(self, command_name, command, **options):
225225
self.connection.disconnect()
226226
return self._execute_command(command_name, command, **options)
227227

228-
def _parse_response(self, command_name):
228+
def _parse_response(self, command_name, catch_errors):
229229
conn = self.connection
230230
response = conn.read().strip()
231231
if not response:
@@ -261,13 +261,27 @@ def _parse_response(self, command_name):
261261
length = int(response)
262262
if length == -1:
263263
return None
264-
return [self._parse_response(command_name) for i in range(length)]
264+
if not catch_errors:
265+
return [self._parse_response(command_name, catch_errors)
266+
for i in range(length)]
267+
else:
268+
# for pipelines, we need to read everything, including response errors.
269+
# otherwise we'd completely mess up the receive buffer
270+
data = []
271+
for i in range(length):
272+
try:
273+
data.append(
274+
self._parse_response(command_name, catch_errors)
275+
)
276+
except Exception, e:
277+
data.append(e)
278+
return data
265279

266280
raise InvalidResponse("Unknown response type for: %s" % command_name)
267281

268-
def parse_response(self, command_name, **options):
282+
def parse_response(self, command_name, catch_errors=False, **options):
269283
"Parses a response from the Redis server"
270-
response = self._parse_response(command_name)
284+
response = self._parse_response(command_name, catch_errors)
271285
if command_name in self.RESPONSE_CALLBACKS:
272286
return self.RESPONSE_CALLBACKS[command_name](response, **options)
273287
return response
@@ -886,16 +900,27 @@ class Pipeline(Redis):
886900
in one transmission. This is convenient for batch processing, such as
887901
saving all the values in a list to Redis.
888902
889-
Note that pipelining does *not* guarantee all the commands will be executed
890-
together atomically, nor does it guarantee any transactional consistency.
891-
If the third command in the batch fails, the first two will still have been
892-
executed and "committed"
903+
All commands executed within a pipeline are wrapped with MULTI and EXEC
904+
calls. This guarantees all commands executed in the pipeline will be
905+
executed atomically.
906+
907+
Any command raising an exception does *not* halt the execution of
908+
subsequent commands in the pipeline. Instead, the exception is caught
909+
and its instance is placed into the response list returned by execute().
910+
Code iterating over the response list should be able to deal with an
911+
instance of an exception as a potential value. In general, these will be
912+
ResponseError exceptions, such as those raised when issuing a command
913+
on a key of a different datatype.
893914
"""
894915
def __init__(self, connection, charset, errors):
895916
self.connection = connection
896-
self.command_stack = []
897917
self.encoding = charset
898918
self.errors = errors
919+
self.reset()
920+
921+
def reset(self):
922+
self.command_stack = []
923+
self.format_inline('MULTI')
899924

900925
def execute_command(self, command_name, command, **options):
901926
"""
@@ -921,15 +946,34 @@ def execute_command(self, command_name, command, **options):
921946
return self
922947

923948
def _execute(self, commands):
924-
for _, command, options in commands:
949+
for name, command, options in commands:
925950
self.connection.send(command, self)
926-
return [self.parse_response(name, **options)
927-
for name, _, options in commands]
951+
# we only care about the last item in the response, which should be
952+
# the EXEC command
953+
for i in range(len(commands)-1):
954+
_ = self.parse_response('_')
955+
# tell the response parse to catch errors and return them as
956+
# part of the response
957+
response = self.parse_response('_', catch_errors=True)
958+
# don't return the results of the MULTI or EXEC command
959+
commands = [(c[0], c[2]) for c in commands[1:-1]]
960+
if len(response) != len(commands):
961+
raise ResponseError("Wrong number of response items from "
962+
"pipline execution")
963+
# Run any callbacks for the commands run in the pipeline
964+
data = []
965+
for r, cmd in zip(response, commands):
966+
if not isinstance(r, Exception):
967+
if cmd[0] in self.RESPONSE_CALLBACKS:
968+
r = self.RESPONSE_CALLBACKS[cmd[0]](r, **cmd[1])
969+
data.append(r)
970+
return data
928971

929972
def execute(self):
930973
"Execute all the commands in the current pipeline"
974+
self.format_inline('EXEC')
931975
stack = self.command_stack
932-
self.command_stack = []
976+
self.reset()
933977
try:
934978
return self._execute(stack)
935979
except ConnectionError:

tests/pipeline.py

+30
Original file line numberDiff line numberDiff line change
@@ -23,3 +23,33 @@ def test_pipeline(self):
2323
[('z1', 2.0), ('z2', 4)]
2424
]
2525
)
26+
27+
def test_invalid_command_in_pipeline(self):
28+
# all commands but the invalid one should be excuted correctly
29+
self.client['c'] = 'a'
30+
pipe = self.client.pipeline()
31+
pipe.set('a', 1).set('b', 2).lpush('c', 3).set('d', 4)
32+
result = pipe.execute()
33+
34+
self.assertEquals(result[0], True)
35+
self.assertEquals(self.client['a'], '1')
36+
self.assertEquals(result[1], True)
37+
self.assertEquals(self.client['b'], '2')
38+
# we can't lpush to a key that's a string value, so this should
39+
# be a ResponseError exception
40+
self.assert_(isinstance(result[2], redis.ResponseError))
41+
self.assertEquals(self.client['c'], 'a')
42+
self.assertEquals(result[3], True)
43+
self.assertEquals(self.client['d'], '4')
44+
45+
# make sure the pipe was restored to a working state
46+
self.assertEquals(pipe.set('z', 'zzz').execute(), [True])
47+
self.assertEquals(self.client['z'], 'zzz')
48+
49+
def test_pipe_cannot_select(self):
50+
pipe = self.client.pipeline()
51+
self.assertRaises(redis.RedisError,
52+
pipe.select, 'localhost', 6379, db=9)
53+
54+
55+

0 commit comments

Comments
 (0)