* The objects we actually sort are SortTuple structs.  These contain
  * a pointer to the tuple proper (might be a MinimalTuple or IndexTuple),
  * which is a separate palloc chunk --- we assume it is just one chunk and
- * can be freed by a simple pfree().  SortTuples also contain the tuple's
+ * can be freed by a simple pfree() (except during final on-the-fly merge,
+ * when memory is used in batch).  SortTuples also contain the tuple's
  * first key column in Datum/nullflag format, and an index integer.
  *
  * Storing the first key column lets us save heap_getattr or index_getattr
                                 * tuples to return? */
    bool        boundUsed;      /* true if we made use of a bounded heap */
    int         bound;          /* if bounded, the maximum number of tuples */
+   bool        tuples;         /* Can SortTuple.tuple ever be set? */
    int64       availMem;       /* remaining memory available, in bytes */
    int64       allowedMem;     /* total memory allowed, in bytes */
    int         maxTapes;       /* number of tapes (Knuth's T) */
    int         tapeRange;      /* maxTapes-1 (Knuth's P) */
-   MemoryContext sortcontext;  /* memory context holding all sort data */
+   MemoryContext sortcontext;  /* memory context holding most sort data */
+   MemoryContext tuplecontext; /* memory context holding tuple data */
    LogicalTapeSet *tapeset;    /* logtape.c object for tapes in a temp file */
 
    /*
    int         memtupsize;     /* allocated length of memtuples array */
    bool        growmemtuples;  /* memtuples' growth still underway? */
 
+   /*
+    * Memory for tuples is sometimes allocated in batch, rather than
+    * incrementally.  This implies that incremental memory accounting has been
+    * abandoned.  Currently, this only happens for the final on-the-fly merge
+    * step.  Large batch allocations can store tuples (e.g. IndexTuples)
+    * without palloc() fragmentation and other overhead.
+    */
+   bool        batchUsed;
+
    /*
     * While building initial runs, this is the current output run number
     * (starting at 0).  Afterwards, it is the number of initial runs we made.
    int         mergefreelist;  /* head of freelist of recycled slots */
    int         mergefirstfree; /* first slot never used in this merge */
 
+   /*
+    * Per-tape batch state, when final on-the-fly merge consumes memory from
+    * just a few large allocations.
+    *
+    * Aside from the general benefits of performing fewer individual retail
+    * palloc() calls, this also helps make merging more cache efficient, since
+    * each tape's tuples must naturally be accessed sequentially (in sorted
+    * order).
+    */
+   int64       spacePerTape;   /* Space (memory) for tuples (not slots) */
+   char      **mergetuples;    /* Each tape's memory allocation */
+   char      **mergecurrent;   /* Current offset into each tape's memory */
+   char      **mergetail;      /* Last item's start point for each tape */
+   char      **mergeoverflow;  /* Retail palloc() "overflow" for each tape */
+
    /*
     * Variables for Algorithm D.  Note that destTape is a "logical" tape
     * number, ie, an index into the tp_xxx[] arrays.  Be careful to keep
     * tuplesort_begin_datum and used only by the DatumTuple routines.
     */
    Oid         datumType;
-   /* we need typelen and byval in order to know how to copy the Datums. */
+   /* we need typelen in order to know how to copy the Datums. */
    int         datumTypeLen;
-   bool        datumTypeByVal;
 
    /*
     * Resource snapshot for time of sort start.
 #define COPYTUP(state,stup,tup) ((*(state)->copytup) (state, stup, tup))
 #define WRITETUP(state,tape,stup)  ((*(state)->writetup) (state, tape, stup))
 #define READTUP(state,stup,tape,len) ((*(state)->readtup) (state, stup, tape, len))
-#define LACKMEM(state)     ((state)->availMem < 0)
+#define LACKMEM(state)     ((state)->availMem < 0 && !(state)->batchUsed)
 #define USEMEM(state,amt)  ((state)->availMem -= (amt))
 #define FREEMEM(state,amt) ((state)->availMem += (amt))
 
  * rather than the originally-requested size.  This is important since
  * palloc can add substantial overhead.  It's not a complete answer since
  * we won't count any wasted space in palloc allocation blocks, but it's
- * a lot better than what we were doing before 7.3.
+ * a lot better than what we were doing before 7.3.  As of 9.6, a
+ * separate memory context is used for caller passed tuples.  Resetting
+ * it at certain key increments significantly ameliorates fragmentation.
+ * Note that this places a responsibility on readtup and copytup routines
+ * to use the right memory context for these tuples (and to not use the
+ * reset context for anything whose lifetime needs to span multiple
+ * external sort runs).
  */
 
 /* When using this macro, beware of double evaluation of len */
 static void selectnewtape(Tuplesortstate *state);
 static void mergeruns(Tuplesortstate *state);
 static void mergeonerun(Tuplesortstate *state);
-static void beginmerge(Tuplesortstate *state);
+static void beginmerge(Tuplesortstate *state, bool finalMerge);
+static void batchmemtuples(Tuplesortstate *state);
+static void mergebatch(Tuplesortstate *state, int64 spacePerTape);
+static void mergebatchone(Tuplesortstate *state, int srcTape,
+                     SortTuple *stup, bool *should_free);
+static void mergebatchfreetape(Tuplesortstate *state, int srcTape,
+                              SortTuple *rtup, bool *should_free);
+static void *mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen);
 static void mergepreread(Tuplesortstate *state);
 static void mergeprereadone(Tuplesortstate *state, int srcTape);
 static void dumptuples(Tuplesortstate *state, bool alltuples);
 static void reversedirection(Tuplesortstate *state);
 static unsigned int getlen(Tuplesortstate *state, int tapenum, bool eofOK);
 static void markrunend(Tuplesortstate *state, int tapenum);
+static void *readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen);
 static int comparetup_heap(const SortTuple *a, const SortTuple *b,
                Tuplesortstate *state);
 static void copytup_heap(Tuplesortstate *state, SortTuple *stup, void *tup);
 {
    Tuplesortstate *state;
    MemoryContext sortcontext;
+   MemoryContext tuplecontext;
    MemoryContext oldcontext;
 
    /*
     * needed by the sort will live inside this context.
     */
    sortcontext = AllocSetContextCreate(CurrentMemoryContext,
-                                       "TupleSort",
+                                       "TupleSort main",
                                        ALLOCSET_DEFAULT_MINSIZE,
                                        ALLOCSET_DEFAULT_INITSIZE,
                                        ALLOCSET_DEFAULT_MAXSIZE);
 
+   /*
+    * Caller tuple (e.g. IndexTuple) memory context.
+    *
+    * A dedicated child content used exclusively for caller passed tuples
+    * eases memory management.  Resetting at key points reduces fragmentation.
+    * Note that the memtuples array of SortTuples is allocated in the parent
+    * context, not this context, because there is no need to free memtuples
+    * early.
+    */
+   tuplecontext = AllocSetContextCreate(sortcontext,
+                                        "Caller tuples",
+                                        ALLOCSET_DEFAULT_MINSIZE,
+                                        ALLOCSET_DEFAULT_INITSIZE,
+                                        ALLOCSET_DEFAULT_MAXSIZE);
+
    /*
     * Make the Tuplesortstate within the per-sort context.  This way, we
     * don't need a separate pfree() operation for it at shutdown.
    state->status = TSS_INITIAL;
    state->randomAccess = randomAccess;
    state->bounded = false;
+   state->tuples = true;
    state->boundUsed = false;
    state->allowedMem = workMem * (int64) 1024;
    state->availMem = state->allowedMem;
    state->sortcontext = sortcontext;
+   state->tuplecontext = tuplecontext;
    state->tapeset = NULL;
 
    state->memtupcount = 0;
                        ALLOCSET_SEPARATE_THRESHOLD / sizeof(SortTuple) + 1);
 
    state->growmemtuples = true;
+   state->batchUsed = false;
    state->memtuples = (SortTuple *) palloc(state->memtupsize * sizeof(SortTuple));
 
    USEMEM(state, GetMemoryChunkSpace(state->memtuples));
    /* lookup necessary attributes of the datum type */
    get_typlenbyval(datumType, &typlen, &typbyval);
    state->datumTypeLen = typlen;
-   state->datumTypeByVal = typbyval;
+   state->tuples = !typbyval;
 
    /* Prepare SortSupport data */
    state->sortKeys = (SortSupport) palloc0(sizeof(SortSupportData));
                              ItemPointer self, Datum *values,
                              bool *isnull)
 {
-   MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+   MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
    SortTuple   stup;
    Datum       original;
    IndexTuple  tuple;
                             RelationGetDescr(state->indexRel),
                             &stup.isnull1);
 
+   MemoryContextSwitchTo(state->sortcontext);
+
    if (!state->sortKeys || !state->sortKeys->abbrev_converter || stup.isnull1)
    {
        /*
 void
 tuplesort_putdatum(Tuplesortstate *state, Datum val, bool isNull)
 {
-   MemoryContext oldcontext = MemoryContextSwitchTo(state->sortcontext);
+   MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
    SortTuple   stup;
 
    /*
     * identical to stup.tuple.
     */
 
-   if (isNull || state->datumTypeByVal)
+   if (isNull || !state->tuples)
    {
        /*
         * Set datum1 to zeroed representation for NULLs (to be consistent, and
        stup.datum1 = !isNull ? val : (Datum) 0;
        stup.isnull1 = isNull;
        stup.tuple = NULL;      /* no separate storage */
+       MemoryContextSwitchTo(state->sortcontext);
    }
    else
    {
        stup.isnull1 = false;
        stup.tuple = DatumGetPointer(original);
        USEMEM(state, GetMemoryChunkSpace(stup.tuple));
+       MemoryContextSwitchTo(state->sortcontext);
 
        if (!state->sortKeys->abbrev_converter)
        {
  * Internal routine to fetch the next tuple in either forward or back
  * direction into *stup.  Returns FALSE if no more tuples.
  * If *should_free is set, the caller must pfree stup.tuple when done with it.
+ * Otherwise, caller should not use tuple following next call here.
  */
 static bool
 tuplesort_gettuple_common(Tuplesortstate *state, bool forward,
    {
        case TSS_SORTEDINMEM:
            Assert(forward || state->randomAccess);
+           Assert(!state->batchUsed);
            *should_free = false;
            if (forward)
            {
 
        case TSS_SORTEDONTAPE:
            Assert(forward || state->randomAccess);
+           Assert(!state->batchUsed);
            *should_free = true;
            if (forward)
            {
 
        case TSS_FINALMERGE:
            Assert(forward);
-           *should_free = true;
+           Assert(state->batchUsed || !state->tuples);
+           /* For now, assume tuple is stored in tape's batch memory */
+           *should_free = false;
 
            /*
             * This code should match the inner loop of mergeonerun().
            if (state->memtupcount > 0)
            {
                int         srcTape = state->memtuples[0].tupindex;
-               Size        tuplen;
                int         tupIndex;
                SortTuple  *newtup;
 
+               /*
+                * Returned tuple is still counted in our memory space most
+                * of the time.  See mergebatchone() for discussion of why
+                * caller may occasionally be required to free returned
+                * tuple, and how preread memory is managed with regard to
+                * edge cases more generally.
+                */
                *stup = state->memtuples[0];
-               /* returned tuple is no longer counted in our memory space */
-               if (stup->tuple)
-               {
-                   tuplen = GetMemoryChunkSpace(stup->tuple);
-                   state->availMem += tuplen;
-                   state->mergeavailmem[srcTape] += tuplen;
-               }
                tuplesort_heap_siftup(state, false);
                if ((tupIndex = state->mergenext[srcTape]) == 0)
                {
                     * out of preloaded data on this tape, try to read more
                     *
                     * Unlike mergeonerun(), we only preload from the single
-                    * tape that's run dry.  See mergepreread() comments.
+                    * tape that's run dry, though not before preparing its
+                    * batch memory for a new round of sequential consumption.
+                    * See mergepreread() comments.
                     */
+                   if (state->batchUsed)
+                       mergebatchone(state, srcTape, stup, should_free);
+
                    mergeprereadone(state, srcTape);
 
                    /*
                     * if still no data, we've reached end of run on this tape
                     */
                    if ((tupIndex = state->mergenext[srcTape]) == 0)
+                   {
+                       /* Free tape's buffer, avoiding dangling pointer */
+                       if (state->batchUsed)
+                           mergebatchfreetape(state, srcTape, stup, should_free);
                        return true;
+                   }
                }
                /* pull next preread tuple from list, insert in heap */
                newtup = &state->memtuples[tupIndex];
  * Fetch the next tuple in either forward or back direction.
  * Returns NULL if no more tuples.  If *should_free is set, the
  * caller must pfree the returned tuple when done with it.
+ * If it is not set, caller should not use tuple following next
+ * call here.
  */
 HeapTuple
 tuplesort_getheaptuple(Tuplesortstate *state, bool forward, bool *should_free)
  * Fetch the next index tuple in either forward or back direction.
  * Returns NULL if no more tuples.  If *should_free is set, the
  * caller must pfree the returned tuple when done with it.
+ * If it is not set, caller should not use tuple following next
+ * call here.
  */
 IndexTuple
 tuplesort_getindextuple(Tuplesortstate *state, bool forward,
    if (state->sortKeys->abbrev_converter && abbrev)
        *abbrev = stup.datum1;
 
-   if (stup.isnull1 || state->datumTypeByVal)
+   if (stup.isnull1 || !state->tuples)
    {
        *val = stup.datum1;
        *isNull = stup.isnull1;
    state->mergelast = (int *) palloc0(maxTapes * sizeof(int));
    state->mergeavailslots = (int *) palloc0(maxTapes * sizeof(int));
    state->mergeavailmem = (int64 *) palloc0(maxTapes * sizeof(int64));
+   state->mergetuples = (char **) palloc0(maxTapes * sizeof(char *));
+   state->mergecurrent = (char **) palloc0(maxTapes * sizeof(char *));
+   state->mergetail = (char **) palloc0(maxTapes * sizeof(char *));
+   state->mergeoverflow = (char **) palloc0(maxTapes * sizeof(char *));
    state->tp_fib = (int *) palloc0(maxTapes * sizeof(int));
    state->tp_runs = (int *) palloc0(maxTapes * sizeof(int));
    state->tp_dummy = (int *) palloc0(maxTapes * sizeof(int));
                /* Tell logtape.c we won't be writing anymore */
                LogicalTapeSetForgetFreeSpace(state->tapeset);
                /* Initialize for the final merge pass */
-               beginmerge(state);
+               beginmerge(state, state->tuples);
                state->status = TSS_FINALMERGE;
                return;
            }
     * Start the merge by loading one tuple from each active source tape into
     * the heap.  We can also decrease the input run/dummy run counts.
     */
-   beginmerge(state);
+   beginmerge(state, false);
 
    /*
     * Execute merge by repeatedly extracting lowest tuple in heap, writing it
        state->mergeavailslots[srcTape]++;
    }
 
+   /*
+    * Reset tuple memory, now that no caller tuples are needed in memory.
+    * This prevents fragmentation.
+    */
+   MemoryContextReset(state->tuplecontext);
+
    /*
     * When the heap empties, we're done.  Write an end-of-run marker on the
     * output tape, and increment its count of real runs.
  * which tapes contain active input runs in mergeactive[].  Then, load
  * as many tuples as we can from each active input tape, and finally
  * fill the merge heap with the first tuple from each active tape.
+ *
+ * finalMergeBatch indicates if this is the beginning of a final on-the-fly
+ * merge where a batched allocation of tuple memory is required.
  */
 static void
-beginmerge(Tuplesortstate *state)
+beginmerge(Tuplesortstate *state, bool finalMergeBatch)
 {
    int         activeTapes;
    int         tapenum;
    state->mergefreelist = 0;   /* nothing in the freelist */
    state->mergefirstfree = activeTapes;        /* 1st slot avail for preread */
 
+   if (finalMergeBatch)
+   {
+       /* Free outright buffers for tape never actually allocated */
+       FREEMEM(state, (state->maxTapes - activeTapes) * TAPE_BUFFER_OVERHEAD);
+
+       /*
+        * Grow memtuples one last time, since the palloc() overhead no longer
+        * incurred can make a big difference
+        */
+       batchmemtuples(state);
+   }
+
    /*
     * Initialize space allocation to let each active input tape have an equal
     * share of preread space.
    Assert(activeTapes > 0);
    slotsPerTape = (state->memtupsize - state->mergefirstfree) / activeTapes;
    Assert(slotsPerTape > 0);
-   spacePerTape = state->availMem / activeTapes;
+   spacePerTape = MAXALIGN_DOWN(state->availMem / activeTapes);
    for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
    {
        if (state->mergeactive[srcTape])
        }
    }
 
+   /*
+    * Preallocate tuple batch memory for each tape.  This is the memory used
+    * for tuples themselves (not SortTuples), so it's never used by
+    * pass-by-value datum sorts.  Memory allocation is performed here at most
+    * once per sort, just in advance of the final on-the-fly merge step.
+    */
+   if (finalMergeBatch)
+       mergebatch(state, spacePerTape);
+
    /*
     * Preread as many tuples as possible (and at least one) from each active
     * tape
            tup->tupindex = state->mergefreelist;
            state->mergefreelist = tupIndex;
            state->mergeavailslots[srcTape]++;
+
+#ifdef TRACE_SORT
+           if (trace_sort && finalMergeBatch)
+           {
+               int64       perTapeKB = (spacePerTape + 1023) / 1024;
+               int64       usedSpaceKB;
+               int         usedSlots;
+
+               /*
+                * Report how effective batchmemtuples() was in balancing
+                * the number of slots against the need for memory for the
+                * underlying tuples (e.g. IndexTuples).  The big preread of
+                * all tapes when switching to FINALMERGE state should be
+                * fairly representative of memory utilization during the
+                * final merge step, and in any case is the only point at
+                * which all tapes are guaranteed to have depleted either
+                * their batch memory allowance or slot allowance.  Ideally,
+                * both will be completely depleted for every tape by now.
+                */
+               usedSpaceKB = (state->mergecurrent[srcTape] -
+                              state->mergetuples[srcTape] + 1023) / 1024;
+               usedSlots = slotsPerTape - state->mergeavailslots[srcTape];
+
+               elog(LOG, "tape %d initially used %ld KB of %ld KB batch "
+                    "(%2.3f) and %d out of %d slots (%2.3f)", srcTape,
+                    usedSpaceKB, perTapeKB,
+                    (double) usedSpaceKB / (double) perTapeKB,
+                    usedSlots, slotsPerTape,
+                    (double) usedSlots / (double) slotsPerTape);
+           }
+#endif
+       }
+   }
+}
+
+/*
+ * batchmemtuples - grow memtuples without palloc overhead
+ *
+ * When called, availMem should be approximately the amount of memory we'd
+ * require to allocate memtupsize - memtupcount tuples (not SortTuples/slots)
+ * that were allocated with palloc() overhead, and in doing so use up all
+ * allocated slots.  However, though slots and tuple memory is in balance
+ * following the last grow_memtuples() call, that's predicated on the observed
+ * average tuple size for the "final" grow_memtuples() call, which includes
+ * palloc overhead.
+ *
+ * This will perform an actual final grow_memtuples() call without any palloc()
+ * overhead, rebalancing the use of memory between slots and tuples.
+ */
+static void
+batchmemtuples(Tuplesortstate *state)
+{
+   int64           refund;
+   int64           availMemLessRefund;
+   int             memtupsize = state->memtupsize;
+
+   /* For simplicity, assume no memtuples are actually currently counted */
+   Assert(state->memtupcount == 0);
+
+   /*
+    * Refund STANDARDCHUNKHEADERSIZE per tuple.
+    *
+    * This sometimes fails to make memory use prefectly balanced, but it
+    * should never make the situation worse.  Note that Assert-enabled builds
+    * get a larger refund, due to a varying STANDARDCHUNKHEADERSIZE.
+    */
+   refund = memtupsize * STANDARDCHUNKHEADERSIZE;
+   availMemLessRefund = state->availMem - refund;
+
+   /*
+    * To establish balanced memory use after refunding palloc overhead,
+    * temporarily have our accounting indicate that we've allocated all
+    * memory we're allowed to less that refund, and call grow_memtuples()
+    * to have it increase the number of slots.
+    */
+   state->growmemtuples = true;
+   USEMEM(state, availMemLessRefund);
+   (void) grow_memtuples(state);
+   /* Should not matter, but be tidy */
+   FREEMEM(state, availMemLessRefund);
+   state->growmemtuples = false;
+
+#ifdef TRACE_SORT
+   if (trace_sort)
+   {
+       Size    OldKb = (memtupsize * sizeof(SortTuple) + 1023) / 1024;
+       Size    NewKb = (state->memtupsize * sizeof(SortTuple) + 1023) / 1024;
+
+       elog(LOG, "grew memtuples %1.2fx from %d (%zu KB) to %d (%zu KB) for final merge",
+            (double) NewKb / (double) OldKb,
+            memtupsize, OldKb,
+            state->memtupsize, NewKb);
+   }
+#endif
+}
+
+/*
+ * mergebatch - initialize tuple memory in batch
+ *
+ * This allows sequential access to sorted tuples buffered in memory from
+ * tapes/runs on disk during a final on-the-fly merge step.  Note that the
+ * memory is not used for SortTuples, but for the underlying tuples (e.g.
+ * MinimalTuples).
+ *
+ * Note that when batch memory is used, there is a simple division of space
+ * into large buffers (one per active tape).  The conventional incremental
+ * memory accounting (calling USEMEM() and FREEMEM()) is abandoned.  Instead,
+ * when each tape's memory budget is exceeded, a retail palloc() "overflow" is
+ * performed, which is then immediately detected in a way that is analogous to
+ * LACKMEM().  This keeps each tape's use of memory fair, which is always a
+ * goal.
+ */
+static void
+mergebatch(Tuplesortstate *state, int64 spacePerTape)
+{
+   int     srcTape;
+
+   Assert(state->activeTapes > 0);
+   Assert(state->tuples);
+
+   /*
+    * For the purposes of tuplesort's memory accounting, the batch allocation
+    * is special, and regular memory accounting through USEMEM() calls is
+    * abandoned (see mergeprereadone()).
+    */
+   for (srcTape = 0; srcTape < state->maxTapes; srcTape++)
+   {
+       char       *mergetuples;
+
+       if (!state->mergeactive[srcTape])
+           continue;
+
+       /* Allocate buffer for each active tape */
+       mergetuples = MemoryContextAllocHuge(state->tuplecontext,
+                                            spacePerTape);
+
+       /* Initialize state for tape */
+       state->mergetuples[srcTape] = mergetuples;
+       state->mergecurrent[srcTape] = mergetuples;
+       state->mergetail[srcTape] = mergetuples;
+       state->mergeoverflow[srcTape] = NULL;
+   }
+
+   state->batchUsed = true;
+   state->spacePerTape = spacePerTape;
+}
+
+/*
+ * mergebatchone - prepare batch memory for one merge input tape
+ *
+ * This is called following the exhaustion of preread tuples for one input
+ * tape.  All that actually occurs is that the state for the source tape is
+ * reset to indicate that all memory may be reused.
+ *
+ * This routine must deal with fixing up the tuple that is about to be returned
+ * to the client, due to "overflow" allocations.
+ */
+static void
+mergebatchone(Tuplesortstate *state, int srcTape, SortTuple *rtup,
+             bool *should_free)
+{
+   Assert(state->batchUsed);
+
+   /*
+    * Tuple about to be returned to caller ("stup") is final preread tuple
+    * from tape, just removed from the top of the heap.  Special steps around
+    * memory management must be performed for that tuple, to make sure it
+    * isn't overwritten early.
+    */
+   if (!state->mergeoverflow[srcTape])
+   {
+       Size    tupLen;
+
+       /*
+        * Mark tuple buffer range for reuse, but be careful to move final,
+        * tail tuple to start of space for next run so that it's available
+        * to caller when stup is returned, and remains available at least
+        * until the next tuple is requested.
+        */
+       tupLen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
+       state->mergecurrent[srcTape] = state->mergetuples[srcTape];
+       memmove(state->mergecurrent[srcTape], state->mergetail[srcTape],
+               tupLen);
+
+       /* Make SortTuple at top of the merge heap point to new tuple */
+       rtup->tuple = (void *) state->mergecurrent[srcTape];
+
+       state->mergetail[srcTape] = state->mergecurrent[srcTape];
+       state->mergecurrent[srcTape] += tupLen;
+   }
+   else
+   {
+       /*
+        * Handle an "overflow" retail palloc.
+        *
+        * This is needed when we run out of tuple memory for the tape.
+        */
+       state->mergecurrent[srcTape] = state->mergetuples[srcTape];
+       state->mergetail[srcTape] = state->mergetuples[srcTape];
+
+       if (rtup->tuple)
+       {
+           Assert(rtup->tuple == (void *) state->mergeoverflow[srcTape]);
+           /* Caller should free palloc'd tuple */
+           *should_free = true;
        }
+       state->mergeoverflow[srcTape] = NULL;
+   }
+}
+
+/*
+ * mergebatchfreetape - handle final clean-up for batch memory once tape is
+ * about to become exhausted
+ *
+ * All tuples are returned from tape, but a single final tuple, *rtup, is to be
+ * passed back to caller.  Free tape's batch allocation buffer while ensuring
+ * that the final tuple is managed appropriately.
+ */
+static void
+mergebatchfreetape(Tuplesortstate *state, int srcTape, SortTuple *rtup,
+                  bool *should_free)
+{
+   Assert(state->batchUsed);
+   Assert(state->status == TSS_FINALMERGE);
+
+   /*
+    * Tuple may or may not already be an overflow allocation from
+    * mergebatchone()
+    */
+   if (!*should_free && rtup->tuple)
+   {
+       /*
+        * Final tuple still in tape's batch allocation.
+        *
+        * Return palloc()'d copy to caller, and have it freed in a similar
+        * manner to overflow allocation.  Otherwise, we'd free batch memory
+        * and pass back a pointer to garbage.  Note that we deliberately
+        * allocate this in the parent tuplesort context, to be on the safe
+        * side.
+        */
+       Size        tuplen;
+       void       *oldTuple = rtup->tuple;
+
+       tuplen = state->mergecurrent[srcTape] - state->mergetail[srcTape];
+       rtup->tuple = MemoryContextAlloc(state->sortcontext, tuplen);
+       memcpy(rtup->tuple, oldTuple, tuplen);
+       *should_free = true;
+   }
+
+   /* Free spacePerTape-sized buffer */
+   pfree(state->mergetuples[srcTape]);
+}
+
+/*
+ * mergebatchalloc - allocate memory for one tuple using a batch memory
+ * "logical allocation".
+ *
+ * This is used for the final on-the-fly merge phase only.  READTUP() routines
+ * receive memory from here in place of palloc() and USEMEM() calls.
+ *
+ * Tuple tapenum is passed, ensuring each tape's tuples are stored in sorted,
+ * contiguous order (while allowing safe reuse of memory made available to
+ * each tape).  This maximizes locality of access as tuples are returned by
+ * final merge.
+ *
+ * Caller must not subsequently attempt to free memory returned here.  In
+ * general, only mergebatch* functions know about how memory returned from
+ * here should be freed, and this function's caller must ensure that batch
+ * memory management code will definitely have the opportunity to do the right
+ * thing during the final on-the-fly merge.
+ */
+static void *
+mergebatchalloc(Tuplesortstate *state, int tapenum, Size tuplen)
+{
+   Size        reserve_tuplen = MAXALIGN(tuplen);
+   char       *ret;
+
+   /* Should overflow at most once before mergebatchone() call: */
+   Assert(state->mergeoverflow[tapenum] == NULL);
+   Assert(state->batchUsed);
+
+   /* It should be possible to use precisely spacePerTape memory at once */
+   if (state->mergecurrent[tapenum] + reserve_tuplen <=
+       state->mergetuples[tapenum] + state->spacePerTape)
+   {
+       /*
+        * Usual case -- caller is returned pointer into its tape's buffer, and
+        * an offset from that point is recorded as where tape has consumed up
+        * to for current round of preloading.
+        */
+       ret = state->mergetail[tapenum] = state->mergecurrent[tapenum];
+       state->mergecurrent[tapenum] += reserve_tuplen;
    }
+   else
+   {
+       /*
+        * Allocate memory, and record as tape's overflow allocation.  This
+        * will be detected quickly, in a similar fashion to a LACKMEM()
+        * condition, and should not happen again before a new round of
+        * preloading for caller's tape.  Note that we deliberately allocate
+        * this in the parent tuplesort context, to be on the safe side.
+        *
+        * Sometimes, this does not happen because merging runs out of slots
+        * before running out of memory.
+        */
+       ret = state->mergeoverflow[tapenum] =
+           MemoryContextAlloc(state->sortcontext, tuplen);
+   }
+
+   return ret;
 }
 
 /*
  * that state and so no point in scanning through all the tapes to fix one.
  * (Moreover, there may be quite a lot of inactive tapes in that state, since
  * we might have had many fewer runs than tapes.  In a regular tape-to-tape
- * merge we can expect most of the tapes to be active.)
+ * merge we can expect most of the tapes to be active.  Plus, only
+ * FINALMERGE state has to consider memory management for a batch
+ * allocation.)
  */
 static void
 mergepreread(Tuplesortstate *state)
 
    if (!state->mergeactive[srcTape])
        return;                 /* tape's run is already exhausted */
+
+   /*
+    * Manage per-tape availMem.  Only actually matters when batch memory not
+    * in use.
+    */
    priorAvail = state->availMem;
    state->availMem = state->mergeavailmem[srcTape];
-   while ((state->mergeavailslots[srcTape] > 0 && !LACKMEM(state)) ||
+
+   /*
+    * When batch memory is used if final on-the-fly merge, only mergeoverflow
+    * test is relevant; otherwise, only LACKMEM() test is relevant.
+    */
+   while ((state->mergeavailslots[srcTape] > 0 &&
+           state->mergeoverflow[srcTape] == NULL && !LACKMEM(state)) ||
           state->mergenext[srcTape] == 0)
    {
        /* read next tuple, if any */
    LogicalTapeWrite(state->tapeset, tapenum, (void *) &len, sizeof(len));
 }
 
+/*
+ * Get memory for tuple from within READTUP() routine.  Allocate
+ * memory and account for that, or consume from tape's batch
+ * allocation.
+ *
+ * Memory returned here in the final on-the-fly merge case is recycled
+ * from tape's batch allocation.  Otherwise, callers must pfree() or
+ * reset tuple child memory context, and account for that with a
+ * FREEMEM().  Currently, this only ever needs to happen in WRITETUP()
+ * routines.
+ */
+static void *
+readtup_alloc(Tuplesortstate *state, int tapenum, Size tuplen)
+{
+   if (state->batchUsed)
+   {
+       /*
+        * No USEMEM() call, because during final on-the-fly merge
+        * accounting is based on tape-private state. ("Overflow"
+        * allocations are detected as an indication that a new round
+        * or preloading is required. Preloading marks existing
+        * contents of tape's batch buffer for reuse.)
+        */
+       return mergebatchalloc(state, tapenum, tuplen);
+   }
+   else
+   {
+       char       *ret;
+
+       /* Batch allocation yet to be performed */
+       ret = MemoryContextAlloc(state->tuplecontext, tuplen);
+       USEMEM(state, GetMemoryChunkSpace(ret));
+       return ret;
+   }
+}
+
 
 /*
  * Routines specialized for HeapTuple (actually MinimalTuple) case
    Datum       original;
    MinimalTuple tuple;
    HeapTupleData htup;
+   MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
 
    /* copy the tuple into sort storage */
    tuple = ExecCopySlotMinimalTuple(slot);
                            state->tupDesc,
                            &stup->isnull1);
 
+   MemoryContextSwitchTo(oldcontext);
+
    if (!state->sortKeys->abbrev_converter || stup->isnull1)
    {
        /*
 {
    unsigned int tupbodylen = len - sizeof(int);
    unsigned int tuplen = tupbodylen + MINIMAL_TUPLE_DATA_OFFSET;
-   MinimalTuple tuple = (MinimalTuple) palloc(tuplen);
+   MinimalTuple tuple = (MinimalTuple) readtup_alloc(state, tapenum, tuplen);
    char       *tupbody = (char *) tuple + MINIMAL_TUPLE_DATA_OFFSET;
    HeapTupleData htup;
 
-   USEMEM(state, GetMemoryChunkSpace(tuple));
    /* read in the tuple proper */
    tuple->t_len = tuplen;
    LogicalTapeReadExact(state->tapeset, tapenum,
 {
    HeapTuple   tuple = (HeapTuple) tup;
    Datum       original;
+   MemoryContext oldcontext = MemoryContextSwitchTo(state->tuplecontext);
 
    /* copy the tuple into sort storage */
    tuple = heap_copytuple(tuple);
    stup->tuple = (void *) tuple;
    USEMEM(state, GetMemoryChunkSpace(tuple));
 
+   MemoryContextSwitchTo(oldcontext);
+
    /*
     * set up first-column key value, and potentially abbreviate, if it's a
     * simple column
                int tapenum, unsigned int tuplen)
 {
    unsigned int t_len = tuplen - sizeof(ItemPointerData) - sizeof(int);
-   HeapTuple   tuple = (HeapTuple) palloc(t_len + HEAPTUPLESIZE);
+   HeapTuple   tuple = (HeapTuple) readtup_alloc(state,
+                                                 tapenum,
+                                                 t_len + HEAPTUPLESIZE);
 
-   USEMEM(state, GetMemoryChunkSpace(tuple));
    /* Reconstruct the HeapTupleData header */
    tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
    tuple->t_len = t_len;
    Datum       original;
 
    /* copy the tuple into sort storage */
-   newtuple = (IndexTuple) palloc(tuplen);
+   newtuple = (IndexTuple) MemoryContextAlloc(state->tuplecontext, tuplen);
    memcpy(newtuple, tuple, tuplen);
    USEMEM(state, GetMemoryChunkSpace(newtuple));
    stup->tuple = (void *) newtuple;
              int tapenum, unsigned int len)
 {
    unsigned int tuplen = len - sizeof(unsigned int);
-   IndexTuple  tuple = (IndexTuple) palloc(tuplen);
+   IndexTuple  tuple = (IndexTuple) readtup_alloc(state, tapenum, tuplen);
 
-   USEMEM(state, GetMemoryChunkSpace(tuple));
    LogicalTapeReadExact(state->tapeset, tapenum,
                         tuple, tuplen);
    if (state->randomAccess)    /* need trailing length word? */
        waddr = NULL;
        tuplen = 0;
    }
-   else if (state->datumTypeByVal)
+   else if (!state->tuples)
    {
        waddr = &stup->datum1;
        tuplen = sizeof(Datum);
        stup->isnull1 = true;
        stup->tuple = NULL;
    }
-   else if (state->datumTypeByVal)
+   else if (!state->tuples)
    {
        Assert(tuplen == sizeof(Datum));
        LogicalTapeReadExact(state->tapeset, tapenum,
    }
    else
    {
-       void       *raddr = palloc(tuplen);
+       void       *raddr = readtup_alloc(state, tapenum, tuplen);
 
        LogicalTapeReadExact(state->tapeset, tapenum,
                             raddr, tuplen);
        stup->datum1 = PointerGetDatum(raddr);
        stup->isnull1 = false;
        stup->tuple = raddr;
-       USEMEM(state, GetMemoryChunkSpace(raddr));
    }
 
    if (state->randomAccess)    /* need trailing length word? */