statemachine

package
v0.0.0-...-3c171a9 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 15, 2023 License: Apache-2.0 Imports: 7 Imported by: 6

README

Metafora Finite State Machine

The statemachine package provides a featureful state machine for use by Metafora task handlers.

Features

  • Static state machine; no custom states or messages (transitions)
  • Per task state machine; task may intercept commands
  • Flexible state store (see StateStore interface)
  • Flexible command sending/receiving (see Commander, CommandListener, or the etcd implementation).
  • Flexible error handling with builtin retry logic (see errors.go).
  • States: Runnable, Paused, Sleeping, Fault, Completed, Failed, Killed
  • Commands/Messages: Run, Pause, Sleep, Release, Error, Kill, Complete, Checkpoint
  • Tasks in a terminal state are unscheduled and will take no cluster resources.

Control Flow

  1. Coordinator receives a claimable task from a Watch
  2. Consumer calls Balancer.CanClaim(task)
  3. If claimable, Consumer calls Coordinator.Claim(task) to claim it.
  4. If claim was successful, Consumer starts the task handler which is created by statemachine.New(...).
  5. State machine loads initial state via StateStore.Load(task).
  6. If the task is Runnable hand over control to the StatefulHandler implementation provided by the user.
  7. Run until task returns a Message either due to completion, an error, or a received command.

There are quite a few moving parts that are hooked together:

  • The Consumer needs a Coordinator, Balancer, and HandlerFunc like normal, but you should use statemachine.New(...) to create the Handler returned by your HandlerFunc.
  • The state machine requires a StateStore and CommandListener. The m_etcd package includes an etcd implemenation of CommandLister (as well as Commander for sending commands), but no default StateStore is provided.
  • Your task handling code must be implemented in a function (or method) that fulfills the StatefulHandler signature. When your handler receives a command it should return it (or override it with a new Message) to the state machine to handle state transitions.

States

State Description
Runnable Task is runnable and control is passed to the task handler.
Paused Task is paused until a command is received.
Sleeping Task is paused until a specified time (or a command is received).
Fault An error occurred and a custom error handler is invoked.
Completed Terminal Task returned the Complete message because it finished succesfully.
Failed Terminal The error handler executed during the Fault state determined the task has failed permanently.
Killed Terminal Task received a Kill message.

Terminal states are final. The task is removed from from the broker and will never be scheduled to run again.

Messages

AKA Events or Commands

Messages cause transitions between states.

Message Description
Run Causes a Paused or Sleeping task to transition to Runnable and begin executing.
Pause Causes a Runnable or Sleeping task to transition to Paused.
Sleep Requires an Until time.Time to be set. Causes non-terminal states to pause until the time is reached.
Error Requires an Err error to be set. Usually returned by tasks to transition to Fault state.
Release See below
Checkpoint See below
Kill Causes a non-terminal state to transition to Killed.
Complete Should only be returned by tasks. Causes a Runnable state to transition to Completed.
Release

Release is a special message that does not transition between states. Instead the task handler exits and the Coordinator's claim on the task is released.

Metafora's Handler.Stop() method sends the Release command to a running task to request it exit. It's most often used when cleanly restarting Metafora nodes.

Checkpoint

Checkpoint is a special message that - like Release - does not transition between states. It is meant to be a signal to tasks to persist any internal state and optionally exit to allow the state machine to store.

Since a Checkpoint is a noop in the state machine a task may decide to intercept the message and not return.

Documentation

Overview

Statemachine is a featureful statemachine implementation for Metafora handlers to use. It is implemented as a Handler wrapper which provides a channel of incoming commands to wrapped handlers. Internal handlers are expected to shutdown cleanly and exit upon receiving a command from the state machine. The state machine will handle the state transition and restart the internal handler if necesary.

Users must provide a StateStore implementation for persisting task state and Command Listener implementation for receiving commands. See the m_etcd or embedded packages for example Command Listener implementations.

See the README in this package for details.

Index

Constants

View Source
const (
	DefaultErrLifetime = -4 * time.Hour
	DefaultErrMax      = 8
)

Variables

View Source
var (
	MissingUntilError  = errors.New("sleeping state missing deadline")
	MissingErrorsError = errors.New("fault state has no errors")
	ReleasableError    = errors.New("network error, release and retry")
)
View Source
var ExceededErrorRate = errors.New("exceeded error rate")

ExceededErrorRate is returned by error handlers in an Error Message when retry logic has been exhausted for a handler and it should transition to Failed.

View Source
var (
	// Rules is the state transition table.
	Rules = [...]Transition{

		{Event: Checkpoint, From: Runnable, To: Runnable},
		{Event: Release, From: Runnable, To: Runnable},
		{Event: Sleep, From: Runnable, To: Sleeping},
		{Event: Complete, From: Runnable, To: Completed},
		{Event: Kill, From: Runnable, To: Killed},
		{Event: Error, From: Runnable, To: Fault},
		{Event: Pause, From: Runnable, To: Paused},
		{Event: Run, From: Runnable, To: Runnable},

		{Event: Checkpoint, From: Sleeping, To: Sleeping},
		{Event: Release, From: Sleeping, To: Sleeping},
		{Event: Sleep, From: Sleeping, To: Sleeping},
		{Event: Run, From: Sleeping, To: Runnable},
		{Event: Kill, From: Sleeping, To: Killed},
		{Event: Pause, From: Sleeping, To: Paused},
		{Event: Error, From: Sleeping, To: Fault},

		{Event: Sleep, From: Fault, To: Sleeping},
		{Event: Error, From: Fault, To: Failed},

		{Event: Checkpoint, From: Paused, To: Paused},
		{Event: Release, From: Paused, To: Paused},
		{Event: Run, From: Paused, To: Runnable},
		{Event: Sleep, From: Paused, To: Sleeping},
		{Event: Kill, From: Paused, To: Killed},
		{Event: Pause, From: Paused, To: Paused},
	}
)

Functions

func DefaultErrHandler

func DefaultErrHandler(_ metafora.Task, errs []Err) (*Message, []Err)

DefaultErrHandler returns a Fail message if 8 errors have occurred in 4 hours. Otherwise it enters the Sleep state for 10 minutes before trying again.

func New

New handler that creates a state machine and exposes state transitions to the given handler by calling its Transition method. It should be created in the HandlerFunc you use with metafora's Consumer.

If ErrHandler is nil DefaultErrHandler will be used.

Types

type CommandListener

type CommandListener interface {
	Receive() <-chan *Message
	Stop()
}

type Commander

type Commander interface {
	Send(taskID string, m *Message) error
}

type Err

type Err struct {
	Time time.Time `json:"timestamp"`
	Err  string    `json:"error"`
	// contains filtered or unexported fields
}

Err represents an error that occurred while a stateful handler was running.

NewErr was added to allow callers to construct an instance from an underlying error. The underlying error is now preserved so that Err can be converted back using errors.As This is useful for custom error handlers that wish to inspect underlying error types and decision accordingly.

func NewErr

func NewErr(e error, t time.Time) Err

NewErr constructs an Err from an underlying error e.

func (Err) Error

func (e Err) Error() string

Error implements the Error interface.

func (Err) Unwrap

func (e Err) Unwrap() error

Unwrap returns baseErr.

type ErrHandler

type ErrHandler func(task metafora.Task, errs []Err) (*Message, []Err)

ErrHandler functions should return Run, Sleep, or Fail messages depending on the rate of errors.

Either ErrHandler and/or StateStore should trim the error slice to keep it from growing without bound.

type Message

type Message struct {
	Code MessageCode `json:"message"`

	// Until is when the statemachine should transition from sleeping to runnable
	Until *time.Time `json:"until,omitempty"`

	// Err is the error that caused this Error message
	Err error `json:"error,omitempty"`
}

Messages are events that cause state transitions. Until and Err are used by the Sleep and Error messages respectively.

func CheckpointMessage

func CheckpointMessage() *Message

func CompleteMessage

func CompleteMessage() *Message

func ErrorMessage

func ErrorMessage(err error) *Message

ErrorMessage is a simpler helper for creating error messages from an error.

func KillMessage

func KillMessage() *Message

func PauseMessage

func PauseMessage() *Message

func ReleaseMessage

func ReleaseMessage() *Message

func RunMessage

func RunMessage() *Message

func SleepMessage

func SleepMessage(t time.Time) *Message

SleepMessage is a simpler helper for creating sleep messages from a time.

func (*Message) String

func (m *Message) String() string

func (*Message) Valid

func (m *Message) Valid() bool

Valid returns true if the Message is valid. Invalid messages sent as commands are discarded by the state machine.

type MessageCode

type MessageCode string

MessageCode is the symbolic name of a state transition.

const (
	Run        MessageCode = "run"
	Sleep      MessageCode = "sleep"
	Pause      MessageCode = "pause"
	Kill       MessageCode = "kill"
	Error      MessageCode = "error"
	Complete   MessageCode = "complete"
	Checkpoint MessageCode = "checkpoint"

	// Special event which triggers state machine to exit without transitioning
	// between states.
	Release MessageCode = "release"
)

func (MessageCode) String

func (m MessageCode) String() string

type State

type State struct {
	Code   StateCode  `json:"state"`
	Until  *time.Time `json:"until,omitempty"`
	Errors []Err      `json:"errors,omitempty"`
}

State represents the current state of a stateful handler. See StateCode for details. Until and Errors are extra state used by the Sleeping and Fault states respectively.

func (*State) String

func (s *State) String() string

func (*State) Valid

func (s *State) Valid() error

type StateCode

type StateCode string

StateCode is the actual state key. The State struct adds additional metadata related to certain StateCodes.

const (
	Runnable  StateCode = "runnable"  // Scheduled
	Sleeping  StateCode = "sleeping"  // Scheduled, not running until time has elapsed
	Completed StateCode = "completed" // Terminal, not scheduled
	Killed    StateCode = "killed"    // Terminal, not scheduled
	Failed    StateCode = "failed"    // Terminal, not scheduled
	Fault     StateCode = "fault"     // Scheduled, in error handling / retry logic
	Paused    StateCode = "paused"    // Scheduled, not running
)

func (StateCode) String

func (s StateCode) String() string

func (StateCode) Terminal

func (s StateCode) Terminal() bool

Terminal states will never run and cannot transition to a non-terminal state.

type StateStore

type StateStore interface {
	// Load the persisted or initial state for a task. Errors will cause tasks to
	// be marked as done.
	//
	// The one exception is the special error StateNotFound which will cause the
	// state machine to start from the initial (Runnable) state.
	Load(metafora.Task) (*State, error)

	// Store the current task state. Errors will prevent current state from being
	// persisted and prevent state transitions.
	Store(metafora.Task, *State) error
}

StateStore is an interface implementations must provide for persisting task state. Since the task ID is provided on each method call a single global StateStore can be used and implementations should be safe for concurrent access.

type StatefulHandler

type StatefulHandler func(task metafora.Task, commands <-chan *Message) *Message

StatefulHandler is the function signature that the state machine is able to run. Instead of metafora.Handler's Stop method, StatefulHandlers receive Messages via the commands chan and return their exit status via a Message.

Normally StatefulHandlers simply return a Message as soon as it's received on the commands chan. However, it's also acceptable for a handler to return a different Message. For example if it encounters an error during shutdown, it may choose to return that error as an Error Message as opposed to the original command.

type Transition

type Transition struct {
	Event MessageCode
	From  StateCode
	To    StateCode
}

Transitions represent a state machine transition from one state to another given an event message.

func (Transition) String

func (t Transition) String() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL