Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
1e407ce
Client-side polling via `Last-Event-ID`
MackinnonBuck Dec 16, 2025
677d9ea
`ISseEventStreamStore` and test implementation
MackinnonBuck Dec 16, 2025
63fb022
Server-side resumability via `Last-Event-ID`
MackinnonBuck Dec 17, 2025
11494e1
Server-side disconnect
MackinnonBuck Dec 17, 2025
493062a
Integration tests
MackinnonBuck Dec 17, 2025
fe71286
Make `EnablePollingAsync` no-op in stateless mode
MackinnonBuck Dec 17, 2025
2065c0e
Merge remote-tracking branch 'origin/main' into mbuck/resumability-re…
MackinnonBuck Dec 17, 2025
66fcdb1
Update src/ModelContextProtocol.Core/Client/HttpClientTransportOption…
MackinnonBuck Dec 18, 2025
c9044c8
PR feedback: Don't dispose `_sseWriter` on final message
MackinnonBuck Dec 18, 2025
2f19044
PR feedback: Use `SseEventStreamMode.Default` for unsolicited messages
MackinnonBuck Dec 18, 2025
f6858ab
Return 405 on GET in statless mode
MackinnonBuck Dec 18, 2025
8e617bd
Make completing POST responses more robust
MackinnonBuck Dec 19, 2025
1fdec43
Fix `TestSseEventStreamStore` and make tests more robust
MackinnonBuck Dec 19, 2025
ef6dd62
Allow tests to configure their own `HttpClient`
MackinnonBuck Dec 19, 2025
58ee59e
Add test: Using an event ID for a different session
MackinnonBuck Dec 19, 2025
4dd7383
Require negotiated protocol version for priming
MackinnonBuck Dec 19, 2025
7262e5d
Improve `StreamableHttpPostTransport` thread safety
MackinnonBuck Dec 19, 2025
2f7a923
Acquire lock before writing to parent transport
MackinnonBuck Dec 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/ModelContextProtocol.AspNetCore/HttpServerTransportOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,34 @@ public class HttpServerTransportOptions
/// </remarks>
public bool Stateless { get; set; }

/// <summary>
/// Gets or sets the event store for resumability support.
/// When set, events are stored and can be replayed when clients reconnect with a Last-Event-ID header.
/// </summary>
/// <remarks>
/// When configured, the server will:
/// <list type="bullet">
/// <item><description>Generate unique event IDs for each SSE message</description></item>
/// <item><description>Store events for later replay</description></item>
/// <item><description>Replay missed events when a client reconnects with a Last-Event-ID header</description></item>
/// <item><description>Send priming events to establish resumability before any actual messages</description></item>
/// </list>
/// </remarks>
public ISseEventStreamStore? EventStreamStore { get; set; }

/// <summary>
/// Gets or sets the retry interval to suggest to clients in SSE retry field.
/// </summary>
/// <value>
/// The retry interval. The default is 1 second.
/// </value>
/// <remarks>
/// When <see cref="EventStreamStore"/> is set, the server will include a retry field in priming events.
/// This value suggests to clients how long to wait before attempting to reconnect after a connection is lost.
/// Clients may use this value to implement polling behavior during long-running operations.
/// </remarks>
public TimeSpan RetryInterval { get; set; } = TimeSpan.FromSeconds(1);

/// <summary>
/// Gets or sets a value that indicates whether the server uses a single execution context for the entire session.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,12 @@ public static IEndpointConventionBuilder MapMcp(this IEndpointRouteBuilder endpo

if (!streamableHttpHandler.HttpServerTransportOptions.Stateless)
{
// The GET and DELETE endpoints are not mapped in Stateless mode since there's no way to send unsolicited messages
// for the GET to handle, and there is no server-side state for the DELETE to clean up.
// The GET endpoint is not mapped in Stateless mode since there's no way to send unsolicited messages.
// Resuming streams via GET is currently not supported in Stateless mode.
streamableHttpGroup.MapGet("", streamableHttpHandler.HandleGetRequestAsync)
.WithMetadata(new ProducesResponseTypeMetadata(StatusCodes.Status200OK, contentTypes: ["text/event-stream"]));

// The DELETE endpoint is not mapped in Stateless mode since there is no server-side state for the DELETE to clean up.
streamableHttpGroup.MapDelete("", streamableHttpHandler.HandleDeleteRequestAsync);

// Map legacy HTTP with SSE endpoints only if not in Stateless mode, because we cannot guarantee the /message requests
Expand Down
87 changes: 83 additions & 4 deletions src/ModelContextProtocol.AspNetCore/StreamableHttpHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ internal sealed class StreamableHttpHandler(
ILoggerFactory loggerFactory)
{
private const string McpSessionIdHeaderName = "Mcp-Session-Id";
private const string LastEventIdHeaderName = "Last-Event-ID";

private static readonly JsonTypeInfo<JsonRpcMessage> s_messageTypeInfo = GetRequiredJsonTypeInfo<JsonRpcMessage>();
private static readonly JsonTypeInfo<JsonRpcError> s_errorTypeInfo = GetRequiredJsonTypeInfo<JsonRpcError>();
Expand Down Expand Up @@ -88,10 +89,57 @@ await WriteJsonRpcErrorAsync(context,
return;
}

var lastEventId = context.Request.Headers[LastEventIdHeaderName].ToString();
if (!string.IsNullOrEmpty(lastEventId))
{
await HandleResumedStreamAsync(context, session, lastEventId);
}
else
{
await HandleUnsolicitedMessageStreamAsync(context, session);
}
}

private async Task HandleResumedStreamAsync(HttpContext context, StreamableHttpSession session, string lastEventId)
{
if (HttpServerTransportOptions.Stateless)
{
await WriteJsonRpcErrorAsync(context,
"Bad Request: The Last-Event-ID header is not supported in stateless mode.",
StatusCodes.Status400BadRequest);
return;
}

var eventStreamReader = await GetEventStreamReaderAsync(context, lastEventId);
if (eventStreamReader is null)
{
// There was an error obtaining the event stream; consider the request failed.
return;
}

if (!string.Equals(session.Id, eventStreamReader.SessionId, StringComparison.Ordinal))
{
await WriteJsonRpcErrorAsync(context,
"Bad Request: The Last-Event-ID header refers to a session with a different session ID.",
StatusCodes.Status400BadRequest);
return;
}

using var sseCts = CancellationTokenSource.CreateLinkedTokenSource(context.RequestAborted, hostApplicationLifetime.ApplicationStopping);
var cancellationToken = sseCts.Token;

await using var _ = await session.AcquireReferenceAsync(cancellationToken);

InitializeSseResponse(context);
await eventStreamReader.CopyToAsync(context.Response.Body, context.RequestAborted);
}

private async Task HandleUnsolicitedMessageStreamAsync(HttpContext context, StreamableHttpSession session)
{
if (!session.TryStartGetRequest())
{
await WriteJsonRpcErrorAsync(context,
"Bad Request: This server does not support multiple GET requests. Start a new session to get a new GET SSE response.",
"Bad Request: This server does not support multiple GET requests. Start a new session or use Last-Event-ID header to resume.",
StatusCodes.Status400BadRequest);
return;
}
Expand Down Expand Up @@ -120,6 +168,12 @@ await WriteJsonRpcErrorAsync(context,
}
}

private static async Task HandleResumePostResponseStreamAsync(HttpContext context, ISseEventStreamReader eventStreamReader)
{
InitializeSseResponse(context);
await eventStreamReader.CopyToAsync(context.Response.Body, context.RequestAborted);
}

public async Task HandleDeleteRequestAsync(HttpContext context)
{
var sessionId = context.Request.Headers[McpSessionIdHeaderName].ToString();
Expand All @@ -131,14 +185,13 @@ public async Task HandleDeleteRequestAsync(HttpContext context)

private async ValueTask<StreamableHttpSession?> GetSessionAsync(HttpContext context, string sessionId)
{
StreamableHttpSession? session;

if (string.IsNullOrEmpty(sessionId))
{
await WriteJsonRpcErrorAsync(context, "Bad Request: Mcp-Session-Id header is required", StatusCodes.Status400BadRequest);
return null;
}
else if (!sessionManager.TryGetValue(sessionId, out session))

if (!sessionManager.TryGetValue(sessionId, out var session))
{
// -32001 isn't part of the MCP standard, but this is what the typescript-sdk currently does.
// One of the few other usages I found was from some Ethereum JSON-RPC documentation and this
Expand Down Expand Up @@ -194,12 +247,16 @@ private async ValueTask<StreamableHttpSession> StartNewSessionAsync(HttpContext
{
SessionId = sessionId,
FlowExecutionContextFromRequests = !HttpServerTransportOptions.PerSessionExecutionContext,
EventStreamStore = HttpServerTransportOptions.EventStreamStore,
RetryInterval = HttpServerTransportOptions.RetryInterval,
};
context.Response.Headers[McpSessionIdHeaderName] = sessionId;
}
else
{
// In stateless mode, each request is independent. Don't set any session ID on the transport.
// If in the future we support resuming stateless requests, we should populate
// the event stream store and retry interval here as well.
sessionId = "";
transport = new()
{
Expand Down Expand Up @@ -246,6 +303,28 @@ private async ValueTask<StreamableHttpSession> CreateSessionAsync(
return session;
}

private async ValueTask<ISseEventStreamReader?> GetEventStreamReaderAsync(HttpContext context, string lastEventId)
{
if (HttpServerTransportOptions.EventStreamStore is not { } eventStreamStore)
{
await WriteJsonRpcErrorAsync(context,
"Bad Request: This server does not support resuming streams.",
StatusCodes.Status400BadRequest);
return null;
}

var eventStreamReader = await eventStreamStore.GetStreamReaderAsync(lastEventId, context.RequestAborted);
if (eventStreamReader is null)
{
await WriteJsonRpcErrorAsync(context,
"Bad Request: The specified Last-Event-ID is either invalid or expired.",
StatusCodes.Status400BadRequest);
return null;
}

return eventStreamReader;
}

private static Task WriteJsonRpcErrorAsync(HttpContext context, string errorMessage, int statusCode, int errorCode = -32000)
{
var jsonRpcError = new JsonRpcError
Expand Down
13 changes: 13 additions & 0 deletions src/ModelContextProtocol.Core/Client/HttpClientTransportOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,4 +108,17 @@ public required Uri Endpoint
/// Gets sor sets the authorization provider to use for authentication.
/// </summary>
public ClientOAuthOptions? OAuth { get; set; }

/// <summary>
/// Gets or sets the maximum number of consecutive reconnection attempts when an SSE stream is disconnected.
/// </summary>
/// <value>
/// The maximum number of reconnection attempts. The default is 2.
/// </value>
/// <remarks>
/// When an SSE stream is disconnected (e.g., due to a network issue), the client will attempt to
/// reconnect using the Last-Event-ID header to resume from where it left off. This property controls
/// how many reconnection attempts are made before giving up.
/// </remarks>
public int MaxReconnectionAttempts { get; set; } = 2;
}
Loading