@@ -823,6 +823,39 @@ _ReopenArchive(ArchiveHandle *AH)
823823 pg_fatal ("could not set seek position in archive file: %m" );
824824}
825825
826+ /*
827+ * Read through (skip) one full data entry from the archive.
828+ * Return the size, and save the dumpId of the scanned entry.
829+ */
830+ static size_t
831+ _ReadOneFullEntry (ArchiveHandle * AH , pgoff_t curPos , DumpId * scanned_id )
832+ {
833+ int blkType ;
834+
835+ _readBlockHeader (AH , & blkType , scanned_id );
836+
837+ if (blkType == EOF )
838+ return 0 ;
839+
840+ switch (blkType )
841+ {
842+ case BLK_DATA :
843+ _skipData (AH );
844+ break ;
845+
846+ case BLK_BLOBS :
847+ _skipLOs (AH );
848+ break ;
849+
850+ default : /* Always have a default */
851+ pg_fatal ("unrecognized data block type (%d) while searching archive" ,
852+ blkType );
853+ break ;
854+ }
855+
856+ return ftello (AH -> FH ) - curPos ;
857+ }
858+
826859/*
827860 * Prepare for parallel restore.
828861 *
@@ -837,9 +870,16 @@ static void
837870_PrepParallelRestore (ArchiveHandle * AH )
838871{
839872 lclContext * ctx = (lclContext * ) AH -> formatData ;
873+ TocEntry * te ;
840874 TocEntry * prev_te = NULL ;
841875 lclTocEntry * prev_tctx = NULL ;
842- TocEntry * te ;
876+ const int hasSeek = ctx -> hasSeek ;
877+ pgoff_t curPos = ctx -> lastFilePos ;
878+ bool first_data_entry = true;
879+
880+ if (hasSeek && (
881+ fseeko (AH -> FH , curPos , SEEK_SET ) != 0 ))
882+ pg_fatal ("error during file seek: %m" );
843883
844884 /*
845885 * Knowing that the data items were dumped out in TOC order, we can
@@ -850,12 +890,51 @@ _PrepParallelRestore(ArchiveHandle *AH)
850890 {
851891 lclTocEntry * tctx = (lclTocEntry * ) te -> formatData ;
852892
893+ if (tctx -> dataState == K_OFFSET_POS_SET )
894+ {
895+ if (first_data_entry && tctx -> dataPos != curPos )
896+ pg_fatal ("data for first archive entry %d is recorded to"
897+ " start at %lld but it really starts at %lld" ,
898+ te -> dumpId , (long long ) tctx -> dataPos ,
899+ (long long ) curPos );
900+ }
901+
902+ /*
903+ * If the archive's TOC is missing the positions of each entry, then
904+ * most likely the archive was written to a pipe. If we are not
905+ * reading from a pipe now, we can do one extra scan to find the
906+ * missing positions. They are needed in parallel restore in order to
907+ * assign TOC entries as jobs to the workers.
908+ */
909+ else if (tctx -> dataState == K_OFFSET_POS_NOT_SET && hasSeek )
910+ {
911+ DumpId dump_id ;
912+ size_t entry_size ;
913+
914+ if (first_data_entry )
915+ {
916+ pg_log_info ("archive does not contain offsets;"
917+ " doing one full scan to calculate them..." );
918+ }
919+ /* Fill in the missing info */
920+ tctx -> dataPos = curPos ;
921+ tctx -> dataState = K_OFFSET_POS_SET ;
922+ /* Advance */
923+ entry_size = _ReadOneFullEntry (AH , curPos , & dump_id );
924+ if (dump_id != te -> dumpId )
925+ pg_fatal ("found unexpected dump ID %d -- expected %d" ,
926+ dump_id , te -> dumpId );
927+ curPos += entry_size ;
928+ }
929+
853930 /*
854- * Ignore entries without a known data offset; if we were unable to
855- * seek to rewrite the TOC when creating the archive, this'll be all
856- * of them, and we'll end up with no size estimates.
931+ * When we can't seek, ignore data entries without a known data
932+ * offset. This shouldn't happen in parallel restore though, the
933+ * archive should always be seekable. Also skip K_OFFSET_NO_DATA
934+ * entries since they are not present in the data section of the
935+ * archive.
857936 */
858- if ( tctx -> dataState != K_OFFSET_POS_SET )
937+ else
859938 continue ;
860939
861940 /* Compute previous data item's length */
@@ -867,10 +946,11 @@ _PrepParallelRestore(ArchiveHandle *AH)
867946
868947 prev_te = te ;
869948 prev_tctx = tctx ;
949+ first_data_entry = false;
870950 }
871951
872952 /* If OK to seek, we can determine the length of the last item */
873- if (prev_te && ctx -> hasSeek )
953+ if (prev_te && hasSeek )
874954 {
875955 pgoff_t endpos ;
876956
0 commit comments