@@ -1762,6 +1762,12 @@ def __init__(self, connection_pool, shard_hint=None,
1762
1762
self .shard_hint = shard_hint
1763
1763
self .ignore_subscribe_messages = ignore_subscribe_messages
1764
1764
self .connection = None
1765
+ # we need to know the encoding options for this connection in order
1766
+ # to lookup channel and pattern names for callback handlers.
1767
+ connection_kwargs = self .connection_pool .connection_kwargs
1768
+ self .encoding = connection_kwargs ['encoding' ]
1769
+ self .encoding_errors = connection_kwargs ['encoding_errors' ]
1770
+ self .decode_responses = connection_kwargs ['decode_responses' ]
1765
1771
self .reset ()
1766
1772
1767
1773
def __del__ (self ):
@@ -1786,10 +1792,35 @@ def close(self):
1786
1792
self .reset ()
1787
1793
1788
1794
def on_connect (self , connection ):
1795
+ "Re-subscribe to any channels and patterns previously subscribed to"
1796
+ # NOTE: for python3, we can't pass bytestrings as keyword arguments
1797
+ # so we need to decode channel/pattern names back to unicode strings
1798
+ # before passing them to [p]subscribe.
1789
1799
if self .channels :
1790
- self .subscribe (** self .channels )
1800
+ channels = {}
1801
+ for k , v in iteritems (self .channels ):
1802
+ if not self .decode_responses :
1803
+ k = k .decode (self .encoding , self .encoding_errors )
1804
+ channels [k ] = v
1805
+ self .subscribe (** channels )
1791
1806
if self .patterns :
1792
- self .psubscribe (** self .patterns )
1807
+ patterns = {}
1808
+ for k , v in iteritems (self .patterns ):
1809
+ if not self .decode_responses :
1810
+ k = k .decode (self .encoding , self .encoding_errors )
1811
+ patterns [k ] = v
1812
+ self .psubscribe (** patterns )
1813
+
1814
+ def encode (self , value ):
1815
+ """
1816
+ Encode the value so that it's identical to what we'll
1817
+ read off the connection
1818
+ """
1819
+ if self .decode_responses and isinstance (value , bytes ):
1820
+ value = value .decode (self .encoding , self .encoding_errors )
1821
+ elif not self .decode_responses and isinstance (value , unicode ):
1822
+ value = value .encode (self .encoding , self .encoding_errors )
1823
+ return value
1793
1824
1794
1825
@property
1795
1826
def subscribed (self ):
@@ -1808,10 +1839,8 @@ def execute_command(self, *args, **kwargs):
1808
1839
'pubsub' ,
1809
1840
self .shard_hint
1810
1841
)
1811
- # initially connect here so we don't run our callback the first
1812
- # time. If we did, it would dupe the subscriptions, once from the
1813
- # callback and a second time from the actual command invocation
1814
- self .connection .connect ()
1842
+ # register a callback that re-subscribes to any channels we
1843
+ # were listening to when we were disconnected
1815
1844
self .connection .register_connect_callback (self .on_connect )
1816
1845
connection = self .connection
1817
1846
self ._execute (connection , connection .send_command , * args )
@@ -1847,10 +1876,15 @@ def psubscribe(self, *args, **kwargs):
1847
1876
if args :
1848
1877
args = list_or_args (args [0 ], args [1 :])
1849
1878
new_patterns = {}
1850
- new_patterns .update (dict .fromkeys (args ))
1851
- new_patterns .update (kwargs )
1879
+ new_patterns .update (dict .fromkeys (imap (self .encode , args )))
1880
+ for pattern , handler in iteritems (kwargs ):
1881
+ new_patterns [self .encode (pattern )] = handler
1882
+ ret_val = self .execute_command ('PSUBSCRIBE' , * iterkeys (new_patterns ))
1883
+ # update the patterns dict AFTER we send the command. we don't want to
1884
+ # subscribe twice to these patterns, once for the command and again
1885
+ # for the reconnection.
1852
1886
self .patterns .update (new_patterns )
1853
- return self . execute_command ( 'PSUBSCRIBE' , * iterkeys ( new_patterns ))
1887
+ return ret_val
1854
1888
1855
1889
def punsubscribe (self , * args ):
1856
1890
"""
@@ -1872,10 +1906,15 @@ def subscribe(self, *args, **kwargs):
1872
1906
if args :
1873
1907
args = list_or_args (args [0 ], args [1 :])
1874
1908
new_channels = {}
1875
- new_channels .update (dict .fromkeys (args ))
1876
- new_channels .update (kwargs )
1909
+ new_channels .update (dict .fromkeys (imap (self .encode , args )))
1910
+ for channel , handler in iteritems (kwargs ):
1911
+ new_channels [self .encode (channel )] = handler
1912
+ ret_val = self .execute_command ('SUBSCRIBE' , * iterkeys (new_channels ))
1913
+ # update the channels dict AFTER we send the command. we don't want to
1914
+ # subscribe twice to these channels, once for the command and again
1915
+ # for the reconnection.
1877
1916
self .channels .update (new_channels )
1878
- return self . execute_command ( 'SUBSCRIBE' , * iterkeys ( new_channels ))
1917
+ return ret_val
1879
1918
1880
1919
def unsubscribe (self , * args ):
1881
1920
"""
@@ -1910,18 +1949,30 @@ def handle_message(self, response, ignore_subscribe_messages=False):
1910
1949
if message_type == 'pmessage' :
1911
1950
message = {
1912
1951
'type' : message_type ,
1913
- 'pattern' : nativestr ( response [1 ]) ,
1914
- 'channel' : nativestr ( response [2 ]) ,
1952
+ 'pattern' : response [1 ],
1953
+ 'channel' : response [2 ],
1915
1954
'data' : response [3 ]
1916
1955
}
1917
1956
else :
1918
1957
message = {
1919
1958
'type' : message_type ,
1920
1959
'pattern' : None ,
1921
- 'channel' : nativestr ( response [1 ]) ,
1960
+ 'channel' : response [1 ],
1922
1961
'data' : response [2 ]
1923
1962
}
1924
1963
1964
+ # if this is an unsubscribe message, remove it from memory
1965
+ if message_type in self .UNSUBSCRIBE_MESSAGE_TYPES :
1966
+ subscribed_dict = None
1967
+ if message_type == 'punsubscribe' :
1968
+ subscribed_dict = self .patterns
1969
+ else :
1970
+ subscribed_dict = self .channels
1971
+ try :
1972
+ del subscribed_dict [message ['channel' ]]
1973
+ except KeyError :
1974
+ pass
1975
+
1925
1976
if message_type in self .PUBLISH_MESSAGE_TYPES :
1926
1977
# if there's a message handler, invoke it
1927
1978
handler = None
@@ -1938,18 +1989,6 @@ def handle_message(self, response, ignore_subscribe_messages=False):
1938
1989
if ignore_subscribe_messages or self .ignore_subscribe_messages :
1939
1990
return None
1940
1991
1941
- # if this is an unsubscribe message, remove it from memory
1942
- if message_type in self .UNSUBSCRIBE_MESSAGE_TYPES :
1943
- subscribed_dict = None
1944
- if message_type == 'punsubscribe' :
1945
- subscribed_dict = self .patterns
1946
- else :
1947
- subscribed_dict = self .channels
1948
- try :
1949
- del subscribed_dict [message ['channel' ]]
1950
- except KeyError :
1951
- pass
1952
-
1953
1992
return message
1954
1993
1955
1994
def run_in_thread (self , sleep_time = 0 ):
0 commit comments