Skip to content

Commit 8933d6f

Browse files
michail-nikolaevCommitfest Bot
authored andcommitted
Support snapshot resets in parallel concurrent index builds
Extend periodic snapshot reset support to parallel builds, previously limited to non-parallel operations. This allows the xmin horizon to advance during parallel concurrent index builds as well. The main limitation of applying that technic to parallel builds was a requirement to wait until workers processes restore their initial snapshot from leader. To address this, following changes applied: - add infrastructure to track snapshot restoration in parallel workers - extend parallel scan initialization to support periodic snapshot resets - wait for parallel workers to restore their initial snapshots before proceeding with scan - relax limitation for parallel worker to call GetLatestSnapshot
1 parent 1e28a39 commit 8933d6f

File tree

14 files changed

+225
-89
lines changed

14 files changed

+225
-89
lines changed

src/backend/access/brin/brin.c

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,6 @@ typedef struct BrinLeader
143143
*/
144144
BrinShared *brinshared;
145145
Sharedsort *sharedsort;
146-
Snapshot snapshot;
147146
WalUsage *walusage;
148147
BufferUsage *bufferusage;
149148
} BrinLeader;
@@ -231,7 +230,7 @@ static void brin_fill_empty_ranges(BrinBuildState *state,
231230
static void _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
232231
bool isconcurrent, int request);
233232
static void _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state);
234-
static Size _brin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
233+
static Size _brin_parallel_estimate_shared(Relation heap);
235234
static double _brin_parallel_heapscan(BrinBuildState *state);
236235
static double _brin_parallel_merge(BrinBuildState *state);
237236
static void _brin_leader_participate_as_worker(BrinBuildState *buildstate,
@@ -1221,7 +1220,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
12211220
reltuples = _brin_parallel_merge(state);
12221221

12231222
_brin_end_parallel(state->bs_leader, state);
1224-
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
12251223
}
12261224
else /* no parallel index build */
12271225
{
@@ -1254,7 +1252,6 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
12541252
brin_fill_empty_ranges(state,
12551253
state->bs_currRangeStart,
12561254
state->bs_maxRangeStart);
1257-
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
12581255
}
12591256

12601257
/* release resources */
@@ -1269,6 +1266,7 @@ brinbuild(Relation heap, Relation index, IndexInfo *indexInfo)
12691266

12701267
result->heap_tuples = reltuples;
12711268
result->index_tuples = idxtuples;
1269+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xid));
12721270

12731271
return result;
12741272
}
@@ -2368,7 +2366,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
23682366
{
23692367
ParallelContext *pcxt;
23702368
int scantuplesortstates;
2371-
Snapshot snapshot;
23722369
Size estbrinshared;
23732370
Size estsort;
23742371
BrinShared *brinshared;
@@ -2399,25 +2396,25 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
23992396
* Prepare for scan of the base relation. In a normal index build, we use
24002397
* SnapshotAny because we must retrieve all tuples and do our own time
24012398
* qual checks (because we have to index RECENTLY_DEAD tuples). In a
2402-
* concurrent build, we take a regular MVCC snapshot and index whatever's
2403-
* live according to that.
2399+
* concurrent build, we take a regular MVCC snapshot and push it as active.
2400+
* Later we index whatever's live according to that snapshot while that
2401+
* snapshot is reset periodically.
24042402
*/
24052403
if (!isconcurrent)
24062404
{
24072405
Assert(ActiveSnapshotSet());
2408-
snapshot = SnapshotAny;
24092406
need_pop_active_snapshot = false;
24102407
}
24112408
else
24122409
{
2413-
snapshot = RegisterSnapshot(GetTransactionSnapshot());
2410+
Assert(!ActiveSnapshotSet());
24142411
PushActiveSnapshot(GetTransactionSnapshot());
24152412
}
24162413

24172414
/*
24182415
* Estimate size for our own PARALLEL_KEY_BRIN_SHARED workspace.
24192416
*/
2420-
estbrinshared = _brin_parallel_estimate_shared(heap, snapshot);
2417+
estbrinshared = _brin_parallel_estimate_shared(heap);
24212418
shm_toc_estimate_chunk(&pcxt->estimator, estbrinshared);
24222419
estsort = tuplesort_estimate_shared(scantuplesortstates);
24232420
shm_toc_estimate_chunk(&pcxt->estimator, estsort);
@@ -2457,8 +2454,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
24572454
{
24582455
if (need_pop_active_snapshot)
24592456
PopActiveSnapshot();
2460-
if (IsMVCCSnapshot(snapshot))
2461-
UnregisterSnapshot(snapshot);
24622457
DestroyParallelContext(pcxt);
24632458
ExitParallelMode();
24642459
return;
@@ -2483,7 +2478,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
24832478

24842479
table_parallelscan_initialize(heap,
24852480
ParallelTableScanFromBrinShared(brinshared),
2486-
snapshot);
2481+
isconcurrent ? InvalidSnapshot : SnapshotAny,
2482+
isconcurrent);
24872483

24882484
/*
24892485
* Store shared tuplesort-private state, for which we reserved space.
@@ -2529,7 +2525,6 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
25292525
brinleader->nparticipanttuplesorts++;
25302526
brinleader->brinshared = brinshared;
25312527
brinleader->sharedsort = sharedsort;
2532-
brinleader->snapshot = snapshot;
25332528
brinleader->walusage = walusage;
25342529
brinleader->bufferusage = bufferusage;
25352530

@@ -2545,6 +2540,13 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
25452540
/* Save leader state now that it's clear build will be parallel */
25462541
buildstate->bs_leader = brinleader;
25472542

2543+
/*
2544+
* In case of concurrent build snapshots are going to be reset periodically.
2545+
* We need to wait until all workers imported initial snapshot.
2546+
*/
2547+
if (isconcurrent)
2548+
WaitForParallelWorkersToAttach(pcxt, true);
2549+
25482550
/* Join heap scan ourselves */
25492551
if (leaderparticipates)
25502552
_brin_leader_participate_as_worker(buildstate, heap, index);
@@ -2553,7 +2555,8 @@ _brin_begin_parallel(BrinBuildState *buildstate, Relation heap, Relation index,
25532555
* Caller needs to wait for all launched workers when we return. Make
25542556
* sure that the failure-to-start case will not hang forever.
25552557
*/
2556-
WaitForParallelWorkersToAttach(pcxt);
2558+
if (!isconcurrent)
2559+
WaitForParallelWorkersToAttach(pcxt, false);
25572560
if (need_pop_active_snapshot)
25582561
PopActiveSnapshot();
25592562
}
@@ -2576,9 +2579,6 @@ _brin_end_parallel(BrinLeader *brinleader, BrinBuildState *state)
25762579
for (i = 0; i < brinleader->pcxt->nworkers_launched; i++)
25772580
InstrAccumParallelQuery(&brinleader->bufferusage[i], &brinleader->walusage[i]);
25782581

2579-
/* Free last reference to MVCC snapshot, if one was used */
2580-
if (IsMVCCSnapshot(brinleader->snapshot))
2581-
UnregisterSnapshot(brinleader->snapshot);
25822582
DestroyParallelContext(brinleader->pcxt);
25832583
ExitParallelMode();
25842584
}
@@ -2778,14 +2778,14 @@ _brin_parallel_merge(BrinBuildState *state)
27782778

27792779
/*
27802780
* Returns size of shared memory required to store state for a parallel
2781-
* brin index build based on the snapshot its parallel scan will use.
2781+
* brin index build.
27822782
*/
27832783
static Size
2784-
_brin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
2784+
_brin_parallel_estimate_shared(Relation heap)
27852785
{
27862786
/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
27872787
return add_size(BUFFERALIGN(sizeof(BrinShared)),
2788-
table_parallelscan_estimate(heap, snapshot));
2788+
table_parallelscan_estimate(heap, InvalidSnapshot));
27892789
}
27902790

27912791
/*
@@ -2807,6 +2807,7 @@ _brin_leader_participate_as_worker(BrinBuildState *buildstate, Relation heap, Re
28072807
/* Perform work common to all participants */
28082808
_brin_parallel_scan_and_build(buildstate, brinleader->brinshared,
28092809
brinleader->sharedsort, heap, index, sortmem, true);
2810+
Assert(!brinleader->brinshared->isconcurrent || !TransactionIdIsValid(MyProc->xid));
28102811
}
28112812

28122813
/*
@@ -2947,6 +2948,13 @@ _brin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
29472948

29482949
_brin_parallel_scan_and_build(buildstate, brinshared, sharedsort,
29492950
heapRel, indexRel, sortmem, false);
2951+
if (brinshared->isconcurrent)
2952+
{
2953+
PopActiveSnapshot();
2954+
InvalidateCatalogSnapshot();
2955+
Assert(!TransactionIdIsValid(MyProc->xid));
2956+
PushActiveSnapshot(GetTransactionSnapshot());
2957+
}
29502958

29512959
/* Report WAL/buffer usage during parallel execution */
29522960
bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);

src/backend/access/gin/gininsert.c

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,6 @@ typedef struct GinLeader
132132
*/
133133
GinBuildShared *ginshared;
134134
Sharedsort *sharedsort;
135-
Snapshot snapshot;
136135
WalUsage *walusage;
137136
BufferUsage *bufferusage;
138137
} GinLeader;
@@ -180,7 +179,7 @@ typedef struct
180179
static void _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
181180
bool isconcurrent, int request);
182181
static void _gin_end_parallel(GinLeader *ginleader, GinBuildState *state);
183-
static Size _gin_parallel_estimate_shared(Relation heap, Snapshot snapshot);
182+
static Size _gin_parallel_estimate_shared(Relation heap);
184183
static double _gin_parallel_heapscan(GinBuildState *state);
185184
static double _gin_parallel_merge(GinBuildState *state);
186185
static void _gin_leader_participate_as_worker(GinBuildState *buildstate,
@@ -721,7 +720,6 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
721720
reltuples = _gin_parallel_merge(state);
722721

723722
_gin_end_parallel(state->bs_leader, state);
724-
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
725723
}
726724
else /* no parallel index build */
727725
{
@@ -745,7 +743,6 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
745743
list, nlist, &buildstate.buildStats);
746744
}
747745
MemoryContextSwitchTo(oldCtx);
748-
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
749746
}
750747

751748
MemoryContextDelete(buildstate.funcCtx);
@@ -775,6 +772,7 @@ ginbuild(Relation heap, Relation index, IndexInfo *indexInfo)
775772

776773
result->heap_tuples = reltuples;
777774
result->index_tuples = buildstate.indtuples;
775+
Assert(!indexInfo->ii_Concurrent || !TransactionIdIsValid(MyProc->xmin));
778776

779777
return result;
780778
}
@@ -909,7 +907,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
909907
{
910908
ParallelContext *pcxt;
911909
int scantuplesortstates;
912-
Snapshot snapshot;
913910
Size estginshared;
914911
Size estsort;
915912
GinBuildShared *ginshared;
@@ -939,25 +936,25 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
939936
* Prepare for scan of the base relation. In a normal index build, we use
940937
* SnapshotAny because we must retrieve all tuples and do our own time
941938
* qual checks (because we have to index RECENTLY_DEAD tuples). In a
942-
* concurrent build, we take a regular MVCC snapshot and index whatever's
943-
* live according to that.
939+
* concurrent build, we take a regular MVCC snapshot and push it as active.
940+
* Later we index whatever's live according to that snapshot while that
941+
* snapshot is reset periodically.
944942
*/
945943
if (!isconcurrent)
946944
{
947945
Assert(ActiveSnapshotSet());
948-
snapshot = SnapshotAny;
949946
need_pop_active_snapshot = false;
950947
}
951948
else
952949
{
953-
snapshot = RegisterSnapshot(GetTransactionSnapshot());
950+
Assert(!ActiveSnapshotSet());
954951
PushActiveSnapshot(GetTransactionSnapshot());
955952
}
956953

957954
/*
958955
* Estimate size for our own PARALLEL_KEY_GIN_SHARED workspace.
959956
*/
960-
estginshared = _gin_parallel_estimate_shared(heap, snapshot);
957+
estginshared = _gin_parallel_estimate_shared(heap);
961958
shm_toc_estimate_chunk(&pcxt->estimator, estginshared);
962959
estsort = tuplesort_estimate_shared(scantuplesortstates);
963960
shm_toc_estimate_chunk(&pcxt->estimator, estsort);
@@ -997,8 +994,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
997994
{
998995
if (need_pop_active_snapshot)
999996
PopActiveSnapshot();
1000-
if (IsMVCCSnapshot(snapshot))
1001-
UnregisterSnapshot(snapshot);
1002997
DestroyParallelContext(pcxt);
1003998
ExitParallelMode();
1004999
return;
@@ -1022,7 +1017,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
10221017

10231018
table_parallelscan_initialize(heap,
10241019
ParallelTableScanFromGinBuildShared(ginshared),
1025-
snapshot);
1020+
isconcurrent ? InvalidSnapshot : SnapshotAny,
1021+
isconcurrent);
10261022

10271023
/*
10281024
* Store shared tuplesort-private state, for which we reserved space.
@@ -1064,7 +1060,6 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
10641060
ginleader->nparticipanttuplesorts++;
10651061
ginleader->ginshared = ginshared;
10661062
ginleader->sharedsort = sharedsort;
1067-
ginleader->snapshot = snapshot;
10681063
ginleader->walusage = walusage;
10691064
ginleader->bufferusage = bufferusage;
10701065

@@ -1080,6 +1075,13 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
10801075
/* Save leader state now that it's clear build will be parallel */
10811076
buildstate->bs_leader = ginleader;
10821077

1078+
/*
1079+
* In case of concurrent build snapshots are going to be reset periodically.
1080+
* We need to wait until all workers imported initial snapshot.
1081+
*/
1082+
if (isconcurrent)
1083+
WaitForParallelWorkersToAttach(pcxt, true);
1084+
10831085
/* Join heap scan ourselves */
10841086
if (leaderparticipates)
10851087
_gin_leader_participate_as_worker(buildstate, heap, index);
@@ -1088,7 +1090,8 @@ _gin_begin_parallel(GinBuildState *buildstate, Relation heap, Relation index,
10881090
* Caller needs to wait for all launched workers when we return. Make
10891091
* sure that the failure-to-start case will not hang forever.
10901092
*/
1091-
WaitForParallelWorkersToAttach(pcxt);
1093+
if (!isconcurrent)
1094+
WaitForParallelWorkersToAttach(pcxt, false);
10921095
if (need_pop_active_snapshot)
10931096
PopActiveSnapshot();
10941097
}
@@ -1111,9 +1114,6 @@ _gin_end_parallel(GinLeader *ginleader, GinBuildState *state)
11111114
for (i = 0; i < ginleader->pcxt->nworkers_launched; i++)
11121115
InstrAccumParallelQuery(&ginleader->bufferusage[i], &ginleader->walusage[i]);
11131116

1114-
/* Free last reference to MVCC snapshot, if one was used */
1115-
if (IsMVCCSnapshot(ginleader->snapshot))
1116-
UnregisterSnapshot(ginleader->snapshot);
11171117
DestroyParallelContext(ginleader->pcxt);
11181118
ExitParallelMode();
11191119
}
@@ -1794,14 +1794,14 @@ _gin_parallel_merge(GinBuildState *state)
17941794

17951795
/*
17961796
* Returns size of shared memory required to store state for a parallel
1797-
* gin index build based on the snapshot its parallel scan will use.
1797+
* gin index build.
17981798
*/
17991799
static Size
1800-
_gin_parallel_estimate_shared(Relation heap, Snapshot snapshot)
1800+
_gin_parallel_estimate_shared(Relation heap)
18011801
{
18021802
/* c.f. shm_toc_allocate as to why BUFFERALIGN is used */
18031803
return add_size(BUFFERALIGN(sizeof(GinBuildShared)),
1804-
table_parallelscan_estimate(heap, snapshot));
1804+
table_parallelscan_estimate(heap, InvalidSnapshot));
18051805
}
18061806

18071807
/*
@@ -1824,6 +1824,7 @@ _gin_leader_participate_as_worker(GinBuildState *buildstate, Relation heap, Rela
18241824
_gin_parallel_scan_and_build(buildstate, ginleader->ginshared,
18251825
ginleader->sharedsort, heap, index,
18261826
sortmem, true);
1827+
Assert(!ginleader->ginshared->isconcurrent || !TransactionIdIsValid(MyProc->xid));
18271828
}
18281829

18291830
/*
@@ -2183,6 +2184,13 @@ _gin_parallel_build_main(dsm_segment *seg, shm_toc *toc)
21832184

21842185
_gin_parallel_scan_and_build(&buildstate, ginshared, sharedsort,
21852186
heapRel, indexRel, sortmem, false);
2187+
if (ginshared->isconcurrent)
2188+
{
2189+
PopActiveSnapshot();
2190+
InvalidateCatalogSnapshot();
2191+
Assert(!TransactionIdIsValid(MyProc->xid));
2192+
PushActiveSnapshot(GetTransactionSnapshot());
2193+
}
21862194

21872195
/* Report WAL/buffer usage during parallel execution */
21882196
bufferusage = shm_toc_lookup(toc, PARALLEL_KEY_BUFFER_USAGE, false);

src/backend/access/heap/heapam_handler.c

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,14 +1235,13 @@ heapam_index_build_range_scan(Relation heapRelation,
12351235
* SnapshotAny because we must retrieve all tuples and do our own time
12361236
* qual checks (because we have to index RECENTLY_DEAD tuples). In a
12371237
* concurrent build, or during bootstrap, we take a regular MVCC snapshot
1238-
* and index whatever's live according to that.
1238+
* and index whatever's live according to that while that snapshot is reset
1239+
* every so often (in case of non-unique index).
12391240
*/
12401241
OldestXmin = InvalidTransactionId;
12411242

12421243
/*
12431244
* For unique index we need consistent snapshot for the whole scan.
1244-
* In case of parallel scan some additional infrastructure required
1245-
* to perform scan with SO_RESET_SNAPSHOT which is not yet ready.
12461245
*/
12471246
reset_snapshots = indexInfo->ii_Concurrent &&
12481247
!indexInfo->ii_Unique &&
@@ -1304,8 +1303,11 @@ heapam_index_build_range_scan(Relation heapRelation,
13041303
Assert(!IsBootstrapProcessingMode());
13051304
Assert(allow_sync);
13061305
snapshot = scan->rs_snapshot;
1307-
PushActiveSnapshot(snapshot);
1308-
need_pop_active_snapshot = true;
1306+
if (!reset_snapshots)
1307+
{
1308+
PushActiveSnapshot(snapshot);
1309+
need_pop_active_snapshot = true;
1310+
}
13091311
}
13101312

13111313
hscan = (HeapScanDesc) scan;

0 commit comments

Comments
 (0)