1818
1919#include  "access/xlog_internal.h" 
2020#include  "access/xlogdefs.h" 
21+ #include  "common/file_perm.h" 
2122#include  "common/logging.h" 
22- #include  "fe_utils/simple_list.h" 
2323#include  "pg_waldump.h" 
2424
2525/* 
@@ -42,10 +42,11 @@ typedef struct astreamer_waldump
4242
4343	/* These fields change with archive member. */ 
4444	bool 		skipThisSeg ;
45+ 	bool 		writeThisSeg ;
46+ 	FILE 	   * segFp ;
4547	XLogSegNo 	nextSegNo ;		/* Next expected segment to stream */ 
4648} astreamer_waldump ;
4749
48- static  int 	astreamer_archive_read (XLogDumpPrivate  * privateInfo );
4950static  void  astreamer_waldump_content (astreamer  * streamer ,
5051									  astreamer_member  * member ,
5152									  const  char  * data , int  len ,
@@ -56,7 +57,12 @@ static void astreamer_waldump_free(astreamer *streamer);
5657static  bool  member_is_relevant_wal (astreamer_waldump  * mystreamer ,
5758								   astreamer_member  * member ,
5859								   TimeLineID  startTimeLineID ,
60+ 								   char  * * curFname ,
5961								   XLogSegNo  * curSegNo );
62+ static  FILE  * member_prepare_tmp_write (XLogSegNo  curSegNo ,
63+ 									  const  char  * fname );
64+ static  XLogSegNo  member_next_segno (XLogSegNo  curSegNo ,
65+ 								   TimeLineID  timeline );
6066
6167static  const  astreamer_ops  astreamer_waldump_ops  =  {
6268	.content  =  astreamer_waldump_content ,
@@ -164,7 +170,7 @@ astreamer_wal_read(char *readBuff, XLogRecPtr targetPagePtr, Size count,
164170/* 
165171 * Reads the archive and passes it to the archive streamer for decompression. 
166172 */ 
167- static   int 
173+ int 
168174astreamer_archive_read (XLogDumpPrivate  * privateInfo )
169175{
170176	int 			rc ;
@@ -208,17 +214,8 @@ astreamer_waldump_new(XLogRecPtr startptr, XLogRecPtr endPtr,
208214	if  (XLogRecPtrIsInvalid (startptr ))
209215		streamer -> startSegNo  =  0 ;
210216	else 
211- 	{
212217		XLByteToSeg (startptr , streamer -> startSegNo , WalSegSz );
213218
214- 		/* 
215- 		 * Initialize the record pointer to the beginning of the first 
216- 		 * segment; this pointer will track the WAL record reading status. 
217- 		 */ 
218- 		XLogSegNoOffsetToRecPtr (streamer -> startSegNo , 0 , WalSegSz ,
219- 								privateInfo -> archive_streamer_read_ptr );
220- 	}
221- 
222219	if  (XLogRecPtrIsInvalid (endPtr ))
223220		streamer -> endSegNo  =  UINT64_MAX ;
224221	else 
@@ -247,14 +244,16 @@ astreamer_waldump_content(astreamer *streamer, astreamer_member *member,
247244	{
248245		case  ASTREAMER_MEMBER_HEADER :
249246			{
250- 				XLogSegNo 	 segNo ;
247+ 				char 	    * fname ;
251248
252249				pg_log_debug ("pg_waldump: reading \"%s\"" , member -> pathname );
253250
254251				mystreamer -> skipThisSeg  =  false;
252+ 				mystreamer -> writeThisSeg  =  false;
255253
256254				if  (!member_is_relevant_wal (mystreamer , member ,
257- 											privateInfo -> timeline , & segNo ))
255+ 											privateInfo -> timeline ,
256+ 											& fname , & privateInfo -> curSegNo ))
258257				{
259258					mystreamer -> skipThisSeg  =  true;
260259					break ;
@@ -267,33 +266,67 @@ astreamer_waldump_content(astreamer *streamer, astreamer_member *member,
267266				if  (READ_ANY_WAL (mystreamer ))
268267					break ;
269268
270- 				/* WAL segments must be archived in order */ 
271- 				if  (mystreamer -> nextSegNo  !=  segNo )
269+ 				/* 
270+ 				 * When WAL segments are not archived sequentially, it becomes 
271+ 				 * necessary to write out (or preserve) segments that might be 
272+ 				 * required at a later point. 
273+ 				 */ 
274+ 				if  (mystreamer -> nextSegNo  !=  privateInfo -> curSegNo )
272275				{
273- 					pg_log_error ( "WAL files are not archived in sequential order" ) ;
274- 					pg_log_error_detail ( "Expecting segment number "   UINT64_FORMAT   " but found "   UINT64_FORMAT   "." , 
275- 										 mystreamer -> nextSegNo ,  segNo );
276- 					exit ( 1 ) ;
276+ 					mystreamer -> writeThisSeg   =  true ;
277+ 					mystreamer -> segFp   = 
278+ 						member_prepare_tmp_write ( privateInfo -> curSegNo ,  fname );
279+ 					break ;
277280				}
278281
279282				/* 
280- 				 * We track the reading of WAL segment records using a pointer 
281- 				 * that's continuously incremented by the length of the 
282- 				 * received data. This pointer is crucial for serving WAL page 
283- 				 * requests from the WAL decoding routine, so it must be 
284- 				 * accurate. 
283+ 				 * If the buffer contains data, the next WAL record must 
284+ 				 * logically follow it. Otherwise, this file isn't the one we 
285+ 				 * need, and we must export it. 
285286				 */ 
286- #ifdef  USE_ASSERT_CHECKING 
287- 				if  (mystreamer -> nextSegNo  !=  0 )
287+ 				else  if  (privateInfo -> archive_streamer_buf -> len  !=  0 )
288288				{
289289					XLogRecPtr 	recPtr ;
290290
291- 					XLogSegNoOffsetToRecPtr (segNo , 0 , WalSegSz , recPtr );
292- 					Assert (privateInfo -> archive_streamer_read_ptr  ==  recPtr );
291+ 					XLogSegNoOffsetToRecPtr (privateInfo -> curSegNo , 0 , WalSegSz ,
292+ 											recPtr );
293+ 
294+ 					if  (privateInfo -> archive_streamer_read_ptr  !=  recPtr )
295+ 					{
296+ 						mystreamer -> writeThisSeg  =  true;
297+ 						mystreamer -> segFp  = 
298+ 							member_prepare_tmp_write (privateInfo -> curSegNo , fname );
299+ 
300+ 						/* Update the next expected segment number after this */ 
301+ 						mystreamer -> nextSegNo  = 
302+ 							member_next_segno (privateInfo -> curSegNo  +  1 ,
303+ 											  privateInfo -> timeline );
304+ 						break ;
305+ 					}
293306				}
294- #endif 
307+ 
308+ 				Assert (!mystreamer -> skipThisSeg );
309+ 				Assert (!mystreamer -> writeThisSeg );
310+ 
311+ 				/* 
312+ 				 * We are now streaming segment containt. 
313+ 				 * 
314+ 				 * We need to track the reading of WAL segment records using a 
315+ 				 * pointer that's typically incremented by the length of the 
316+ 				 * data read. However, we sometimes export the WAL file to 
317+ 				 * temporary storage, allowing the decoding routine to read 
318+ 				 * directly from there. This makes continuous pointer 
319+ 				 * incrementing challenging, as file reads can occur from any 
320+ 				 * offset, leading to potential errors. Therefore, we now 
321+ 				 * reset the pointer when reading from a file for streaming. 
322+ 				 */ 
323+ 				XLogSegNoOffsetToRecPtr (privateInfo -> curSegNo , 0 , WalSegSz ,
324+ 										privateInfo -> archive_streamer_read_ptr );
325+ 
295326				/* Update the next expected segment number */ 
296- 				mystreamer -> nextSegNo  +=  1 ;
327+ 				mystreamer -> nextSegNo  = 
328+ 					member_next_segno (privateInfo -> curSegNo ,
329+ 									  privateInfo -> timeline );
297330			}
298331			break ;
299332
@@ -302,12 +335,45 @@ astreamer_waldump_content(astreamer *streamer, astreamer_member *member,
302335			if  (mystreamer -> skipThisSeg )
303336				break ;
304337
338+ 			/* Or, write contents to file */ 
339+ 			if  (mystreamer -> writeThisSeg )
340+ 			{
341+ 				Assert (mystreamer -> segFp  !=  NULL );
342+ 
343+ 				errno  =  0 ;
344+ 				if  (len  >  0  &&  fwrite (data , len , 1 , mystreamer -> segFp ) !=  1 )
345+ 				{
346+ 					char 	   * fname ;
347+ 					int 			pathlen  =  strlen (member -> pathname );
348+ 
349+ 					Assert (pathlen  >= XLOG_FNAME_LEN );
350+ 
351+ 					fname  =  member -> pathname  +  (pathlen  -  XLOG_FNAME_LEN );
352+ 
353+ 					/* 
354+ 					 * If write didn't set errno, assume problem is no disk 
355+ 					 * space 
356+ 					 */ 
357+ 					if  (errno  ==  0 )
358+ 						errno  =  ENOSPC ;
359+ 					pg_fatal ("could not write to file \"%s\": %m" ,
360+ 							 get_tmp_wal_file_path (fname ));
361+ 				}
362+ 				break ;
363+ 			}
364+ 
305365			/* Or, copy contents to buffer */ 
306366			privateInfo -> archive_streamer_read_ptr  +=  len ;
307367			astreamer_buffer_bytes (streamer , & data , & len , len );
308368			break ;
309369
310370		case  ASTREAMER_MEMBER_TRAILER :
371+ 			if  (mystreamer -> segFp  !=  NULL )
372+ 			{
373+ 				fclose (mystreamer -> segFp );
374+ 				mystreamer -> segFp  =  NULL ;
375+ 			}
376+ 			privateInfo -> curSegNo  =  0 ;
311377			break ;
312378
313379		case  ASTREAMER_ARCHIVE_TRAILER :
@@ -334,8 +400,14 @@ astreamer_waldump_finalize(astreamer *streamer)
334400static  void 
335401astreamer_waldump_free (astreamer  * streamer )
336402{
403+ 	astreamer_waldump  * mystreamer ;
404+ 
337405	Assert (streamer -> bbs_next  ==  NULL );
338406
407+ 	mystreamer  =  (astreamer_waldump  * ) streamer ;
408+ 	if  (mystreamer -> segFp  !=  NULL )
409+ 		fclose (mystreamer -> segFp );
410+ 
339411	pfree (streamer -> bbs_buffer .data );
340412	pfree (streamer );
341413}
@@ -347,7 +419,8 @@ astreamer_waldump_free(astreamer *streamer)
347419 */ 
348420static  bool 
349421member_is_relevant_wal (astreamer_waldump  * mystreamer , astreamer_member  * member ,
350- 					   TimeLineID  startTimeLineID , XLogSegNo  * curSegNo )
422+ 					   TimeLineID  startTimeLineID , char  * * curFname ,
423+ 					   XLogSegNo  * curSegNo )
351424{
352425	int 			pathlen ;
353426	XLogSegNo 	segNo ;
@@ -382,7 +455,84 @@ member_is_relevant_wal(astreamer_waldump *mystreamer, astreamer_member *member,
382455			return  false;
383456	}
384457
458+ 	* curFname  =  fname ;
385459	* curSegNo  =  segNo ;
386460
387461	return  true;
388462}
463+ 
464+ /* 
465+  * Create an empty placeholder file and return its handle.  The file is also 
466+  * added to an exported list for future management, e.g.  access, deletion, and 
467+  * existence checks. 
468+  */ 
469+ static  FILE  * 
470+ member_prepare_tmp_write (XLogSegNo  curSegNo , const  char  * fname )
471+ {
472+ 	FILE 	   * file ;
473+ 	char 	   * fpath  =  get_tmp_wal_file_path (fname );
474+ 
475+ 	/* Create an empty placeholder */ 
476+ 	file  =  fopen (fpath , PG_BINARY_W );
477+ 	if  (file  ==  NULL )
478+ 		pg_fatal ("could not create file \"%s\": %m" , fpath );
479+ 
480+ #ifndef  WIN32 
481+ 	if  (chmod (fpath , pg_file_create_mode ))
482+ 		pg_fatal ("could not set permissions on file \"%s\": %m" ,
483+ 				 fpath );
484+ #endif 
485+ 
486+ 	pg_log_info ("temporarily exporting file \"%s\"" , fpath );
487+ 
488+ 	/* Record this segment's export */ 
489+ 	simple_string_list_append (& TmpWalSegList , fname );
490+ 	pfree (fpath );
491+ 
492+ 	return  file ;
493+ }
494+ 
495+ /* 
496+  * Get next WAL segment that needs to be retrieved from the archive. 
497+  * 
498+  * The function checks for the presence of a previously read and extracted WAL 
499+  * segment in the temporary storage. If a temporary file is found for that 
500+  * segment, it indicates the segment has already been successfully retrieved 
501+  * from the archive. In this case, the function increments the segment number 
502+  * and repeats the check. This process continues until a segment that has not 
503+  * yet been retrieved is found, at which point the function returns its number. 
504+  */ 
505+ static  XLogSegNo 
506+ member_next_segno (XLogSegNo  curSegNo , TimeLineID  timeline )
507+ {
508+ 	XLogSegNo 	nextSegNo  =  curSegNo  +  1 ;
509+ 	bool 		exists ;
510+ 
511+ 	/* 
512+ 	 * If we find a file that was previously written to the temporary space, 
513+ 	 * it indicates that the corresponding WAL segment request has already 
514+ 	 * been fulfilled. In that case, we increment the nextSegNo counter and 
515+ 	 * check again whether that segment number again. if found above steps 
516+ 	 * will be return if not then we return that segment number which would be 
517+ 	 * needed from the archive. 
518+ 	 */ 
519+ 	do 
520+ 	{
521+ 		char 		fname [MAXFNAMELEN ];
522+ 
523+ 		XLogFileName (fname , timeline , nextSegNo , WalSegSz );
524+ 
525+ 		/* 
526+ 		 * If the WAL segment has already been exported, increment the counter 
527+ 		 * and check for the next segment. 
528+ 		 */ 
529+ 		exists  =  false;
530+ 		if  (simple_string_list_member (& TmpWalSegList , fname ))
531+ 		{
532+ 			nextSegNo  +=  1 ;
533+ 			exists  =  true;
534+ 		}
535+ 	} while  (exists );
536+ 
537+ 	return  nextSegNo ;
538+ }
0 commit comments