Skip to content

Commit 1e28a39

Browse files
michail-nikolaevCommitfest Bot
authored andcommitted
Reset snapshots periodically in non-unique non-parallel concurrent index builds
Long-living snapshots used by CREATE INDEX CONCURRENTLY and REINDEX CONCURRENTLY can hold back the global xmin horizon. Commit d9d0762 attempted to allow VACUUM to ignore such snapshots to mitigate this problem. However, this was reverted in commit e28bb88 because it could cause indexes to miss heap tuples that were HOT-updated and HOT-pruned during the index creation, leading to index corruption. This patch introduces an alternative by periodically resetting the snapshot used during the first phase. By resetting the snapshot every N pages during the heap scan, it allows the xmin horizon to advance. Currently, this technique is applied to: - only during the first scan of the heap: The second scan during index validation still uses a single snapshot to ensure index correctness - non-parallel index builds: Parallel index builds are not yet supported and will be addressed in a following commits - non-unique indexes: Unique index builds still require a consistent snapshot to enforce uniqueness constraints, will be addressed in a following commits A new scan option SO_RESET_SNAPSHOT is introduced. When set, it causes the snapshot to be reset "between" every SO_RESET_SNAPSHOT_EACH_N_PAGE pages during the scan. The heap scan code is adjusted to support this option, and the index build code is modified to use it for applicable concurrent index builds that are not on system catalogs and not using parallel workers.
1 parent 9b41773 commit 1e28a39

File tree

20 files changed

+428
-35
lines changed

20 files changed

+428
-35
lines changed

contrib/amcheck/verify_nbtree.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,8 @@ bt_check_every_level(Relation rel, Relation heaprel, bool heapkeyspace,
558558
0, /* number of keys */
559559
NULL, /* scan key */
560560
true, /* buffer access strategy OK */
561-
true); /* syncscan OK? */
561+
true, /* syncscan OK? */
562+
false);
562563

563564
/*
564565
* Scan will behave as the first scan of a CREATE INDEX CONCURRENTLY

contrib/pgstattuple/pgstattuple.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,7 +335,7 @@ pgstat_heap(Relation rel, FunctionCallInfo fcinfo)
335335
errmsg("only heap AM is supported")));
336336

337337
/* Disable syncscan because we assume we scan from block zero upwards */
338-
scan = table_beginscan_strat(rel, SnapshotAny, 0, NULL, true, false);
338+
scan = table_beginscan_strat(rel, SnapshotAny, 0, NULL, true, false, false);
339339
hscan = (HeapScanDesc) scan;
340340

341341
InitDirtySnapshot(SnapshotDirty);

src/backend/access/brin/brin.c

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1216,11 +1216,12 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
12161216
state->bs_sortstate =
12171217
tuplesort_begin_index_brin(maintenance_work_mem, coordinate,
12181218
TUPLESORT_NONE);
1219-
1219+
InvalidateCatalogSnapshot();
12201220
/* scan the relation and merge per-worker results */
12211221
reltuples = _brin_parallel_merge(state);
12221222

12231223
_brin_end_parallel(state->bs_leader, state);
1224+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
12241225
}
12251226
else /* no parallel index build */
12261227
{
@@ -1233,6 +1234,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
12331234
reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
12341235
brinbuildCallback, state, NULL);
12351236

1237+
InvalidateCatalogSnapshot();
12361238
/*
12371239
* process the final batch
12381240
*
@@ -1252,6 +1254,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
12521254
brin_fill_empty_ranges(state,
12531255
state->bs_currRangeStart,
12541256
state->bs_maxRangeStart);
1257+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
12551258
}
12561259

12571260
/* release resources */
@@ -2374,6 +2377,7 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
23742377
WalUsage *walusage;
23752378
BufferUsage *bufferusage;
23762379
bool leaderparticipates = true;
2380+
bool need_pop_active_snapshot = true;
23772381
int querylen;
23782382

23792383
#ifdef DISABLE_LEADER_PARTICIPATION
@@ -2399,9 +2403,16 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
23992403
* live according to that.
24002404
*/
24012405
if (!isconcurrent)
2406+
{
2407+
Assert(ActiveSnapshotSet());
24022408
snapshot = SnapshotAny;
2409+
need_pop_active_snapshot = false;
2410+
}
24032411
else
2412+
{
24042413
snapshot = RegisterSnapshot(GetTransactionSnapshot());
2414+
PushActiveSnapshot(GetTransactionSnapshot());
2415+
}
24052416

24062417
/*
24072418
* Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
@@ -2444,6 +2455,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
24442455
/* If no DSM segment was available, back out (do serial build) */
24452456
if (pcxt->seg == NULL)
24462457
{
2458+
if (need_pop_active_snapshot)
2459+
PopActiveSnapshot();
24472460
if (IsMVCCSnapshot(snapshot))
24482461
UnregisterSnapshot(snapshot);
24492462
DestroyParallelContext(pcxt);
@@ -2523,6 +2536,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
25232536
/* If no workers were successfully launched, back out (do serial build) */
25242537
if (pcxt->nworkers_launched == 0)
25252538
{
2539+
if (need_pop_active_snapshot)
2540+
PopActiveSnapshot();
25262541
_brin_end_parallel(brinleader, NULL);
25272542
return;
25282543
}
@@ -2539,6 +2554,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
25392554
* sure that the failure-to-start case will not hang forever.
25402555
*/
25412556
WaitForParallelWorkersToAttach(pcxt);
2557+
if (need_pop_active_snapshot)
2558+
PopActiveSnapshot();
25422559
}
25432560

25442561
/*

src/backend/access/gin/gininsert.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "pgstat.h"
2929
#include "storage/bufmgr.h"
3030
#include "storage/predicate.h"
31+
#include "storage/proc.h"
3132
#include "tcop/tcopprot.h"
3233
#include "utils/datum.h"
3334
#include "utils/memutils.h"
@@ -650,6 +651,8 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
650651
buildstate.accum.ginstate = &buildstate.ginstate;
651652
ginInitBA(&buildstate.accum);
652653

654+
Assert(!indexInfo->ii_Concurrent || indexInfo->ii_ParallelWorkers || !TransactionIdIsValid(MyProc->xid));
655+
653656
/* Report table scan phase started */
654657
pgstat_progress_update_param(PROGRESS_CREATEIDX_SUBPHASE,
655658
PROGRESS_GIN_PHASE_INDEXBUILD_TABLESCAN);
@@ -712,11 +715,13 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
712715
tuplesort_begin_index_gin(heap, index,
713716
maintenance_work_mem, coordinate,
714717
TUPLESORT_NONE);
718+
InvalidateCatalogSnapshot();
715719

716720
/* scan the relation in parallel and merge per-worker results */
717721
reltuples = _gin_parallel_merge(state);
718722

719723
_gin_end_parallel(state->bs_leader, state);
724+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
720725
}
721726
else /* no parallel index build */
722727
{
@@ -726,6 +731,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
726731
*/
727732
reltuples = table_index_build_scan(heap, index, indexInfo, false, true,
728733
ginBuildCallback, &buildstate, NULL);
734+
InvalidateCatalogSnapshot();
729735

730736
/* dump remaining entries to the index */
731737
oldCtx = MemoryContextSwitchTo(buildstate.tmpCtx);
@@ -739,6 +745,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
739745
list, nlist, &buildstate.buildStats);
740746
}
741747
MemoryContextSwitchTo(oldCtx);
748+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
742749
}
743750

744751
MemoryContextDelete(buildstate.funcCtx);
@@ -911,6 +918,7 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
911918
WalUsage *walusage;
912919
BufferUsage *bufferusage;
913920
bool leaderparticipates = true;
921+
bool need_pop_active_snapshot = true;
914922
int querylen;
915923

916924
#ifdef DISABLE_LEADER_PARTICIPATION
@@ -935,9 +943,16 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
935943
* live according to that.
936944
*/
937945
if (!isconcurrent)
946+
{
947+
Assert(ActiveSnapshotSet());
938948
snapshot = SnapshotAny;
949+
need_pop_active_snapshot = false;
950+
}
939951
else
952+
{
940953
snapshot = RegisterSnapshot(GetTransactionSnapshot());
954+
PushActiveSnapshot(GetTransactionSnapshot());
955+
}
941956

942957
/*
943958
* Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace.
@@ -980,6 +995,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
980995
/* If no DSM segment was available, back out (do serial build) */
981996
if (pcxt->seg == NULL)
982997
{
998+
if (need_pop_active_snapshot)
999+
PopActiveSnapshot();
9831000
if (IsMVCCSnapshot(snapshot))
9841001
UnregisterSnapshot(snapshot);
9851002
DestroyParallelContext(pcxt);
@@ -1054,6 +1071,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
10541071
/* If no workers were successfully launched, back out (do serial build) */
10551072
if (pcxt->nworkers_launched == 0)
10561073
{
1074+
if (need_pop_active_snapshot)
1075+
PopActiveSnapshot();
10571076
_gin_end_parallel(ginleader, NULL);
10581077
return;
10591078
}
@@ -1070,6 +1089,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
10701089
* sure that the failure-to-start case will not hang forever.
10711090
*/
10721091
WaitForParallelWorkersToAttach(pcxt);
1092+
if (need_pop_active_snapshot)
1093+
PopActiveSnapshot();
10731094
}
10741095

10751096
/*

src/backend/access/gist/gistbuild.c

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
#include "optimizer/optimizer.h"
4444
#include "storage/bufmgr.h"
4545
#include "storage/bulk_write.h"
46+
#include "storage/proc.h"
4647

4748
#include "utils/memutils.h"
4849
#include "utils/rel.h"
@@ -259,6 +260,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
259260
buildstate.indtuples = 0;
260261
buildstate.indtuplesSize = 0;
261262

263+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xid));
262264
if (buildstate.buildMode == GIST_SORTED_BUILD)
263265
{
264266
/*
@@ -350,6 +352,7 @@ gistbuild(Relation heap, Relation index, IndexInfo *indexInfo)
350352

351353
result->heap_tuples = reltuples;
352354
result->index_tuples = (double) buildstate.indtuples;
355+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xid));
353356

354357
return result;
355358
}

src/backend/access/hash/hash.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ hashbuild(Relation heap, Relation index, IndexInfo *indexInfo)
197197

198198
result->heap_tuples = reltuples;
199199
result->index_tuples = buildstate.indtuples;
200+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xid));
200201

201202
return result;
202203
}

src/backend/access/heap/heapam.c

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
#include "utils/inval.h"
5454
#include "utils/spccache.h"
5555
#include "utils/syscache.h"
56+
#include "utils/injection_point.h"
5657

5758

5859
static HeapTuple heap_prepare_insert(Relation relation, HeapTuple tup,
@@ -633,6 +634,36 @@ heap_prepare_pagescan(TableScanDesc sscan)
633634
LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
634635
}
635636

637+
/*
638+
* Reset the active snapshot during a scan.
639+
* This ensures the xmin horizon can advance while maintaining safe tuple visibility.
640+
* Note: No other snapshot should be active during this operation.
641+
*/
642+
static inline void
643+
heap_reset_scan_snapshot(TableScanDesc sscan)
644+
{
645+
/* Make sure no other snapshot was set as active. */
646+
Assert(GetActiveSnapshot() == sscan->rs_snapshot);
647+
/* And make sure active snapshot is not registered. */
648+
Assert(GetActiveSnapshot()->regd_count == 0);
649+
PopActiveSnapshot();
650+
651+
sscan->rs_snapshot = InvalidSnapshot; /* just ot be tidy */
652+
Assert(!HaveRegisteredOrActiveSnapshot());
653+
InvalidateCatalogSnapshot();
654+
655+
/* Goal of snapshot reset is to allow horizon to advance. */
656+
Assert(!TransactionIdIsValid(MyProc->xmin));
657+
#if USE_INJECTION_POINTS
658+
/* In some cases it is still not possible due xid assign. */
659+
if (!TransactionIdIsValid(MyProc->xid))
660+
INJECTION_POINT("heap_reset_scan_snapshot_effective", NULL);
661+
#endif
662+
663+
PushActiveSnapshot(GetLatestSnapshot());
664+
sscan->rs_snapshot = GetActiveSnapshot();
665+
}
666+
636667
/*
637668
* heap_fetch_next_buffer - read and pin the next block from MAIN_FORKNUM.
638669
*
@@ -674,7 +705,12 @@ heap_fetch_next_buffer(HeapScanDesc scan, ScanDirection dir)
674705

675706
scan->rs_cbuf = read_stream_next_buffer(scan->rs_read_stream, NULL);
676707
if (BufferIsValid(scan->rs_cbuf))
708+
{
677709
scan->rs_cblock = BufferGetBlockNumber(scan->rs_cbuf);
710+
if ((scan->rs_base.rs_flags & SO_RESET_SNAPSHOT) &&
711+
(scan->rs_cblock % SO_RESET_SNAPSHOT_EACH_N_PAGE == 0))
712+
heap_reset_scan_snapshot((TableScanDesc) scan);
713+
}
678714
}
679715

680716
/*
@@ -1336,6 +1372,15 @@ heap_endscan(TableScanDesc sscan)
13361372
if (scan->rs_parallelworkerdata != NULL)
13371373
pfree(scan->rs_parallelworkerdata);
13381374

1375+
if (scan->rs_base.rs_flags & SO_RESET_SNAPSHOT)
1376+
{
1377+
Assert(!(scan->rs_base.rs_flags & SO_TEMP_SNAPSHOT));
1378+
/* Make sure no other snapshot was set as active. */
1379+
Assert(GetActiveSnapshot() == sscan->rs_snapshot);
1380+
/* And make sure snapshot is not registered. */
1381+
Assert(GetActiveSnapshot()->regd_count == 0);
1382+
}
1383+
13391384
if (scan->rs_base.rs_flags & SO_TEMP_SNAPSHOT)
13401385
UnregisterSnapshot(scan->rs_base.rs_snapshot);
13411386

0 commit comments

Comments
 (0)