Skip to content

Commit 9f520f7

Browse files
authored
Debounce producer dispatch for throughput (oban-bg#136)
Producers may become overloaded under high insert/execute load when processes complete quickly. This is due to a flood of `DOWN` messages from completed jobs, which triggers an immediate dispatch. Under high load the "demand" (concurrency limit - running jobs) hovers around 1, so every dispatch only retrieves one job. This is wasteful, unnecessarily hard on the database, and floods the producer's message queue. This introduces dispatch cooldown for each producer. Rather than fetching jobs every time dispatch is called it waits a small period of time (5ms by default) to allow demand to accumulate. According to benchmarks this has a huge impact on overall throughput: without cooldown a producer maxes out around ~1000 jobs/sec, with a 5ms debounce producers can handle ~5000 jobs/sec and have an empty message queue.
1 parent 9e37769 commit 9f520f7

File tree

7 files changed

+116
-36
lines changed

7 files changed

+116
-36
lines changed

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,19 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0.
1313
than "Etc/UTC". Using a custom timezone requires a timezone database such as
1414
tzdata.
1515

16+
- [Oban] Add `dispatch_cooldown` option to configure the minimum time between
17+
a producer fetching more jobs to execute.
18+
19+
### Changed
20+
21+
- [Oban.Queue.Producer] Introduce "dispatch cooldown" as a way to debounce
22+
repeatedly fetching new jobs. Repeated fetching floods the producer's message
23+
queue and forces the producer to repeatedly fetch one job at a time, which is
24+
not especially efficient. Debounced fetching is much more efficient for the
25+
producer and the database, increasing maximum jobs/sec throughput so that it
26+
scales linearly with a queue's concurrency settings (up to what the database
27+
can handle).
28+
1629
## [0.12.1] — 2019-12-13
1730

1831
### Fixed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ and job dispatching altogether when testing:
216216

217217
```elixir
218218
# config/test.exs
219-
config :my_app, Oban, queues: false, prune: :disabled
219+
config :my_app, Oban, crontab: false, queues: false, prune: :disabled
220220
```
221221

222222
Without dispatch and pruning disabled Ecto will raise constant ownership errors

lib/oban.ex

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ defmodule Oban do
7171
defmodule MyApp.Business do
7272
use Oban.Worker, queue: "events", max_attempts: 10
7373
74-
@impl Worker
74+
@impl Oban.Worker
7575
def perform(%{"id" => id}, _job) do
7676
model = MyApp.Repo.get(MyApp.Business.Man, id)
7777
@@ -513,7 +513,10 @@ defmodule Oban do
513513
alias Oban.Queue.Supervisor, as: QueueSupervisor
514514

515515
@type option ::
516-
{:name, module()}
516+
{:circuit_backoff, timeout()}
517+
| {:crontab, [Config.cronjob()]}
518+
| {:dispatch_cooldown, pos_integer()}
519+
| {:name, module()}
517520
| {:node, binary()}
518521
| {:poll_interval, pos_integer()}
519522
| {:prefix, binary()}
@@ -522,7 +525,10 @@ defmodule Oban do
522525
| {:prune_limit, pos_integer()}
523526
| {:queues, [{atom(), pos_integer()}]}
524527
| {:repo, module()}
528+
| {:rescue_after, pos_integer()}
529+
| {:rescue_interval, pos_integer()}
525530
| {:shutdown_grace_period, timeout()}
531+
| {:timezone, Calendar.time_zone()}
526532
| {:verbose, false | Logger.level()}
527533

528534
@type queue_name :: atom() | binary()
@@ -577,11 +583,20 @@ defmodule Oban do
577583
### Twiddly Options
578584
579585
Additional options used to tune system behaviour. These are primarily useful for testing or
580-
troubleshooting and shouldn't be changed usually.
586+
troubleshooting and don't usually need modification.
581587
582588
* `:circuit_backoff` — the number of milliseconds until queries are attempted after a database
583589
error. All processes communicating with the database are equipped with circuit breakers and
584590
will use this for the backoff. Defaults to `30_000ms`.
591+
* `:dispatch_cooldown` — the minimum number of milliseconds a producer will wait before fetching
592+
and running more jobs. A slight cooldown period prevents a producer from flooding with
593+
messages and thrashing the database. The cooldown period _directly impacts_ a producer's
594+
throughput: jobs per second for a single queue is calculated by `(1000 / cooldown) * limit`.
595+
For example, with a `5ms` cooldown and a queue limit of `25` a single queue can run 2,500
596+
jobs/sec.
597+
598+
The default is `5ms` and the minimum is `1ms`, which is likely faster than the database can
599+
return new jobs to run.
585600
* `:poll_interval` - the number of milliseconds between polling for new jobs in a queue. This
586601
is directly tied to the resolution of _scheduled_ jobs. For example, with a `poll_interval` of
587602
`5_000ms`, scheduled jobs are checked every 5 seconds. The default is `1_000ms`.

lib/oban/config.ex

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ defmodule Oban.Config do
1212
@type t :: %__MODULE__{
1313
circuit_backoff: timeout(),
1414
crontab: [cronjob()],
15+
dispatch_cooldown: pos_integer(),
1516
name: atom(),
1617
node: binary(),
1718
poll_interval: pos_integer(),
@@ -33,6 +34,7 @@ defmodule Oban.Config do
3334
@enforce_keys [:node, :repo]
3435
defstruct circuit_backoff: :timer.seconds(30),
3536
crontab: [],
37+
dispatch_cooldown: 5,
3638
name: Oban,
3739
node: nil,
3840
poll_interval: :timer.seconds(1),
@@ -103,6 +105,12 @@ defmodule Oban.Config do
103105
end
104106
end
105107

108+
defp validate_opt!({:dispatch_cooldown, period}) do
109+
unless is_integer(period) and period > 0 do
110+
raise ArgumentError, "expected :dispatch_cooldown to be a positive integer"
111+
end
112+
end
113+
106114
defp validate_opt!({:name, name}) do
107115
unless is_atom(name) do
108116
raise ArgumentError, "expected :name to be a module or atom"

lib/oban/queue/producer.ex

Lines changed: 66 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ defmodule Oban.Queue.Producer do
2222
@enforce_keys [:conf, :foreman, :limit, :nonce, :queue]
2323
defstruct [
2424
:conf,
25+
:cooldown_ref,
26+
:dispatched_at,
2527
:foreman,
2628
:limit,
2729
:name,
@@ -102,29 +104,14 @@ defmodule Oban.Queue.Producer do
102104
end
103105

104106
@impl GenServer
105-
def handle_info(:poll, %State{conf: conf} = state) do
106-
conf.repo.checkout(fn ->
107-
state
108-
|> deschedule()
109-
|> pulse()
110-
|> send_poll_after()
111-
|> dispatch()
112-
end)
113-
end
114-
115-
def handle_info(:rescue, %State{conf: conf} = state) do
116-
conf.repo.checkout(fn ->
117-
state
118-
|> rescue_orphans()
119-
|> send_rescue_after()
120-
|> dispatch()
121-
end)
122-
end
123-
124107
def handle_info({:DOWN, ref, :process, _pid, _reason}, %State{running: running} = state) do
125108
dispatch(%{state | running: Map.delete(running, ref)})
126109
end
127110

111+
def handle_info(:dispatch, %State{} = state) do
112+
dispatch(state)
113+
end
114+
128115
def handle_info({:notification, insert(), payload}, %State{queue: queue} = state) do
129116
case payload do
130117
%{"queue" => ^queue} -> dispatch(state)
@@ -162,6 +149,25 @@ defmodule Oban.Queue.Producer do
162149
dispatch(state)
163150
end
164151

152+
def handle_info(:poll, %State{conf: conf} = state) do
153+
conf.repo.checkout(fn ->
154+
state
155+
|> deschedule()
156+
|> pulse()
157+
|> send_poll_after()
158+
|> dispatch()
159+
end)
160+
end
161+
162+
def handle_info(:rescue, %State{conf: conf} = state) do
163+
conf.repo.checkout(fn ->
164+
state
165+
|> rescue_orphans()
166+
|> send_rescue_after()
167+
|> dispatch()
168+
end)
169+
end
170+
165171
def handle_info(:reset_circuit, state) do
166172
{:noreply, open_circuit(state)}
167173
end
@@ -230,12 +236,14 @@ defmodule Oban.Queue.Producer do
230236
state
231237
end
232238

233-
defp pulse(%State{conf: conf} = state) do
239+
defp pulse(%State{conf: conf, running: running} = state) do
240+
running_ids = for {_ref, {%_{id: id}, _pid}} <- running, do: id
241+
234242
args =
235243
state
236244
|> Map.take([:limit, :nonce, :paused, :queue, :started_at])
237245
|> Map.put(:node, conf.node)
238-
|> Map.put(:running, running_job_ids(state))
246+
|> Map.put(:running, running_ids)
239247

240248
Query.insert_beat(conf, args)
241249

@@ -256,29 +264,56 @@ defmodule Oban.Queue.Producer do
256264
{:noreply, state}
257265
end
258266

259-
defp dispatch(%State{conf: conf, foreman: foreman} = state) do
260-
%State{limit: limit, nonce: nonce, queue: queue, running: running} = state
267+
defp dispatch(%State{} = state) do
268+
cond do
269+
dispatch_now?(state) ->
270+
%State{conf: conf, limit: limit, nonce: nonce, queue: queue, running: running} = state
261271

262-
started_jobs =
263-
for job <- fetch_jobs(conf, queue, nonce, limit - map_size(running)), into: %{} do
264-
{:ok, pid} = DynamicSupervisor.start_child(foreman, Executor.child_spec(job, conf))
272+
running =
273+
conf
274+
|> fetch_jobs(queue, nonce, limit - map_size(running))
275+
|> start_jobs(state)
276+
|> Map.merge(running)
265277

266-
{Process.monitor(pid), {job, pid}}
267-
end
278+
{:noreply, %{state | cooldown_ref: nil, dispatched_at: system_now(), running: running}}
279+
280+
cooldown_available?(state) ->
281+
%State{conf: conf, dispatched_at: dispatched_at} = state
282+
283+
dispatch_after = system_now() - dispatched_at + conf.dispatch_cooldown
284+
cooldown_ref = Process.send_after(self(), :dispatch, dispatch_after)
268285

269-
{:noreply, %{state | running: Map.merge(running, started_jobs)}}
286+
{:noreply, %{state | cooldown_ref: cooldown_ref}}
287+
288+
true ->
289+
{:noreply, state}
290+
end
270291
rescue
271292
exception in trip_errors() -> {:noreply, trip_circuit(exception, state)}
272293
end
273294

295+
defp dispatch_now?(%State{dispatched_at: nil}), do: true
296+
297+
defp dispatch_now?(%State{conf: conf, dispatched_at: dispatched_at}) do
298+
system_now() > dispatched_at + conf.dispatch_cooldown
299+
end
300+
301+
defp cooldown_available?(%State{cooldown_ref: ref}), do: is_nil(ref)
302+
274303
defp fetch_jobs(conf, queue, nonce, count) do
275304
case Query.fetch_available_jobs(conf, queue, nonce, count) do
276305
{0, nil} -> []
277306
{_count, jobs} -> jobs
278307
end
279308
end
280309

281-
defp running_job_ids(%State{running: running}) do
282-
for {_ref, {%_{id: id}, _pid}} <- running, do: id
310+
defp start_jobs(jobs, %State{conf: conf, foreman: foreman}) do
311+
for job <- jobs, into: %{} do
312+
{:ok, pid} = DynamicSupervisor.start_child(foreman, Executor.child_spec(job, conf))
313+
314+
{Process.monitor(pid), {job, pid}}
315+
end
283316
end
317+
318+
defp system_now, do: System.monotonic_time(:millisecond)
284319
end

lib/oban/worker.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ defmodule Oban.Worker do
1313
defmodule MyApp.Workers.Business do
1414
use Oban.Worker, queue: "events", max_attempts: 10, unique: [period: 30]
1515
16-
@impl Worker
16+
@impl Oban.Worker
1717
def perform(_args, %Oban.Job{attempt: attempt}) when attempt > 3 do
1818
IO.inspect(attempt)
1919
end

test/oban/config_test.exs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,15 @@ defmodule Oban.ConfigTest do
2323
assert_valid(circuit_backoff: 10)
2424
end
2525

26+
test ":dispatch_cooldown is validated as a positive integer" do
27+
assert_invalid(dispatch_cooldown: -1)
28+
assert_invalid(dispatch_cooldown: 0)
29+
assert_invalid(dispatch_cooldown: "5")
30+
assert_invalid(dispatch_cooldown: 1.0)
31+
32+
assert_valid(dispatch_cooldown: 500)
33+
end
34+
2635
test ":crontab is validated as a list of cron job expressions" do
2736
assert_invalid(crontab: ["* * * * *"])
2837
assert_invalid(crontab: [["* * * * *", Fake]])

0 commit comments

Comments
 (0)