diff --git a/src/bin/pg_dump/pg_backup_custom.c b/src/bin/pg_dump/pg_backup_custom.c index 2226520dffca..4e105f9e80e5 100644 --- a/src/bin/pg_dump/pg_backup_custom.c +++ b/src/bin/pg_dump/pg_backup_custom.c @@ -823,6 +823,39 @@ _ReopenArchive(ArchiveHandle *AH) pg_fatal("could not set seek position in archive file: %m"); } +/* + * Read through (skip) one full data entry from the archive. + * Return the size, and save the dumpId of the scanned entry. + */ +static size_t +_ReadOneFullEntry(ArchiveHandle *AH, pgoff_t curPos, DumpId *scanned_id) +{ + int blkType; + + _readBlockHeader(AH, &blkType, scanned_id); + + if (blkType == EOF) + return 0; + + switch (blkType) + { + case BLK_DATA: + _skipData(AH); + break; + + case BLK_BLOBS: + _skipLOs(AH); + break; + + default: /* Always have a default */ + pg_fatal("unrecognized data block type (%d) while searching archive", + blkType); + break; + } + + return ftello(AH->FH) - curPos; +} + /* * Prepare for parallel restore. * @@ -837,9 +870,16 @@ static void _PrepParallelRestore(ArchiveHandle *AH) { lclContext *ctx = (lclContext *) AH->formatData; + TocEntry *te; TocEntry *prev_te = NULL; lclTocEntry *prev_tctx = NULL; - TocEntry *te; + const int hasSeek = ctx->hasSeek; + pgoff_t curPos = ctx->lastFilePos; + bool first_data_entry = true; + + if (hasSeek && ( + fseeko(AH->FH, curPos, SEEK_SET) != 0)) + pg_fatal("error during file seek: %m"); /* * Knowing that the data items were dumped out in TOC order, we can @@ -850,12 +890,51 @@ _PrepParallelRestore(ArchiveHandle *AH) { lclTocEntry *tctx = (lclTocEntry *) te->formatData; + if (tctx->dataState == K_OFFSET_POS_SET) + { + if (first_data_entry && tctx->dataPos != curPos) + pg_fatal("data for first archive entry %d is recorded to" + " start at %lld but it really starts at %lld", + te->dumpId, (long long) tctx->dataPos, + (long long) curPos); + } + + /* + * If the archive's TOC is missing the positions of each entry, then + * most likely the archive was written to a pipe. If we are not + * reading from a pipe now, we can do one extra scan to find the + * missing positions. They are needed in parallel restore in order to + * assign TOC entries as jobs to the workers. + */ + else if (tctx->dataState == K_OFFSET_POS_NOT_SET && hasSeek) + { + DumpId dump_id; + size_t entry_size; + + if (first_data_entry) + { + pg_log_info("archive does not contain offsets;" + " doing one full scan to calculate them..."); + } + /* Fill in the missing info */ + tctx->dataPos = curPos; + tctx->dataState = K_OFFSET_POS_SET; + /* Advance */ + entry_size = _ReadOneFullEntry(AH, curPos, &dump_id); + if (dump_id != te->dumpId) + pg_fatal("found unexpected dump ID %d -- expected %d", + dump_id, te->dumpId); + curPos += entry_size; + } + /* - * Ignore entries without a known data offset; if we were unable to - * seek to rewrite the TOC when creating the archive, this'll be all - * of them, and we'll end up with no size estimates. + * When we can't seek, ignore data entries without a known data + * offset. This shouldn't happen in parallel restore though, the + * archive should always be seekable. Also skip K_OFFSET_NO_DATA + * entries since they are not present in the data section of the + * archive. */ - if (tctx->dataState != K_OFFSET_POS_SET) + else continue; /* Compute previous data item's length */ @@ -867,10 +946,11 @@ _PrepParallelRestore(ArchiveHandle *AH) prev_te = te; prev_tctx = tctx; + first_data_entry = false; } /* If OK to seek, we can determine the length of the last item */ - if (prev_te && ctx->hasSeek) + if (prev_te && hasSeek) { pgoff_t endpos;