Skip to content

Commit c10af85

Browse files
authored
CLOUDP-347497: Single cluster Replica Set Controller Refactoring (#486)
# CLOUDP-347497 - Single cluster Replica Set Controller Refactoring ## Why this refactoring The single-cluster RS controller was mixing two concerns: - **Kubernetes stuff** (StatefulSets, pods, volumes) - **Ops Manager/MongoDB stuff** (MongoDB processes, replication config) This worked fine for single-cluster, but it's a problem when you think about multi-cluster: - Multi-cluster has **multiple StatefulSets** (one per cluster) but only **one logical ReplicaSet** in Ops Manager - The OM automation config doesn't care about how many K8s clusters you have or how the pods are deployed So we need to separate these layers properly. ## Main changes ### 1. Broke down the huge Reconcile() method Before: ~300 lines of inline logic in Reconcile() Now: ```go Reconcile() ├── reconcileMemberResources() // Handles all K8s resource creation │ ├── reconcileHostnameOverrideConfigMap() │ ├── ensureRoles() │ └── reconcileStatefulSet() // StatefulSet-specific logic isolated here │ └── buildStatefulSetOptions() // Builds STS configuration └── updateOmDeploymentRs() // Handles Ops Manager automation config updates ``` This makes it way easier to understand what's happening and matches the multi-cluster controller structure. ### 2. Removed StatefulSet dependency from OM operations Created new helper functions that work directly with MongoDB resources instead of StatefulSets: - `CreateMongodProcessesFromMongoDB()` - was using StatefulSet before - `BuildFromMongoDBWithReplicas()` - same - `WaitForRsAgentsToRegisterByResource()` - same These mirror the existing `...FromStatefulSet` functions but take MongoDB resources instead. **Why it matters:** The OM layer now only cares about the MongoDB resource definition, not how it's deployed in K8s. This makes the code work the same way for both single-cluster and multi-cluster. ### 3. Added publishAutomationConfigFirstRS checks Dedicated function for RS instead of using the shared one. Does not rely on a statefulset object. ## Important for review The ideal way to review this PR is to compare the new structure to the `mongodbmultireplicaset_controller.go`. The aim of the refactoring is to get the single cluster controller closer to it. Look at: - `reconcileMemberResources()` in both controllers - similar structure now - `updateOmDeploymentRs()` - no more StatefulSet dependency - New helper functions in `om/process` and `om/replicaset` - mirror existing patterns ## Bug found along the way The logic to handle **scale up + disable TLS at the same time** doesn't actually work properly and should be blocked by validation (see [draft PR #490](#490) for more details). ## Tests added Added tests for the new helper functions: - `TestCreateMongodProcessesFromMongoDB` - basic scenarios, scaling, custom domains, TLS, additional config - `TestBuildFromMongoDBWithReplicas` - integration test checking ReplicaSet structure and member options propagation - `TestPublishAutomationConfigFirstRS` - automation config publish logic with various TLS/auth scenarios
1 parent ea816dd commit c10af85

File tree

10 files changed

+911
-144
lines changed

10 files changed

+911
-144
lines changed

api/v1/mdb/mongodb_types.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1038,7 +1038,7 @@ func (a *Authentication) IsOIDCEnabled() bool {
10381038
return stringutil.Contains(a.GetModes(), util.OIDC)
10391039
}
10401040

1041-
// GetModes returns the modes of the Authentication instance of an empty
1041+
// GetModes returns the modes of the Authentication instance, or an empty
10421042
// list if it is nil
10431043
func (a *Authentication) GetModes() []string {
10441044
if a == nil {

controllers/om/process/om_process.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,18 @@ func CreateMongodProcessesWithLimit(mongoDBImage string, forceEnterprise bool, s
2222
return processes
2323
}
2424

25+
// CreateMongodProcessesFromMongoDB creates mongod processes directly from MongoDB resource without StatefulSet
26+
func CreateMongodProcessesFromMongoDB(mongoDBImage string, forceEnterprise bool, mdb *mdbv1.MongoDB, limit int, fcv string, tlsCertPath string) []om.Process {
27+
hostnames, names := dns.GetDNSNames(mdb.Name, mdb.ServiceName(), mdb.Namespace, mdb.Spec.GetClusterDomain(), limit, mdb.Spec.DbCommonSpec.GetExternalDomain())
28+
processes := make([]om.Process, len(hostnames))
29+
30+
for idx, hostname := range hostnames {
31+
processes[idx] = om.NewMongodProcess(names[idx], hostname, mongoDBImage, forceEnterprise, mdb.Spec.GetAdditionalMongodConfig(), &mdb.Spec, tlsCertPath, mdb.Annotations, fcv)
32+
}
33+
34+
return processes
35+
}
36+
2537
// CreateMongodProcessesWithLimitMulti creates the process array for automationConfig based on MultiCluster CR spec
2638
func CreateMongodProcessesWithLimitMulti(mongoDBImage string, forceEnterprise bool, mrs mdbmultiv1.MongoDBMultiCluster, certFileName string) ([]om.Process, error) {
2739
hostnames := make([]string, 0)
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
package process
2+
3+
import (
4+
"fmt"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
9+
mdbv1 "github.com/mongodb/mongodb-kubernetes/api/v1/mdb"
10+
"github.com/mongodb/mongodb-kubernetes/pkg/util/maputil"
11+
)
12+
13+
const (
14+
defaultMongoDBImage = "mongodb/mongodb-enterprise-server:7.0.0"
15+
defaultFCV = "7.0"
16+
defaultNamespace = "test-namespace"
17+
)
18+
19+
func TestCreateMongodProcessesFromMongoDB(t *testing.T) {
20+
t.Run("Happy path - creates processes with correct integration", func(t *testing.T) {
21+
mdb := baseReplicaSet("test-rs", 3)
22+
processes := CreateMongodProcessesFromMongoDB(
23+
defaultMongoDBImage,
24+
false,
25+
mdb,
26+
3,
27+
defaultFCV,
28+
"",
29+
)
30+
31+
assert.Len(t, processes, 3, "Should create 3 processes")
32+
33+
// Verify basic integration - processes are created with correct names and FCV
34+
for i, process := range processes {
35+
expectedName := fmt.Sprintf("test-rs-%d", i)
36+
assert.Equal(t, expectedName, process.Name(), "Process name should be generated correctly")
37+
assert.Equal(t, defaultFCV, process.FeatureCompatibilityVersion(), "FCV should be set correctly")
38+
assert.NotEmpty(t, process.HostName(), "Hostname should be generated")
39+
}
40+
})
41+
42+
t.Run("Limit parameter controls process count", func(t *testing.T) {
43+
mdb := baseReplicaSet("scale-rs", 5)
44+
45+
// Test limit less than members (scale up in progress)
46+
processesScaleUp := CreateMongodProcessesFromMongoDB(
47+
defaultMongoDBImage,
48+
false,
49+
mdb,
50+
3, // limit
51+
defaultFCV,
52+
"",
53+
)
54+
assert.Len(t, processesScaleUp, 3, "Limit should control process count during scale up")
55+
56+
// Test limit greater than members (scale down in progress)
57+
processesScaleDown := CreateMongodProcessesFromMongoDB(
58+
defaultMongoDBImage,
59+
false,
60+
mdb,
61+
7, // limit
62+
defaultFCV,
63+
"",
64+
)
65+
assert.Len(t, processesScaleDown, 7, "Limit should control process count during scale down")
66+
67+
// Test limit zero
68+
processesZero := CreateMongodProcessesFromMongoDB(
69+
defaultMongoDBImage,
70+
false,
71+
mdb,
72+
0, // limit
73+
defaultFCV,
74+
"",
75+
)
76+
assert.Empty(t, processesZero, "Zero limit should create empty process slice")
77+
})
78+
79+
t.Run("TLS cert path flows through to processes", func(t *testing.T) {
80+
mdb := baseReplicaSet("tls-rs", 2)
81+
mdb.Spec.Security = &mdbv1.Security{
82+
TLSConfig: &mdbv1.TLSConfig{Enabled: true},
83+
}
84+
85+
tlsCertPath := "/custom/path/to/cert.pem"
86+
processes := CreateMongodProcessesFromMongoDB(
87+
defaultMongoDBImage,
88+
false,
89+
mdb,
90+
2,
91+
defaultFCV,
92+
tlsCertPath,
93+
)
94+
95+
assert.Len(t, processes, 2)
96+
97+
// Verify TLS configuration is properly integrated
98+
for i, process := range processes {
99+
tlsConfig := process.TLSConfig()
100+
assert.NotNil(t, tlsConfig, "TLS config should be set when cert path provided")
101+
assert.Equal(t, tlsCertPath, tlsConfig["certificateKeyFile"], "TLS cert path should match at index %d", i)
102+
}
103+
})
104+
}
105+
106+
func TestCreateMongodProcessesFromMongoDB_AdditionalConfig(t *testing.T) {
107+
config := mdbv1.NewAdditionalMongodConfig("storage.engine", "inMemory").
108+
AddOption("replication.oplogSizeMB", 2048)
109+
110+
mdb := mdbv1.NewReplicaSetBuilder().
111+
SetName("config-rs").
112+
SetNamespace(defaultNamespace).
113+
SetMembers(2).
114+
SetVersion("7.0.0").
115+
SetFCVersion(defaultFCV).
116+
SetAdditionalConfig(config).
117+
Build()
118+
119+
processes := CreateMongodProcessesFromMongoDB(
120+
defaultMongoDBImage,
121+
false,
122+
mdb,
123+
2,
124+
defaultFCV,
125+
"",
126+
)
127+
128+
assert.Len(t, processes, 2)
129+
130+
for i, process := range processes {
131+
assert.Equal(t, "inMemory", maputil.ReadMapValueAsInterface(process.Args(), "storage", "engine"),
132+
"Storage engine mismatch at index %d", i)
133+
assert.Equal(t, 2048, maputil.ReadMapValueAsInterface(process.Args(), "replication", "oplogSizeMB"),
134+
"OplogSizeMB mismatch at index %d", i)
135+
}
136+
}
137+
138+
func baseReplicaSet(name string, members int) *mdbv1.MongoDB {
139+
return mdbv1.NewReplicaSetBuilder().
140+
SetName(name).
141+
SetNamespace(defaultNamespace).
142+
SetMembers(members).
143+
SetVersion("7.0.0").
144+
SetFCVersion(defaultFCV).
145+
Build()
146+
}

controllers/om/replicaset/om_replicaset.go

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@ func BuildFromStatefulSetWithReplicas(mongoDBImage string, forceEnterprise bool,
3030
return rsWithProcesses
3131
}
3232

33+
// BuildFromMongoDBWithReplicas returns a replica set that can be set in the Automation Config
34+
// based on the given MongoDB resource directly without requiring a StatefulSet.
35+
func BuildFromMongoDBWithReplicas(mongoDBImage string, forceEnterprise bool, mdb *mdbv1.MongoDB, replicas int, fcv string, tlsCertPath string) om.ReplicaSetWithProcesses {
36+
members := process.CreateMongodProcessesFromMongoDB(mongoDBImage, forceEnterprise, mdb, replicas, fcv, tlsCertPath)
37+
replicaSet := om.NewReplicaSet(mdb.Name, mdb.Spec.GetMongoDBVersion())
38+
rsWithProcesses := om.NewReplicaSetWithProcesses(replicaSet, members, mdb.Spec.GetMemberOptions())
39+
rsWithProcesses.SetHorizons(mdb.Spec.GetHorizonConfig())
40+
return rsWithProcesses
41+
}
42+
3343
// PrepareScaleDownFromMap performs additional steps necessary to make sure removed members are not primary (so no
3444
// election happens and replica set is available) (see
3545
// https://jira.mongodb.org/browse/HELP-3818?focusedCommentId=1548348 for more details)
@@ -65,30 +75,13 @@ func PrepareScaleDownFromMap(omClient om.Connection, rsMembers map[string][]stri
6575
log.Debugw("Marked replica set members as non-voting", "replica set with members", rsMembers)
6676
}
6777

68-
// TODO practice shows that automation agents can get stuck on setting db to "disabled" also it seems that this process
69-
// works correctly without explicit disabling - feel free to remove this code after some time when it is clear
70-
// that everything works correctly without disabling
71-
72-
// Stage 2. Set disabled to true
73-
//err = omClient.ReadUpdateDeployment(
74-
// func(d om.Deployment) error {
75-
// d.DisableProcesses(allProcesses)
76-
// return nil
77-
// },
78-
//)
79-
//
80-
//if err != nil {
81-
// return errors.New(fmt.Sprintf("Unable to set disabled to true, hosts: %v, err: %w", allProcesses, err))
82-
//}
83-
//log.Debugw("Disabled processes", "processes", allProcesses)
84-
8578
log.Infow("Performed some preliminary steps to support scale down", "hosts", processes)
8679

8780
return nil
8881
}
8982

90-
func PrepareScaleDownFromStatefulSet(omClient om.Connection, statefulSet appsv1.StatefulSet, rs *mdbv1.MongoDB, log *zap.SugaredLogger) error {
91-
_, podNames := dns.GetDnsForStatefulSetReplicasSpecified(statefulSet, rs.Spec.GetClusterDomain(), rs.Status.Members, nil)
83+
func PrepareScaleDownFromMongoDB(omClient om.Connection, rs *mdbv1.MongoDB, log *zap.SugaredLogger) error {
84+
_, podNames := dns.GetDNSNames(rs.Name, rs.ServiceName(), rs.Namespace, rs.Spec.GetClusterDomain(), rs.Status.Members, rs.Spec.DbCommonSpec.GetExternalDomain())
9285
podNames = podNames[scale.ReplicasThisReconciliation(rs):rs.Status.Members]
9386

9487
if len(podNames) != 1 {
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package replicaset
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/assert"
7+
"k8s.io/utils/ptr"
8+
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
11+
mdbv1 "github.com/mongodb/mongodb-kubernetes/api/v1/mdb"
12+
"github.com/mongodb/mongodb-kubernetes/controllers/om"
13+
"github.com/mongodb/mongodb-kubernetes/mongodb-community-operator/pkg/automationconfig"
14+
)
15+
16+
// This test focuses on the integration/glue logic, not re-testing components.
17+
func TestBuildFromMongoDBWithReplicas(t *testing.T) {
18+
memberOptions := []automationconfig.MemberOptions{
19+
{Votes: ptr.To(1), Priority: ptr.To("1.0")},
20+
{Votes: ptr.To(1), Priority: ptr.To("0.5")},
21+
{Votes: ptr.To(0), Priority: ptr.To("0")}, // Non-voting member
22+
}
23+
24+
mdb := &mdbv1.MongoDB{
25+
ObjectMeta: metav1.ObjectMeta{
26+
Name: "test-rs",
27+
Namespace: "test-namespace",
28+
},
29+
Spec: mdbv1.MongoDbSpec{
30+
DbCommonSpec: mdbv1.DbCommonSpec{
31+
Version: "7.0.5",
32+
Security: &mdbv1.Security{
33+
TLSConfig: &mdbv1.TLSConfig{},
34+
Authentication: &mdbv1.Authentication{},
35+
},
36+
Connectivity: &mdbv1.MongoDBConnectivity{
37+
ReplicaSetHorizons: []mdbv1.MongoDBHorizonConfig{},
38+
},
39+
},
40+
Members: 5, // Spec (target) is 5 members
41+
MemberConfig: memberOptions,
42+
},
43+
}
44+
45+
// 3 replicas is less than spec.Members, scale up scenario
46+
replicas := 3
47+
rsWithProcesses := BuildFromMongoDBWithReplicas(
48+
"mongodb/mongodb-enterprise-server:7.0.5",
49+
false,
50+
mdb,
51+
replicas,
52+
"7.0",
53+
"",
54+
)
55+
56+
// Assert: ReplicaSet structure
57+
assert.Equal(t, "test-rs", rsWithProcesses.Rs.Name(), "ReplicaSet ID should match MongoDB name")
58+
assert.Equal(t, "1", rsWithProcesses.Rs["protocolVersion"], "Protocol version should be set to 1 for this MongoDB version")
59+
60+
// Assert: Member count is controlled by replicas parameter, NOT mdb.Spec.Members
61+
members := rsWithProcesses.Rs["members"].([]om.ReplicaSetMember)
62+
assert.Len(t, members, replicas, "Member count should match replicas parameter (3), not mdb.Spec.Members (5)")
63+
assert.Equal(t, 3, len(members), "Should have exactly 3 members")
64+
65+
// Assert: Processes are created correctly
66+
assert.Len(t, rsWithProcesses.Processes, replicas, "Process count should match replicas parameter")
67+
expectedProcessNames := []string{"test-rs-0", "test-rs-1", "test-rs-2"}
68+
expectedHostnames := []string{
69+
"test-rs-0.test-rs-svc.test-namespace.svc.cluster.local",
70+
"test-rs-1.test-rs-svc.test-namespace.svc.cluster.local",
71+
"test-rs-2.test-rs-svc.test-namespace.svc.cluster.local",
72+
}
73+
74+
for i := 0; i < replicas; i++ {
75+
assert.Equal(t, expectedProcessNames[i], rsWithProcesses.Processes[i].Name(),
76+
"Process name mismatch at index %d", i)
77+
assert.Equal(t, expectedHostnames[i], rsWithProcesses.Processes[i].HostName(),
78+
"Process hostname mismatch at index %d", i)
79+
}
80+
81+
// Assert: Member options are propagated
82+
assert.Equal(t, 1, members[0].Votes(), "Member 0 should have 1 vote")
83+
assert.Equal(t, float32(1.0), members[0].Priority(), "Member 0 should have priority 1.0")
84+
assert.Equal(t, 1, members[1].Votes(), "Member 1 should have 1 vote")
85+
assert.Equal(t, float32(0.5), members[1].Priority(), "Member 1 should have priority 0.5")
86+
assert.Equal(t, 0, members[2].Votes(), "Member 2 should have 0 votes (non-voting)")
87+
assert.Equal(t, float32(0), members[2].Priority(), "Member 2 should have priority 0")
88+
89+
// Assert: Member host field contains process name (not full hostname)
90+
// Note: ReplicaSetMember["host"] is the process name, not the full hostname
91+
for i := 0; i < replicas; i++ {
92+
assert.Equal(t, expectedProcessNames[i], members[i].Name(),
93+
"Member host should match process name at index %d", i)
94+
}
95+
}

controllers/operator/agents/agents.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,19 @@ func WaitForRsAgentsToRegister(set appsv1.StatefulSet, members int, clusterName
9696
return nil
9797
}
9898

99+
// WaitForRsAgentsToRegisterByResource waits for RS agents to register using MongoDB resource directly without StatefulSet
100+
func WaitForRsAgentsToRegisterByResource(rs *mdbv1.MongoDB, members int, omConnection om.Connection, log *zap.SugaredLogger) error {
101+
hostnames, _ := dns.GetDNSNames(rs.Name, rs.ServiceName(), rs.Namespace, rs.Spec.GetClusterDomain(), members, rs.Spec.DbCommonSpec.GetExternalDomain())
102+
103+
log = log.With("mongodb", rs.Name)
104+
105+
ok, msg := waitUntilRegistered(omConnection, log, retryParams{retrials: 5, waitSeconds: 3}, hostnames...)
106+
if !ok {
107+
return getAgentRegisterError(msg)
108+
}
109+
return nil
110+
}
111+
99112
// WaitForRsAgentsToRegisterSpecifiedHostnames waits for the specified agents to registry with Ops Manager.
100113
func WaitForRsAgentsToRegisterSpecifiedHostnames(omConnection om.Connection, hostnames []string, log *zap.SugaredLogger) error {
101114
ok, msg := waitUntilRegistered(omConnection, log, retryParams{retrials: 10, waitSeconds: 9}, hostnames...)

controllers/operator/mongodbmultireplicaset_controller.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,6 @@ func (r *ReconcileMongoDbMultiReplicaSet) Reconcile(ctx context.Context, request
166166

167167
r.SetupCommonWatchers(&mrs, nil, nil, mrs.Name)
168168

169-
publishAutomationConfigFirst, err := r.publishAutomationConfigFirstMultiCluster(ctx, &mrs, log)
170-
if err != nil {
171-
return r.updateStatus(ctx, &mrs, workflow.Failed(err), log)
172-
}
173-
174169
// If tls is enabled we need to configure the "processes" array in opsManager/Cloud Manager with the
175170
// correct tlsCertPath, with the new tls design, this path has the certHash in it(so that cert can be rotated
176171
// without pod restart).
@@ -210,6 +205,11 @@ func (r *ReconcileMongoDbMultiReplicaSet) Reconcile(ctx context.Context, request
210205
}
211206
}
212207

208+
publishAutomationConfigFirst, err := r.publishAutomationConfigFirstMultiCluster(ctx, &mrs, log)
209+
if err != nil {
210+
return r.updateStatus(ctx, &mrs, workflow.Failed(err), log)
211+
}
212+
213213
status := workflow.RunInGivenOrder(publishAutomationConfigFirst,
214214
func() workflow.Status {
215215
if err := r.updateOmDeploymentRs(ctx, conn, mrs, agentCertPath, tlsCertPath, internalClusterCertPath, false, log); err != nil {

0 commit comments

Comments
 (0)