diff --git a/cmd/connection_test/main.go b/cmd/connection_test/main.go new file mode 100644 index 00000000..83b227eb --- /dev/null +++ b/cmd/connection_test/main.go @@ -0,0 +1,84 @@ +package main + +import ( + "context" + "flag" + "fmt" + "math/rand" + "time" + + "github.com/jackc/pgx/v5" +) + +func main() { + uri := flag.String("uri", "", "PG cluster connection string") + totalWrites := flag.Int("total-writes", 0, "Total writes") + + flag.Parse() + + ctx := context.Background() + conn, err := openConnection(ctx, *uri) + if err != nil { + panic(err) + } + defer conn.Close(ctx) + + if *totalWrites == 0 { + *totalWrites = 1000 + } + + // if err := admin.CreateDatabaseWithOwner(ctx, conn, "benchmark", "postgres"); err != nil { + // panic(err) + // } + + // sql := fmt.Sprintf("CREATE TABLE IF NOT EXISTS bench ( id serial primary key, val varchar(100));") + // _, err = conn.Exec(ctx, sql) + // if err != nil { + // panic(conn) + // } + + seed := generateSeed() + + for i := 0; i < *totalWrites; i++ { + val := fmt.Sprintf("%s-%d", seed, i) + sql := fmt.Sprintf("INSERT INTO bench (val) VALUES ('%s');", val) + _, err = conn.Exec(ctx, sql) + if err != nil { + fmt.Printf("(%d of %d) - Failed\n", i, *totalWrites-1) + time.Sleep(2) + continue + } + + fmt.Printf("(%d of %d) - Success\n", i, *totalWrites-1) + } + +} + +func generateSeed() string { + const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ" + + b := make([]byte, 5) + for i := range b { + b[i] = letterBytes[rand.Intn(len(letterBytes))] + } + return string(b) +} + +func openConnection(parentCtx context.Context, uri string) (*pgx.Conn, error) { + ctx, cancel := context.WithTimeout(parentCtx, 10*time.Second) + defer cancel() + + fmt.Println(uri) + conf, err := pgx.ParseConfig(uri) + if err != nil { + return nil, err + } + + conf.ConnectTimeout = 5 * time.Second + conn, err := pgx.ConnectConfig(ctx, conf) + if err != nil { + return nil, err + } + + return conn, nil +} diff --git a/cmd/event_handler/main.go b/cmd/event_handler/main.go index eab76a80..83bcbbca 100644 --- a/cmd/event_handler/main.go +++ b/cmd/event_handler/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "errors" "flag" "fmt" "log" @@ -10,6 +11,7 @@ import ( "time" "github.com/fly-apps/postgres-flex/internal/flypg" + "github.com/jackc/pgx/v5" ) const eventLogFile = "/data/event.log" @@ -26,6 +28,8 @@ func main() { details := flag.String("details", "", "details") flag.Parse() + ctx := context.Background() + logFile, err := os.OpenFile(eventLogFile, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644) if err != nil { fmt.Printf("failed to open event log: %s", err) @@ -43,7 +47,7 @@ func main() { success := false for retry < maxRetries { - if err := reconfigurePGBouncer(*nodeID); err != nil { + if err := reconfigurePGBouncer(ctx, *nodeID); err != nil { log.Printf("%s - failed to reconfigure pgbouncer: %s. (attempt: %d)\n", *event, err, retry) retry++ time.Sleep(1 * time.Second) @@ -71,9 +75,8 @@ func main() { retry := 0 maxRetries := 5 success := false - for retry < maxRetries { - if err := reconfigurePGBouncer(newMemberID); err != nil { + if err := reconfigurePGBouncer(ctx, newMemberID); err != nil { log.Printf("%s - failed to reconfigure pgbouncer: %s. (attempt: %d)\n", *event, err, retry) retry++ time.Sleep(1 * time.Second) @@ -91,28 +94,115 @@ func main() { os.Exit(1) } + case "child_node_disconnect", "child_node_reconnect", "child_node_new_connect": + node, err := flypg.NewNode() + if err != nil { + log.Printf("failed to initialize node: %s", err) + os.Exit(1) + } + + conn, err := node.RepMgr.NewLocalConnection(ctx) + if err != nil { + log.Printf("failed to open local connection: %s", err) + os.Exit(1) + } + defer conn.Close(ctx) + + member, err := node.RepMgr.Member(ctx, conn) + if err != nil { + log.Printf("failed to resolve member: %s", err) + os.Exit(1) + } + + if member.Role != flypg.PrimaryRoleName { + // We should never get here. + log.Println("skipping since we are not the primary") + os.Exit(0) + } + + if err := evaluateClusterState(ctx, conn, node); err != nil { + log.Printf("failed to evaluate cluster state: %s", err) + os.Exit(0) + } + + os.Exit(0) default: // noop } } -func reconfigurePGBouncer(id int) error { +func reconfigurePGBouncer(ctx context.Context, id int) error { node, err := flypg.NewNode() if err != nil { return fmt.Errorf("failed to reference node: %s", err) } - conn, err := node.RepMgr.NewLocalConnection(context.TODO()) + conn, err := node.RepMgr.NewLocalConnection(ctx) if err != nil { return fmt.Errorf("failed to establish connection with local pg: %s", err) } + defer conn.Close(ctx) - member, err := node.RepMgr.MemberByID(context.TODO(), conn, id) + member, err := node.RepMgr.MemberByID(ctx, conn, id) if err != nil { return err } - if err := node.PGBouncer.ConfigurePrimary(context.TODO(), member.Hostname, true); err != nil { + if err := node.PGBouncer.ConfigurePrimary(ctx, member.Hostname, true); err != nil { + return fmt.Errorf("failed to reconfigure pgbouncer primary %s", err) + } + + return nil +} + +func evaluateClusterState(ctx context.Context, conn *pgx.Conn, node *flypg.Node) error { + standbys, err := node.RepMgr.StandbyMembers(ctx, conn) + if err != nil { + if !errors.Is(err, pgx.ErrNoRows) { + return fmt.Errorf("failed to query standbys") + } + } + + sample, err := flypg.TakeDNASample(ctx, node, standbys) + if err != nil { + return fmt.Errorf("failed to evaluate cluster data: %s", err) + } + + log.Println(flypg.DNASampleString(sample)) + + primary, err := flypg.ZombieDiagnosis(sample) + if errors.Is(err, flypg.ErrZombieDiagnosisUndecided) || errors.Is(err, flypg.ErrZombieDiscovered) { + if err := flypg.Quarantine(ctx, conn, node, primary); err != nil { + return fmt.Errorf("failed to quarantine failed primary: %s", err) + } + + return fmt.Errorf("primary has been quarantined: %s", err) + } else if err != nil { + return fmt.Errorf("failed to run zombie diagnosis: %s", err) + } + + // If the zombie lock exists clear it + if flypg.ZombieLockExists() { + log.Println("Clearing zombie lock and enabling read/write") + if err := flypg.RemoveZombieLock(); err != nil { + return fmt.Errorf("failed to remove zombie lock: %s", err) + } + + maxRetries := 5 + retry := 0 + + for retry < maxRetries { + if err := flypg.UnsetReadOnly(ctx, node, conn); err != nil { + log.Printf("attempt %d - failed to unset readonly: %s", retry, err) + retry++ + continue + } + log.Println("successfully enabled read/write") + break + } + } + + if err := node.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil { return fmt.Errorf("failed to reconfigure pgbouncer primary %s", err) } diff --git a/internal/flycheck/pg.go b/internal/flycheck/pg.go index 6ca7a2a9..f078b652 100644 --- a/internal/flycheck/pg.go +++ b/internal/flycheck/pg.go @@ -67,6 +67,10 @@ func diskCapacityCheck(ctx context.Context, localConn *pgx.Conn, node *flypg.Nod // Turn primary read-only if usedPercentage > diskCapacityPercentageThreshold { + + if err := flypg.WriteReadOnlyLock(); err != nil { + return "", fmt.Errorf("failed to set readonly lock: %s", err) + } if err := flypg.SetReadOnly(ctx, node, localConn); err != nil { return "", fmt.Errorf("failed to turn primary readonly: %s", err) } @@ -74,10 +78,17 @@ func diskCapacityCheck(ctx context.Context, localConn *pgx.Conn, node *flypg.Nod return "", fmt.Errorf("%0.1f%% - readonly mode enabled, extend your volume to re-enable writes", usedPercentage) } - // Don't attempt to turn read/write if zombie lock exists. - if !flypg.ZombieLockExists() { - if err := flypg.UnsetReadOnly(ctx, node, localConn); err != nil { - return "", fmt.Errorf("failed to turn primary read/write: %s", err) + if flypg.ReadOnlyLockExists() { + // We are required to stay read-only if the zombie.lock is present. + if !flypg.ZombieLockExists() { + if err := flypg.UnsetReadOnly(ctx, node, localConn); err != nil { + return "", fmt.Errorf("failed to turn primary read/write: %s", err) + } + } + + // Remove the read-only lock. + if err := flypg.RemoveReadOnlyLock(); err != nil { + return "", fmt.Errorf("failed to remove readonly lock: %s", err) } } diff --git a/internal/flypg/node.go b/internal/flypg/node.go index ff0da75c..56794cf2 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -145,7 +145,7 @@ func (n *Node) Init(ctx context.Context) error { if ZombieLockExists() { fmt.Println("Zombie lock detected!") - primaryStr, err := readZombieLock() + primaryStr, err := ReadZombieLock() if err != nil { return fmt.Errorf("failed to read zombie lock: %s", primaryStr) } @@ -172,7 +172,7 @@ func (n *Node) Init(ctx context.Context) error { // Confirm that our rejoin target still identifies itself as the primary. if primary.Hostname != ip.String() { // Clear the zombie.lock file so we can attempt to re-resolve the correct primary. - if err := removeZombieLock(); err != nil { + if err := RemoveZombieLock(); err != nil { return fmt.Errorf("failed to remove zombie lock: %s", err) } @@ -191,7 +191,7 @@ func (n *Node) Init(ctx context.Context) error { // TODO - Wait for target cluster to register self as a standby. - if err := removeZombieLock(); err != nil { + if err := RemoveZombieLock(); err != nil { return fmt.Errorf("failed to remove zombie lock: %s", err) } @@ -348,46 +348,28 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to resolve cluster metrics: %s", err) } - printDNASample(sample) + fmt.Println(DNASampleString(sample)) // Evaluate whether we are a zombie or not. primary, err := ZombieDiagnosis(sample) if errors.Is(err, ErrZombieDiagnosisUndecided) { fmt.Println("Unable to confirm that we are the true primary!") - fmt.Println("Writing zombie.lock file.") - if err := writeZombieLock(""); err != nil { - return fmt.Errorf("failed to set zombie lock: %s", err) + if err := Quarantine(ctx, conn, n, primary); err != nil { + return fmt.Errorf("failed to quarantine failed primary: %s", err) } - fmt.Println("Turning all user-created databases readonly.") - if err := SetReadOnly(ctx, n, conn); err != nil { - return fmt.Errorf("failed to set read-only: %s", err) - } - - // TODO - Add link to docs - fmt.Println("Please refer to following documentation for more information: .") - } else if errors.Is(err, ErrZombieDiscovered) { - fmt.Println("Zombie primary discovered!") fmt.Printf("The majority of registered members agree that '%s' is the real primary.\n", primary) - fmt.Printf("Reconfiguring PGBouncer to point to '%s'\n", primary) - if err := n.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil { - return fmt.Errorf("failed to reconfigure pgbouncer: %s", err) - } - - fmt.Println("Writing zombie.lock file") - if err := writeZombieLock(primary); err != nil { - return fmt.Errorf("failed to set zombie lock: %s", err) - } - - fmt.Println("Turning user-created databases read-only") - if err := SetReadOnly(ctx, n, conn); err != nil { - return fmt.Errorf("failed to set read-only: %s", err) + if err := Quarantine(ctx, conn, n, primary); err != nil { + return fmt.Errorf("failed to quarantine failed primary: %s", err) } + // Issue panic to force a process restart so we can attempt to rejoin + // the the cluster we've diverged from. panic(err) + } else if err != nil { return fmt.Errorf("failed to run zombie diagnosis: %s", err) } diff --git a/internal/flypg/pgbouncer.go b/internal/flypg/pgbouncer.go index 4f2ec2cb..449f551f 100644 --- a/internal/flypg/pgbouncer.go +++ b/internal/flypg/pgbouncer.go @@ -62,7 +62,7 @@ func (p *PGBouncer) ConfigurePrimary(ctx context.Context, primary string, reload if reload { err = p.reloadConfig(ctx) if err != nil { - fmt.Printf("failed to reconfigure pgbouncer primary %s\n", err) + return fmt.Errorf("failed to reconfigure pgbouncer primary: %s", err) } } return nil diff --git a/internal/flypg/readonly.go b/internal/flypg/readonly.go index 7567e658..c762773b 100644 --- a/internal/flypg/readonly.go +++ b/internal/flypg/readonly.go @@ -17,13 +17,9 @@ const ( ) func SetReadOnly(ctx context.Context, n *Node, conn *pgx.Conn) error { - if err := writeReadOnlyLock(); err != nil { - return fmt.Errorf("failed to set readonly lock: %s", err) - } - databases, err := admin.ListDatabases(ctx, conn) if err != nil { - return err + return fmt.Errorf("failed to list database: %s", err) } for _, db := range databases { @@ -51,7 +47,7 @@ func SetReadOnly(ctx context.Context, n *Node, conn *pgx.Conn) error { return fmt.Errorf("failed to verify readonly was unset: %s", err) } - if status == readOnlyDisabled { + if status != readOnlyEnabled { return fmt.Errorf("failed to turn database '%s' readonly", db.Name) } } @@ -60,14 +56,9 @@ func SetReadOnly(ctx context.Context, n *Node, conn *pgx.Conn) error { } func UnsetReadOnly(ctx context.Context, n *Node, conn *pgx.Conn) error { - // Skip if there's no readonly lock present - if !ReadOnlyLockExists() { - return nil - } - databases, err := admin.ListDatabases(ctx, conn) if err != nil { - return err + return fmt.Errorf("failed to list databases: %s", err) } for _, db := range databases { @@ -101,10 +92,6 @@ func UnsetReadOnly(ctx context.Context, n *Node, conn *pgx.Conn) error { } } - if err := removeReadOnlyLock(); err != nil { - return fmt.Errorf("failed to remove readonly lock: %s", err) - } - return nil } @@ -117,7 +104,7 @@ func ReadOnlyLockExists() bool { return true } -func writeReadOnlyLock() error { +func WriteReadOnlyLock() error { if ReadOnlyLockExists() { return nil } @@ -129,7 +116,7 @@ func writeReadOnlyLock() error { return nil } -func removeReadOnlyLock() error { +func RemoveReadOnlyLock() error { if !ReadOnlyLockExists() { return nil } diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index 0520783d..bb1b2c23 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -123,7 +123,7 @@ func (r *RepMgr) setDefaults() { "promote_command": fmt.Sprintf("'repmgr standby promote -f %s --log-to-file'", r.ConfigPath), "follow_command": fmt.Sprintf("'repmgr standby follow -f %s --log-to-file --upstream-node-id=%%n'", r.ConfigPath), "event_notification_command": fmt.Sprintf("'/usr/local/bin/event_handler -node-id %%n -event %%e -success %%s -details \"%%d\" -new-node-id \\'%%p\\''"), - "event_notifications": "'repmgrd_failover_promote,standby_promote,standby_follow'", + "event_notifications": "'repmgrd_failover_promote,standby_promote,standby_follow,child_node_disconnect,child_node_reconnect,child_node_new_connect'", "location": r.Region, "primary_visibility_consensus": true, "failover_validation_command": fmt.Sprintf("'/usr/local/bin/failover_validation -visible-nodes %%v -total-nodes %%t'"), diff --git a/internal/flypg/restore.go b/internal/flypg/restore.go index 0872aa7a..78134afd 100644 --- a/internal/flypg/restore.go +++ b/internal/flypg/restore.go @@ -204,13 +204,13 @@ func openConn(ctx context.Context, n *Node) (*pgx.Conn, error) { } func clearLocks() error { - if err := removeReadOnlyLock(); err != nil { + if err := RemoveReadOnlyLock(); err != nil { if !os.IsNotExist(err) { return fmt.Errorf("failed to remove readonly lock pre-restore: %s", err) } } - if err := removeZombieLock(); err != nil { + if err := RemoveZombieLock(); err != nil { if !os.IsNotExist(err) { return fmt.Errorf("failed to remove zombie lock pre-restore: %s", err) } diff --git a/internal/flypg/zombie.go b/internal/flypg/zombie.go index fbc5a3c6..fb9bc796 100644 --- a/internal/flypg/zombie.go +++ b/internal/flypg/zombie.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "os" + + "github.com/jackc/pgx/v5" ) var ( @@ -35,7 +37,7 @@ func writeZombieLock(hostname string) error { return nil } -func removeZombieLock() error { +func RemoveZombieLock() error { if err := os.Remove("/data/zombie.lock"); err != nil { return err } @@ -43,7 +45,7 @@ func removeZombieLock() error { return nil } -func readZombieLock() (string, error) { +func ReadZombieLock() (string, error) { body, err := os.ReadFile("/data/zombie.lock") if err != nil { return "", err @@ -75,7 +77,7 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp // Check for connectivity mConn, err := node.RepMgr.NewRemoteConnection(ctx, standby.Hostname) if err != nil { - fmt.Printf("failed to connect to %s", standby.Hostname) + fmt.Printf("failed to connect to %s\n", standby.Hostname) sample.totalInactive++ continue } @@ -142,8 +144,28 @@ func ZombieDiagnosis(s *DNASample) (string, error) { return "", ErrZombieDiagnosisUndecided } -func printDNASample(s *DNASample) { - fmt.Printf("Registered members: %d, Active member(s): %d, Inactive member(s): %d, Conflicts detected: %d\n", +func Quarantine(ctx context.Context, conn *pgx.Conn, n *Node, primary string) error { + if primary != "" { + if err := n.PGBouncer.ConfigurePrimary(ctx, primary, true); err != nil { + return fmt.Errorf("failed to reconfigure pgbouncer: %s", err) + } + } + + fmt.Println("Writing zombie.lock file.") + if err := writeZombieLock(""); err != nil { + return fmt.Errorf("failed to set zombie lock: %s", err) + } + + fmt.Println("Turning all user-created databases readonly.") + if err := SetReadOnly(ctx, n, conn); err != nil { + return fmt.Errorf("failed to set read-only: %s", err) + } + + return nil +} + +func DNASampleString(s *DNASample) string { + return fmt.Sprintf("Registered members: %d, Active member(s): %d, Inactive member(s): %d, Conflicts detected: %d", s.totalMembers, s.totalActive, s.totalInactive,