From f4701a08ccfa35c41da181b25a7f639222a9301f Mon Sep 17 00:00:00 2001 From: Dimitrios Apostolou Date: Sat, 29 Mar 2025 01:16:19 +0100 Subject: [PATCH] parallel pg_restore: move offset-building phase to before forking A pg_dump custom format archive without offsets in the table of contents, is usually generated when pg_dump writes to stdout instead of a file. When doing parallel pg_restore (-j) from such a file, every worker process was scanning the full archive sequentially, in order to build the offset table and find the parts assigned to restore. This led to the worker processes competing for I/O. This patch moves this offset-table building phase to the parent process, before forking the worker processes. The upside is that we now have only one extra scan of the file. And this scan happens without other competing I/O, so it completes faster. The downside is that there is a delay before spawning the children and starting assigning jobs to them. --- src/bin/pg_dump/pg_backup_custom.c | 92 ++++++++++++++++++++++++++++-- 1 file changed, 86 insertions(+), 6 deletions(-) diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 2226520dffca..4e105f9e80e5 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -823,6 +823,39 @@ _ReopenArchive(ArchiveHandle *AH) pg_fatal("could not set seek position in archive file: %m"); } +/* + * Read through (skip) one full data entry from the archive. + * Return the size, and save the dumpId of the scanned entry. + */ +static size_t +_ReadOneFullEntry(ArchiveHandle *AH, pgoff_t curPos, DumpId *scanned_id) +{ + int blkType; + + _readBlockHeader(AH, &blkType, scanned_id); + + if (blkType == EOF) + return 0; + + switch (blkType) + { + case BLK_DATA: + _skipData(AH); + break; + + case BLK_BLOBS: + _skipLOs(AH); + break; + + default: /* Always have a default */ + pg_fatal("unrecognized data block type (%d) while searching archive", + blkType); + break; + } + + return ftello(AH->FH) - curPos; +} + /* * Prepare for parallel restore. * @@ -837,9 +870,16 @@ static void _PrepParallelRestore(ArchiveHandle *AH) { lclContext *ctx = (lclContext *) AH->formatData; + TocEntry *te; TocEntry *prev_te = NULL; lclTocEntry *prev_tctx = NULL; - TocEntry *te; + const int hasSeek = ctx->hasSeek; + pgoff_t curPos = ctx->lastFilePos; + bool first_data_entry = true; + + if (hasSeek && ( + fseeko(AH->FH, curPos, SEEK_SET) != 0)) + pg_fatal("error during file seek: %m"); /* * Knowing that the data items were dumped out in TOC order, we can @@ -850,12 +890,51 @@ _PrepParallelRestore(ArchiveHandle *AH) { lclTocEntry *tctx = (lclTocEntry *) te->formatData; + if (tctx->dataState == K_OFFSET_POS_SET) + { + if (first_data_entry && tctx->dataPos != curPos) + pg_fatal("data for first archive entry %d is recorded to" + " start at %lld but it really starts at %lld", + te->dumpId, (long long) tctx->dataPos, + (long long) curPos); + } + + /* + * If the archive's TOC is missing the positions of each entry, then + * most likely the archive was written to a pipe. If we are not + * reading from a pipe now, we can do one extra scan to find the + * missing positions. They are needed in parallel restore in order to + * assign TOC entries as jobs to the workers. + */ + else if (tctx->dataState == K_OFFSET_POS_NOT_SET && hasSeek) + { + DumpId dump_id; + size_t entry_size; + + if (first_data_entry) + { + pg_log_info("archive does not contain offsets;" + " doing one full scan to calculate them..."); + } + /* Fill in the missing info */ + tctx->dataPos = curPos; + tctx->dataState = K_OFFSET_POS_SET; + /* Advance */ + entry_size = _ReadOneFullEntry(AH, curPos, &dump_id); + if (dump_id != te->dumpId) + pg_fatal("found unexpected dump ID %d -- expected %d", + dump_id, te->dumpId); + curPos += entry_size; + } + /* - * Ignore entries without a known data offset; if we were unable to - * seek to rewrite the TOC when creating the archive, this'll be all - * of them, and we'll end up with no size estimates. + * When we can't seek, ignore data entries without a known data + * offset. This shouldn't happen in parallel restore though, the + * archive should always be seekable. Also skip K_OFFSET_NO_DATA + * entries since they are not present in the data section of the + * archive. */ - if (tctx->dataState != K_OFFSET_POS_SET) + else continue; /* Compute previous data item's length */ @@ -867,10 +946,11 @@ _PrepParallelRestore(ArchiveHandle *AH) prev_te = te; prev_tctx = tctx; + first_data_entry = false; } /* If OK to seek, we can determine the length of the last item */ - if (prev_te && ctx->hasSeek) + if (prev_te && hasSeek) { pgoff_t endpos;