Skip to content
Open
5 changes: 4 additions & 1 deletion .iex.exs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
import Ecto.Query
if Code.loaded?(Ecto.Query) do
import Ecto.Query
end

alias Lightning.Repo
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ and this project adheres to

### Added

- Support a rate-limiter on the /i endpoint
[#3185](https://github.com/OpenFn/lightning/issues/3185)
- Added a custom metric to track projects that could benefit from additional
worker pods.
[#3189](https://github.com/OpenFn/lightning/issues/3189)
Expand Down
4 changes: 2 additions & 2 deletions bin/ci_tests
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
set -euo pipefail

set +e
mix coveralls.json --export-coverage coverage -o test/reports
mix coveralls.json --exclude dist_integration --export-coverage coverage -o test/reports
EXIT_CODE=$?
set -e

Expand All @@ -39,7 +39,7 @@ fi
# Only retry if exit code is exactly 2
if [ $EXIT_CODE -eq 2 ]; then
set +e
mix coveralls.json --import-cover coverage --failed -o test/reports
mix coveralls.json --exclude dist_integration --import-cover coverage --failed -o test/reports
EXIT_CODE=$? # Overwrite with the second run's exit code
set -e
# Rename the failed test report if it exists
Expand Down
173 changes: 173 additions & 0 deletions bin/local_cluster
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
#!/usr/bin/env bash

COOKIE="rate-limiter-secret"

# Function to show usage
show_usage() {
echo "Usage:"
echo " $0 [--proxy] --count <number> Start local cluster (default: 2 instances)"
echo " $0 connect <node_number> Connect to a specific node (1-4)"
echo ""
echo "Options:"
echo " --proxy Start a Caddy reverse proxy on port 4000 (nodes will start from 4001)"
echo " --count <num> Number of nodes to start (1-4, default: 2)"
exit 1
}

# Handle connect subcommand
if [ "$1" = "connect" ]; then
if [ -z "$2" ] || ! [[ "$2" =~ ^[1-4]$ ]]; then
echo "Error: Please specify a valid node number (1-4)"
show_usage
fi

NODE_NUM=$2
echo "Connecting to node${NODE_NUM}@127.0.0.1..."
exec iex --name "remote_shell${NODE_NUM}@127.0.0.1" --cookie "${COOKIE}" --remsh "node${NODE_NUM}@127.0.0.1"
# The exec command replaces the current process, so we don't need an explicit exit
# If we reach this point, it means the exec failed, so we'll exit with its status code
exit $?
fi

# Parse arguments
USE_PROXY=false
INSTANCES=2

while [[ $# -gt 0 ]]; do
case $1 in
--proxy)
USE_PROXY=true
shift
;;
--count)
if [ -z "$2" ] || ! [[ "$2" =~ ^[0-9]+$ ]]; then
echo "Error: --count requires a numeric argument"
show_usage
fi
INSTANCES=$2
shift 2
;;
*)
echo "Unknown argument: $1"
show_usage
;;
esac
done

# Validate number of instances
if ! [[ "$INSTANCES" =~ ^[0-9]+$ ]]; then
echo "Error: Number of instances must be a positive integer"
show_usage
fi

if [ "$INSTANCES" -lt 1 ] || [ "$INSTANCES" -gt 4 ]; then
echo "Error: Number of instances must be between 1 and 4"
show_usage
fi

# Check for Caddy if proxy is requested
if [ "$USE_PROXY" = true ]; then
if ! command -v caddy &>/dev/null; then
echo "Error: Caddy is required for proxy mode but it's not installed"
echo "Please install Caddy first:"
echo " Mac: brew install caddy"
echo " Linux: sudo apt install caddy"
echo " Or visit: https://caddyserver.com/docs/install"
exit 1
fi
fi

# Array to store background PIDs
declare -a PIDS

# Colors for different processes
declare -a COLORS=(
"\033[0;36m" # Cyan
"\033[0;32m" # Green
"\033[0;35m" # Purple
"\033[0;33m" # Yellow
"\033[0;37m" # Gray (for proxy)
)
RESET="\033[0m"

# Cleanup function to kill all child processes
cleanup() {
echo "Shutting down all processes..."
for pid in "${PIDS[@]}"; do
kill "$pid" 2>/dev/null
done
exit 0
}

# Set up trap for cleanup
trap cleanup INT TERM

# Function to run a command with colored output
run_with_color() {
local color=$1
local prefix=$2
shift 2
# Run the command and color its output
"$@" 2>&1 | while read -r line; do
echo -e "${color}${prefix} | ${line}${RESET}"
done
}

# Create Caddy configuration if proxy is enabled
if [ "$USE_PROXY" = true ]; then
BASE_PORT=4001
CADDY_CONFIG=$(mktemp)
echo "Creating Caddy configuration..."
cat >"$CADDY_CONFIG" <<EOF
# Global options
{
admin off
auto_https off
http_port 4000
}

# Reverse proxy configuration
localhost:4000 {
reverse_proxy {
to $(for i in $(seq 1 "$INSTANCES"); do echo "localhost:$((BASE_PORT + i - 1))"; done | paste -sd " " -)
lb_policy round_robin
}
}
EOF

# Only log Caddy config if LOG_LEVEL is debug
if [ "${LOG_LEVEL:-}" = "debug" ]; then
echo "Caddy config:"
cat "$CADDY_CONFIG"
fi

# Start Caddy
run_with_color "${COLORS[4]}" "proxy" caddy run --adapter caddyfile --config "$CADDY_CONFIG" &
PIDS+=($!)

# Cleanup Caddy config on exit
trap 'rm -f "$CADDY_CONFIG"' EXIT

echo "Started reverse proxy on port 4000"
else
BASE_PORT=4000
fi

# Start the requested number of instances
for i in $(seq 1 "$INSTANCES"); do
export RTM_PORT=$((2222 + i - 1)) PORT=$((BASE_PORT + i - 1))
run_with_color "${COLORS[$i - 1]}" "node$i" elixir --cookie "${COOKIE}" --name "[email protected]" -S mix phx.server &
PIDS+=($!)
done

if [ "$USE_PROXY" = true ]; then
echo "Started $INSTANCES node(s) on ports $((BASE_PORT))-$((BASE_PORT + INSTANCES - 1)) with load balancer on port 4000"
echo "RTM ports: 2222-$((2222 + INSTANCES - 1))"
else
echo "Started $INSTANCES node(s) on ports $((BASE_PORT))-$((BASE_PORT + INSTANCES - 1))"
echo "RTM ports: 2222-$((2222 + INSTANCES - 1))"
fi
echo "To connect to a specific node, use: $0 connect <node_number>"

# Wait for all background processes
wait
5 changes: 5 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ config :lightning, LightningWeb.Endpoint,
pubsub_server: Lightning.PubSub,
live_view: [signing_salt: "EfrmuOUr"]

config :lightning, Lightning.DistributedRateLimiter,
start: false,
capacity: 10,
refill_per_second: 2

config :lightning, Lightning.Extensions,
rate_limiter: Lightning.Extensions.RateLimiter,
usage_limiter: Lightning.Extensions.UsageLimiter,
Expand Down
2 changes: 2 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ config :lightning, Lightning.Vault,

config :lightning, Lightning.Runtime.RuntimeManager, start: true

config :lightning, Lightning.DistributedRateLimiter, start: true

config :lightning, :workers,
private_key: """
-----BEGIN PRIVATE KEY-----
Expand Down
2 changes: 2 additions & 0 deletions config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ config :lightning, LightningWeb.Endpoint,
"/8zedVJLxvmGGFoRExE3e870g7CGZZQ1Vq11A5MbQGPKOpK57MahVsPW6Wkkv61n",
server: true

config :lightning, Lightning.DistributedRateLimiter, start: true

config :lightning, Lightning.Runtime.RuntimeManager,
ws_url: "ws://localhost:4002/worker"

Expand Down
22 changes: 22 additions & 0 deletions lib/lightning/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@ defmodule Lightning.Application do

require Logger

@rate_limiter_opts Application.compile_env!(
:lightning,
Lightning.DistributedRateLimiter
)

@impl true
def start(_type, _args) do
# Initialize ETS table for adapter lookup
Expand Down Expand Up @@ -107,6 +112,12 @@ defmodule Lightning.Application do
[
Lightning.PromEx,
{Cluster.Supervisor, [topologies, [name: Lightning.ClusterSupervisor]]},
{Horde.Registry,
name: Lightning.HordeRegistry, keys: :unique, members: :auto},
{Horde.DynamicSupervisor,
name: Lightning.DistributedSupervisor,
strategy: :one_for_one,
members: :auto},
{Lightning.Vault, Application.get_env(:lightning, Lightning.Vault, [])},
# Start the Ecto repository
Lightning.Repo,
Expand Down Expand Up @@ -174,6 +185,17 @@ defmodule Lightning.Application do
:ok
end

def start_phase(:init_rate_limiter, :normal, _args) do
if @rate_limiter_opts[:start] do
Horde.DynamicSupervisor.start_child(
Lightning.DistributedSupervisor,
{Lightning.DistributedRateLimiter, @rate_limiter_opts}
)
end

:ok
end

def oban_opts do
opts = Application.get_env(:lightning, Oban)

Expand Down
Loading