Skip to content

Fix handling for empty arguments in commands that throw server-side error #941

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 38 additions & 9 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@
SYM_EMPTY = b('')


_empty = object()


def is_empty(cmd):
return len(cmd) >= 2 and cmd[1] == _empty


COMMAND_EMPTY_RETURN_VALUE = {
'MGET': []
}


def list_or_args(keys, args):
# returns a single list combining keys and args
try:
Expand Down Expand Up @@ -742,6 +754,8 @@ def execute_command(self, *args, **options):
"Execute a command and return a parsed response"
pool = self.connection_pool
command_name = args[0]
if is_empty(args):
return COMMAND_EMPTY_RETURN_VALUE[command_name]
connection = pool.get_connection(command_name, **options)
try:
connection.send_command(*args)
Expand Down Expand Up @@ -1116,6 +1130,8 @@ def mget(self, keys, *args):
Returns a list of values ordered identically to ``keys``
"""
args = list_or_args(keys, args)
if not args:
args = [_empty]
return self.execute_command('MGET', *args)

def mset(self, *args, **kwargs):
Expand Down Expand Up @@ -3169,6 +3185,8 @@ def immediate_execute_command(self, *args, **options):
MULTI is called.
"""
command_name = args[0]
if is_empty(args):
return COMMAND_EMPTY_RETURN_VALUE[command_name]
conn = self.connection
# if this is the first call, we need a connection
if not conn:
Expand Down Expand Up @@ -3209,7 +3227,8 @@ def pipeline_execute_command(self, *args, **options):
return self

def _execute_transaction(self, connection, commands, raise_on_error):
cmds = chain([(('MULTI', ), {})], commands, [(('EXEC', ), {})])
non_empty_cmds = [cmd for cmd in commands if not is_empty(cmd[0])]
cmds = chain([(('MULTI', ), {})], non_empty_cmds, [(('EXEC', ), {})])
all_cmds = connection.pack_commands([args for args, _ in cmds])
connection.send_packed_command(all_cmds)
errors = []
Expand All @@ -3223,8 +3242,8 @@ def _execute_transaction(self, connection, commands, raise_on_error):
except ResponseError:
errors.append((0, sys.exc_info()[1]))

# and all the other commands
for i, command in enumerate(commands):
# and all the other non_empty_cmds
for i, command in enumerate(non_empty_cmds):
try:
self.parse_response(connection, '_')
except ResponseError:
Expand All @@ -3249,18 +3268,23 @@ def _execute_transaction(self, connection, commands, raise_on_error):
for i, e in errors:
response.insert(i, e)

if len(response) != len(commands):
if len(response) != len(non_empty_cmds):
self.connection.disconnect()
raise ResponseError("Wrong number of response items from "
"pipeline execution")

# find any errors in the response and raise if necessary
if raise_on_error:
self.raise_first_error(commands, response)
self.raise_first_error(non_empty_cmds, response)

# We have to run response callbacks manually
data = []
for r, cmd in izip(response, commands):
response_iter = iter(response)
for cmd in commands:
if not is_empty(cmd[0]):
r = next(response_iter)
else:
r = COMMAND_EMPTY_RETURN_VALUE[cmd[0][0]]
if not isinstance(r, Exception):
args, options = cmd
command_name = args[0]
Expand All @@ -3271,14 +3295,19 @@ def _execute_transaction(self, connection, commands, raise_on_error):

def _execute_pipeline(self, connection, commands, raise_on_error):
# build up all commands into a single request to increase network perf
all_cmds = connection.pack_commands([args for args, _ in commands])
non_empty_cmds = [cmd for cmd in commands if not is_empty(cmd[0])]
all_cmds = connection.pack_commands(
[args for args, _ in non_empty_cmds])
connection.send_packed_command(all_cmds)

response = []
for args, options in commands:
try:
response.append(
self.parse_response(connection, args[0], **options))
if not is_empty(args):
response.append(
self.parse_response(connection, args[0], **options))
else:
response.append(COMMAND_EMPTY_RETURN_VALUE[args[0]])
except ResponseError:
response.append(sys.exc_info()[1])

Expand Down
1 change: 1 addition & 0 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ def test_keys(self, r):
assert set(r.keys(pattern='test*')) == keys

def test_mget(self, r):
assert r.mget([]) == []
assert r.mget(['a', 'b']) == [None, None]
r['a'] = '1'
r['b'] = '2'
Expand Down
6 changes: 4 additions & 2 deletions tests/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ def test_pipeline(self, r):
with r.pipeline() as pipe:
pipe.set('a', 'a1').get('a').zadd('z', z1=1).zadd('z', z2=4)
pipe.zincrby('z', 'z1').zrange('z', 0, 5, withscores=True)
pipe.mget([])
assert pipe.execute() == \
[
True,
Expand All @@ -18,6 +19,7 @@ def test_pipeline(self, r):
True,
2.0,
[(b('z1'), 2.0), (b('z2'), 4)],
[],
]

def test_pipeline_length(self, r):
Expand All @@ -38,8 +40,8 @@ def test_pipeline_length(self, r):

def test_pipeline_no_transaction(self, r):
with r.pipeline(transaction=False) as pipe:
pipe.set('a', 'a1').set('b', 'b1').set('c', 'c1')
assert pipe.execute() == [True, True, True]
pipe.set('a', 'a1').set('b', 'b1').set('c', 'c1').mget([])
assert pipe.execute() == [True, True, True, []]
assert r['a'] == b('a1')
assert r['b'] == b('b1')
assert r['c'] == b('c1')
Expand Down