Skip to content

Commit 3f19297

Browse files
Cary HuangCommitfest Bot
authored andcommitted
v10 parallel tid range scan
1 parent cf8be02 commit 3f19297

File tree

14 files changed

+410
-21
lines changed

14 files changed

+410
-21
lines changed

src/backend/access/heap/heapam.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,16 @@ heap_setscanlimits(TableScanDesc sscan, BlockNumber startBlk, BlockNumber numBlk
490490

491491
scan->rs_startblock = startBlk;
492492
scan->rs_numblocks = numBlks;
493+
494+
/* set the limits in the ParallelBlockTableScanDesc, when present as leader */
495+
if (scan->rs_base.rs_parallel != NULL && !IsParallelWorker())
496+
{
497+
ParallelBlockTableScanDesc bpscan;
498+
499+
bpscan = (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel;
500+
bpscan->phs_startblock = startBlk;
501+
bpscan->phs_numblock = numBlks;
502+
}
493503
}
494504

495505
/*

src/backend/access/table/tableam.c

Lines changed: 45 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,42 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan)
188188
pscan, flags);
189189
}
190190

191+
TableScanDesc
192+
table_beginscan_parallel_tidrange(Relation relation, ParallelTableScanDesc pscan,
193+
ItemPointerData * mintid, ItemPointerData * maxtid)
194+
{
195+
Snapshot snapshot;
196+
uint32 flags = SO_TYPE_TIDRANGESCAN | SO_ALLOW_PAGEMODE;
197+
TableScanDesc sscan;
198+
199+
Assert(RelFileLocatorEquals(relation->rd_locator, pscan->phs_locator));
200+
201+
/* disable syncscan in parallel tid range scan. */
202+
pscan->phs_syncscan = false;
203+
204+
if (!pscan->phs_snapshot_any)
205+
{
206+
/* Snapshot was serialized -- restore it */
207+
snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off);
208+
RegisterSnapshot(snapshot);
209+
flags |= SO_TEMP_SNAPSHOT;
210+
}
211+
else
212+
{
213+
/* SnapshotAny passed by caller (not serialized) */
214+
snapshot = SnapshotAny;
215+
}
216+
217+
sscan = relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL,
218+
pscan, flags);
219+
220+
/* Set the TID range if needed */
221+
if (mintid && maxtid)
222+
relation->rd_tableam->scan_set_tidrange(sscan, mintid, maxtid);
223+
224+
return sscan;
225+
}
226+
191227

192228
/* ----------------------------------------------------------------------------
193229
* Index scan related functions.
@@ -398,6 +434,7 @@ table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
398434
bpscan->phs_nblocks > NBuffers / 4;
399435
SpinLockInit(&bpscan->phs_mutex);
400436
bpscan->phs_startblock = InvalidBlockNumber;
437+
bpscan->phs_numblock = InvalidBlockNumber;
401438
pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
402439

403440
return sizeof(ParallelBlockTableScanDescData);
@@ -577,8 +614,15 @@ table_block_parallelscan_nextpage(Relation rel,
577614
pbscanwork->phsw_chunk_remaining = pbscanwork->phsw_chunk_size - 1;
578615
}
579616

617+
/*
618+
* Check if we've allocated every block in the relation, or if we've
619+
* reached the limit imposed by pbscan->phs_numblock (if set).
620+
*/
580621
if (nallocated >= pbscan->phs_nblocks)
581-
page = InvalidBlockNumber; /* all blocks have been allocated */
622+
page = InvalidBlockNumber; /* all blocks have been allocated */
623+
else if (pbscan->phs_numblock != InvalidBlockNumber &&
624+
nallocated >= pbscan->phs_numblock)
625+
page = InvalidBlockNumber; /* upper scan limit reached */
582626
else
583627
page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks;
584628

src/backend/executor/execParallel.c

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@
4141
#include "executor/nodeSort.h"
4242
#include "executor/nodeSubplan.h"
4343
#include "executor/tqueue.h"
44+
#include "executor/nodeTidrangescan.h"
4445
#include "jit/jit.h"
4546
#include "nodes/nodeFuncs.h"
4647
#include "pgstat.h"
@@ -266,6 +267,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
266267
ExecForeignScanEstimate((ForeignScanState *) planstate,
267268
e->pcxt);
268269
break;
270+
case T_TidRangeScanState:
271+
if (planstate->plan->parallel_aware)
272+
ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
273+
e->pcxt);
274+
break;
269275
case T_AppendState:
270276
if (planstate->plan->parallel_aware)
271277
ExecAppendEstimate((AppendState *) planstate,
@@ -493,6 +499,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
493499
ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
494500
d->pcxt);
495501
break;
502+
case T_TidRangeScanState:
503+
if (planstate->plan->parallel_aware)
504+
ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
505+
d->pcxt);
506+
break;
496507
case T_AppendState:
497508
if (planstate->plan->parallel_aware)
498509
ExecAppendInitializeDSM((AppendState *) planstate,
@@ -994,6 +1005,11 @@ ExecParallelReInitializeDSM(PlanState *planstate,
9941005
ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
9951006
pcxt);
9961007
break;
1008+
case T_TidRangeScanState:
1009+
if (planstate->plan->parallel_aware)
1010+
ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
1011+
pcxt);
1012+
break;
9971013
case T_AppendState:
9981014
if (planstate->plan->parallel_aware)
9991015
ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
@@ -1362,6 +1378,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
13621378
ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
13631379
pwcxt);
13641380
break;
1381+
case T_TidRangeScanState:
1382+
if (planstate->plan->parallel_aware)
1383+
ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
1384+
pwcxt);
1385+
break;
13651386
case T_AppendState:
13661387
if (planstate->plan->parallel_aware)
13671388
ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);

src/backend/executor/nodeTidrangescan.c

Lines changed: 111 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,9 +250,13 @@ TidRangeNext(TidRangeScanState *node)
250250
}
251251
else
252252
{
253-
/* rescan with the updated TID range */
254-
table_rescan_tidrange(scandesc, &node->trss_mintid,
255-
&node->trss_maxtid);
253+
/* rescan with the updated TID range only in non-parallel mode */
254+
if (scandesc->rs_parallel == NULL)
255+
{
256+
/* rescan with the updated TID range */
257+
table_rescan_tidrange(scandesc, &node->trss_mintid,
258+
&node->trss_maxtid);
259+
}
256260
}
257261

258262
node->trss_inScan = true;
@@ -415,3 +419,107 @@ ExecInitTidRangeScan(TidRangeScan *node, EState *estate, int eflags)
415419
*/
416420
return tidrangestate;
417421
}
422+
/* ----------------------------------------------------------------
423+
* Parallel Scan Support
424+
* ----------------------------------------------------------------
425+
*/
426+
427+
/* ----------------------------------------------------------------
428+
* ExecTidRangeScanEstimate
429+
*
430+
* Compute the amount of space we'll need in the parallel
431+
* query DSM, and inform pcxt->estimator about our needs.
432+
* ----------------------------------------------------------------
433+
*/
434+
void
435+
ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt)
436+
{
437+
EState *estate = node->ss.ps.state;
438+
439+
node->trss_pscanlen =
440+
table_parallelscan_estimate(node->ss.ss_currentRelation,
441+
estate->es_snapshot);
442+
shm_toc_estimate_chunk(&pcxt->estimator, node->trss_pscanlen);
443+
shm_toc_estimate_keys(&pcxt->estimator, 1);
444+
}
445+
446+
/* ----------------------------------------------------------------
447+
* ExecTidRangeScanInitializeDSM
448+
*
449+
* Set up a parallel TID scan descriptor.
450+
* ----------------------------------------------------------------
451+
*/
452+
void
453+
ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt)
454+
{
455+
EState *estate = node->ss.ps.state;
456+
ParallelTableScanDesc pscan;
457+
458+
pscan = shm_toc_allocate(pcxt->toc, node->trss_pscanlen);
459+
table_parallelscan_initialize(node->ss.ss_currentRelation,
460+
pscan,
461+
estate->es_snapshot);
462+
shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
463+
464+
/*
465+
* Initialize parallel scan descriptor with given TID range if it can be
466+
* evaluated successfully.
467+
*/
468+
if (TidRangeEval(node))
469+
node->ss.ss_currentScanDesc =
470+
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan,
471+
&node->trss_mintid, &node->trss_maxtid);
472+
else
473+
node->ss.ss_currentScanDesc =
474+
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan,
475+
NULL, NULL);
476+
}
477+
478+
/* ----------------------------------------------------------------
479+
* ExecTidRangeScanReInitializeDSM
480+
*
481+
* Reset shared state before beginning a fresh scan.
482+
* ----------------------------------------------------------------
483+
*/
484+
void
485+
ExecTidRangeScanReInitializeDSM(TidRangeScanState *node,
486+
ParallelContext *pcxt)
487+
{
488+
ParallelTableScanDesc pscan;
489+
490+
pscan = node->ss.ss_currentScanDesc->rs_parallel;
491+
table_parallelscan_reinitialize(node->ss.ss_currentRelation, pscan);
492+
493+
/* Set the new TID range if it can be evaluated successfully */
494+
if (TidRangeEval(node))
495+
node->ss.ss_currentRelation->rd_tableam->scan_set_tidrange(
496+
node->ss.ss_currentScanDesc, &node->trss_mintid,
497+
&node->trss_maxtid);
498+
else
499+
node->ss.ss_currentRelation->rd_tableam->scan_set_tidrange(
500+
node->ss.ss_currentScanDesc, NULL, NULL);
501+
}
502+
503+
/* ----------------------------------------------------------------
504+
* ExecTidRangeScanInitializeWorker
505+
*
506+
* Copy relevant information from TOC into planstate.
507+
* ----------------------------------------------------------------
508+
*/
509+
void
510+
ExecTidRangeScanInitializeWorker(TidRangeScanState *node,
511+
ParallelWorkerContext *pwcxt)
512+
{
513+
ParallelTableScanDesc pscan;
514+
515+
pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
516+
517+
if (TidRangeEval(node))
518+
node->ss.ss_currentScanDesc =
519+
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan,
520+
&node->trss_mintid, &node->trss_maxtid);
521+
else
522+
node->ss.ss_currentScanDesc =
523+
table_beginscan_parallel_tidrange(node->ss.ss_currentRelation, pscan,
524+
NULL, NULL);
525+
}

src/backend/optimizer/path/costsize.c

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1340,8 +1340,9 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
13401340
{
13411341
Selectivity selectivity;
13421342
double pages;
1343-
Cost startup_cost = 0;
1344-
Cost run_cost = 0;
1343+
Cost startup_cost;
1344+
Cost cpu_run_cost;
1345+
Cost disk_run_cost;
13451346
QualCost qpqual_cost;
13461347
Cost cpu_per_tuple;
13471348
QualCost tid_qual_cost;
@@ -1370,11 +1371,7 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
13701371

13711372
/*
13721373
* The first page in a range requires a random seek, but each subsequent
1373-
* page is just a normal sequential page read. NOTE: it's desirable for
1374-
* TID Range Scans to cost more than the equivalent Sequential Scans,
1375-
* because Seq Scans have some performance advantages such as scan
1376-
* synchronization and parallelizability, and we'd prefer one of them to
1377-
* be picked unless a TID Range Scan really is better.
1374+
* page is just a normal sequential page read.
13781375
*/
13791376
ntuples = selectivity * baserel->tuples;
13801377
nseqpages = pages - 1.0;
@@ -1391,7 +1388,7 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
13911388
&spc_seq_page_cost);
13921389

13931390
/* disk costs; 1 random page and the remainder as seq pages */
1394-
run_cost += spc_random_page_cost + spc_seq_page_cost * nseqpages;
1391+
disk_run_cost = spc_random_page_cost + spc_seq_page_cost * nseqpages;
13951392

13961393
/* Add scanning CPU costs */
13971394
get_restriction_qual_cost(root, baserel, param_info, &qpqual_cost);
@@ -1403,20 +1400,35 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
14031400
* can't be removed, this is a mistake and we're going to underestimate
14041401
* the CPU cost a bit.)
14051402
*/
1406-
startup_cost += qpqual_cost.startup + tid_qual_cost.per_tuple;
1403+
startup_cost = qpqual_cost.startup + tid_qual_cost.per_tuple;
14071404
cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple -
14081405
tid_qual_cost.per_tuple;
1409-
run_cost += cpu_per_tuple * ntuples;
1406+
cpu_run_cost = cpu_per_tuple * ntuples;
14101407

14111408
/* tlist eval costs are paid per output row, not per tuple scanned */
14121409
startup_cost += path->pathtarget->cost.startup;
1413-
run_cost += path->pathtarget->cost.per_tuple * path->rows;
1410+
cpu_run_cost += path->pathtarget->cost.per_tuple * path->rows;
1411+
1412+
/* Adjust costing for parallelism, if used. */
1413+
if (path->parallel_workers > 0)
1414+
{
1415+
double parallel_divisor = get_parallel_divisor(path);
1416+
1417+
/* The CPU cost is divided among all the workers. */
1418+
cpu_run_cost /= parallel_divisor;
1419+
1420+
/*
1421+
* In the case of a parallel plan, the row count needs to represent
1422+
* the number of tuples processed per worker.
1423+
*/
1424+
path->rows = clamp_row_est(path->rows / parallel_divisor);
1425+
}
14141426

14151427
/* we should not generate this path type when enable_tidscan=false */
14161428
Assert(enable_tidscan);
14171429
path->disabled_nodes = 0;
14181430
path->startup_cost = startup_cost;
1419-
path->total_cost = startup_cost + run_cost;
1431+
path->total_cost = startup_cost + cpu_run_cost + disk_run_cost;
14201432
}
14211433

14221434
/*

src/backend/optimizer/path/tidpath.c

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include "optimizer/pathnode.h"
4848
#include "optimizer/paths.h"
4949
#include "optimizer/restrictinfo.h"
50+
#include "optimizer/cost.h"
5051

5152

5253
/*
@@ -553,7 +554,24 @@ create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel)
553554

554555
add_path(rel, (Path *) create_tidrangescan_path(root, rel,
555556
tidrangequals,
556-
required_outer));
557+
required_outer,
558+
0));
559+
560+
/* If appropriate, consider parallel tid range scan. */
561+
if (rel->consider_parallel && required_outer == NULL)
562+
{
563+
int parallel_workers;
564+
565+
parallel_workers = compute_parallel_worker(rel, rel->pages, -1,
566+
max_parallel_workers_per_gather);
567+
568+
if (parallel_workers > 0)
569+
add_partial_path(rel, (Path *) create_tidrangescan_path(root,
570+
rel,
571+
tidrangequals,
572+
required_outer,
573+
parallel_workers));
574+
}
557575
}
558576

559577
/*

src/backend/optimizer/util/pathnode.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1262,7 +1262,8 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
12621262
*/
12631263
TidRangePath *
12641264
create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
1265-
List *tidrangequals, Relids required_outer)
1265+
List *tidrangequals, Relids required_outer,
1266+
int parallel_workers)
12661267
{
12671268
TidRangePath *pathnode = makeNode(TidRangePath);
12681269

@@ -1271,9 +1272,9 @@ create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
12711272
pathnode->path.pathtarget = rel->reltarget;
12721273
pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
12731274
required_outer);
1274-
pathnode->path.parallel_aware = false;
1275+
pathnode->path.parallel_aware = (parallel_workers > 0);
12751276
pathnode->path.parallel_safe = rel->consider_parallel;
1276-
pathnode->path.parallel_workers = 0;
1277+
pathnode->path.parallel_workers = parallel_workers;
12771278
pathnode->path.pathkeys = NIL; /* always unordered */
12781279

12791280
pathnode->tidrangequals = tidrangequals;

0 commit comments

Comments
 (0)