Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/api-syncagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func run(ctx context.Context, log *zap.SugaredLogger, opts *Options) error {
return fmt.Errorf("failed to add apiexport controller: %w", err)
}

if err := syncmanager.Add(ctx, mgr, kcpCluster, kcpRestConfig, log, apiExport, opts.PublishedResourceSelector, opts.Namespace); err != nil {
if err := syncmanager.Add(ctx, mgr, kcpCluster, kcpRestConfig, log, apiExport, opts.PublishedResourceSelector, opts.Namespace, opts.AgentName); err != nil {
return fmt.Errorf("failed to add syncmanager controller: %w", err)
}

Expand Down
8 changes: 6 additions & 2 deletions docs/getting-started.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,12 @@ itself and a reference to the kubeconfig secret we just created.
# Required: the name of the APIExport in kcp that this Sync Agent is supposed to serve.
apiExportName: test.example.com

# Required: This Agent's public name, purely for informational purposes.
# If not set, defaults to the Helm release name.
# Required: This Agent's public name, used to signal ownership over locally synced objects.
# This value must be a valid Kubernetes label value, see
# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
# for more information.
# Changing this value after the fact will make the agent ignore previously created objects,
# so beware and relabel if necessary.
agentName: unique-test

# Required: Name of the Kubernetes Secret that contains a "kubeconfig" key,
Expand Down
11 changes: 9 additions & 2 deletions internal/controller/sync/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/kontext"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand Down Expand Up @@ -69,6 +70,7 @@ func Create(
discoveryClient *discovery.Client,
apiExportName string,
stateNamespace string,
agentName string,
log *zap.SugaredLogger,
numWorkers int,
) (controller.Controller, error) {
Expand All @@ -92,7 +94,7 @@ func Create(

// create the syncer that holds the meat&potatoes of the synchronization logic
mutator := mutation.NewMutator(nil) // pubRes.Spec.Mutation
syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator, stateNamespace)
syncer, err := sync.NewResourceSyncer(log, localManager.GetClient(), virtualWorkspaceCluster.GetClient(), pubRes, localCRD, apiExportName, mutator, stateNamespace, agentName)
if err != nil {
return nil, fmt.Errorf("failed to create syncer: %w", err)
}
Expand Down Expand Up @@ -135,7 +137,12 @@ func Create(
return []reconcile.Request{*req}
})

if err := c.Watch(source.Kind(localManager.GetCache(), localDummy, enqueueRemoteObjForLocalObj)); err != nil {
// only watch local objects that we own
nameFilter := predicate.NewTypedPredicateFuncs(func(u *unstructured.Unstructured) bool {
return sync.OwnedBy(u, agentName)
})

if err := c.Watch(source.Kind(localManager.GetCache(), localDummy, enqueueRemoteObjForLocalObj, nameFilter)); err != nil {
return nil, err
}

Expand Down
4 changes: 4 additions & 0 deletions internal/controller/syncmanager/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ type Reconciler struct {
discoveryClient *discovery.Client
prFilter labels.Selector
stateNamespace string
agentName string

apiExport *kcpdevv1alpha1.APIExport

Expand All @@ -95,6 +96,7 @@ func Add(
apiExport *kcpdevv1alpha1.APIExport,
prFilter labels.Selector,
stateNamespace string,
agentName string,
) error {
reconciler := &Reconciler{
ctx: ctx,
Expand All @@ -108,6 +110,7 @@ func Add(
discoveryClient: discovery.NewClient(localManager.GetClient()),
prFilter: prFilter,
stateNamespace: stateNamespace,
agentName: agentName,
}

_, err := builder.ControllerManagedBy(localManager).
Expand Down Expand Up @@ -280,6 +283,7 @@ func (r *Reconciler) ensureSyncControllers(ctx context.Context, log *zap.Sugared
r.discoveryClient,
r.apiExport.Name,
r.stateNamespace,
r.agentName,
r.log,
numSyncWorkers,
)
Expand Down
14 changes: 14 additions & 0 deletions internal/sync/object_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ import (
type objectCreatorFunc func(source *unstructured.Unstructured) *unstructured.Unstructured

type objectSyncer struct {
// When set, the syncer will create a label on the destination object that contains
// this value; used to allow multiple agents syncing *the same* API from one
// service cluster onto multiple different kcp's.
agentName string
// creates a new destination object; does not need to perform cleanup like
// removing unwanted metadata, that's done by the syncer automatically
destCreator objectCreatorFunc
Expand Down Expand Up @@ -281,6 +285,9 @@ func (s *objectSyncer) ensureDestinationObject(log *zap.SugaredLogger, source, d
sourceObjKey := newObjectKey(source.object, source.clusterName, source.workspacePath)
ensureLabels(destObj, sourceObjKey.Labels())

// remember what agent synced this object
s.labelWithAgent(destObj)

// put optional additional annotations on the new object
ensureAnnotations(destObj, sourceObjKey.Annotations())

Expand Down Expand Up @@ -326,6 +333,7 @@ func (s *objectSyncer) adoptExistingDestinationObject(log *zap.SugaredLogger, de
// the destination object from another source object, which would then lead to the two source objects
// "fighting" about the one destination object.
ensureLabels(existingDestObj, sourceKey.Labels())
s.labelWithAgent(existingDestObj)
ensureAnnotations(existingDestObj, sourceKey.Annotations())

if err := dest.client.Update(dest.ctx, existingDestObj); err != nil {
Expand Down Expand Up @@ -425,3 +433,9 @@ func (s *objectSyncer) createMergePatch(base, revision *unstructured.Unstructure
func (s *objectSyncer) isIrrelevantTopLevelField(fieldName string) bool {
return fieldName == "kind" || fieldName == "apiVersion" || fieldName == "metadata" || slices.Contains(s.subresources, fieldName)
}

func (s *objectSyncer) labelWithAgent(obj *unstructured.Unstructured) {
if s.agentName != "" {
ensureLabels(obj, map[string]string{agentNameLabel: s.agentName})
}
}
6 changes: 6 additions & 0 deletions internal/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ type ResourceSyncer struct {

mutator mutation.Mutator

agentName string

// newObjectStateStore is used for testing purposes
newObjectStateStore newObjectStateStoreFunc
}
Expand All @@ -59,6 +61,7 @@ func NewResourceSyncer(
remoteAPIGroup string,
mutator mutation.Mutator,
stateNamespace string,
agentName string,
) (*ResourceSyncer, error) {
// create a dummy that represents the type used on the local service cluster
localGVK := projection.PublishedResourceSourceGVK(pubRes)
Expand Down Expand Up @@ -100,6 +103,7 @@ func NewResourceSyncer(
subresources: subresources,
destDummy: localDummy,
mutator: mutator,
agentName: agentName,
newObjectStateStore: newKubernetesStateStoreCreator(stateNamespace),
}, nil
}
Expand Down Expand Up @@ -145,6 +149,8 @@ func (s *ResourceSyncer) Process(ctx Context, remoteObj *unstructured.Unstructur
stateStore := s.newObjectStateStore(sourceSide, destSide)

syncer := objectSyncer{
// The primary object should be labelled with the agent name.
agentName: s.agentName,
subresources: s.subresources,
// use the projection and renaming rules configured in the PublishedResource
destCreator: s.createLocalObjectCreator(ctx),
Expand Down
2 changes: 2 additions & 0 deletions internal/sync/syncer_related.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ func (s *ResourceSyncer) processRelatedResource(log *zap.SugaredLogger, stateSto
}

syncer := objectSyncer{
// Related objects within kcp are not labelled with the agent name because it's unnecessary.
// agentName: "",
// use the same state store as we used for the main resource, to keep everything contained
// in one place, on the service cluster side
stateStore: stateStore,
Expand Down
22 changes: 22 additions & 0 deletions internal/sync/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand All @@ -208,6 +209,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -255,6 +257,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -312,6 +315,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -348,6 +352,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand All @@ -374,6 +379,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -410,6 +416,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand All @@ -436,6 +443,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -484,6 +492,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
"existing-annotation": "annotation-value",
},
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -525,6 +534,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
"new-annotation": "hei-verden",
},
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -564,6 +574,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -591,6 +602,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -636,6 +648,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
"prevent-instant-deletion-in-tests",
},
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -667,6 +680,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
},
DeletionTimestamp: &nonEmptyTime,
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -748,6 +762,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
},
DeletionTimestamp: &nonEmptyTime,
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -778,6 +793,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
},
DeletionTimestamp: &nonEmptyTime,
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -809,6 +825,7 @@ func TestSyncerProcessingSingleResourceWithoutStatus(t *testing.T) {
testcase.remoteAPIGroup,
nil,
stateNamespace,
"textor-the-doctor",
)
if err != nil {
t.Fatalf("Failed to create syncer: %v", err)
Expand Down Expand Up @@ -970,6 +987,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -1002,6 +1020,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -1041,6 +1060,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -1073,6 +1093,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
ObjectMeta: metav1.ObjectMeta{
Name: "testcluster-my-test-thing",
Labels: map[string]string{
agentNameLabel: "textor-the-doctor",
remoteObjectClusterLabel: "testcluster",
remoteObjectNamespaceLabel: "",
remoteObjectNameLabel: "my-test-thing",
Expand Down Expand Up @@ -1106,6 +1127,7 @@ func TestSyncerProcessingSingleResourceWithStatus(t *testing.T) {
testcase.remoteAPIGroup,
nil,
stateNamespace,
"textor-the-doctor",
)
if err != nil {
t.Fatalf("Failed to create syncer: %v", err)
Expand Down
10 changes: 10 additions & 0 deletions internal/sync/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ limitations under the License.

package sync

import ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"

const (
// deletionFinalizer is the finalizer put on remote objects to prevent
// them from being deleted before the local objects can be cleaned up.
Expand All @@ -31,6 +33,10 @@ const (

remoteObjectWorkspacePathAnnotation = "syncagent.kcp.io/remote-object-workspace-path"

// agentNameLabel contains the Sync Agent's name and is used to allow multiple Sync Agents
// on the same service cluster, syncing *the same* API to different kcp's.
agentNameLabel = "syncagent.kcp.io/agent-name"

// objectStateLabelName is put on object state Secrets to allow for easier mass deletions
// if ever necessary.
objectStateLabelName = "syncagent.kcp.io/object-state"
Expand All @@ -45,3 +51,7 @@ const (
// metadata of the related object.
relatedObjectAnnotationPrefix = "related-resources.syncagent.kcp.io/"
)

func OwnedBy(obj ctrlruntimeclient.Object, agentName string) bool {
return obj.GetLabels()[agentNameLabel] == agentName
}