@@ -130,6 +130,11 @@ typedef struct
130130
131131 int num_requests ; /* current # of requests */
132132 int max_requests ; /* allocated array size */
133+
134+ int head ; /* Index of the first request in the ring
135+ * buffer */
136+ int tail ; /* Index of the last request in the ring
137+ * buffer */
133138 CheckpointerRequest requests [FLEXIBLE_ARRAY_MEMBER ];
134139} CheckpointerShmemStruct ;
135140
@@ -138,6 +143,12 @@ static CheckpointerShmemStruct *CheckpointerShmem;
138143/* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */
139144#define WRITES_PER_ABSORB 1000
140145
146+ /* Maximum number of checkpointer requests to process in one batch */
147+ #define CKPT_REQ_BATCH_SIZE 10000
148+
149+ /* Max number of requests the checkpointer request queue can hold */
150+ #define MAX_CHECKPOINT_REQUESTS 10000000
151+
141152/*
142153 * GUC parameters
143154 */
@@ -973,7 +984,8 @@ CheckpointerShmemInit(void)
973984 */
974985 MemSet (CheckpointerShmem , 0 , size );
975986 SpinLockInit (& CheckpointerShmem -> ckpt_lck );
976- CheckpointerShmem -> max_requests = NBuffers ;
987+ CheckpointerShmem -> max_requests = Min (NBuffers , MAX_CHECKPOINT_REQUESTS );
988+ CheckpointerShmem -> head = CheckpointerShmem -> tail = 0 ;
977989 ConditionVariableInit (& CheckpointerShmem -> start_cv );
978990 ConditionVariableInit (& CheckpointerShmem -> done_cv );
979991 }
@@ -1201,6 +1213,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
12011213{
12021214 CheckpointerRequest * request ;
12031215 bool too_full ;
1216+ int insert_pos ;
12041217
12051218 if (!IsUnderPostmaster )
12061219 return false; /* probably shouldn't even get here */
@@ -1224,10 +1237,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
12241237 }
12251238
12261239 /* OK, insert request */
1227- request = & CheckpointerShmem -> requests [CheckpointerShmem -> num_requests ++ ];
1240+ insert_pos = CheckpointerShmem -> tail ;
1241+ request = & CheckpointerShmem -> requests [insert_pos ];
12281242 request -> ftag = * ftag ;
12291243 request -> type = type ;
12301244
1245+ CheckpointerShmem -> tail = (CheckpointerShmem -> tail + 1 ) % CheckpointerShmem -> max_requests ;
1246+ CheckpointerShmem -> num_requests ++ ;
1247+
12311248 /* If queue is more than half full, nudge the checkpointer to empty it */
12321249 too_full = (CheckpointerShmem -> num_requests >=
12331250 CheckpointerShmem -> max_requests / 2 );
@@ -1262,22 +1279,30 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type)
12621279 * Trying to do this every time the queue is full could lose if there
12631280 * aren't any removable entries. But that should be vanishingly rare in
12641281 * practice: there's one queue entry per shared buffer.
1282+ *
1283+ * To avoid large memory allocations when the queue contains many entries,
1284+ * we process requests incrementally in batches of CKPT_REQ_BATCH_SIZE.
1285+ * This limits memory usage to O(batch_size) instead of O(num_requests).
1286+ * Note that duplicates spanning batch boundaries won't be detected, but
1287+ * this trade-off is acceptable for memory scalability.
12651288 */
12661289static bool
12671290CompactCheckpointerRequestQueue (void )
12681291{
12691292 struct CheckpointerSlotMapping
12701293 {
12711294 CheckpointerRequest request ;
1272- int slot ;
1295+ int ring_idx ;
12731296 };
1274-
1275- int n ,
1276- preserve_count ;
1277- int num_skipped = 0 ;
1297+ int n ;
1298+ int total_num_skipped = 0 ;
1299+ int head ;
1300+ int max_requests ;
1301+ int num_requests ;
1302+ int read_idx ,
1303+ write_idx ;
1304+ int batch_start ;
12781305 HASHCTL ctl ;
1279- HTAB * htab ;
1280- bool * skip_slot ;
12811306
12821307 /* must hold CheckpointerCommLock in exclusive mode */
12831308 Assert (LWLockHeldByMe (CheckpointerCommLock ));
@@ -1286,81 +1311,118 @@ CompactCheckpointerRequestQueue(void)
12861311 if (CritSectionCount > 0 )
12871312 return false;
12881313
1289- /* Initialize skip_slot array */
1290- skip_slot = palloc0 (sizeof (bool ) * CheckpointerShmem -> num_requests );
1314+ max_requests = CheckpointerShmem -> max_requests ;
1315+ num_requests = CheckpointerShmem -> num_requests ;
1316+ head = CheckpointerShmem -> head ;
12911317
1292- /* Initialize temporary hash table */
1318+ /* Setup hash table control structure once */
12931319 ctl .keysize = sizeof (CheckpointerRequest );
12941320 ctl .entrysize = sizeof (struct CheckpointerSlotMapping );
12951321 ctl .hcxt = CurrentMemoryContext ;
12961322
1297- htab = hash_create ( "CompactCheckpointerRequestQueue" ,
1298- CheckpointerShmem -> num_requests ,
1299- & ctl ,
1300- HASH_ELEM | HASH_BLOBS | HASH_CONTEXT ) ;
1323+ /* Process and compact in batches */
1324+ read_idx = head ;
1325+ write_idx = head ;
1326+ batch_start = 0 ;
13011327
1302- /*
1303- * The basic idea here is that a request can be skipped if it's followed
1304- * by a later, identical request. It might seem more sensible to work
1305- * backwards from the end of the queue and check whether a request is
1306- * *preceded* by an earlier, identical request, in the hopes of doing less
1307- * copying. But that might change the semantics, if there's an
1308- * intervening SYNC_FORGET_REQUEST or SYNC_FILTER_REQUEST, so we do it
1309- * this way. It would be possible to be even smarter if we made the code
1310- * below understand the specific semantics of such requests (it could blow
1311- * away preceding entries that would end up being canceled anyhow), but
1312- * it's not clear that the extra complexity would buy us anything.
1313- */
1314- for (n = 0 ; n < CheckpointerShmem -> num_requests ; n ++ )
1328+ while (batch_start < num_requests )
13151329 {
1316- CheckpointerRequest * request ;
1317- struct CheckpointerSlotMapping * slotmap ;
1318- bool found ;
1330+ int batch_size = Min (num_requests - batch_start , CKPT_REQ_BATCH_SIZE );
1331+ HTAB * htab ;
1332+ bool * skip_slot ;
1333+ int batch_num_skipped = 0 ;
1334+ int batch_read_idx ;
1335+
1336+ /* Allocate skip array for this batch only */
1337+ skip_slot = palloc0 (sizeof (bool ) * batch_size );
1338+
1339+ htab = hash_create ("CompactCheckpointerRequestQueue_Batch" ,
1340+ batch_size ,
1341+ & ctl ,
1342+ HASH_ELEM | HASH_BLOBS | HASH_CONTEXT );
13191343
13201344 /*
1321- * We use the request struct directly as a hashtable key. This
1322- * assumes that any padding bytes in the structs are consistently the
1323- * same, which should be okay because we zeroed them in
1324- * CheckpointerShmemInit. Note also that RelFileLocator had better
1325- * contain no pad bytes.
1345+ * The basic idea here is that a request can be skipped if it's followed
1346+ * by a later, identical request within the same batch. It might seem more
1347+ * sensible to work backwards from the end of the queue and check whether a
1348+ * request is *preceded* by an earlier, identical request, in the hopes of
1349+ * doing less copying. But that might change the semantics, if there's an
1350+ * intervening SYNC_FORGET_REQUEST or SYNC_FILTER_REQUEST, so we do it
1351+ * this way. It would be possible to be even smarter if we made the code
1352+ * below understand the specific semantics of such requests (it could blow
1353+ * away preceding entries that would end up being canceled anyhow), but
1354+ * it's not clear that the extra complexity would buy us anything.
13261355 */
1327- request = & CheckpointerShmem -> requests [n ];
1328- slotmap = hash_search (htab , request , HASH_ENTER , & found );
1329- if (found )
1356+ batch_read_idx = read_idx ;
1357+ for (n = 0 ; n < batch_size ; n ++ )
13301358 {
1331- /* Duplicate, so mark the previous occurrence as skippable */
1332- skip_slot [slotmap -> slot ] = true;
1333- num_skipped ++ ;
1359+ CheckpointerRequest * request ;
1360+ struct CheckpointerSlotMapping * slotmap ;
1361+ bool found ;
1362+
1363+ /*
1364+ * We use the request struct directly as a hashtable key. This
1365+ * assumes that any padding bytes in the structs are consistently the
1366+ * same, which should be okay because we zeroed them in
1367+ * CheckpointerShmemInit. Note also that RelFileLocator had better
1368+ * contain no pad bytes.
1369+ */
1370+ request = & CheckpointerShmem -> requests [batch_read_idx ];
1371+ slotmap = hash_search (htab , request , HASH_ENTER , & found );
1372+ if (found )
1373+ {
1374+ /* Duplicate, so mark the previous occurrence as skippable */
1375+ skip_slot [slotmap -> ring_idx ] = true;
1376+ batch_num_skipped ++ ;
1377+ }
1378+ /* Remember slot containing latest occurrence of this request value */
1379+ slotmap -> ring_idx = n ; /* Index within this batch */
1380+ batch_read_idx = (batch_read_idx + 1 ) % max_requests ;
13341381 }
1335- /* Remember slot containing latest occurrence of this request value */
1336- slotmap -> slot = n ;
1337- }
13381382
1339- /* Done with the hash table. */
1340- hash_destroy (htab );
1383+ /* Done with the hash table. */
1384+ hash_destroy (htab );
13411385
1342- /* If no duplicates, we're out of luck. */
1343- if (!num_skipped )
1344- {
1386+ /* Compact this batch: copy non-skipped entries */
1387+ for (n = 0 ; n < batch_size ; n ++ )
1388+ {
1389+ /* If this slot is NOT skipped, keep it */
1390+ if (!skip_slot [n ])
1391+ {
1392+ /* If the read and write positions are different, copy the request */
1393+ if (write_idx != read_idx )
1394+ CheckpointerShmem -> requests [write_idx ] = CheckpointerShmem -> requests [read_idx ];
1395+
1396+ /* Advance the write position */
1397+ write_idx = (write_idx + 1 ) % max_requests ;
1398+ }
1399+
1400+ read_idx = (read_idx + 1 ) % max_requests ;
1401+ }
1402+
1403+ total_num_skipped += batch_num_skipped ;
1404+
1405+ /* Cleanup batch resources */
13451406 pfree (skip_slot );
1346- return false;
1347- }
13481407
1349- /* We found some duplicates; remove them. */
1350- preserve_count = 0 ;
1351- for (n = 0 ; n < CheckpointerShmem -> num_requests ; n ++ )
1352- {
1353- if (skip_slot [n ])
1354- continue ;
1355- CheckpointerShmem -> requests [preserve_count ++ ] = CheckpointerShmem -> requests [n ];
1408+ batch_start += batch_size ;
13561409 }
1410+
1411+ /* If no duplicates, we're out of luck. */
1412+ if (total_num_skipped == 0 )
1413+ return false;
1414+
1415+ /*
1416+ * Update ring buffer state: head remains the same, tail moves, count
1417+ * decreases
1418+ */
1419+ CheckpointerShmem -> tail = write_idx ;
1420+ CheckpointerShmem -> num_requests -= total_num_skipped ;
1421+
13571422 ereport (DEBUG1 ,
13581423 (errmsg_internal ("compacted fsync request queue from %d entries to %d entries" ,
1359- CheckpointerShmem -> num_requests , preserve_count )));
1360- CheckpointerShmem -> num_requests = preserve_count ;
1424+ num_requests , CheckpointerShmem -> num_requests )));
13611425
1362- /* Cleanup. */
1363- pfree (skip_slot );
13641426 return true;
13651427}
13661428
@@ -1378,40 +1440,61 @@ AbsorbSyncRequests(void)
13781440{
13791441 CheckpointerRequest * requests = NULL ;
13801442 CheckpointerRequest * request ;
1381- int n ;
1443+ int n ,
1444+ i ;
1445+ bool loop ;
13821446
13831447 if (!AmCheckpointerProcess ())
13841448 return ;
13851449
1386- LWLockAcquire (CheckpointerCommLock , LW_EXCLUSIVE );
1387-
1388- /*
1389- * We try to avoid holding the lock for a long time by copying the request
1390- * array, and processing the requests after releasing the lock.
1391- *
1392- * Once we have cleared the requests from shared memory, we have to PANIC
1393- * if we then fail to absorb them (eg, because our hashtable runs out of
1394- * memory). This is because the system cannot run safely if we are unable
1395- * to fsync what we have been told to fsync. Fortunately, the hashtable
1396- * is so small that the problem is quite unlikely to arise in practice.
1397- */
1398- n = CheckpointerShmem -> num_requests ;
1399- if (n > 0 )
1450+ do
14001451 {
1401- requests = (CheckpointerRequest * ) palloc (n * sizeof (CheckpointerRequest ));
1402- memcpy (requests , CheckpointerShmem -> requests , n * sizeof (CheckpointerRequest ));
1403- }
1452+ LWLockAcquire (CheckpointerCommLock , LW_EXCLUSIVE );
14041453
1405- START_CRIT_SECTION ();
1454+ /*
1455+ * We try to avoid holding the lock for a long time by copying the
1456+ * request array, and processing the requests after releasing the
1457+ * lock.
1458+ *
1459+ * Once we have cleared the requests from shared memory, we have to
1460+ * PANIC if we then fail to absorb them (eg, because our hashtable
1461+ * runs out of memory). This is because the system cannot run safely
1462+ * if we are unable to fsync what we have been told to fsync.
1463+ * Fortunately, the hashtable is so small that the problem is quite
1464+ * unlikely to arise in practice.
1465+ *
1466+ * Note: we could not palloc more than 1Gb of memory, thus make sure
1467+ * that the maximum number of elements will fit in the requests
1468+ * buffer.
1469+ */
1470+ n = Min (CheckpointerShmem -> num_requests , CKPT_REQ_BATCH_SIZE );
1471+ if (n > 0 )
1472+ {
1473+ if (!requests )
1474+ requests = (CheckpointerRequest * ) palloc (n * sizeof (CheckpointerRequest ));
14061475
1407- CheckpointerShmem -> num_requests = 0 ;
1476+ for (i = 0 ; i < n ; i ++ )
1477+ {
1478+ requests [i ] = CheckpointerShmem -> requests [CheckpointerShmem -> head ];
1479+ CheckpointerShmem -> head = (CheckpointerShmem -> head + 1 ) % CheckpointerShmem -> max_requests ;
1480+ }
14081481
1409- LWLockRelease (CheckpointerCommLock );
1482+ CheckpointerShmem -> num_requests -= n ;
1483+
1484+ }
1485+
1486+ START_CRIT_SECTION ();
1487+
1488+ /* Are there any requests in the queue? If so, keep going. */
1489+ loop = CheckpointerShmem -> num_requests != 0 ;
1490+
1491+ LWLockRelease (CheckpointerCommLock );
14101492
1411- for (request = requests ; n > 0 ; request ++ , n -- )
1412- RememberSyncRequest (& request -> ftag , request -> type );
1493+ for (request = requests ; n > 0 ; request ++ , n -- )
1494+ RememberSyncRequest (& request -> ftag , request -> type );
14131495
1414- END_CRIT_SECTION ();
1496+ END_CRIT_SECTION ();
1497+ } while (loop );
14151498
14161499 if (requests )
14171500 pfree (requests );
0 commit comments