diff --git a/controllers/catalogsource_controller.go b/controllers/catalogsource_controller.go deleted file mode 100644 index 874e6169a..000000000 --- a/controllers/catalogsource_controller.go +++ /dev/null @@ -1,203 +0,0 @@ -package controllers - -import ( - "context" - "fmt" - "reflect" - "sync" - "time" - - "github.com/operator-framework/api/pkg/operators/v1alpha1" - "github.com/operator-framework/deppy/pkg/deppy" - "github.com/operator-framework/deppy/pkg/deppy/input" - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/record" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/log" - - "github.com/operator-framework/operator-controller/internal/resolution/entity_sources/catalogsource" -) - -const ( - defaultCatalogSourceSyncInterval = 5 * time.Minute - defaultRegistryGRPCConnectionTimeout = 10 * time.Second - - eventTypeNormal = "Normal" - eventTypeWarning = "Warning" - - eventReasonCacheUpdated = "BundleCacheUpdated" - eventReasonCacheUpdateFailed = "BundleCacheUpdateFailed" -) - -type CatalogSourceReconcilerOption func(reconciler *CatalogSourceReconciler) - -func WithRegistryClient(registry catalogsource.RegistryClient) CatalogSourceReconcilerOption { - return func(reconciler *CatalogSourceReconciler) { - reconciler.registry = registry - } -} - -func WithUnmanagedCatalogSourceSyncInterval(interval time.Duration) CatalogSourceReconcilerOption { - return func(reconciler *CatalogSourceReconciler) { - reconciler.unmanagedCatalogSourceSyncInterval = interval - } -} - -// applyDefaults applies default values to empty CatalogSourceReconciler fields _after_ options have been applied -func applyDefaults() CatalogSourceReconcilerOption { - return func(reconciler *CatalogSourceReconciler) { - if reconciler.registry == nil { - reconciler.registry = catalogsource.NewRegistryGRPCClient(defaultRegistryGRPCConnectionTimeout) - } - if reconciler.unmanagedCatalogSourceSyncInterval == 0 { - reconciler.unmanagedCatalogSourceSyncInterval = defaultCatalogSourceSyncInterval - } - } -} - -type CatalogSourceReconciler struct { - sync.RWMutex - client.Client - scheme *runtime.Scheme - registry catalogsource.RegistryClient - recorder record.EventRecorder - unmanagedCatalogSourceSyncInterval time.Duration - cache map[string]map[deppy.Identifier]*input.Entity -} - -func NewCatalogSourceReconciler(client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, options ...CatalogSourceReconcilerOption) *CatalogSourceReconciler { - reconciler := &CatalogSourceReconciler{ - RWMutex: sync.RWMutex{}, - Client: client, - scheme: scheme, - recorder: recorder, - unmanagedCatalogSourceSyncInterval: 0, - cache: map[string]map[deppy.Identifier]*input.Entity{}, - } - // apply options - options = append(options, applyDefaults()) - for _, option := range options { - option(reconciler) - } - - return reconciler -} - -// +kubebuilder:rbac:groups=operators.coreos.com,resources=catalogsources,verbs=get;list;watch -// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch - -func (r *CatalogSourceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { - l := log.FromContext(ctx).WithName("catalogsource-controller") - l.V(1).Info("starting") - defer l.V(1).Info("ending") - - var catalogSource = &v1alpha1.CatalogSource{} - if err := r.Client.Get(ctx, req.NamespacedName, catalogSource); err != nil { - if errors.IsNotFound(err) { - r.dropSource(req.String()) - } - return ctrl.Result{}, client.IgnoreNotFound(err) - } - - entities, err := r.registry.ListEntities(ctx, catalogSource) - // TODO: invalidate stale cache for failed updates - if err != nil { - r.recorder.Event(catalogSource, eventTypeWarning, eventReasonCacheUpdateFailed, fmt.Sprintf("Failed to update bundle cache from %s/%s: %v", catalogSource.GetNamespace(), catalogSource.GetName(), err)) - return ctrl.Result{Requeue: !isManagedCatalogSource(*catalogSource)}, err - } - if updated := r.updateCache(req.String(), entities); updated { - r.recorder.Event(catalogSource, eventTypeNormal, eventReasonCacheUpdated, fmt.Sprintf("Successfully updated bundle cache from %s/%s", catalogSource.GetNamespace(), catalogSource.GetName())) - } - - if isManagedCatalogSource(*catalogSource) { - return ctrl.Result{}, nil - } - return ctrl.Result{RequeueAfter: r.unmanagedCatalogSourceSyncInterval}, nil -} - -// SetupWithManager sets up the controller with the Manager. -func (r *CatalogSourceReconciler) SetupWithManager(mgr ctrl.Manager) error { - return ctrl.NewControllerManagedBy(mgr). - For(&v1alpha1.CatalogSource{}). - Complete(r) -} - -// TODO: find better way to identify catalogSources unmanaged by olm -func isManagedCatalogSource(catalogSource v1alpha1.CatalogSource) bool { - return len(catalogSource.Spec.Address) == 0 -} - -func (r *CatalogSourceReconciler) updateCache(sourceID string, entities []*input.Entity) bool { - newSourceCache := make(map[deppy.Identifier]*input.Entity) - for _, entity := range entities { - newSourceCache[entity.Identifier()] = entity - } - if _, ok := r.cache[sourceID]; ok && reflect.DeepEqual(r.cache[sourceID], newSourceCache) { - return false - } - r.RWMutex.Lock() - defer r.RWMutex.Unlock() - r.cache[sourceID] = newSourceCache - // return whether cache had updates - return true -} - -func (r *CatalogSourceReconciler) dropSource(sourceID string) { - r.RWMutex.Lock() - defer r.RWMutex.Unlock() - delete(r.cache, sourceID) -} - -func (r *CatalogSourceReconciler) Get(ctx context.Context, id deppy.Identifier) *input.Entity { - r.RWMutex.RLock() - defer r.RWMutex.RUnlock() - // don't count on deppy ID to reflect its catalogsource - for _, source := range r.cache { - if entity, ok := source[id]; ok { - return entity - } - } - return nil -} - -func (r *CatalogSourceReconciler) Filter(ctx context.Context, filter input.Predicate) (input.EntityList, error) { - resultSet := input.EntityList{} - if err := r.Iterate(ctx, func(entity *input.Entity) error { - if filter(entity) { - resultSet = append(resultSet, *entity) - } - return nil - }); err != nil { - return nil, err - } - return resultSet, nil -} - -func (r *CatalogSourceReconciler) GroupBy(ctx context.Context, fn input.GroupByFunction) (input.EntityListMap, error) { - resultSet := input.EntityListMap{} - if err := r.Iterate(ctx, func(entity *input.Entity) error { - keys := fn(entity) - for _, key := range keys { - resultSet[key] = append(resultSet[key], *entity) - } - return nil - }); err != nil { - return nil, err - } - return resultSet, nil -} - -func (r *CatalogSourceReconciler) Iterate(ctx context.Context, fn input.IteratorFunction) error { - r.RWMutex.RLock() - defer r.RWMutex.RUnlock() - for _, source := range r.cache { - for _, entity := range source { - if err := fn(entity); err != nil { - return err - } - } - } - return nil -} diff --git a/controllers/entity_cache_builder.go b/controllers/entity_cache_builder.go new file mode 100644 index 000000000..979e0edd5 --- /dev/null +++ b/controllers/entity_cache_builder.go @@ -0,0 +1,125 @@ +package controllers + +import ( + "context" + "fmt" + "time" + + "github.com/operator-framework/api/pkg/operators/v1alpha1" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/record" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + + "github.com/operator-framework/operator-controller/internal/entitycache" + "github.com/operator-framework/operator-controller/internal/resolution" + "github.com/operator-framework/operator-controller/internal/resolution/entity_sources/catalogsource" +) + +const ( + defaultSyncInterval = 5 * time.Minute + + eventTypeNormal = "Normal" + eventTypeWarning = "Warning" + + eventReasonCacheUpdated = "CacheUpdated" + eventReasonCacheUpdateFailed = "CacheUpdateFailed" +) + +type EntityCacheBuilderOption func(reconciler *EntityCacheBuilder) + +func WithEntitySourceConnector(entitysourceConnector resolution.EntitySourceConnector) EntityCacheBuilderOption { + return func(reconciler *EntityCacheBuilder) { + reconciler.entitySourceConnector = entitysourceConnector + } +} + +func WithSyncInterval(interval time.Duration) EntityCacheBuilderOption { + return func(reconciler *EntityCacheBuilder) { + reconciler.syncInterval = interval + } +} + +// applyDefaults applies default values to empty EntityCacheBuilder fields _after_ options have been applied +func applyDefaults() EntityCacheBuilderOption { + return func(reconciler *EntityCacheBuilder) { + if reconciler.entitySourceConnector == nil { + reconciler.entitySourceConnector = catalogsource.NewGRPCClientConnector(0) + } + if reconciler.syncInterval == 0 { + reconciler.syncInterval = defaultSyncInterval + } + } +} + +type EntityCacheBuilder struct { + client.Client + scheme *runtime.Scheme + entitySourceConnector resolution.EntitySourceConnector + recorder record.EventRecorder + syncInterval time.Duration + Cache *entitycache.EntityCache +} + +func NewEntityCacheBuilder(client client.Client, scheme *runtime.Scheme, recorder record.EventRecorder, options ...EntityCacheBuilderOption) *EntityCacheBuilder { + reconciler := &EntityCacheBuilder{ + Client: client, + scheme: scheme, + recorder: recorder, + syncInterval: 0, + } + // apply options + options = append(options, applyDefaults()) + for _, option := range options { + option(reconciler) + } + + return reconciler +} + +// +kubebuilder:rbac:groups=operators.coreos.com,resources=catalogsources,verbs=get;list;watch +// +kubebuilder:rbac:groups="",resources=events,verbs=create;patch + +func (r *EntityCacheBuilder) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + l := log.FromContext(ctx).WithName("entity-cache-builder") + l.V(1).Info("starting") + defer l.V(1).Info("ending") + + var catalogSource = &v1alpha1.CatalogSource{} + if err := r.Client.Get(ctx, req.NamespacedName, catalogSource); err != nil { + if errors.IsNotFound(err) { + r.Cache.DropSource(req.String()) + } + return ctrl.Result{}, client.IgnoreNotFound(err) + } + + entities, err := r.entitySourceConnector.ListEntities(ctx, catalogSource) + // TODO: invalidate stale cache for failed updates + if err != nil { + r.recorder.Event(catalogSource, eventTypeWarning, eventReasonCacheUpdateFailed, fmt.Sprintf("Failed to update bundle cache from %s/%s: %v", catalogSource.GetNamespace(), catalogSource.GetName(), err)) + // return ctrl.Result{Requeue: !isManagedCatalogSource(*catalogSource)}, err + return ctrl.Result{Requeue: true}, err + } + if updated := r.Cache.UpdateCache(req.String(), entities); updated { + r.recorder.Event(catalogSource, eventTypeNormal, eventReasonCacheUpdated, fmt.Sprintf("Successfully updated bundle cache from %s/%s", catalogSource.GetNamespace(), catalogSource.GetName())) + } + + // if isManagedCatalogSource(*catalogSource) { + // return ctrl.Result{}, nil + // } + return ctrl.Result{RequeueAfter: r.syncInterval}, nil +} + +// SetupWithManager sets up the controller with the Manager. +func (r *EntityCacheBuilder) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&v1alpha1.CatalogSource{}). + Complete(r) +} + +// TODO: find better way to identify catalogSources unmanaged by olm +// func isManagedCatalogSource(catalogSource v1alpha1.CatalogSource) bool { +// return len(catalogSource.Spec.Address) == 0 +// } diff --git a/controllers/catalogsource_controller_test.go b/controllers/entity_cache_builder_test.go similarity index 85% rename from controllers/catalogsource_controller_test.go rename to controllers/entity_cache_builder_test.go index 38b02f227..0f7d47e50 100644 --- a/controllers/catalogsource_controller_test.go +++ b/controllers/entity_cache_builder_test.go @@ -17,14 +17,14 @@ import ( "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" - catsrccontroller "github.com/operator-framework/operator-controller/controllers" - "github.com/operator-framework/operator-controller/internal/resolution/entity_sources/catalogsource" + "github.com/operator-framework/operator-controller/controllers" + "github.com/operator-framework/operator-controller/internal/resolution" ) -var _ catalogsource.RegistryClient = &fakeRegistryClient{} +var _ resolution.EntitySourceConnector = &fakeEntitySourceConnector{} const ( - unmanagedCatalogSourceSyncInterface = 1 * time.Second + syncInterval = 1 * time.Second ) type catalogContents struct { @@ -32,29 +32,29 @@ type catalogContents struct { err error } -type fakeRegistryClient struct { +type fakeEntitySourceConnector struct { catalogSource map[string]catalogContents } -func newFakeRegistryClient() *fakeRegistryClient { - return &fakeRegistryClient{ +func newFakeRegistryClient() *fakeEntitySourceConnector { + return &fakeEntitySourceConnector{ catalogSource: map[string]catalogContents{}, } } -func (r *fakeRegistryClient) setEntitiesForSource(catalogSourceID string, entities ...*input.Entity) { +func (r *fakeEntitySourceConnector) setEntitiesForSource(catalogSourceID string, entities ...*input.Entity) { r.catalogSource[catalogSourceID] = catalogContents{ Entities: entities, } } -func (r *fakeRegistryClient) setErrorForSource(catalogSourceID string, err error) { +func (r *fakeEntitySourceConnector) setErrorForSource(catalogSourceID string, err error) { r.catalogSource[catalogSourceID] = catalogContents{ err: err, } } -func (r *fakeRegistryClient) ListEntities(ctx context.Context, catsrc *v1alpha1.CatalogSource) ([]*input.Entity, error) { +func (r *fakeEntitySourceConnector) ListEntities(ctx context.Context, catsrc *v1alpha1.CatalogSource) ([]*input.Entity, error) { catalogSourceKey := types.NamespacedName{Namespace: catsrc.Namespace, Name: catsrc.Name}.String() if src, ok := r.catalogSource[catalogSourceKey]; ok { return src.Entities, src.err @@ -62,23 +62,23 @@ func (r *fakeRegistryClient) ListEntities(ctx context.Context, catsrc *v1alpha1. return []*input.Entity{}, nil } -var _ = Describe("CatalogSource Controller Test", func() { +var _ = Describe("EntitySource Reconciler Test", func() { var ( ctx context.Context - reconciler *catsrccontroller.CatalogSourceReconciler + reconciler *controllers.EntityCacheBuilder fakeRecorder *record.FakeRecorder - fakeRegistry *fakeRegistryClient + fakeRegistry *fakeEntitySourceConnector ) BeforeEach(func() { ctx = context.Background() fakeRecorder = record.NewFakeRecorder(5) fakeRegistry = newFakeRegistryClient() - reconciler = catsrccontroller.NewCatalogSourceReconciler( + reconciler = controllers.NewEntityCacheBuilder( cl, sch, fakeRecorder, - catsrccontroller.WithRegistryClient(fakeRegistry), - catsrccontroller.WithUnmanagedCatalogSourceSyncInterval(unmanagedCatalogSourceSyncInterface), + controllers.WithEntitySourceConnector(fakeRegistry), + controllers.WithSyncInterval(syncInterval), ) }) Describe("cache managements", func() { @@ -122,7 +122,7 @@ var _ = Describe("CatalogSource Controller Test", func() { By("checking the cache is empty") var entities []*input.Entity - err = reconciler.Iterate(ctx, func(entity *input.Entity) error { + err = reconciler.Cache.Iterate(ctx, func(entity *input.Entity) error { entities = append(entities, entity) return nil }) @@ -142,7 +142,7 @@ var _ = Describe("CatalogSource Controller Test", func() { By("checking the cache is populated") entities = nil - err = reconciler.Iterate(ctx, func(entity *input.Entity) error { + err = reconciler.Cache.Iterate(ctx, func(entity *input.Entity) error { entities = append(entities, entity) return nil }) @@ -163,7 +163,7 @@ var _ = Describe("CatalogSource Controller Test", func() { By("checking the cache is empty again") entities = nil - err = reconciler.Iterate(ctx, func(entity *input.Entity) error { + err = reconciler.Cache.Iterate(ctx, func(entity *input.Entity) error { entities = append(entities, entity) return nil }) @@ -211,12 +211,12 @@ var _ = Describe("CatalogSource Controller Test", func() { It("manages the cache", func() { By("running reconcile") res, err := reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: catSrcKey}) - Expect(res).To(Equal(ctrl.Result{RequeueAfter: unmanagedCatalogSourceSyncInterface})) + Expect(res).To(Equal(ctrl.Result{RequeueAfter: syncInterval})) Expect(err).ToNot(HaveOccurred()) By("checking the cache is empty") var entities []*input.Entity - err = reconciler.Iterate(ctx, func(entity *input.Entity) error { + err = reconciler.Cache.Iterate(ctx, func(entity *input.Entity) error { entities = append(entities, entity) return nil }) @@ -231,12 +231,12 @@ var _ = Describe("CatalogSource Controller Test", func() { By("re-running reconcile") res, err = reconciler.Reconcile(ctx, ctrl.Request{NamespacedName: catSrcKey}) - Expect(res).To(Equal(ctrl.Result{RequeueAfter: unmanagedCatalogSourceSyncInterface})) + Expect(res).To(Equal(ctrl.Result{RequeueAfter: syncInterval})) Expect(err).ToNot(HaveOccurred()) By("checking the cache is populated") entities = nil - err = reconciler.Iterate(ctx, func(entity *input.Entity) error { + err = reconciler.Cache.Iterate(ctx, func(entity *input.Entity) error { entities = append(entities, entity) return nil }) @@ -257,7 +257,7 @@ var _ = Describe("CatalogSource Controller Test", func() { By("checking the cache is empty again") entities = nil - err = reconciler.Iterate(ctx, func(entity *input.Entity) error { + err = reconciler.Cache.Iterate(ctx, func(entity *input.Entity) error { entities = append(entities, entity) return nil }) @@ -318,17 +318,17 @@ var _ = Describe("CatalogSource Controller Test", func() { Describe("Get", func() { It("should fetch an entity by ID", func() { - Expect(reconciler.Get(ctx, deppy.Identifier(fmt.Sprintf("%s/%s/pkg1/chan1/0.1.0", catalogSourceName, namespace)))).To( + Expect(reconciler.Cache.Get(ctx, deppy.Identifier(fmt.Sprintf("%s/%s/pkg1/chan1/0.1.0", catalogSourceName, namespace)))).To( Equal(input.NewEntity(deppy.Identifier(fmt.Sprintf("%s/%s/pkg1/chan1/0.1.0", catalogSourceName, namespace)), map[string]string{})), ) }) It("should not fetch anything for nonexistent entity ID", func() { - Expect(reconciler.Get(ctx, "non-existent")).To(BeNil()) + Expect(reconciler.Cache.Get(ctx, "non-existent")).To(BeNil()) }) }) Describe("Filter", func() { It("should return entities that meet filter predicates", func() { - actual, err := reconciler.Filter(ctx, func(e *input.Entity) bool { + actual, err := reconciler.Cache.Filter(ctx, func(e *input.Entity) bool { _, ok := e.Properties["k"] return ok }) @@ -338,7 +338,7 @@ var _ = Describe("CatalogSource Controller Test", func() { }) Describe("GroupBy", func() { It("should group entities by the keys provided by the groupBy function", func() { - actual, err := reconciler.GroupBy(ctx, func(e *input.Entity) []string { + actual, err := reconciler.Cache.GroupBy(ctx, func(e *input.Entity) []string { var keys []string for k := range e.Properties { keys = append(keys, k) @@ -352,7 +352,7 @@ var _ = Describe("CatalogSource Controller Test", func() { Describe("Iterate", func() { It("should go through all entities", func() { var ids []string - Expect(reconciler.Iterate(ctx, func(e *input.Entity) error { + Expect(reconciler.Cache.Iterate(ctx, func(e *input.Entity) error { ids = append(ids, e.Identifier().String()) return nil })).To(BeNil()) diff --git a/internal/entitycache/cache.go b/internal/entitycache/cache.go new file mode 100644 index 000000000..154da3543 --- /dev/null +++ b/internal/entitycache/cache.go @@ -0,0 +1,95 @@ +package entitycache + +import ( + "context" + "reflect" + "sync" + + "github.com/operator-framework/deppy/pkg/deppy" + "github.com/operator-framework/deppy/pkg/deppy/input" +) + +type EntityCache struct { + mutex sync.RWMutex + cache map[string]map[deppy.Identifier]*input.Entity +} + +func NewEntityCache() *EntityCache { + return &EntityCache{ + mutex: sync.RWMutex{}, + cache: map[string]map[deppy.Identifier]*input.Entity{}, + } +} + +func (c *EntityCache) UpdateCache(sourceID string, entities []*input.Entity) bool { + newSourceCache := make(map[deppy.Identifier]*input.Entity) + for _, entity := range entities { + newSourceCache[entity.Identifier()] = entity + } + if _, ok := c.cache[sourceID]; ok && reflect.DeepEqual(c.cache[sourceID], newSourceCache) { + return false + } + c.mutex.Lock() + defer c.mutex.Unlock() + c.cache[sourceID] = newSourceCache + // return whether cache had updates + return true +} + +func (c *EntityCache) DropSource(sourceID string) { + c.mutex.Lock() + defer c.mutex.Unlock() + delete(c.cache, sourceID) +} + +func (c *EntityCache) Get(ctx context.Context, id deppy.Identifier) *input.Entity { + c.mutex.RLock() + defer c.mutex.RUnlock() + // don't count on deppy ID to reflect its catalogsource + for _, source := range c.cache { + if entity, ok := source[id]; ok { + return entity + } + } + return nil +} + +func (c *EntityCache) Filter(ctx context.Context, filter input.Predicate) (input.EntityList, error) { + resultSet := input.EntityList{} + if err := c.Iterate(ctx, func(entity *input.Entity) error { + if filter(entity) { + resultSet = append(resultSet, *entity) + } + return nil + }); err != nil { + return nil, err + } + return resultSet, nil +} + +func (c *EntityCache) GroupBy(ctx context.Context, fn input.GroupByFunction) (input.EntityListMap, error) { + resultSet := input.EntityListMap{} + if err := c.Iterate(ctx, func(entity *input.Entity) error { + keys := fn(entity) + for _, key := range keys { + resultSet[key] = append(resultSet[key], *entity) + } + return nil + }); err != nil { + return nil, err + } + return resultSet, nil +} + +func (c *EntityCache) Iterate(ctx context.Context, fn input.IteratorFunction) error { + c.mutex.RLock() + defer c.mutex.RUnlock() + for _, source := range c.cache { + for _, entity := range source { + if err := fn(entity); err != nil { + return err + } + } + } + return nil +} diff --git a/internal/resolution/entity_source_connector.go b/internal/resolution/entity_source_connector.go new file mode 100644 index 000000000..3e31bec83 --- /dev/null +++ b/internal/resolution/entity_source_connector.go @@ -0,0 +1,12 @@ +package resolution + +import ( + "context" + + "github.com/operator-framework/api/pkg/operators/v1alpha1" + "github.com/operator-framework/deppy/pkg/deppy/input" +) + +type EntitySourceConnector interface { + ListEntities(ctx context.Context, catsrc *v1alpha1.CatalogSource) ([]*input.Entity, error) +} diff --git a/internal/resolution/entity_sources/catalogsource/registry_grpc_client.go b/internal/resolution/entity_sources/catalogsource/registry_grpc_client.go index 1ddd62eb8..b0079a904 100644 --- a/internal/resolution/entity_sources/catalogsource/registry_grpc_client.go +++ b/internal/resolution/entity_sources/catalogsource/registry_grpc_client.go @@ -12,6 +12,8 @@ import ( "github.com/operator-framework/api/pkg/operators/v1alpha1" "github.com/operator-framework/deppy/pkg/deppy/input" + "github.com/operator-framework/operator-controller/internal/resolution" + catalogsourceapi "github.com/operator-framework/operator-registry/pkg/api" "golang.org/x/net/http/httpproxy" "golang.org/x/net/proxy" @@ -20,22 +22,20 @@ import ( "google.golang.org/grpc/credentials/insecure" ) -type RegistryClient interface { - ListEntities(ctx context.Context, catsrc *v1alpha1.CatalogSource) ([]*input.Entity, error) -} +const DefaultGRPCTimeout = 2 * time.Minute -type registryGRPCClient struct { +type gRPCClientConnector struct { timeout time.Duration } -func NewRegistryGRPCClient(grpcTimeout time.Duration) RegistryClient { +func NewGRPCClientConnector(grpcTimeout time.Duration) resolution.EntitySourceConnector { if grpcTimeout == 0 { grpcTimeout = DefaultGRPCTimeout } - return ®istryGRPCClient{timeout: grpcTimeout} + return gRPCClientConnector{timeout: grpcTimeout} } -func (r *registryGRPCClient) ListEntities(ctx context.Context, catalogSource *v1alpha1.CatalogSource) ([]*input.Entity, error) { +func (r gRPCClientConnector) ListEntities(ctx context.Context, catalogSource *v1alpha1.CatalogSource) ([]*input.Entity, error) { // TODO: create GRPC connections separately conn, err := ConnectGRPCWithTimeout(ctx, catalogSource.Address(), r.timeout) if conn != nil { @@ -85,8 +85,6 @@ func (r *registryGRPCClient) ListEntities(ctx context.Context, catalogSource *v1 return entities, nil } -const DefaultGRPCTimeout = 2 * time.Minute - func ConnectGRPCWithTimeout(ctx context.Context, address string, timeout time.Duration) (conn *grpc.ClientConn, err error) { // based on https://github.com/operator-framework/operator-lifecycle-manager/blob/afc0848d102ecdc01a0b0f3b55d389bb66acf168/pkg/controller/registry/grpc/source.go#L149 conn, err = grpcConnection(address) diff --git a/internal/resolution/entity_sources/catalogsource/registry_grpc_client_test.go b/internal/resolution/entity_sources/catalogsource/registry_grpc_client_test.go index 10d0513fb..5567536cb 100644 --- a/internal/resolution/entity_sources/catalogsource/registry_grpc_client_test.go +++ b/internal/resolution/entity_sources/catalogsource/registry_grpc_client_test.go @@ -146,7 +146,7 @@ var _ = Describe("Registry GRPC Client", func() { }) It("lists entities from a grpc registry server", func() { - entities, err := catalogsource.NewRegistryGRPCClient(1*time.Minute).ListEntities(context.TODO(), &v1alpha1.CatalogSource{ + entities, err := catalogsource.NewGRPCClientConnector(1*time.Minute).ListEntities(context.TODO(), &v1alpha1.CatalogSource{ Spec: v1alpha1.CatalogSourceSpec{ Address: ":50052", }, diff --git a/main.go b/main.go index 0c26d4b35..f9fcf4a72 100644 --- a/main.go +++ b/main.go @@ -92,20 +92,20 @@ func main() { os.Exit(1) } - catsrcReconciler := controllers.NewCatalogSourceReconciler( + entityCacheBuilder := controllers.NewEntityCacheBuilder( mgr.GetClient(), mgr.GetScheme(), - mgr.GetEventRecorderFor("catalogsource-controller"), + mgr.GetEventRecorderFor("entitysource-reconciler"), ) - if err := catsrcReconciler.SetupWithManager(mgr); err != nil { - setupLog.Error(err, "unable to create catalog source controller", "controller", "CatalogSource") + if err := entityCacheBuilder.SetupWithManager(mgr); err != nil { + setupLog.Error(err, "unable to create entitysource reconciler", "reconciler", "EntitySource") os.Exit(1) } if err = (&controllers.OperatorReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), - Resolver: resolution.NewOperatorResolver(mgr.GetClient(), catsrcReconciler), + Resolver: resolution.NewOperatorResolver(mgr.GetClient(), entityCacheBuilder.Cache), }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Operator") os.Exit(1)