|
1 | 1 | import errno
|
2 | 2 | import os
|
3 | 3 | import socket
|
4 |
| -import threading |
5 |
| -from itertools import imap |
| 4 | +from itertools import chain, imap |
6 | 5 | from redis.exceptions import ConnectionError, ResponseError, InvalidResponse
|
7 | 6 |
|
8 | 7 | class PythonParser(object):
|
@@ -235,36 +234,43 @@ def _connect(self):
|
235 | 234 | sock.connect(self.path)
|
236 | 235 | return sock
|
237 | 236 |
|
| 237 | +# TODO: add ability to block waiting on a connection to be released |
238 | 238 | class ConnectionPool(object):
|
239 |
| - """ |
240 |
| - A connection pool that maintains only one connection. Great for |
241 |
| - single-threaded apps with no sharding |
242 |
| - """ |
243 |
| - def __init__(self, connection_class=Connection, **kwargs): |
| 239 | + "Generic connection pool" |
| 240 | + def __init__(self, connection_class=Connection, max_connections=None, |
| 241 | + **connection_kwargs): |
244 | 242 | self.connection_class = connection_class
|
245 |
| - self.kwargs = kwargs |
246 |
| - self._connection = None |
247 |
| - self._in_use = False |
248 |
| - |
249 |
| - def copy(self): |
250 |
| - "Return a new instance of this class with the same parameters" |
251 |
| - return self.__class__(self.connection_class, **self.kwargs) |
| 243 | + self.connection_kwargs = connection_kwargs |
| 244 | + self.max_connections = max_connections or 2**31 |
| 245 | + self._created_connections = 0 |
| 246 | + self._available_connections = [] |
| 247 | + self._in_use_connections = set() |
252 | 248 |
|
253 | 249 | def get_connection(self, command_name, *keys):
|
254 | 250 | "Get a connection from the pool"
|
255 |
| - if self._in_use: |
256 |
| - raise ConnectionError("Connection already in-use") |
257 |
| - if not self._connection: |
258 |
| - self._connection = self.connection_class(**self.kwargs) |
259 |
| - self._in_use = True |
260 |
| - return self._connection |
| 251 | + try: |
| 252 | + connection = self._available_connections.pop() |
| 253 | + except IndexError: |
| 254 | + connection = self.make_connection() |
| 255 | + self._in_use_connections.add(connection) |
| 256 | + return connection |
| 257 | + |
| 258 | + def make_connection(self): |
| 259 | + "Create a new connection" |
| 260 | + if self._created_connections >= self.max_connections: |
| 261 | + raise Exception("Too many connections") |
| 262 | + self._created_connections += 1 |
| 263 | + return self.connection_class(**self.connection_kwargs) |
261 | 264 |
|
262 | 265 | def release(self, connection):
|
263 | 266 | "Releases the connection back to the pool"
|
264 |
| - assert self._connection == connection |
265 |
| - self._in_use = False |
| 267 | + # assert self._connection == connection |
| 268 | + # self._in_use = False |
| 269 | + self._in_use_connections.remove(connection) |
| 270 | + self._available_connections.append(connection) |
266 | 271 |
|
267 | 272 | def disconnect(self):
|
268 | 273 | "Disconnects all connections in the pool"
|
269 |
| - if self._connection: |
270 |
| - self._connection.disconnect() |
| 274 | + all_conns = chain(self._available_connections, self._in_use_connections) |
| 275 | + for connection in all_conns: |
| 276 | + connection.disconnect() |
0 commit comments