Skip to content

Commit aedba6c

Browse files
MasaoFujiiCommitfest Bot
authored andcommitted
Fix bug that could block the startup of parallel apply workers.
If a logical replication worker fails to start and its parent crashes while waiting, its worker slot can remain marked as "in use". This can prevent new workers from starting, as the launcher may not find a free slot or may incorrectly think the sync or parallel apply worker limits have been reached. To handle this, the launcher already performs garbage collection when no free slot is found or when the sync worker limit is hit, and then retries launching workers. However, it previously did not trigger garbage collection when the parallel apply worker limit was reached. As a result, stale slots could block new parallel apply workers from starting, even though they could have been launched after cleanup. This commit fixes the issue by triggering garbage collection when the parallel apply worker limit is reached as well. If stale slots are cleared and the number of parallel apply workers drops below the limit, new parallel apply worker can then be started successfully.
1 parent 93b7ab5 commit aedba6c

File tree

1 file changed

+31
-34
lines changed

1 file changed

+31
-34
lines changed

src/backend/replication/logical/launcher.c

Lines changed: 31 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ static void logicalrep_launcher_onexit(int code, Datum arg);
9797
static void logicalrep_worker_onexit(int code, Datum arg);
9898
static void logicalrep_worker_detach(void);
9999
static void logicalrep_worker_cleanup(LogicalRepWorker *worker);
100-
static int logicalrep_pa_worker_count(Oid subid);
100+
static void logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply);
101101
static void logicalrep_launcher_attach_dshmem(void);
102102
static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
103103
static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
@@ -379,16 +379,21 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
379379
}
380380
}
381381

382-
nsyncworkers = logicalrep_sync_worker_count(subid);
382+
logicalrep_worker_count(subid, &nsyncworkers, &nparallelapplyworkers);
383383

384384
now = GetCurrentTimestamp();
385385

386386
/*
387-
* If we didn't find a free slot, try to do garbage collection. The
388-
* reason we do this is because if some worker failed to start up and its
389-
* parent has crashed while waiting, the in_use state was never cleared.
387+
* If we can't start a new logical replication background worker because
388+
* no free slot is available, or because the number of sync workers or
389+
* parallel apply workers has reached the limit per subscriptoin, try
390+
* running garbage collection. The reason we do this is because if some
391+
* workers failed to start up and their parent has crashed while waiting,
392+
* the in_use state was never cleared. By freeing up these stale worker
393+
* slots, we may be able to start a new worker.
390394
*/
391-
if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription)
395+
if (worker == NULL || nsyncworkers >= max_sync_workers_per_subscription ||
396+
nparallelapplyworkers >= max_parallel_apply_workers_per_subscription)
392397
{
393398
bool did_cleanup = false;
394399

@@ -428,8 +433,6 @@ logicalrep_worker_launch(LogicalRepWorkerType wtype,
428433
return false;
429434
}
430435

431-
nparallelapplyworkers = logicalrep_pa_worker_count(subid);
432-
433436
/*
434437
* Return false if the number of parallel apply workers reached the limit
435438
* per subscription.
@@ -886,48 +889,42 @@ logicalrep_worker_onexit(int code, Datum arg)
886889
int
887890
logicalrep_sync_worker_count(Oid subid)
888891
{
889-
int i;
890892
int res = 0;
891893

892-
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
893-
894-
/* Search for attached worker for a given subscription id. */
895-
for (i = 0; i < max_logical_replication_workers; i++)
896-
{
897-
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
898-
899-
if (isTablesyncWorker(w) && w->subid == subid)
900-
res++;
901-
}
902-
894+
logicalrep_worker_count(subid, &res, NULL);
903895
return res;
904896
}
905897

906898
/*
907-
* Count the number of registered (but not necessarily running) parallel apply
908-
* workers for a subscription.
899+
* Count the number of registered (but not necessarily running) sync workers
900+
* and parallel apply workers for a subscription.
909901
*/
910-
static int
911-
logicalrep_pa_worker_count(Oid subid)
902+
static void
903+
logicalrep_worker_count(Oid subid, int *nsync, int *nparallelapply)
912904
{
913-
int i;
914-
int res = 0;
915-
916905
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
917906

907+
if (nsync != NULL)
908+
*nsync = 0;
909+
if (nparallelapply != NULL)
910+
*nparallelapply = 0;
911+
918912
/*
919-
* Scan all attached parallel apply workers, only counting those which
920-
* have the given subscription id.
913+
* Scan all attached sync and parallel apply workers, only counting those
914+
* which have the given subscription id.
921915
*/
922-
for (i = 0; i < max_logical_replication_workers; i++)
916+
for (int i = 0; i < max_logical_replication_workers; i++)
923917
{
924918
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
925919

926-
if (isParallelApplyWorker(w) && w->subid == subid)
927-
res++;
920+
if (w->subid == subid)
921+
{
922+
if (nsync != NULL && isTablesyncWorker(w))
923+
(*nsync)++;
924+
if (nparallelapply != NULL && isParallelApplyWorker(w))
925+
(*nparallelapply)++;
926+
}
928927
}
929-
930-
return res;
931928
}
932929

933930
/*

0 commit comments

Comments
 (0)