Skip to content

Commit 174f36a

Browse files
committed
Merge pull request rails#6492 from pmahoney/fair-connection-pool2
Fair connection pool2 Conflicts: activerecord/test/cases/associations/eager_test.rb
2 parents 2d1ca38 + 02b2335 commit 174f36a

File tree

5 files changed

+286
-38
lines changed

5 files changed

+286
-38
lines changed

activerecord/lib/active_record/connection_adapters/abstract/connection_pool.rb

Lines changed: 174 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
require 'monitor'
33
require 'set'
44
require 'active_support/core_ext/module/deprecation'
5-
require 'timeout'
65

76
module ActiveRecord
87
# Raised when a connection could not be obtained within the connection
@@ -70,6 +69,131 @@ module ConnectionAdapters
7069
# after which the Reaper will consider a connection reapable. (default
7170
# 5 seconds).
7271
class ConnectionPool
72+
# Threadsafe, fair, FIFO queue. Meant to be used by ConnectionPool
73+
# with which it shares a Monitor. But could be a generic Queue.
74+
#
75+
# The Queue in stdlib's 'thread' could replace this class except
76+
# stdlib's doesn't support waiting with a timeout.
77+
class Queue
78+
def initialize(lock = Monitor.new)
79+
@lock = lock
80+
@cond = @lock.new_cond
81+
@num_waiting = 0
82+
@queue = []
83+
end
84+
85+
# Test if any threads are currently waiting on the queue.
86+
def any_waiting?
87+
synchronize do
88+
@num_waiting > 0
89+
end
90+
end
91+
92+
# Return the number of threads currently waiting on this
93+
# queue.
94+
def num_waiting
95+
synchronize do
96+
@num_waiting
97+
end
98+
end
99+
100+
# Add +element+ to the queue. Never blocks.
101+
def add(element)
102+
synchronize do
103+
@queue.push element
104+
@cond.signal
105+
end
106+
end
107+
108+
# If +element+ is in the queue, remove and return it, or nil.
109+
def delete(element)
110+
synchronize do
111+
@queue.delete(element)
112+
end
113+
end
114+
115+
# Remove all elements from the queue.
116+
def clear
117+
synchronize do
118+
@queue.clear
119+
end
120+
end
121+
122+
# Remove the head of the queue.
123+
#
124+
# If +timeout+ is not given, remove and return the head the
125+
# queue if the number of available elements is strictly
126+
# greater than the number of threads currently waiting (that
127+
# is, don't jump ahead in line). Otherwise, return nil.
128+
#
129+
# If +timeout+ is given, block if it there is no element
130+
# available, waiting up to +timeout+ seconds for an element to
131+
# become available.
132+
#
133+
# Raises:
134+
# - ConnectionTimeoutError if +timeout+ is given and no element
135+
# becomes available after +timeout+ seconds,
136+
def poll(timeout = nil)
137+
synchronize do
138+
if timeout
139+
no_wait_poll || wait_poll(timeout)
140+
else
141+
no_wait_poll
142+
end
143+
end
144+
end
145+
146+
private
147+
148+
def synchronize(&block)
149+
@lock.synchronize(&block)
150+
end
151+
152+
# Test if the queue currently contains any elements.
153+
def any?
154+
!@queue.empty?
155+
end
156+
157+
# A thread can remove an element from the queue without
158+
# waiting if an only if the number of currently available
159+
# connections is strictly greater than the number of waiting
160+
# threads.
161+
def can_remove_no_wait?
162+
@queue.size > @num_waiting
163+
end
164+
165+
# Removes and returns the head of the queue if possible, or nil.
166+
def remove
167+
@queue.shift
168+
end
169+
170+
# Remove and return the head the queue if the number of
171+
# available elements is strictly greater than the number of
172+
# threads currently waiting. Otherwise, return nil.
173+
def no_wait_poll
174+
remove if can_remove_no_wait?
175+
end
176+
177+
# Waits on the queue up to +timeout+ seconds, then removes and
178+
# returns the head of the queue.
179+
def wait_poll(timeout)
180+
@num_waiting += 1
181+
182+
t0 = Time.now
183+
elapsed = 0
184+
loop do
185+
@cond.wait(timeout - elapsed)
186+
187+
return remove if any?
188+
189+
elapsed = Time.now - t0
190+
raise ConnectionTimeoutError if elapsed >= timeout
191+
end
192+
ensure
193+
@num_waiting -= 1
194+
end
195+
end
196+
73197
# Every +frequency+ seconds, the reaper will call +reap+ on +pool+.
74198
# A reaper instantiated with a nil frequency will never reap the
75199
# connection pool.
@@ -100,21 +224,6 @@ def run
100224
attr_accessor :automatic_reconnect, :checkout_timeout, :dead_connection_timeout
101225
attr_reader :spec, :connections, :size, :reaper
102226

103-
class Latch # :nodoc:
104-
def initialize
105-
@mutex = Mutex.new
106-
@cond = ConditionVariable.new
107-
end
108-
109-
def release
110-
@mutex.synchronize { @cond.broadcast }
111-
end
112-
113-
def await
114-
@mutex.synchronize { @cond.wait @mutex }
115-
end
116-
end
117-
118227
# Creates a new ConnectionPool object. +spec+ is a ConnectionSpecification
119228
# object which describes database connection information (e.g. adapter,
120229
# host name, username, password, etc), as well as the maximum size for
@@ -137,9 +246,18 @@ def initialize(spec)
137246
# default max pool size to 5
138247
@size = (spec.config[:pool] && spec.config[:pool].to_i) || 5
139248

140-
@latch = Latch.new
141249
@connections = []
142250
@automatic_reconnect = true
251+
252+
@available = Queue.new self
253+
end
254+
255+
# Hack for tests to be able to add connections. Do not call outside of tests
256+
def insert_connection_for_test!(c) #:nodoc:
257+
synchronize do
258+
@connections << c
259+
@available.add c
260+
end
143261
end
144262

145263
# Retrieve the connection associated with the current thread, or call
@@ -197,6 +315,7 @@ def disconnect!
197315
conn.disconnect!
198316
end
199317
@connections = []
318+
@available.clear
200319
end
201320
end
202321

@@ -211,6 +330,10 @@ def clear_reloadable_connections!
211330
@connections.delete_if do |conn|
212331
conn.requires_reloading?
213332
end
333+
@available.clear
334+
@connections.each do |conn|
335+
@available.add conn
336+
end
214337
end
215338
end
216339

@@ -234,23 +357,10 @@ def clear_stale_cached_connections! # :nodoc:
234357
# Raises:
235358
# - PoolFullError: no connection can be obtained from the pool.
236359
def checkout
237-
loop do
238-
# Checkout an available connection
239-
synchronize do
240-
# Try to find a connection that hasn't been leased, and lease it
241-
conn = connections.find { |c| c.lease }
242-
243-
# If all connections were leased, and we have room to expand,
244-
# create a new connection and lease it.
245-
if !conn && connections.size < size
246-
conn = checkout_new_connection
247-
conn.lease
248-
end
249-
250-
return checkout_and_verify(conn) if conn
251-
end
252-
253-
Timeout.timeout(@checkout_timeout, PoolFullError) { @latch.await }
360+
synchronize do
361+
conn = acquire_connection
362+
conn.lease
363+
checkout_and_verify(conn)
254364
end
255365
end
256366

@@ -266,21 +376,24 @@ def checkin(conn)
266376
end
267377

268378
release conn
379+
380+
@available.add conn
269381
end
270-
@latch.release
271382
end
272383

273384
# Remove a connection from the connection pool. The connection will
274385
# remain open and active but will no longer be managed by this pool.
275386
def remove(conn)
276387
synchronize do
277388
@connections.delete conn
389+
@available.delete conn
278390

279391
# FIXME: we might want to store the key on the connection so that removing
280392
# from the reserved hash will be a little easier.
281393
release conn
394+
395+
@available.add checkout_new_connection if @available.any_waiting?
282396
end
283-
@latch.release
284397
end
285398

286399
# Removes dead connections from the pool. A dead connection can occur
@@ -293,11 +406,35 @@ def reap
293406
remove conn if conn.in_use? && stale > conn.last_use && !conn.active?
294407
end
295408
end
296-
@latch.release
297409
end
298410

299411
private
300412

413+
# Acquire a connection by one of 1) immediately removing one
414+
# from the queue of available connections, 2) creating a new
415+
# connection if the pool is not at capacity, 3) waiting on the
416+
# queue for a connection to become available.
417+
#
418+
# Raises:
419+
# - PoolFullError if a connection could not be acquired (FIXME:
420+
# why not ConnectionTimeoutError?
421+
def acquire_connection
422+
if conn = @available.poll
423+
conn
424+
elsif @connections.size < @size
425+
checkout_new_connection
426+
else
427+
t0 = Time.now
428+
begin
429+
@available.poll(@checkout_timeout)
430+
rescue ConnectionTimeoutError
431+
msg = 'could not obtain a database connection within %0.3f seconds (waited %0.3f seconds)' %
432+
[@checkout_timeout, Time.now - t0]
433+
raise PoolFullError, msg
434+
end
435+
end
436+
end
437+
301438
def release(conn)
302439
thread_id = if @reserved_connections[current_connection_id] == conn
303440
current_connection_id

activerecord/test/cases/associations/eager_test.rb

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -962,6 +962,10 @@ def test_eager_loading_with_order_on_joined_table_preloads
962962
end
963963

964964
def test_eager_loading_with_conditions_on_joined_table_preloads
965+
# cache metadata in advance to avoid extra sql statements executed while testing
966+
Tagging.first
967+
Tag.first
968+
965969
posts = assert_queries(2) do
966970
Post.scoped(:select => 'distinct posts.*', :includes => :author, :joins => [:comments], :where => "comments.body like 'Thank you%'", :order => 'posts.id').all
967971
end

activerecord/test/cases/associations/has_many_associations_test.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1353,6 +1353,9 @@ def test_custom_primary_key_on_new_record_should_fetch_with_query
13531353
author = Author.new(:name => "David")
13541354
assert !author.essays.loaded?
13551355

1356+
# cache metadata in advance to avoid extra sql statements executed while testing
1357+
Essay.first
1358+
13561359
assert_queries 1 do
13571360
assert_equal 1, author.essays.size
13581361
end

activerecord/test/cases/connection_adapters/abstract_adapter_test.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ def test_expire_mutates_in_use
3636

3737
def test_close
3838
pool = ConnectionPool.new(ConnectionSpecification.new({}, nil))
39-
pool.connections << adapter
39+
pool.insert_connection_for_test! adapter
4040
adapter.pool = pool
4141

4242
# Make sure the pool marks the connection in use

0 commit comments

Comments
 (0)