From d1d7a17466f67a8ab304da6884fdd386b01add85 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20M=C3=A4nnchen?= Date: Tue, 4 Aug 2020 17:05:20 +0200 Subject: [PATCH 1/2] Supervision Configuration for Cluster Managers --- lib/quantum.ex | 126 ++++++++++++++++++----- lib/quantum/clock_broadcaster.ex | 19 +++- lib/quantum/execution_broadcaster.ex | 20 +++- lib/quantum/executor_supervisor.ex | 15 ++- lib/quantum/job_broadcaster.ex | 20 +++- lib/quantum/node_selector_broadcaster.ex | 20 +++- lib/quantum/normalizer.ex | 7 +- lib/quantum/supervisor.ex | 114 ++++++++------------ lib/quantum/task_registry.ex | 19 +++- 9 files changed, 251 insertions(+), 109 deletions(-) diff --git a/lib/quantum.ex b/lib/quantum.ex index 5b6eca8..a6dd626 100644 --- a/lib/quantum.ex +++ b/lib/quantum.ex @@ -19,17 +19,54 @@ defmodule Quantum do ## Configuration: - * `:timeout` - Sometimes, you may come across GenServer - timeout errors esp. when you have too many jobs or high - load. The default GenServer.call timeout is 5000. + * `:clock_broadcaster_name` - GenServer name of clock broadcaster \\ + *(unstable, may break without major release until declared stable)* + + * `:execution_broadcaster_name` - GenServer name of execution broadcaster \\ + *(unstable, may break without major release until declared stable)* + + * `:executor_supervisor_name` - GenServer name of execution supervisor \\ + *(unstable, may break without major release until declared stable)* + + * `:debug_logging` - Turn on debug logging * `:jobs` - list of cron jobs to execute - * `:schedule` - Default schedule of new Job + * `:job_broadcaster_name` - GenServer name of job broadcaster \\ + *(unstable, may break without major release until declared stable)* + + * `:name` - GenServer name of scheduler \\ + *(unstable, may break without major release until declared stable)* + + * `:node_selector_broadcaster_name` - GenServer name of node selector broadcaster \\ + *(unstable, may break without major release until declared stable)* + + * `:overlap` - Default overlap of new Job + + * `:otp_app` - Application where scheduler runs * `:run_strategy` - Default Run Strategy of new Job - * `:overlap` - Default overlap of new Job, + * `:schedule` - Default schedule of new Job + + * `:storage` - Storage to use for persistence + + * `:storage_name` - GenServer name of storage \\ + *(unstable, may break without major release until declared stable)* + + * `:supervisor_module` - Module to supervise scheduler \\ + Can be overwritten to supervise processes differently (for example for clustering) \\ + *(unstable, may break without major release until declared stable)* + + * `:task_registry_name` - GenServer name of task registry \\ + *(unstable, may break without major release until declared stable)* + + * `:task_supervisor_name` - GenServer name of task supervisor \\ + *(unstable, may break without major release until declared stable)* + + * `:timeout` - Sometimes, you may come across GenServer timeout errors + esp. when you have too many jobs or high load. The default `GenServer.call/3` + timeout is `5_000`. * `:timezone` - Default timezone of new Job @@ -45,7 +82,6 @@ defmodule Quantum do @type t :: module @defaults [ - cron: [], timeout: 5_000, schedule: nil, overlap: true, @@ -56,8 +92,6 @@ defmodule Quantum do storage: Noop ] - @optional_callbacks init: 1 - # Returns the configuration stored in the `:otp_app` environment. @doc false @callback config(Keyword.t()) :: Keyword.t() @@ -66,7 +100,7 @@ defmodule Quantum do Starts supervision and return `{:ok, pid}` or just `:ok` if nothing needs to be done. - Returns `{:error, {:already_started, pid}}` if the repo is already + Returns `{:error, {:already_started, pid}}` if the scheduler is already started or `{:error, term}` in case anything else goes wrong. ## Options @@ -136,16 +170,58 @@ defmodule Quantum do @doc false # Retrieves only scheduler related configuration. - def scheduler_config(scheduler, otp_app, custom) do + def scheduler_config(opts, scheduler, otp_app) do @defaults |> Keyword.merge(Application.get_env(otp_app, scheduler, [])) - |> Keyword.merge(custom) - |> Keyword.merge(otp_app: otp_app, scheduler: scheduler) + |> Keyword.merge(opts) + |> Keyword.put_new(:otp_app, otp_app) + |> Keyword.put_new(:scheduler, scheduler) + |> Keyword.put_new(:name, scheduler) |> update_in([:schedule], &Normalizer.normalize_schedule/1) + |> Keyword.put_new(:task_supervisor_name, Module.concat(scheduler, TaskSupervisor)) + |> Keyword.put_new(:storage_name, Module.concat(scheduler, Storage)) + |> Keyword.put_new(:task_registry_name, Module.concat(scheduler, TaskRegistry)) + |> Keyword.put_new(:clock_broadcaster_name, Module.concat(scheduler, ClockBroadcaster)) + |> Keyword.put_new(:job_broadcaster_name, Module.concat(scheduler, JobBroadcaster)) + |> Keyword.put_new( + :execution_broadcaster_name, + Module.concat(scheduler, ExecutionBroadcaster) + ) + |> Keyword.put_new( + :node_selector_broadcaster_name, + Module.concat(scheduler, NodeSelectorBroadcaster) + ) + |> Keyword.put_new(:executor_supervisor_name, Module.concat(scheduler, ExecutorSupervisor)) + |> (fn config -> + Keyword.update(config, :jobs, [], fn jobs -> + jobs + |> Enum.map(&Normalizer.normalize(scheduler.new_job(config), &1)) + |> remove_jobs_with_duplicate_names(scheduler) + end) + end).() + |> Keyword.put_new(:supervisor_module, Quantum.Supervisor) + |> Keyword.put_new(:name, Quantum.Supervisor) + end + + defp remove_jobs_with_duplicate_names(job_list, scheduler) do + job_list + |> Enum.reduce(%{}, fn %Job{name: name} = job, acc -> + if Enum.member?(Map.keys(acc), name) do + Logger.warn( + "Job with name '#{name}' of scheduler '#{scheduler}' not started due to duplicate job name" + ) + + acc + else + Map.put_new(acc, name, job) + end + end) + |> Map.values() end defmacro __using__(opts) do - quote bind_quoted: [behaviour: __MODULE__, opts: opts, moduledoc: @moduledoc] do + quote bind_quoted: [behaviour: __MODULE__, opts: opts, moduledoc: @moduledoc], + location: :keep do @otp_app Keyword.fetch!(opts, :otp_app) @moduledoc moduledoc |> String.replace(~r/MyApp\.Scheduler/, Enum.join(Module.split(__MODULE__), ".")) @@ -155,27 +231,25 @@ defmodule Quantum do @doc false @impl behaviour - def config(custom \\ []) do - Quantum.scheduler_config(__MODULE__, @otp_app, custom) + def config(opts \\ []) do + Quantum.scheduler_config(opts, __MODULE__, @otp_app) end defp __job_broadcaster__ do - __job_broadcaster__( - config() |> Keyword.fetch!(:scheduler) |> Module.concat(JobBroadcaster), - config() - ) - end - - defp __job_broadcaster__(job_broadcaster, configuration) do - GenServer.whereis(job_broadcaster) + config() |> Keyword.fetch!(:job_broadcaster_name) end defp __timeout__, do: Keyword.fetch!(config(), :timeout) @impl behaviour def start_link(opts \\ []) do - opts = Keyword.put_new(opts, :name, __MODULE__) - Quantum.Supervisor.start_link(__MODULE__, @otp_app, opts) + opts = config(opts) + Keyword.fetch!(opts, :supervisor_module).start_link(__MODULE__, opts) + end + + @impl behaviour + def init(opts) do + opts end @impl behaviour @@ -249,7 +323,7 @@ defmodule Quantum do %{unquote_splicing(spec)} end - defoverridable child_spec: 1 + defoverridable child_spec: 1, config: 0, config: 1, init: 1 end end end diff --git a/lib/quantum/clock_broadcaster.ex b/lib/quantum/clock_broadcaster.ex index f477d3e..0d2631f 100644 --- a/lib/quantum/clock_broadcaster.ex +++ b/lib/quantum/clock_broadcaster.ex @@ -11,11 +11,22 @@ defmodule Quantum.ClockBroadcaster do @spec start_link(opts :: StartOpts.t()) :: GenServer.on_start() def start_link(%StartOpts{name: name} = opts) do - GenStage.start_link( - __MODULE__, + __MODULE__ + |> GenStage.start_link( struct!(InitOpts, Map.take(opts, [:start_time, :storage, :scheduler, :debug_logging])), name: name ) + |> case do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + Process.monitor(pid) + {:ok, pid} + + {:error, _reason} = error -> + error + end end @impl GenStage @@ -59,6 +70,10 @@ defmodule Quantum.ClockBroadcaster do handle_tick(state) end + def handle_info(_message, state) do + {:noreply, [], state} + end + defp handle_tick(%State{remaining_demand: 0} = state) do {:noreply, [], state} end diff --git a/lib/quantum/execution_broadcaster.ex b/lib/quantum/execution_broadcaster.ex index 57670fb..03e76af 100644 --- a/lib/quantum/execution_broadcaster.ex +++ b/lib/quantum/execution_broadcaster.ex @@ -36,8 +36,8 @@ defmodule Quantum.ExecutionBroadcaster do # Start Stage @spec start_link(StartOpts.t()) :: GenServer.on_start() def start_link(%StartOpts{name: name} = opts) do - GenStage.start_link( - __MODULE__, + __MODULE__ + |> GenStage.start_link( struct!( InitOpts, Map.take(opts, [ @@ -50,6 +50,17 @@ defmodule Quantum.ExecutionBroadcaster do ), name: name ) + |> case do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + Process.monitor(pid) + {:ok, pid} + + {:error, _reason} = error -> + error + end end @impl GenStage @@ -148,6 +159,11 @@ defmodule Quantum.ExecutionBroadcaster do |> execute_events_to_fire(time) end + @impl GenStage + def handle_info(_message, state) do + {:noreply, [], state} + end + defp initialize_jobs(%State{uninitialized_jobs: uninitialized_jobs} = state, time) do uninitialized_jobs |> Enum.reject(&match?(%Job{schedule: %CronExpression{reboot: true}}, &1)) diff --git a/lib/quantum/executor_supervisor.ex b/lib/quantum/executor_supervisor.ex index 247b7fd..5644186 100644 --- a/lib/quantum/executor_supervisor.ex +++ b/lib/quantum/executor_supervisor.ex @@ -11,8 +11,8 @@ defmodule Quantum.ExecutorSupervisor do @spec start_link(StartOpts.t()) :: GenServer.on_start() def start_link(%StartOpts{name: name} = opts) do - ConsumerSupervisor.start_link( - __MODULE__, + __MODULE__ + |> ConsumerSupervisor.start_link( struct!( InitOpts, Map.take(opts, [ @@ -24,6 +24,17 @@ defmodule Quantum.ExecutorSupervisor do ), name: name ) + |> case do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + Process.monitor(pid) + {:ok, pid} + + {:error, _reason} = error -> + error + end end @impl ConsumerSupervisor diff --git a/lib/quantum/job_broadcaster.ex b/lib/quantum/job_broadcaster.ex index 5635f49..99f4d98 100644 --- a/lib/quantum/job_broadcaster.ex +++ b/lib/quantum/job_broadcaster.ex @@ -15,11 +15,22 @@ defmodule Quantum.JobBroadcaster do # Start Job Broadcaster @spec start_link(StartOpts.t()) :: GenServer.on_start() def start_link(%StartOpts{name: name} = opts) do - GenStage.start_link( - __MODULE__, + __MODULE__ + |> GenStage.start_link( struct!(InitOpts, Map.take(opts, [:jobs, :storage, :scheduler, :debug_logging])), name: name ) + |> case do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + Process.monitor(pid) + {:ok, pid} + + {:error, _reason} = error -> + error + end end @impl GenStage @@ -202,4 +213,9 @@ defmodule Quantum.JobBroadcaster do def handle_call({:find_job, name}, _, %State{jobs: jobs} = state), do: {:reply, Map.get(jobs, name), [], state} + + @impl GenStage + def handle_info(_message, state) do + {:noreply, [], state} + end end diff --git a/lib/quantum/node_selector_broadcaster.ex b/lib/quantum/node_selector_broadcaster.ex index c9fe125..ea0e545 100644 --- a/lib/quantum/node_selector_broadcaster.ex +++ b/lib/quantum/node_selector_broadcaster.ex @@ -18,8 +18,8 @@ defmodule Quantum.NodeSelectorBroadcaster do # Start Stage @spec start_link(StartOpts.t()) :: GenServer.on_start() def start_link(%StartOpts{name: name} = opts) do - GenStage.start_link( - __MODULE__, + __MODULE__ + |> GenStage.start_link( struct!( InitOpts, Map.take(opts, [ @@ -29,6 +29,17 @@ defmodule Quantum.NodeSelectorBroadcaster do ), name: name ) + |> case do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + Process.monitor(pid) + {:ok, pid} + + {:error, _reason} = error -> + error + end end @impl GenStage @@ -54,6 +65,11 @@ defmodule Quantum.NodeSelectorBroadcaster do end), state} end + @impl GenStage + def handle_info(_message, state) do + {:noreply, [], state} + end + defp select_nodes(%Job{run_strategy: run_strategy} = job, task_supervisor) do run_strategy |> NodeList.nodes(job) diff --git a/lib/quantum/normalizer.ex b/lib/quantum/normalizer.ex index 2799961..ceb8814 100644 --- a/lib/quantum/normalizer.ex +++ b/lib/quantum/normalizer.ex @@ -25,7 +25,8 @@ defmodule Quantum.Normalizer do # # * `base` - Empty `Quantum.Job` # * `job` - The Job To Normalize - @spec normalize(Job.t(), config_full_notation | config_short_notation) :: Job.t() | no_return + @spec normalize(Job.t(), config_full_notation | config_short_notation | Job.t()) :: + Job.t() | no_return def normalize(base, job) def normalize(%Job{} = base, job) when is_list(job) do @@ -50,6 +51,10 @@ defmodule Quantum.Normalizer do normalize_options(base, %{schedule: schedule, task: task}) end + def normalize(%Job{} = _base, %Job{} = job) do + job + end + @spec normalize_options(Job.t(), map) :: Job.t() defp normalize_options(job, options) do Enum.reduce(options, job, &normalize_job_option/2) diff --git a/lib/quantum/supervisor.ex b/lib/quantum/supervisor.ex index 071a821..a548c70 100644 --- a/lib/quantum/supervisor.ex +++ b/lib/quantum/supervisor.ex @@ -5,39 +5,50 @@ defmodule Quantum.Supervisor do require Logger - alias Quantum.{Job, Normalizer} - # Starts the quantum supervisor. - @spec start_link(GenServer.server(), atom, Keyword.t()) :: GenServer.on_start() - def start_link(quantum, otp_app, opts) do + @spec start_link(GenServer.server(), Keyword.t()) :: GenServer.on_start() + def start_link(quantum, opts) do name = Keyword.take(opts, [:name]) - Supervisor.start_link(__MODULE__, {quantum, otp_app, opts}, name) + Supervisor.start_link(__MODULE__, {quantum, opts}, name) end @impl Supervisor - def init({scheduler, otp_app, opts}) do - opts = runtime_config(scheduler, otp_app, opts) - opts = quantum_init(scheduler, opts) - %{storage: storage, scheduler: ^scheduler} = opts = Map.new(opts) + def init({scheduler, opts}) do + %{ + storage: storage, + scheduler: ^scheduler, + task_supervisor_name: task_supervisor_name, + storage_name: storage_name, + task_registry_name: task_registry_name, + clock_broadcaster_name: clock_broadcaster_name, + job_broadcaster_name: job_broadcaster_name, + execution_broadcaster_name: execution_broadcaster_name, + node_selector_broadcaster_name: node_selector_broadcaster_name, + executor_supervisor_name: executor_supervisor_name + } = + opts = + opts + |> scheduler.config + |> quantum_init(scheduler) + |> Map.new() + + task_supervisor_opts = [name: task_supervisor_name] storage_opts = opts |> Map.get(:storage_opts, []) |> Keyword.put(:scheduler, scheduler) + |> Keyword.put(:name, storage_name) - task_registry_opts = %Quantum.TaskRegistry.StartOpts{ - name: Module.concat(scheduler, TaskRegistry) - } + task_registry_opts = %Quantum.TaskRegistry.StartOpts{name: task_registry_name} clock_broadcaster_opts = struct!( Quantum.ClockBroadcaster.StartOpts, opts |> Map.take([:debug_logging, :storage, :scheduler]) - |> Map.merge(%{ - name: Module.concat(scheduler, ClockBroadcaster), - start_time: NaiveDateTime.utc_now() - }) + |> Map.put(:name, clock_broadcaster_name) + |> Map.put(:start_time, NaiveDateTime.utc_now()) ) job_broadcaster_opts = @@ -45,9 +56,7 @@ defmodule Quantum.Supervisor do Quantum.JobBroadcaster.StartOpts, opts |> Map.take([:jobs, :storage, :scheduler, :debug_logging]) - |> Map.merge(%{ - name: Module.concat(scheduler, JobBroadcaster) - }) + |> Map.put(:name, job_broadcaster_name) ) execution_broadcaster_opts = @@ -59,17 +68,15 @@ defmodule Quantum.Supervisor do :scheduler, :debug_logging ]) - |> Map.merge(%{ - job_broadcaster_reference: Module.concat(scheduler, JobBroadcaster), - clock_broadcaster_reference: Module.concat(scheduler, ClockBroadcaster), - name: Module.concat(scheduler, ExecutionBroadcaster) - }) + |> Map.put(:job_broadcaster_reference, job_broadcaster_name) + |> Map.put(:clock_broadcaster_reference, clock_broadcaster_name) + |> Map.put(:name, execution_broadcaster_name) ) node_selector_broadcaster_opts = %Quantum.NodeSelectorBroadcaster.StartOpts{ - execution_broadcaster_reference: Module.concat(scheduler, ExecutionBroadcaster), - task_supervisor_reference: Module.concat(scheduler, TaskSupervisor), - name: Module.concat(scheduler, NodeSelectorBroadcaster) + execution_broadcaster_reference: execution_broadcaster_name, + task_supervisor_reference: task_supervisor_name, + name: node_selector_broadcaster_name } executor_supervisor_opts = @@ -77,18 +84,16 @@ defmodule Quantum.Supervisor do Quantum.ExecutorSupervisor.StartOpts, opts |> Map.take([:debug_logging]) - |> Map.merge(%{ - node_selector_broadcaster_reference: Module.concat(scheduler, NodeSelectorBroadcaster), - task_supervisor_reference: Module.concat(scheduler, TaskSupervisor), - task_registry_reference: Module.concat(scheduler, TaskRegistry), - name: Module.concat(scheduler, ExecutorSupervisor) - }) + |> Map.put(:node_selector_broadcaster_reference, node_selector_broadcaster_name) + |> Map.put(:task_supervisor_reference, task_supervisor_name) + |> Map.put(:task_registry_reference, task_registry_name) + |> Map.put(:name, executor_supervisor_name) ) Supervisor.init( [ - {Task.Supervisor, [name: Module.concat(scheduler, TaskSupervisor)]}, - {storage, storage_opts ++ [name: Module.concat(scheduler, Storage)]}, + {Task.Supervisor, task_supervisor_opts}, + {storage, storage_opts}, {Quantum.ClockBroadcaster, clock_broadcaster_opts}, {Quantum.TaskRegistry, task_registry_opts}, {Quantum.JobBroadcaster, job_broadcaster_opts}, @@ -101,41 +106,8 @@ defmodule Quantum.Supervisor do end # Run Optional Callback in Quantum Scheduler Implementation - @spec quantum_init(atom, Keyword.t()) :: Keyword.t() - defp quantum_init(quantum, config) do - if Code.ensure_loaded?(quantum) and function_exported?(quantum, :init, 1) do - quantum.init(config) - else - config - end - end - - defp runtime_config(quantum, otp_app, custom) do - config = Quantum.scheduler_config(quantum, otp_app, custom) - - # Load Jobs from Config - jobs = - config - |> Keyword.get(:jobs, []) - |> Enum.map(&Normalizer.normalize(quantum.new_job(config), &1)) - |> remove_jobs_with_duplicate_names(quantum) - - Keyword.put(config, :jobs, jobs) - end - - defp remove_jobs_with_duplicate_names(job_list, quantum) do - job_list - |> Enum.reduce(%{}, fn %Job{name: name} = job, acc -> - if Enum.member?(Map.keys(acc), name) do - Logger.warn( - "Job with name '#{name}' of quantum '#{quantum}' not started due to duplicate job name" - ) - - acc - else - Map.put_new(acc, name, job) - end - end) - |> Map.values() + @spec quantum_init(Keyword.t(), atom) :: Keyword.t() + defp quantum_init(config, scheduler) do + scheduler.init(config) end end diff --git a/lib/quantum/task_registry.ex b/lib/quantum/task_registry.ex index e7af817..3fd7310 100644 --- a/lib/quantum/task_registry.ex +++ b/lib/quantum/task_registry.ex @@ -12,7 +12,19 @@ defmodule Quantum.TaskRegistry do # Start the registry @spec start_link(StartOpts.t()) :: GenServer.on_start() def start_link(%StartOpts{name: name}) do - GenServer.start_link(__MODULE__, %InitOpts{}, name: name) + __MODULE__ + |> GenServer.start_link(%InitOpts{}, name: name) + |> case do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + Process.monitor(pid) + {:ok, pid} + + {:error, _reason} = error -> + error + end end # Mark a task as Running @@ -120,4 +132,9 @@ defmodule Quantum.TaskRegistry do {:noreply, %{state | running_tasks: running_tasks}} end + + @impl GenServer + def handle_info(_message, state) do + {:noreply, [], state} + end end From d2cace6c32f3d4e43399355d32da4f1f0bb994be Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonatan=20M=C3=A4nnchen?= Date: Tue, 18 Aug 2020 12:04:24 +0200 Subject: [PATCH 2/2] Release v3.1.0 --- CHANGELOG.md | 11 +++++++++-- mix.exs | 2 +- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6704bfc..cedd54f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ This project adheres to [Semantic Versioning](http://semver.org/). Diff for [unreleased] +## 3.1.0 - 2020-08-18 + +### Added +- Additional Supervisor Configuration for Clustering (#450) + +Diff for [3.1.0] + ## 3.0.2 - 2020-08-18 ### Fixed @@ -593,8 +600,8 @@ Diff for [1.0.0] ### Added - Initial commit - -[unreleased]: https://github.com/quantum-elixir/quantum-core/compare/v3.0.2...HEAD +[unreleased]: https://github.com/quantum-elixir/quantum-core/compare/v3.1.0...HEAD +[3.0.2]: https://github.com/quantum-elixir/quantum-core/compare/v3.0.2...v3.1.0 [3.0.2]: https://github.com/quantum-elixir/quantum-core/compare/v3.0.1...v3.0.2 [3.0.1]: https://github.com/quantum-elixir/quantum-core/compare/v3.0.0...v3.0.1 [3.0.0]: https://github.com/quantum-elixir/quantum-core/compare/v3.0.0-rc.3...v3.0.0 diff --git a/mix.exs b/mix.exs index 0be78f8..5942654 100644 --- a/mix.exs +++ b/mix.exs @@ -3,7 +3,7 @@ defmodule Quantum.Mixfile do use Mix.Project - @version "3.0.2" + @version "3.1.0" def project do [