@@ -65,7 +65,7 @@ static bool guessed = false; /* T if we had to guess at any values */
6565static const char * progname ;
6666static uint32 set_xid_epoch = (uint32 ) - 1 ;
6767static TransactionId set_oldest_xid = 0 ;
68- static TransactionId set_xid = 0 ;
68+ static uint64 set_xid = 0 ;
6969static TransactionId set_oldest_commit_ts_xid = 0 ;
7070static TransactionId set_newest_commit_ts_xid = 0 ;
7171static Oid set_oid = 0 ;
@@ -89,7 +89,41 @@ static void KillExistingArchiveStatus(void);
8989static void KillExistingWALSummaries (void );
9090static void WriteEmptyXLOG (void );
9191static void usage (void );
92+ static void AdvanceNextXid (TransactionId oldval , TransactionId newval );
93+ static void AdvanceNextMultiXid (MultiXactId oldval , MultiXactId newval );
9294
95+ /*
96+ * Note: this structure is copied from commit_ts.c and should be kept in sync.
97+ */
98+ typedef struct CommitTimestampEntry
99+ {
100+ TimestampTz time ;
101+ RepOriginId nodeid ;
102+ } CommitTimestampEntry ;
103+
104+ /*
105+ * Note: these macros are copied from clog.c, commit_ts.c and subtrans.c and
106+ * should be kept in sync.
107+ */
108+ #define CLOG_BITS_PER_XACT 2
109+ #define CLOG_XACTS_PER_BYTE 4
110+ #define CLOG_XACTS_PER_PAGE (BLCKSZ * CLOG_XACTS_PER_BYTE)
111+
112+ #define TransactionIdToPgIndex (xid ) ((xid) % (TransactionId) CLOG_XACTS_PER_PAGE)
113+ #define TransactionIdToByte (xid ) (TransactionIdToPgIndex(xid) / CLOG_XACTS_PER_BYTE)
114+ #define TransactionIdToBIndex (xid ) ((xid) % (TransactionId) CLOG_XACTS_PER_BYTE)
115+
116+ #define SUBTRANS_XACTS_PER_PAGE (BLCKSZ / sizeof(TransactionId))
117+
118+ #define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
119+ sizeof(RepOriginId))
120+
121+ #define COMMIT_TS_XACTS_PER_PAGE \
122+ (BLCKSZ / SizeOfCommitTimestampEntry)
123+
124+ #define MULTIXACT_OFFSETS_PER_PAGE (BLCKSZ / sizeof(MultiXactOffset))
125+
126+ #define SLRU_PAGES_PER_SEGMENT 32
93127
94128int
95129main (int argc , char * argv [])
@@ -441,9 +475,47 @@ main(int argc, char *argv[])
441475 }
442476
443477 if (set_xid != 0 )
478+ {
479+ FullTransactionId current_fxid = ControlFile .checkPointCopy .nextXid ;
480+ FullTransactionId full_datfrozenxid ;
481+ uint32 current_epoch ;
482+
483+ full_datfrozenxid =
484+ FullTransactionIdFromEpochAndXid (EpochFromFullTransactionId (current_fxid ),
485+ ControlFile .checkPointCopy .oldestXid );
486+
487+ if (set_xid > full_datfrozenxid .value &&
488+ (set_xid - full_datfrozenxid .value ) > INT32_MAX )
489+ {
490+ /*
491+ * Cannot advance transaction ID in this case, because all unfrozen
492+ * transactions in cluster will be considered as 'future' for given
493+ * and all subsequent transaction IDs.
494+ */
495+ pg_fatal ("transaction ID (-x) cannot be ahead of datfrozenxid by %u" , INT32_MAX );
496+ }
497+ else if (set_xid >= MaxTransactionId )
498+ {
499+ /*
500+ * Given transaction ID might exeed current epoch, so advance epoch
501+ * if needed.
502+ */
503+ current_epoch = set_xid / MaxTransactionId ;
504+ set_xid = set_xid % MaxTransactionId ;
505+ }
506+ else
507+ current_epoch = EpochFromFullTransactionId (current_fxid );
508+
444509 ControlFile .checkPointCopy .nextXid =
445- FullTransactionIdFromEpochAndXid (EpochFromFullTransactionId (ControlFile .checkPointCopy .nextXid ),
446- set_xid );
510+ FullTransactionIdFromEpochAndXid (current_epoch , set_xid );
511+
512+ if (FullTransactionIdPrecedes (current_fxid , ControlFile .checkPointCopy .nextXid ) &&
513+ !noupdate )
514+ {
515+ AdvanceNextXid (XidFromFullTransactionId (current_fxid ),
516+ XidFromFullTransactionId (ControlFile .checkPointCopy .nextXid ));
517+ }
518+ }
447519
448520 if (set_oldest_commit_ts_xid != 0 )
449521 ControlFile .checkPointCopy .oldestCommitTsXid = set_oldest_commit_ts_xid ;
@@ -455,12 +527,19 @@ main(int argc, char *argv[])
455527
456528 if (set_mxid != 0 )
457529 {
530+ MultiXactId current_mxid = ControlFile .checkPointCopy .nextMulti ;
458531 ControlFile .checkPointCopy .nextMulti = set_mxid ;
459532
460533 ControlFile .checkPointCopy .oldestMulti = set_oldestmxid ;
461534 if (ControlFile .checkPointCopy .oldestMulti < FirstMultiXactId )
462535 ControlFile .checkPointCopy .oldestMulti += FirstMultiXactId ;
463536 ControlFile .checkPointCopy .oldestMultiDB = InvalidOid ;
537+
538+ /*
539+ * If current_mxid precedes set_mxid.
540+ */
541+ if (((int32 ) (current_mxid - set_mxid ) < 0 ) && !noupdate )
542+ AdvanceNextMultiXid (current_mxid , set_mxid );
464543 }
465544
466545 if (set_mxoff != -1 )
@@ -1218,3 +1297,256 @@ usage(void)
12181297 printf (_ ("\nReport bugs to <%s>.\n" ), PACKAGE_BUGREPORT );
12191298 printf (_ ("%s home page: <%s>\n" ), PACKAGE_NAME , PACKAGE_URL );
12201299}
1300+
1301+ /*
1302+ * Calculate how many xacts can fit one page of given SLRU type.
1303+ */
1304+ static int64
1305+ calculate_xacts_per_page (char * slru_type )
1306+ {
1307+ int64 result = -1 ;
1308+
1309+ if (strcmp (slru_type , "pg_xact" ) == 0 )
1310+ result = CLOG_XACTS_PER_PAGE ;
1311+ else if (strcmp (slru_type , "pg_commit_ts" ) == 0 )
1312+ result = COMMIT_TS_XACTS_PER_PAGE ;
1313+ else if (strcmp (slru_type , "pg_subtrans" ) == 0 )
1314+ result = SUBTRANS_XACTS_PER_PAGE ;
1315+ else if (strcmp (slru_type , "pg_multixact/offsets" ) == 0 )
1316+ result = MULTIXACT_OFFSETS_PER_PAGE ;
1317+ else
1318+ pg_fatal ("unknown SLRU type : %s" , slru_type );
1319+
1320+ return result ;
1321+ }
1322+
1323+ /*
1324+ * Fill given SLRU segment with zeroes.
1325+ */
1326+ static void
1327+ zero_segment (int fd , char * path )
1328+ {
1329+ char zeroes [BLCKSZ ] = {0 };
1330+
1331+ for (int i = 0 ; i < SLRU_PAGES_PER_SEGMENT ; i ++ )
1332+ {
1333+ errno = 0 ;
1334+ if (write (fd , zeroes , BLCKSZ ) != BLCKSZ )
1335+ {
1336+ if (errno == 0 )
1337+ errno = ENOSPC ;
1338+ pg_fatal ("could not write file \"%s\": %m" , path );
1339+ }
1340+ }
1341+ }
1342+
1343+ /*
1344+ * Fill entry for given transaction ID with zeroes in clog.
1345+ */
1346+ static void
1347+ zero_clog_xact_info (int fd , char * path , char * slru_type , TransactionId xid )
1348+ {
1349+ int64 pageno ;
1350+ int byteno = TransactionIdToByte (xid );
1351+ int bshift = TransactionIdToBIndex (xid ) * CLOG_BITS_PER_XACT ;
1352+ char * byteptr ;
1353+ char byteval ;
1354+ int status = 0x00 ;
1355+ char buff [BLCKSZ ];
1356+
1357+ pageno = (xid / CLOG_XACTS_PER_PAGE ) % SLRU_PAGES_PER_SEGMENT ;
1358+
1359+ if (lseek (fd , pageno * BLCKSZ , SEEK_SET ) != pageno * BLCKSZ )
1360+ pg_fatal ("could not iterate through file \"%s\": %m" , path );
1361+
1362+ if (read (fd , buff , BLCKSZ ) != BLCKSZ )
1363+ pg_fatal ("could not read file \"%s\": %m" , path );
1364+
1365+ byteptr = buff + byteno ;
1366+
1367+ byteval = * byteptr ;
1368+ byteval &= ~(((1 << CLOG_BITS_PER_XACT ) - 1 ) << bshift );
1369+ byteval |= (status << bshift );
1370+ * byteptr = byteval ;
1371+
1372+ if (write (fd , buff , BLCKSZ ) != BLCKSZ )
1373+ {
1374+ if (errno == 0 )
1375+ errno = ENOSPC ;
1376+ pg_fatal ("could not write file \"%s\": %m" , path );
1377+ }
1378+ }
1379+
1380+ /*
1381+ * Fill entry for given transaction ID with zeroes in specified SLRU type.
1382+ */
1383+ static void
1384+ zero_xact_info (int fd , char * path , char * slru_type , TransactionId xid )
1385+ {
1386+ int offset = 0 ,
1387+ entry_size = 0 ;
1388+ int64 pageno ;
1389+ char buff [BLCKSZ ];
1390+
1391+ if (strcmp (slru_type , "pg_xact" ) == 0 )
1392+ {
1393+ zero_clog_xact_info (fd , path , slru_type , xid );
1394+ return ;
1395+ }
1396+ else if (strcmp (slru_type , "pg_commit_ts" ) == 0 )
1397+ {
1398+ entry_size = SizeOfCommitTimestampEntry ;
1399+ offset = (xid % COMMIT_TS_XACTS_PER_PAGE ) * entry_size ;
1400+ pageno = xid / COMMIT_TS_XACTS_PER_PAGE ;
1401+ }
1402+ else if (strcmp (slru_type , "pg_subtrans" ) == 0 )
1403+ {
1404+ entry_size = sizeof (TransactionId );
1405+ offset = (xid % SUBTRANS_XACTS_PER_PAGE ) * entry_size ;
1406+ pageno = xid / SUBTRANS_XACTS_PER_PAGE ;
1407+ }
1408+ else if (strcmp (slru_type , "pg_multixact/offsets" ) == 0 )
1409+ {
1410+ entry_size = sizeof (MultiXactOffset );
1411+ offset = (xid % MULTIXACT_OFFSETS_PER_PAGE ) * entry_size ;
1412+ pageno = xid / MULTIXACT_OFFSETS_PER_PAGE ;
1413+ }
1414+ else
1415+ pg_fatal ("unknown SLRU type : %s" , slru_type );
1416+
1417+ if (lseek (fd , pageno * BLCKSZ , SEEK_SET ) != pageno * BLCKSZ )
1418+ pg_fatal ("could not iterate through file \"%s\": %m" , path );
1419+
1420+ if (read (fd , buff , BLCKSZ ) != BLCKSZ )
1421+ pg_fatal ("could not read file \"%s\": %m" , path );
1422+
1423+ memset (buff + offset , 0 , entry_size );
1424+
1425+ if (write (fd , buff , BLCKSZ ) != BLCKSZ )
1426+ {
1427+ if (errno == 0 )
1428+ errno = ENOSPC ;
1429+ pg_fatal ("could not write file \"%s\": %m" , path );
1430+ }
1431+ }
1432+
1433+ /*
1434+ * Make sure that given xid has entry in specified SLRU type.
1435+ */
1436+ static void
1437+ enlarge_slru (TransactionId xid , char * dir )
1438+ {
1439+ char path [MAXPGPATH ];
1440+ int fd ,
1441+ flags = O_RDWR | O_APPEND | O_EXCL | PG_BINARY ;
1442+ int64 segno ,
1443+ pageno ,
1444+ xacts_per_page ;
1445+
1446+ xacts_per_page = calculate_xacts_per_page (dir );
1447+ pageno = xid / xacts_per_page ;
1448+ segno = pageno / SLRU_PAGES_PER_SEGMENT ;
1449+
1450+ snprintf (path , MAXPGPATH , "%s/%04X" , dir , (unsigned int ) segno );
1451+
1452+ errno = 0 ;
1453+ if (access (path , F_OK ) != 0 )
1454+ {
1455+ if (errno != ENOENT )
1456+ pg_fatal ("cannot access file \"%s\" : %m" , path );
1457+
1458+ flags |= O_CREAT ;
1459+ }
1460+
1461+ /*
1462+ * Create or open segment file
1463+ */
1464+ fd = open (path , flags , pg_file_create_mode );
1465+ if (fd < 0 )
1466+ pg_fatal ("could not create/open file \"%s\": %m" , path );
1467+
1468+ /*
1469+ * If segment doen't exist - create segment and fill all it's pages
1470+ * with zeroes.
1471+ */
1472+ if (flags & O_CREAT )
1473+ zero_segment (fd , path );
1474+ /*
1475+ * If segment already exists - fill with zeroes given transaction's
1476+ * entry in it.
1477+ */
1478+ else
1479+ zero_xact_info (fd , path , dir , xid );
1480+
1481+ if (fsync (fd ) != 0 )
1482+ pg_fatal ("fsync error: %m" );
1483+
1484+ close (fd );
1485+ }
1486+
1487+ /*
1488+ * Extend clog so that is can accomodate statuses of all transactions from
1489+ * oldval to newval.
1490+ */
1491+ static void
1492+ AdvanceNextXid (TransactionId oldval , TransactionId newval )
1493+ {
1494+ int64 current_segno = -1 , /* last existing slru segment */
1495+ pageno ,
1496+ segno ;
1497+
1498+ if (newval < oldval ) /* handle wraparound */
1499+ oldval = FirstNormalTransactionId ;
1500+ else /* oldval already has entry in clog */
1501+ oldval += 1 ;
1502+
1503+ for (TransactionId xid = oldval ; xid <= newval ; xid ++ )
1504+ {
1505+ pageno = xid / CLOG_XACTS_PER_PAGE ;
1506+ segno = pageno / SLRU_PAGES_PER_SEGMENT ;
1507+
1508+ /*
1509+ * We already zeroed all necessary pages in this segment during
1510+ * previous xid processing.
1511+ */
1512+ if (segno == current_segno )
1513+ continue ;
1514+
1515+ enlarge_slru (xid , "pg_xact" );
1516+
1517+ current_segno = segno ;
1518+ }
1519+
1520+ pageno = newval / COMMIT_TS_XACTS_PER_PAGE ;
1521+ if (pageno > (oldval / COMMIT_TS_XACTS_PER_PAGE ))
1522+ {
1523+ enlarge_slru (newval , "pg_commit_ts" );
1524+ }
1525+
1526+ pageno = (newval / SUBTRANS_XACTS_PER_PAGE );
1527+ if (pageno > (oldval / SUBTRANS_XACTS_PER_PAGE ))
1528+ {
1529+ enlarge_slru (newval , "pg_subtrans" );
1530+ }
1531+ }
1532+
1533+ static void
1534+ AdvanceNextMultiXid (MultiXactId oldval , MultiXactId newval )
1535+ {
1536+ int64 current_segno = -1 ,
1537+ pageno ,
1538+ segno ;
1539+
1540+ for (MultiXactId mxid = oldval + 1 ; mxid <= newval ; mxid ++ )
1541+ {
1542+ pageno = mxid / MULTIXACT_OFFSETS_PER_PAGE ;
1543+ segno = pageno / SLRU_PAGES_PER_SEGMENT ;
1544+
1545+ if (segno == current_segno )
1546+ continue ;
1547+
1548+ enlarge_slru (mxid , "pg_multixact/offsets" );
1549+
1550+ current_segno = segno ;
1551+ }
1552+ }
0 commit comments