Skip to content

Commit 2810533

Browse files
committed
allow shard hints to be passed to pipeline and pubsub objects. a smart connection pool could use these hints to determine the correct shard to run on.
removed all previously deprecated parameters and commands
1 parent d1cd365 commit 2810533

File tree

1 file changed

+53
-90
lines changed

1 file changed

+53
-90
lines changed

redis/client.py

Lines changed: 53 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -13,26 +13,17 @@
1313
WatchError,
1414
)
1515

16-
17-
def list_or_args(command, keys, args):
16+
def list_or_args(keys, args):
1817
# returns a single list combining keys and args
19-
# if keys is not a list or args has items, issue a
20-
# deprecation warning
21-
oldapi = bool(args)
2218
try:
2319
i = iter(keys)
2420
# a string can be iterated, but indicates
2521
# keys wasn't passed as a list
2622
if isinstance(keys, basestring):
27-
oldapi = True
23+
keys = [keys]
2824
except TypeError:
29-
oldapi = True
3025
keys = [keys]
31-
if oldapi:
32-
warnings.warn(DeprecationWarning(
33-
"Passing *args to Redis.%s has been deprecated. "
34-
"Pass an iterable to ``keys`` instead" % command
35-
))
26+
if args:
3627
keys.extend(args)
3728
return keys
3829

@@ -184,15 +175,15 @@ def __init__(self, host='localhost', port=6379,
184175
encoding_errors=errors
185176
)
186177

187-
def pipeline(self, transaction=True):
178+
def pipeline(self, transaction=True, shard_hint=None):
188179
"""
189180
Return a new pipeline object that can queue multiple commands for
190181
later execution. ``transaction`` indicates whether all commands
191-
should be executed atomically. Apart from multiple atomic operations,
192-
pipelines are useful for batch loading of data as they reduce the
193-
number of back and forth network operations between client and server.
182+
should be executed atomically. Apart from making a group of operations
183+
atomic, pipelines are useful for reducing the back-and-forth overhead
184+
between the client and server.
194185
"""
195-
return Pipeline(self.connection_pool, transaction)
186+
return Pipeline(self.connection_pool, transaction, shard_hint)
196187

197188
def lock(self, name, timeout=None, sleep=0.1):
198189
"""
@@ -208,11 +199,17 @@ def lock(self, name, timeout=None, sleep=0.1):
208199
"""
209200
return Lock(self, name, timeout=timeout, sleep=sleep)
210201

211-
def pubsub(self):
212-
return PubSub(self.connection_pool)
202+
def pubsub(self, shard_hint=None):
203+
"""
204+
Return a Publish/Subscribe object. With this object, you can
205+
subscribe to channels and listen for messages that get published to
206+
them.
207+
"""
208+
return PubSub(self.connection_pool, shard_hint)
213209

214210
#### COMMAND EXECUTION AND PROTOCOL PARSING ####
215211
def execute_command(self, *args, **options):
212+
"Execute a command and return a parsed response"
216213
command_name = args[0]
217214
connection = self.connection_pool.get_connection(command_name)
218215
try:
@@ -261,14 +258,6 @@ def delete(self, *names):
261258
return self.execute_command('DEL', *names)
262259
__delitem__ = delete
263260

264-
def flush(self, all_dbs=False):
265-
warnings.warn(DeprecationWarning(
266-
"'flush' has been deprecated. "
267-
"Use Redis.flushdb() or Redis.flushall() instead"))
268-
if all_dbs:
269-
return self.flushall()
270-
return self.flushdb()
271-
272261
def flushall(self):
273262
"Delete all keys in all databases on the current host"
274263
return self.execute_command('FLUSHALL')
@@ -300,7 +289,13 @@ def save(self):
300289
return self.execute_command('SAVE')
301290

302291
def select(self, db):
303-
"Select a differnet Redis database"
292+
"""
293+
Select a differnet Redis database.
294+
295+
WARNING: this could have severe consequences for pooled connections.
296+
It's highly advised to use a separate connection pool and client
297+
instance to work with multiple databases. Use this at your own risk.
298+
"""
304299
return self.execute_command('SELECT', db)
305300

306301
def shutdown(self):
@@ -370,8 +365,7 @@ def __getitem__(self, name):
370365
value = self.get(name)
371366
if value:
372367
return value
373-
else:
374-
raise KeyError(name)
368+
raise KeyError(name)
375369

376370
def getbit(self, name, offset):
377371
"Returns a boolean indicating the value of ``offset`` in ``name``"
@@ -398,10 +392,8 @@ def keys(self, pattern='*'):
398392
def mget(self, keys, *args):
399393
"""
400394
Returns a list of values ordered identically to ``keys``
401-
402-
* Passing *args to this method has been deprecated *
403395
"""
404-
keys = list_or_args('mget', keys, args)
396+
keys = list_or_args(keys, args)
405397
return self.execute_command('MGET', *keys)
406398

407399
def mset(self, mapping):
@@ -433,29 +425,17 @@ def randomkey(self):
433425
"Returns the name of a random key"
434426
return self.execute_command('RANDOMKEY')
435427

436-
def rename(self, src, dst, **kwargs):
428+
def rename(self, src, dst):
437429
"""
438430
Rename key ``src`` to ``dst``
439-
440-
* The following flags have been deprecated *
441-
If ``preserve`` is True, rename the key only if the destination name
442-
doesn't already exist
443-
"""
444-
if kwargs:
445-
if 'preserve' in kwargs:
446-
warnings.warn(DeprecationWarning(
447-
"preserve option to 'rename' is deprecated, "
448-
"use Redis.renamenx instead"))
449-
if kwargs['preserve']:
450-
return self.renamenx(src, dst)
431+
"""
451432
return self.execute_command('RENAME', src, dst)
452433

453434
def renamenx(self, src, dst):
454435
"Rename key ``src`` to ``dst`` if ``dst`` doesn't already exist"
455436
return self.execute_command('RENAMENX', src, dst)
456437

457-
458-
def set(self, name, value, **kwargs):
438+
def set(self, name, value):
459439
"""
460440
Set the value at key ``name`` to ``value``
461441
@@ -465,19 +445,6 @@ def set(self, name, value, **kwargs):
465445
If ``getset`` is True, set the value only if key doesn't already exist
466446
and return the resulting value of key
467447
"""
468-
if kwargs:
469-
if 'getset' in kwargs:
470-
warnings.warn(DeprecationWarning(
471-
"getset option to 'set' is deprecated, "
472-
"use Redis.getset() instead"))
473-
if kwargs['getset']:
474-
return self.getset(name, value)
475-
if 'preserve' in kwargs:
476-
warnings.warn(DeprecationWarning(
477-
"preserve option to 'set' is deprecated, "
478-
"use Redis.setnx() instead"))
479-
if kwargs['preserve']:
480-
return self.setnx(name, value)
481448
return self.execute_command('SET', name, value)
482449
__setitem__ = set
483450

@@ -778,28 +745,28 @@ def scard(self, name):
778745

779746
def sdiff(self, keys, *args):
780747
"Return the difference of sets specified by ``keys``"
781-
keys = list_or_args('sdiff', keys, args)
748+
keys = list_or_args(keys, args)
782749
return self.execute_command('SDIFF', *keys)
783750

784751
def sdiffstore(self, dest, keys, *args):
785752
"""
786753
Store the difference of sets specified by ``keys`` into a new
787754
set named ``dest``. Returns the number of keys in the new set.
788755
"""
789-
keys = list_or_args('sdiffstore', keys, args)
756+
keys = list_or_args(keys, args)
790757
return self.execute_command('SDIFFSTORE', dest, *keys)
791758

792759
def sinter(self, keys, *args):
793760
"Return the intersection of sets specified by ``keys``"
794-
keys = list_or_args('sinter', keys, args)
761+
keys = list_or_args(keys, args)
795762
return self.execute_command('SINTER', *keys)
796763

797764
def sinterstore(self, dest, keys, *args):
798765
"""
799766
Store the intersection of sets specified by ``keys`` into a new
800767
set named ``dest``. Returns the number of keys in the new set.
801768
"""
802-
keys = list_or_args('sinterstore', keys, args)
769+
keys = list_or_args(keys, args)
803770
return self.execute_command('SINTERSTORE', dest, *keys)
804771

805772
def sismember(self, name, value):
@@ -828,15 +795,15 @@ def srem(self, name, value):
828795

829796
def sunion(self, keys, *args):
830797
"Return the union of sets specifiued by ``keys``"
831-
keys = list_or_args('sunion', keys, args)
798+
keys = list_or_args(keys, args)
832799
return self.execute_command('SUNION', *keys)
833800

834801
def sunionstore(self, dest, keys, *args):
835802
"""
836803
Store the union of sets specified by ``keys`` into a new
837804
set named ``dest``. Returns the number of keys in the new set.
838805
"""
839-
keys = list_or_args('sunionstore', keys, args)
806+
keys = list_or_args(keys, args)
840807
return self.execute_command('SUNIONSTORE', dest, *keys)
841808

842809

@@ -993,12 +960,6 @@ def zscore(self, name, value):
993960
"Return the score of element ``value`` in sorted set ``name``"
994961
return self.execute_command('ZSCORE', name, value)
995962

996-
def zunion(self, dest, keys, aggregate=None):
997-
warnings.warn(DeprecationWarning(
998-
"Redis.zunion has been deprecated, use Redis.zunionstore instead"
999-
))
1000-
return self.zunionstore(dest, keys, aggregate)
1001-
1002963
def zunionstore(self, dest, keys, aggregate=None):
1003964
"""
1004965
Union multiple sorted sets specified by ``keys`` into
@@ -1094,8 +1055,9 @@ def publish(self, channel, message):
10941055

10951056

10961057
class PubSub(object):
1097-
def __init__(self, connection_pool):
1058+
def __init__(self, connection_pool, shard_hint=None):
10981059
self.connection_pool = connection_pool
1060+
self.shard_hint = shard_hint
10991061
self.connection = None
11001062
self.channels = set()
11011063
self.patterns = set()
@@ -1107,7 +1069,10 @@ def __init__(self, connection_pool):
11071069
def execute_command(self, *args, **kwargs):
11081070
"Execute a publish/subscribe command"
11091071
if self.connection is None:
1110-
self.connection = self.connection_pool.get_connection('pubsub')
1072+
self.connection = self.connection_pool.get_connection(
1073+
'pubsub',
1074+
self.shard_hint
1075+
)
11111076
connection = self.connection
11121077
try:
11131078
connection.send_command(*args)
@@ -1225,9 +1190,10 @@ class Pipeline(Redis):
12251190
ResponseError exceptions, such as those raised when issuing a command
12261191
on a key of a different datatype.
12271192
"""
1228-
def __init__(self, connection_pool, transaction):
1193+
def __init__(self, connection_pool, transaction, shard_hint):
12291194
self.connection_pool = connection_pool
12301195
self.transaction = transaction
1196+
self.shard_hint = shard_hint
12311197
self.reset()
12321198

12331199
def reset(self):
@@ -1251,20 +1217,20 @@ def execute_command(self, *args, **options):
12511217
return self
12521218

12531219
def _execute_transaction(self, commands):
1254-
connection = self.connection_pool.get_connection('MULTI')
1220+
conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
12551221
try:
1256-
all_cmds = ''.join(starmap(connection.pack_command,
1222+
all_cmds = ''.join(starmap(conn.pack_command,
12571223
[args for args, options in commands]))
1258-
connection.send_packed_command(all_cmds)
1224+
conn.send_packed_command(all_cmds)
12591225
# we don't care about the multi/exec any longer
12601226
commands = commands[1:-1]
12611227
# parse off the response for MULTI and all commands prior to EXEC.
12621228
# the only data we care about is the response the EXEC
12631229
# which is the last command
12641230
for i in range(len(commands)+1):
1265-
_ = self.parse_response(connection, '_')
1231+
_ = self.parse_response(conn, '_')
12661232
# parse the EXEC.
1267-
response = self.parse_response(connection, '_')
1233+
response = self.parse_response(conn, '_')
12681234

12691235
if response is None:
12701236
raise WatchError("Watched variable changed.")
@@ -1283,19 +1249,19 @@ def _execute_transaction(self, commands):
12831249
data.append(r)
12841250
return data
12851251
finally:
1286-
self.connection_pool.release(connection)
1252+
self.connection_pool.release(conn)
12871253

12881254
def _execute_pipeline(self, commands):
12891255
# build up all commands into a single request to increase network perf
1290-
connection = self.connection_pool.get_connection('MULTI')
1256+
conn = self.connection_pool.get_connection('MULTI', self.shard_hint)
12911257
try:
1292-
all_cmds = ''.join(starmap(connection.pack_command,
1258+
all_cmds = ''.join(starmap(conn.pack_command,
12931259
[args for args, options in commands]))
1294-
connection.send_packed_command(all_cmds)
1295-
return [self.parse_response(connection, args[0], **options)
1260+
conn.send_packed_command(all_cmds)
1261+
return [self.parse_response(conn, args[0], **options)
12961262
for args, options in commands]
12971263
finally:
1298-
self.connection_pool.release(connection)
1264+
self.connection_pool.release(conn)
12991265

13001266
def execute(self):
13011267
"Execute all the commands in the current pipeline"
@@ -1312,9 +1278,6 @@ def execute(self):
13121278
connection.disconnect()
13131279
return execute(stack)
13141280

1315-
def select(self, *args, **kwargs):
1316-
raise RedisError("Cannot select a different database from a pipeline")
1317-
13181281
class LockError(RedisError):
13191282
"Errors thrown from the Lock"
13201283
pass

0 commit comments

Comments
 (0)