diff --git a/src/backend/replication/logical/launcher.c b/src/backend/replication/logical/launcher.c index 95b5cae9a552..1289d9776f6f 100644 --- a/src/backend/replication/logical/launcher.c +++ b/src/backend/replication/logical/launcher.c @@ -97,7 +97,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg); static void logicalrep_worker_onexit(int code, Datum arg); static void logicalrep_worker_detach(void); static void logicalrep_worker_cleanup(LogicalRepWorker *worker); -static int logicalrep_pa_worker_count(Oid subid); +static void logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply); static void logicalrep_launcher_attach_dshmem(void); static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time); static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid); @@ -365,7 +365,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, */ LWLockAcquire(LogicalRepWorkerLock, LW_EXCLUSIVE); -retry: /* Find unused worker slot. */ for (i = 0; i < max_logical_replication_workers; i++) { @@ -379,16 +378,21 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, } } - nsyncworkers = logicalrep_sync_worker_count(subid); + logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers); now = GetCurrentTimestamp(); /* - * If we didn't find a free slot, try to do garbage collection. The - * reason we do this is because if some worker failed to start up and its - * parent has crashed while waiting, the in_use state was never cleared. + * If we can't start a new logical replication background worker because + * no free slot is available, or because the number of sync workers or + * parallel apply workers has reached the limit per subscriptoin, try + * running garbage collection. The reason we do this is because if some + * workers failed to start up and their parent has crashed while waiting, + * the in_use state was never cleared. By freeing up these stale worker + * slots, we may be able to start a new worker. */ - if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription) + if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription || + nparallelapplyworkers >= max_parallel_apply_workers_per_subscription) { bool did_cleanup = false; @@ -410,11 +414,21 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, logicalrep_worker_cleanup(w); did_cleanup = true; + + if (worker == NULL) + { + worker = w; + slot = i; + } } } + /* + * Count the current number of sync and parallel apply workers again, + * since garbage collection may have changed it. + */ if (did_cleanup) - goto retry; + logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers); } /* @@ -428,8 +442,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype, return false; } - nparallelapplyworkers = logicalrep_pa_worker_count(subid); - /* * Return false if the number of parallel apply workers reached the limit * per subscription. @@ -886,48 +898,42 @@ logicalrep_worker_onexit(int code, Datum arg) int logicalrep_sync_worker_count(Oid subid) { - int i; int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); - - /* Search for attached worker for a given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) - { - LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - - if (isTablesyncWorker(w) && w->subid == subid) - res++; - } - + logicalrep_worker_count(subid, &res, NULL); return res; } /* - * Count the number of registered (but not necessarily running) parallel apply - * workers for a subscription. + * Count the number of registered (but not necessarily running) sync workers + * and parallel apply workers for a subscription. */ -static int -logicalrep_pa_worker_count(Oid subid) +static void +logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply) { - int i; - int res = 0; - Assert(LWLockHeldByMe(LogicalRepWorkerLock)); + if (nsync != NULL) + *nsync = 0; + if (nparallelapply != NULL) + *nparallelapply = 0; + /* - * Scan all attached parallel apply workers, only counting those which - * have the given subscription id. + * Scan all attached sync and parallel apply workers, only counting those + * which have the given subscription id. */ - for (i = 0; i < max_logical_replication_workers; i++) + for (int i = 0; i < max_logical_replication_workers; i++) { LogicalRepWorker *w = &LogicalRepCtx->workers[i]; - if (isParallelApplyWorker(w) && w->subid == subid) - res++; + if (w->subid == subid) + { + if (nsync != NULL && isTablesyncWorker(w)) + (*nsync)++; + if (nparallelapply != NULL && isParallelApplyWorker(w)) + (*nparallelapply)++; + } } - - return res; } /*