diff --git a/src/backend/postmaster/checkpointer.c b/src/backend/postmaster/checkpointer.c index 2809e298a44f..452818ff152d 100644 --- a/src/backend/postmaster/checkpointer.c +++ b/src/backend/postmaster/checkpointer.c @@ -130,6 +130,11 @@ typedef struct int num_requests; /* current # of requests */ int max_requests; /* allocated array size */ + + int head; /* Index of the first request in the ring + * buffer */ + int tail; /* Index of the last request in the ring + * buffer */ CheckpointerRequest requests[FLEXIBLE_ARRAY_MEMBER]; } CheckpointerShmemStruct; @@ -138,6 +143,12 @@ static CheckpointerShmemStruct *CheckpointerShmem; /* interval for calling AbsorbSyncRequests in CheckpointWriteDelay */ #define WRITES_PER_ABSORB 1000 +/* Maximum number of checkpointer requests to process in one batch */ +#define CKPT_REQ_BATCH_SIZE 10000 + +/* Max number of requests the checkpointer request queue can hold */ +#define MAX_CHECKPOINT_REQUESTS 10000000 + /* * GUC parameters */ @@ -973,7 +984,8 @@ CheckpointerShmemInit(void) */ MemSet(CheckpointerShmem, 0, size); SpinLockInit(&CheckpointerShmem->ckpt_lck); - CheckpointerShmem->max_requests = NBuffers; + CheckpointerShmem->max_requests = Min(NBuffers, MAX_CHECKPOINT_REQUESTS); + CheckpointerShmem->head = CheckpointerShmem->tail = 0; ConditionVariableInit(&CheckpointerShmem->start_cv); ConditionVariableInit(&CheckpointerShmem->done_cv); } @@ -1201,6 +1213,7 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type) { CheckpointerRequest *request; bool too_full; + int insert_pos; if (!IsUnderPostmaster) return false; /* probably shouldn't even get here */ @@ -1224,10 +1237,14 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type) } /* OK, insert request */ - request = &CheckpointerShmem->requests[CheckpointerShmem->num_requests++]; + insert_pos = CheckpointerShmem->tail; + request = &CheckpointerShmem->requests[insert_pos]; request->ftag = *ftag; request->type = type; + CheckpointerShmem->tail = (CheckpointerShmem->tail + 1) % CheckpointerShmem->max_requests; + CheckpointerShmem->num_requests++; + /* If queue is more than half full, nudge the checkpointer to empty it */ too_full = (CheckpointerShmem->num_requests >= CheckpointerShmem->max_requests / 2); @@ -1262,6 +1279,12 @@ ForwardSyncRequest(const FileTag *ftag, SyncRequestType type) * Trying to do this every time the queue is full could lose if there * aren't any removable entries. But that should be vanishingly rare in * practice: there's one queue entry per shared buffer. + * + * To avoid large memory allocations when the queue contains many entries, + * we process requests incrementally in batches of CKPT_REQ_BATCH_SIZE. + * This limits memory usage to O(batch_size) instead of O(num_requests). + * Note that duplicates spanning batch boundaries won't be detected, but + * this trade-off is acceptable for memory scalability. */ static bool CompactCheckpointerRequestQueue(void) @@ -1269,15 +1292,17 @@ CompactCheckpointerRequestQueue(void) struct CheckpointerSlotMapping { CheckpointerRequest request; - int slot; + int ring_idx; }; - - int n, - preserve_count; - int num_skipped = 0; + int n; + int total_num_skipped = 0; + int head; + int max_requests; + int num_requests; + int read_idx, + write_idx; + int batch_start; HASHCTL ctl; - HTAB *htab; - bool *skip_slot; /* must hold CheckpointerCommLock in exclusive mode */ Assert(LWLockHeldByMe(CheckpointerCommLock)); @@ -1286,81 +1311,118 @@ CompactCheckpointerRequestQueue(void) if (CritSectionCount > 0) return false; - /* Initialize skip_slot array */ - skip_slot = palloc0(sizeof(bool) * CheckpointerShmem->num_requests); + max_requests = CheckpointerShmem->max_requests; + num_requests = CheckpointerShmem->num_requests; + head = CheckpointerShmem->head; - /* Initialize temporary hash table */ + /* Setup hash table control structure once */ ctl.keysize = sizeof(CheckpointerRequest); ctl.entrysize = sizeof(struct CheckpointerSlotMapping); ctl.hcxt = CurrentMemoryContext; - htab = hash_create("CompactCheckpointerRequestQueue", - CheckpointerShmem->num_requests, - &ctl, - HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); + /* Process and compact in batches */ + read_idx = head; + write_idx = head; + batch_start = 0; - /* - * The basic idea here is that a request can be skipped if it's followed - * by a later, identical request. It might seem more sensible to work - * backwards from the end of the queue and check whether a request is - * *preceded* by an earlier, identical request, in the hopes of doing less - * copying. But that might change the semantics, if there's an - * intervening SYNC_FORGET_REQUEST or SYNC_FILTER_REQUEST, so we do it - * this way. It would be possible to be even smarter if we made the code - * below understand the specific semantics of such requests (it could blow - * away preceding entries that would end up being canceled anyhow), but - * it's not clear that the extra complexity would buy us anything. - */ - for (n = 0; n < CheckpointerShmem->num_requests; n++) + while (batch_start < num_requests) { - CheckpointerRequest *request; - struct CheckpointerSlotMapping *slotmap; - bool found; + int batch_size = Min(num_requests - batch_start, CKPT_REQ_BATCH_SIZE); + HTAB *htab; + bool *skip_slot; + int batch_num_skipped = 0; + int batch_read_idx; + + /* Allocate skip array for this batch only */ + skip_slot = palloc0(sizeof(bool) * batch_size); + + htab = hash_create("CompactCheckpointerRequestQueue_Batch", + batch_size, + &ctl, + HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); /* - * We use the request struct directly as a hashtable key. This - * assumes that any padding bytes in the structs are consistently the - * same, which should be okay because we zeroed them in - * CheckpointerShmemInit. Note also that RelFileLocator had better - * contain no pad bytes. + * The basic idea here is that a request can be skipped if it's followed + * by a later, identical request within the same batch. It might seem more + * sensible to work backwards from the end of the queue and check whether a + * request is *preceded* by an earlier, identical request, in the hopes of + * doing less copying. But that might change the semantics, if there's an + * intervening SYNC_FORGET_REQUEST or SYNC_FILTER_REQUEST, so we do it + * this way. It would be possible to be even smarter if we made the code + * below understand the specific semantics of such requests (it could blow + * away preceding entries that would end up being canceled anyhow), but + * it's not clear that the extra complexity would buy us anything. */ - request = &CheckpointerShmem->requests[n]; - slotmap = hash_search(htab, request, HASH_ENTER, &found); - if (found) + batch_read_idx = read_idx; + for (n = 0; n < batch_size; n++) { - /* Duplicate, so mark the previous occurrence as skippable */ - skip_slot[slotmap->slot] = true; - num_skipped++; + CheckpointerRequest *request; + struct CheckpointerSlotMapping *slotmap; + bool found; + + /* + * We use the request struct directly as a hashtable key. This + * assumes that any padding bytes in the structs are consistently the + * same, which should be okay because we zeroed them in + * CheckpointerShmemInit. Note also that RelFileLocator had better + * contain no pad bytes. + */ + request = &CheckpointerShmem->requests[batch_read_idx]; + slotmap = hash_search(htab, request, HASH_ENTER, &found); + if (found) + { + /* Duplicate, so mark the previous occurrence as skippable */ + skip_slot[slotmap->ring_idx] = true; + batch_num_skipped++; + } + /* Remember slot containing latest occurrence of this request value */ + slotmap->ring_idx = n; /* Index within this batch */ + batch_read_idx = (batch_read_idx + 1) % max_requests; } - /* Remember slot containing latest occurrence of this request value */ - slotmap->slot = n; - } - /* Done with the hash table. */ - hash_destroy(htab); + /* Done with the hash table. */ + hash_destroy(htab); - /* If no duplicates, we're out of luck. */ - if (!num_skipped) - { + /* Compact this batch: copy non-skipped entries */ + for (n = 0; n < batch_size; n++) + { + /* If this slot is NOT skipped, keep it */ + if (!skip_slot[n]) + { + /* If the read and write positions are different, copy the request */ + if (write_idx != read_idx) + CheckpointerShmem->requests[write_idx] = CheckpointerShmem->requests[read_idx]; + + /* Advance the write position */ + write_idx = (write_idx + 1) % max_requests; + } + + read_idx = (read_idx + 1) % max_requests; + } + + total_num_skipped += batch_num_skipped; + + /* Cleanup batch resources */ pfree(skip_slot); - return false; - } - /* We found some duplicates; remove them. */ - preserve_count = 0; - for (n = 0; n < CheckpointerShmem->num_requests; n++) - { - if (skip_slot[n]) - continue; - CheckpointerShmem->requests[preserve_count++] = CheckpointerShmem->requests[n]; + batch_start += batch_size; } + + /* If no duplicates, we're out of luck. */ + if (total_num_skipped == 0) + return false; + + /* + * Update ring buffer state: head remains the same, tail moves, count + * decreases + */ + CheckpointerShmem->tail = write_idx; + CheckpointerShmem->num_requests -= total_num_skipped; + ereport(DEBUG1, (errmsg_internal("compacted fsync request queue from %d entries to %d entries", - CheckpointerShmem->num_requests, preserve_count))); - CheckpointerShmem->num_requests = preserve_count; + num_requests, CheckpointerShmem->num_requests))); - /* Cleanup. */ - pfree(skip_slot); return true; } @@ -1378,40 +1440,61 @@ AbsorbSyncRequests(void) { CheckpointerRequest *requests = NULL; CheckpointerRequest *request; - int n; + int n, + i; + bool loop; if (!AmCheckpointerProcess()) return; - LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE); - - /* - * We try to avoid holding the lock for a long time by copying the request - * array, and processing the requests after releasing the lock. - * - * Once we have cleared the requests from shared memory, we have to PANIC - * if we then fail to absorb them (eg, because our hashtable runs out of - * memory). This is because the system cannot run safely if we are unable - * to fsync what we have been told to fsync. Fortunately, the hashtable - * is so small that the problem is quite unlikely to arise in practice. - */ - n = CheckpointerShmem->num_requests; - if (n > 0) + do { - requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest)); - memcpy(requests, CheckpointerShmem->requests, n * sizeof(CheckpointerRequest)); - } + LWLockAcquire(CheckpointerCommLock, LW_EXCLUSIVE); - START_CRIT_SECTION(); + /* + * We try to avoid holding the lock for a long time by copying the + * request array, and processing the requests after releasing the + * lock. + * + * Once we have cleared the requests from shared memory, we have to + * PANIC if we then fail to absorb them (eg, because our hashtable + * runs out of memory). This is because the system cannot run safely + * if we are unable to fsync what we have been told to fsync. + * Fortunately, the hashtable is so small that the problem is quite + * unlikely to arise in practice. + * + * Note: we could not palloc more than 1Gb of memory, thus make sure + * that the maximum number of elements will fit in the requests + * buffer. + */ + n = Min(CheckpointerShmem->num_requests, CKPT_REQ_BATCH_SIZE); + if (n > 0) + { + if (!requests) + requests = (CheckpointerRequest *) palloc(n * sizeof(CheckpointerRequest)); - CheckpointerShmem->num_requests = 0; + for (i = 0; i < n; i++) + { + requests[i] = CheckpointerShmem->requests[CheckpointerShmem->head]; + CheckpointerShmem->head = (CheckpointerShmem->head + 1) % CheckpointerShmem->max_requests; + } - LWLockRelease(CheckpointerCommLock); + CheckpointerShmem->num_requests -= n; + + } + + START_CRIT_SECTION(); + + /* Are there any requests in the queue? If so, keep going. */ + loop = CheckpointerShmem->num_requests != 0; + + LWLockRelease(CheckpointerCommLock); - for (request = requests; n > 0; request++, n--) - RememberSyncRequest(&request->ftag, request->type); + for (request = requests; n > 0; request++, n--) + RememberSyncRequest(&request->ftag, request->type); - END_CRIT_SECTION(); + END_CRIT_SECTION(); + } while (loop); if (requests) pfree(requests);