Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,15 @@ async fn create_sse_stream(
tokio::spawn(async move {
if let Some(last_event_id) = last_event_id {
if let Some(event_store) = state.event_store.as_ref() {
if let Some(events) = event_store.events_after(last_event_id).await {
let events = event_store
.events_after(last_event_id)
.await
.unwrap_or_else(|err| {
tracing::error!("{err}");
None
});

if let Some(events) = events {
for message_payload in events.messages {
// skip storing replay messages
let error = transport.write_str(&message_payload, true).await;
Expand Down
1 change: 1 addition & 0 deletions crates/rust-mcp-sdk/tests/test_streamable_http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1428,6 +1428,7 @@ async fn should_store_and_include_event_ids_in_server_sse_messages() {
.unwrap()
.events_after(first_id)
.await
.unwrap()
.unwrap();
assert_eq!(events.messages.len(), 1);

Expand Down
109 changes: 100 additions & 9 deletions crates/rust-mcp-transport/src/event_store.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,118 @@
mod in_memory_event_store;
use async_trait::async_trait;
pub use in_memory_event_store::*;

use crate::{EventId, SessionId, StreamId};
use async_trait::async_trait;
pub use in_memory_event_store::*;
use thiserror::Error;

#[derive(Debug, Clone)]
pub struct EventStoreMessages {
pub struct EventStoreEntry {
pub session_id: SessionId,
pub stream_id: StreamId,
pub messages: Vec<String>,
}

#[derive(Debug, Error)]
#[error("{message}")]
pub struct EventStoreError {
pub message: String,
}

impl From<&str> for EventStoreError {
fn from(s: &str) -> Self {
EventStoreError {
message: s.to_string(),
}
}
}

impl From<String> for EventStoreError {
fn from(s: String) -> Self {
EventStoreError { message: s }
}
}

type EventStoreResult<T> = Result<T, EventStoreError>;

/// Trait defining the interface for event storage and retrieval, used by the MCP server
/// to store and replay events for state reconstruction after client reconnection
#[async_trait]
pub trait EventStore: Send + Sync {
/// Stores a new event in the store and returns the generated event ID.
/// For MCP, this stores protocol messages, timestamp is the number of microseconds since UNIX_EPOCH.
/// The timestamp helps determine the order in which messages arrived.
///
/// # Parameters
/// - `session_id`: The session identifier for the event.
/// - `stream_id`: The stream identifier within the session.
/// - `timestamp`: The u128 timestamp of the event.
/// - `message`: The event payload as json string.
///
/// # Returns
/// - `Ok(EventId)`: The generated ID (format: session_id:stream_id:timestamp) on success.
/// - `Err(Self::Error)`: If input is invalid or storage fails.
async fn store_event(
&self,
session_id: SessionId,
stream_id: StreamId,
time_stamp: u128,
timestamp: u128,
message: String,
) -> EventId;
async fn remove_by_session_id(&self, session_id: SessionId);
async fn remove_stream_in_session(&self, session_id: SessionId, stream_id: StreamId);
async fn clear(&self);
async fn events_after(&self, last_event_id: EventId) -> Option<EventStoreMessages>;
) -> EventStoreResult<EventId>;

/// Removes all events associated with a given session ID.
/// Used to clean up all events for a session when it is no longer needed (e.g., session ended).
///
/// # Parameters
/// - `session_id`: The session ID whose events should be removed.
///
async fn remove_by_session_id(&self, session_id: SessionId) -> EventStoreResult<()>;
/// Removes all events for a specific stream within a session.
/// Useful for cleaning up a specific stream without affecting others.
///
/// # Parameters
/// - `session_id`: The session ID containing the stream.
/// - `stream_id`: The stream ID whose events should be removed.
///
/// # Returns
/// - `Ok(())`: On successful deletion.
/// - `Err(Self::Error)`: If deletion fails.
async fn remove_stream_in_session(
&self,
session_id: SessionId,
stream_id: StreamId,
) -> EventStoreResult<()>;
/// Clears all events from the store.
/// Used for resetting the store.
///
async fn clear(&self) -> EventStoreResult<()>;
/// Retrieves events after a given event ID for a session and stream.
/// Critical for MCP server to replay events after a client reconnects, starting from the last known event.
/// Events are returned in chronological order (ascending timestamp) to reconstruct state.
///
/// # Parameters
/// - `last_event_id`: The event ID to fetch events after.
///
/// # Returns
/// - `Some(Some(EventStoreEntry))`: Events after the specified ID, if any.
/// - `None`: If no events exist after it OR the event ID is invalid.
async fn events_after(
&self,
last_event_id: EventId,
) -> EventStoreResult<Option<EventStoreEntry>>;
/// Prunes excess events to control storage usage.
/// Implementations may apply custom logic, such as limiting
/// the number of events per session or removing events older than a certain timestamp.
/// Default implementation logs a warning if not overridden by the store.
///
/// # Parameters
/// - `session_id`: Optional session ID to prune a specific session; if None, prunes all sessions.
async fn prune_excess_events(&self, _session_id: Option<SessionId>) -> EventStoreResult<()> {
tracing::warn!("prune_excess_events() is not implemented for the event store.");
Ok(())
}
/// Counts the total number of events in the store.
///
/// # Returns
/// - The number of events across all sessions and streams.
async fn count(&self) -> EventStoreResult<usize>;
}
76 changes: 46 additions & 30 deletions crates/rust-mcp-transport/src/event_store/in_memory_event_store.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use crate::event_store::EventStoreResult;
use crate::{
event_store::{EventStore, EventStoreEntry},
EventId, SessionId, StreamId,
};
use async_trait::async_trait;
use std::collections::HashMap;
use std::collections::VecDeque;
use tokio::sync::RwLock;

use crate::{
event_store::{EventStore, EventStoreMessages},
EventId, SessionId, StreamId,
};

const MAX_EVENTS_PER_SESSION: usize = 64;
const ID_SEPARATOR: &str = "-.-";

Expand Down Expand Up @@ -101,16 +101,19 @@ impl InMemoryEventStore {
/// );
/// assert_eq!(store.parse_event_id("invalid"), None);
/// ```
pub fn parse_event_id<'a>(&self, event_id: &'a str) -> Option<(&'a str, &'a str, &'a str)> {
pub fn parse_event_id<'a>(
&self,
event_id: &'a str,
) -> EventStoreResult<(&'a str, &'a str, u128)> {
// Check for empty input or invalid characters (e.g., NULL)
if event_id.is_empty() || event_id.contains('\0') {
return None;
return Err("Event ID is empty!".into());
}

// Split into exactly three parts
let parts: Vec<&'a str> = event_id.split(ID_SEPARATOR).collect();
if parts.len() != 3 {
return None;
return Err("Invalid Event ID format.".into());
}

let session_id = parts[0];
Expand All @@ -119,10 +122,14 @@ impl InMemoryEventStore {

// Ensure no part is empty
if session_id.is_empty() || stream_id.is_empty() || time_stamp.is_empty() {
return None;
return Err("Invalid Event ID format.".into());
}

Some((session_id, stream_id, time_stamp))
let time_stamp: u128 = time_stamp
.parse()
.map_err(|err| format!("Error parsing timestamp: {err}"))?;

Ok((session_id, stream_id, time_stamp))
}
}

Expand All @@ -147,7 +154,7 @@ impl EventStore for InMemoryEventStore {
stream_id: StreamId,
time_stamp: u128,
message: String,
) -> EventId {
) -> EventStoreResult<EventId> {
let event_id = self.generate_event_id(&session_id, &stream_id, time_stamp);

let mut storage_map = self.storage_map.write().await;
Expand All @@ -172,7 +179,7 @@ impl EventStore for InMemoryEventStore {

session_map.push_back(entry);

event_id
Ok(event_id)
}

/// Removes all events associated with a given stream ID within a specific session.
Expand All @@ -184,7 +191,11 @@ impl EventStore for InMemoryEventStore {
/// # Arguments
/// - `session_id`: The session identifier to target.
/// - `stream_id`: The stream identifier to remove.
async fn remove_stream_in_session(&self, session_id: SessionId, stream_id: StreamId) {
async fn remove_stream_in_session(
&self,
session_id: SessionId,
stream_id: StreamId,
) -> EventStoreResult<()> {
let mut storage_map = self.storage_map.write().await;

// Check if session exists
Expand All @@ -194,9 +205,10 @@ impl EventStore for InMemoryEventStore {
// Remove session if empty
if events.is_empty() {
storage_map.remove(&session_id);
}
};
}
// No action if session_id doesn’t exist (idempotent)
Ok(())
}

/// Removes all events associated with a given session ID.
Expand All @@ -205,9 +217,10 @@ impl EventStore for InMemoryEventStore {
///
/// # Arguments
/// - `session_id`: The session identifier to remove.
async fn remove_by_session_id(&self, session_id: SessionId) {
async fn remove_by_session_id(&self, session_id: SessionId) -> EventStoreResult<()> {
let mut storage_map = self.storage_map.write().await;
storage_map.remove(&session_id);
Ok(())
}

/// Retrieves events after a given `event_id` for a specific session and stream.
Expand All @@ -221,23 +234,20 @@ impl EventStore for InMemoryEventStore {
/// - `last_event_id`: The event ID (format: `session-.-stream-.-timestamp`) to start after.
///
/// # Returns
/// An `Option` containing `EventStoreMessages` with the session ID, stream ID, and sorted messages,
/// An `Option` containing `EventStoreEntry` with the session ID, stream ID, and sorted messages,
/// or `None` if no events are found or the input is invalid.
async fn events_after(&self, last_event_id: EventId) -> Option<EventStoreMessages> {
let Some((session_id, stream_id, time_stamp)) = self.parse_event_id(&last_event_id) else {
tracing::warn!("error parsing last event id: '{last_event_id}'");
return None;
};
async fn events_after(
&self,
last_event_id: EventId,
) -> EventStoreResult<Option<EventStoreEntry>> {
let (session_id, stream_id, time_stamp) = self.parse_event_id(&last_event_id)?;

let storage_map = self.storage_map.read().await;

// fail silently if session id does not exists
let Some(events) = storage_map.get(session_id) else {
tracing::warn!("could not find the session_id in the store : '{session_id}'");
return None;
};

let Ok(time_stamp) = time_stamp.parse::<u128>() else {
tracing::warn!("could not parse the timestamp: '{time_stamp}'");
return None;
return Ok(None);
};

let events = match events
Expand All @@ -260,15 +270,21 @@ impl EventStore for InMemoryEventStore {

tracing::trace!("{} messages after '{last_event_id}'", events.len());

Some(EventStoreMessages {
Ok(Some(EventStoreEntry {
session_id: session_id.to_string(),
stream_id: stream_id.to_string(),
messages: events,
})
}))
}

async fn clear(&self) {
async fn clear(&self) -> EventStoreResult<()> {
let mut storage_map = self.storage_map.write().await;
storage_map.clear();
Ok(())
}

async fn count(&self) -> EventStoreResult<usize> {
let storage_map = self.storage_map.read().await;
Ok(storage_map.len())
}
}
23 changes: 13 additions & 10 deletions crates/rust-mcp-transport/src/message_dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,16 +412,19 @@ impl McpDispatch<ClientMessages, ServerMessages, ClientMessage, ServerMessage>
self.stream_id.as_ref(),
self.event_store.as_ref(),
) {
event_id = Some(
event_store
.store_event(
session_id.clone(),
stream_id.clone(),
current_timestamp(),
payload.to_owned(),
)
.await,
)
event_id = event_store
.store_event(
session_id.clone(),
stream_id.clone(),
current_timestamp(),
payload.to_owned(),
)
.await
.map(Some)
.unwrap_or_else(|err| {
tracing::error!("{err}");
None
});
};
}

Expand Down
Loading