From 150e3a02ba593b2e41b16d2d621e770d292cfa23 Mon Sep 17 00:00:00 2001 From: Ali Hashemi <14126952+hashemix@users.noreply.github.com> Date: Wed, 15 Oct 2025 19:23:42 -0300 Subject: [PATCH 1/4] refactor: eventstore with better error handling and stability (#109) * refactor: eventstore trait * feat: enhance EventStore with better error handling and stability --- .../src/mcp_http/mcp_http_utils.rs | 10 +- .../tests/test_streamable_http_server.rs | 1 + crates/rust-mcp-transport/src/event_store.rs | 109 ++++++++++++++++-- .../src/event_store/in_memory_event_store.rs | 76 +++++++----- .../src/message_dispatcher.rs | 23 ++-- 5 files changed, 169 insertions(+), 50 deletions(-) diff --git a/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs b/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs index 608207a..6d003b9 100644 --- a/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs +++ b/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs @@ -187,7 +187,15 @@ async fn create_sse_stream( tokio::spawn(async move { if let Some(last_event_id) = last_event_id { if let Some(event_store) = state.event_store.as_ref() { - if let Some(events) = event_store.events_after(last_event_id).await { + let events = event_store + .events_after(last_event_id) + .await + .unwrap_or_else(|err| { + tracing::error!("{err}"); + None + }); + + if let Some(events) = events { for message_payload in events.messages { // skip storing replay messages let error = transport.write_str(&message_payload, true).await; diff --git a/crates/rust-mcp-sdk/tests/test_streamable_http_server.rs b/crates/rust-mcp-sdk/tests/test_streamable_http_server.rs index 79c9f00..3592c97 100644 --- a/crates/rust-mcp-sdk/tests/test_streamable_http_server.rs +++ b/crates/rust-mcp-sdk/tests/test_streamable_http_server.rs @@ -1428,6 +1428,7 @@ async fn should_store_and_include_event_ids_in_server_sse_messages() { .unwrap() .events_after(first_id) .await + .unwrap() .unwrap(); assert_eq!(events.messages.len(), 1); diff --git a/crates/rust-mcp-transport/src/event_store.rs b/crates/rust-mcp-transport/src/event_store.rs index fdc0734..35eaf40 100644 --- a/crates/rust-mcp-transport/src/event_store.rs +++ b/crates/rust-mcp-transport/src/event_store.rs @@ -1,27 +1,118 @@ mod in_memory_event_store; -use async_trait::async_trait; -pub use in_memory_event_store::*; use crate::{EventId, SessionId, StreamId}; +use async_trait::async_trait; +pub use in_memory_event_store::*; +use thiserror::Error; #[derive(Debug, Clone)] -pub struct EventStoreMessages { +pub struct EventStoreEntry { pub session_id: SessionId, pub stream_id: StreamId, pub messages: Vec, } +#[derive(Debug, Error)] +#[error("{message}")] +pub struct EventStoreError { + pub message: String, +} + +impl From<&str> for EventStoreError { + fn from(s: &str) -> Self { + EventStoreError { + message: s.to_string(), + } + } +} + +impl From for EventStoreError { + fn from(s: String) -> Self { + EventStoreError { message: s } + } +} + +type EventStoreResult = Result; + +/// Trait defining the interface for event storage and retrieval, used by the MCP server +/// to store and replay events for state reconstruction after client reconnection #[async_trait] pub trait EventStore: Send + Sync { + /// Stores a new event in the store and returns the generated event ID. + /// For MCP, this stores protocol messages, timestamp is the number of microseconds since UNIX_EPOCH. + /// The timestamp helps determine the order in which messages arrived. + /// + /// # Parameters + /// - `session_id`: The session identifier for the event. + /// - `stream_id`: The stream identifier within the session. + /// - `timestamp`: The u128 timestamp of the event. + /// - `message`: The event payload as json string. + /// + /// # Returns + /// - `Ok(EventId)`: The generated ID (format: session_id:stream_id:timestamp) on success. + /// - `Err(Self::Error)`: If input is invalid or storage fails. async fn store_event( &self, session_id: SessionId, stream_id: StreamId, - time_stamp: u128, + timestamp: u128, message: String, - ) -> EventId; - async fn remove_by_session_id(&self, session_id: SessionId); - async fn remove_stream_in_session(&self, session_id: SessionId, stream_id: StreamId); - async fn clear(&self); - async fn events_after(&self, last_event_id: EventId) -> Option; + ) -> EventStoreResult; + + /// Removes all events associated with a given session ID. + /// Used to clean up all events for a session when it is no longer needed (e.g., session ended). + /// + /// # Parameters + /// - `session_id`: The session ID whose events should be removed. + /// + async fn remove_by_session_id(&self, session_id: SessionId) -> EventStoreResult<()>; + /// Removes all events for a specific stream within a session. + /// Useful for cleaning up a specific stream without affecting others. + /// + /// # Parameters + /// - `session_id`: The session ID containing the stream. + /// - `stream_id`: The stream ID whose events should be removed. + /// + /// # Returns + /// - `Ok(())`: On successful deletion. + /// - `Err(Self::Error)`: If deletion fails. + async fn remove_stream_in_session( + &self, + session_id: SessionId, + stream_id: StreamId, + ) -> EventStoreResult<()>; + /// Clears all events from the store. + /// Used for resetting the store. + /// + async fn clear(&self) -> EventStoreResult<()>; + /// Retrieves events after a given event ID for a session and stream. + /// Critical for MCP server to replay events after a client reconnects, starting from the last known event. + /// Events are returned in chronological order (ascending timestamp) to reconstruct state. + /// + /// # Parameters + /// - `last_event_id`: The event ID to fetch events after. + /// + /// # Returns + /// - `Some(Some(EventStoreEntry))`: Events after the specified ID, if any. + /// - `None`: If no events exist after it OR the event ID is invalid. + async fn events_after( + &self, + last_event_id: EventId, + ) -> EventStoreResult>; + /// Prunes excess events to control storage usage. + /// Implementations may apply custom logic, such as limiting + /// the number of events per session or removing events older than a certain timestamp. + /// Default implementation logs a warning if not overridden by the store. + /// + /// # Parameters + /// - `session_id`: Optional session ID to prune a specific session; if None, prunes all sessions. + async fn prune_excess_events(&self, _session_id: Option) -> EventStoreResult<()> { + tracing::warn!("prune_excess_events() is not implemented for the event store."); + Ok(()) + } + /// Counts the total number of events in the store. + /// + /// # Returns + /// - The number of events across all sessions and streams. + async fn count(&self) -> EventStoreResult; } diff --git a/crates/rust-mcp-transport/src/event_store/in_memory_event_store.rs b/crates/rust-mcp-transport/src/event_store/in_memory_event_store.rs index 66e738c..095a014 100644 --- a/crates/rust-mcp-transport/src/event_store/in_memory_event_store.rs +++ b/crates/rust-mcp-transport/src/event_store/in_memory_event_store.rs @@ -1,13 +1,13 @@ +use crate::event_store::EventStoreResult; +use crate::{ + event_store::{EventStore, EventStoreEntry}, + EventId, SessionId, StreamId, +}; use async_trait::async_trait; use std::collections::HashMap; use std::collections::VecDeque; use tokio::sync::RwLock; -use crate::{ - event_store::{EventStore, EventStoreMessages}, - EventId, SessionId, StreamId, -}; - const MAX_EVENTS_PER_SESSION: usize = 64; const ID_SEPARATOR: &str = "-.-"; @@ -101,16 +101,19 @@ impl InMemoryEventStore { /// ); /// assert_eq!(store.parse_event_id("invalid"), None); /// ``` - pub fn parse_event_id<'a>(&self, event_id: &'a str) -> Option<(&'a str, &'a str, &'a str)> { + pub fn parse_event_id<'a>( + &self, + event_id: &'a str, + ) -> EventStoreResult<(&'a str, &'a str, u128)> { // Check for empty input or invalid characters (e.g., NULL) if event_id.is_empty() || event_id.contains('\0') { - return None; + return Err("Event ID is empty!".into()); } // Split into exactly three parts let parts: Vec<&'a str> = event_id.split(ID_SEPARATOR).collect(); if parts.len() != 3 { - return None; + return Err("Invalid Event ID format.".into()); } let session_id = parts[0]; @@ -119,10 +122,14 @@ impl InMemoryEventStore { // Ensure no part is empty if session_id.is_empty() || stream_id.is_empty() || time_stamp.is_empty() { - return None; + return Err("Invalid Event ID format.".into()); } - Some((session_id, stream_id, time_stamp)) + let time_stamp: u128 = time_stamp + .parse() + .map_err(|err| format!("Error parsing timestamp: {err}"))?; + + Ok((session_id, stream_id, time_stamp)) } } @@ -147,7 +154,7 @@ impl EventStore for InMemoryEventStore { stream_id: StreamId, time_stamp: u128, message: String, - ) -> EventId { + ) -> EventStoreResult { let event_id = self.generate_event_id(&session_id, &stream_id, time_stamp); let mut storage_map = self.storage_map.write().await; @@ -172,7 +179,7 @@ impl EventStore for InMemoryEventStore { session_map.push_back(entry); - event_id + Ok(event_id) } /// Removes all events associated with a given stream ID within a specific session. @@ -184,7 +191,11 @@ impl EventStore for InMemoryEventStore { /// # Arguments /// - `session_id`: The session identifier to target. /// - `stream_id`: The stream identifier to remove. - async fn remove_stream_in_session(&self, session_id: SessionId, stream_id: StreamId) { + async fn remove_stream_in_session( + &self, + session_id: SessionId, + stream_id: StreamId, + ) -> EventStoreResult<()> { let mut storage_map = self.storage_map.write().await; // Check if session exists @@ -194,9 +205,10 @@ impl EventStore for InMemoryEventStore { // Remove session if empty if events.is_empty() { storage_map.remove(&session_id); - } + }; } // No action if session_id doesn’t exist (idempotent) + Ok(()) } /// Removes all events associated with a given session ID. @@ -205,9 +217,10 @@ impl EventStore for InMemoryEventStore { /// /// # Arguments /// - `session_id`: The session identifier to remove. - async fn remove_by_session_id(&self, session_id: SessionId) { + async fn remove_by_session_id(&self, session_id: SessionId) -> EventStoreResult<()> { let mut storage_map = self.storage_map.write().await; storage_map.remove(&session_id); + Ok(()) } /// Retrieves events after a given `event_id` for a specific session and stream. @@ -221,23 +234,20 @@ impl EventStore for InMemoryEventStore { /// - `last_event_id`: The event ID (format: `session-.-stream-.-timestamp`) to start after. /// /// # Returns - /// An `Option` containing `EventStoreMessages` with the session ID, stream ID, and sorted messages, + /// An `Option` containing `EventStoreEntry` with the session ID, stream ID, and sorted messages, /// or `None` if no events are found or the input is invalid. - async fn events_after(&self, last_event_id: EventId) -> Option { - let Some((session_id, stream_id, time_stamp)) = self.parse_event_id(&last_event_id) else { - tracing::warn!("error parsing last event id: '{last_event_id}'"); - return None; - }; + async fn events_after( + &self, + last_event_id: EventId, + ) -> EventStoreResult> { + let (session_id, stream_id, time_stamp) = self.parse_event_id(&last_event_id)?; let storage_map = self.storage_map.read().await; + + // fail silently if session id does not exists let Some(events) = storage_map.get(session_id) else { tracing::warn!("could not find the session_id in the store : '{session_id}'"); - return None; - }; - - let Ok(time_stamp) = time_stamp.parse::() else { - tracing::warn!("could not parse the timestamp: '{time_stamp}'"); - return None; + return Ok(None); }; let events = match events @@ -260,15 +270,21 @@ impl EventStore for InMemoryEventStore { tracing::trace!("{} messages after '{last_event_id}'", events.len()); - Some(EventStoreMessages { + Ok(Some(EventStoreEntry { session_id: session_id.to_string(), stream_id: stream_id.to_string(), messages: events, - }) + })) } - async fn clear(&self) { + async fn clear(&self) -> EventStoreResult<()> { let mut storage_map = self.storage_map.write().await; storage_map.clear(); + Ok(()) + } + + async fn count(&self) -> EventStoreResult { + let storage_map = self.storage_map.read().await; + Ok(storage_map.len()) } } diff --git a/crates/rust-mcp-transport/src/message_dispatcher.rs b/crates/rust-mcp-transport/src/message_dispatcher.rs index cd9727c..62c591f 100644 --- a/crates/rust-mcp-transport/src/message_dispatcher.rs +++ b/crates/rust-mcp-transport/src/message_dispatcher.rs @@ -412,16 +412,19 @@ impl McpDispatch self.stream_id.as_ref(), self.event_store.as_ref(), ) { - event_id = Some( - event_store - .store_event( - session_id.clone(), - stream_id.clone(), - current_timestamp(), - payload.to_owned(), - ) - .await, - ) + event_id = event_store + .store_event( + session_id.clone(), + stream_id.clone(), + current_timestamp(), + payload.to_owned(), + ) + .await + .map(Some) + .unwrap_or_else(|err| { + tracing::error!("{err}"); + None + }); }; } From 18b1e6f3e9671bfffa4bd59f64dc12fc2e44d818 Mon Sep 17 00:00:00 2001 From: Ali Hashemi <14126952+hashemix@users.noreply.github.com> Date: Sun, 19 Oct 2025 16:14:38 -0300 Subject: [PATCH 2/4] feat: add middleware support to mcp_http_handler (#112) * feat: generic middlewares * feat: extend mc_hddt_handler to support middlewares * chore: whitelist typos * chore: build issue --- Cargo.toml | 3 + .../src/id_generator/nano_id_generator.rs | 2 +- .../id_generator/snow_flake_id_generator.rs | 4 +- crates/rust-mcp-sdk/Cargo.toml | 3 + .../rust-mcp-sdk/src/hyper_servers/routes.rs | 14 +- .../hyper_servers/routes/messages_routes.rs | 7 +- .../src/hyper_servers/routes/sse_routes.rs | 6 +- .../routes/streamable_http_routes.rs | 10 +- .../rust-mcp-sdk/src/hyper_servers/server.rs | 6 +- crates/rust-mcp-sdk/src/mcp_http.rs | 3 + .../src/mcp_http/mcp_http_handler.rs | 117 ++++-- .../src/mcp_http/mcp_http_middleware.rs | 389 ++++++++++++++++++ .../src/mcp_http/mcp_http_utils.rs | 11 + 13 files changed, 526 insertions(+), 49 deletions(-) create mode 100644 crates/rust-mcp-sdk/src/mcp_http/mcp_http_middleware.rs diff --git a/Cargo.toml b/Cargo.toml index 26fb067..718c9a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -99,3 +99,6 @@ todo = "deny" [workspace.metadata.cargo-machete] ignored = ["bindgen", "cbindgen", "prost_build", "serde"] + +[workspace.metadata.typos] +default.extend-ignore-re = ["clonable"] diff --git a/crates/rust-mcp-extra/src/id_generator/nano_id_generator.rs b/crates/rust-mcp-extra/src/id_generator/nano_id_generator.rs index a50ec2b..1ad5697 100644 --- a/crates/rust-mcp-extra/src/id_generator/nano_id_generator.rs +++ b/crates/rust-mcp-extra/src/id_generator/nano_id_generator.rs @@ -64,7 +64,7 @@ mod tests { for _ in 0..1000 { let id: String = generator.generate(); - assert!(seen.insert(id.clone()), "Duplicate ID: {}", id); + assert!(seen.insert(id.clone()), "Duplicate ID: {id}"); } } } diff --git a/crates/rust-mcp-extra/src/id_generator/snow_flake_id_generator.rs b/crates/rust-mcp-extra/src/id_generator/snow_flake_id_generator.rs index 39942ec..5ab2cb8 100644 --- a/crates/rust-mcp-extra/src/id_generator/snow_flake_id_generator.rs +++ b/crates/rust-mcp-extra/src/id_generator/snow_flake_id_generator.rs @@ -133,9 +133,7 @@ mod tests { let current_id: u64 = id.parse().expect("ID should be a valid u64"); assert!( current_id > prev_id, - "ID not strictly increasing: {} <= {}", - current_id, - prev_id + "ID not strictly increasing: {current_id} <= {prev_id}" ); prev_id = current_id; } diff --git a/crates/rust-mcp-sdk/Cargo.toml b/crates/rust-mcp-sdk/Cargo.toml index 0ecc527..27f8d6f 100644 --- a/crates/rust-mcp-sdk/Cargo.toml +++ b/crates/rust-mcp-sdk/Cargo.toml @@ -109,3 +109,6 @@ macros = ["rust-mcp-macros/sdk"] [lints] workspace = true + +[package.metadata.typos] +default.extend-ignore-re = ["clonable"] diff --git a/crates/rust-mcp-sdk/src/hyper_servers/routes.rs b/crates/rust-mcp-sdk/src/hyper_servers/routes.rs index 4ae274b..fcaa290 100644 --- a/crates/rust-mcp-sdk/src/hyper_servers/routes.rs +++ b/crates/rust-mcp-sdk/src/hyper_servers/routes.rs @@ -4,10 +4,9 @@ pub mod messages_routes; pub mod sse_routes; pub mod streamable_http_routes; -use crate::mcp_http::McpAppState; - use super::HyperServerOptions; -use axum::Router; +use crate::mcp_http::{McpAppState, McpHttpHandler}; +use axum::{Extension, Router}; use std::sync::Arc; /// Constructs the Axum router with all application routes @@ -21,7 +20,11 @@ use std::sync::Arc; /// /// # Returns /// * `Router` - An Axum router configured with all application routes and state -pub fn app_routes(state: Arc, server_options: &HyperServerOptions) -> Router { +pub fn app_routes( + state: Arc, + server_options: &HyperServerOptions, + http_handler: McpHttpHandler, +) -> Router { let router: Router = Router::new() .merge(streamable_http_routes::routes( server_options.streamable_http_endpoint(), @@ -42,7 +45,8 @@ pub fn app_routes(state: Arc, server_options: &HyperServerOptions) r }) .with_state(state) - .merge(fallback_routes::routes()); + .merge(fallback_routes::routes()) + .layer(Extension(Arc::new(http_handler))); router } diff --git a/crates/rust-mcp-sdk/src/hyper_servers/routes/messages_routes.rs b/crates/rust-mcp-sdk/src/hyper_servers/routes/messages_routes.rs index 65490a3..cc85254 100644 --- a/crates/rust-mcp-sdk/src/hyper_servers/routes/messages_routes.rs +++ b/crates/rust-mcp-sdk/src/hyper_servers/routes/messages_routes.rs @@ -3,7 +3,7 @@ use crate::{ mcp_http::{McpAppState, McpHttpHandler}, utils::remove_query_and_hash, }; -use axum::{extract::State, response::IntoResponse, routing::post, Router}; +use axum::{extract::State, response::IntoResponse, routing::post, Extension, Router}; use http::{HeaderMap, Method, Uri}; use std::sync::Arc; @@ -18,10 +18,13 @@ pub async fn handle_messages( uri: Uri, headers: HeaderMap, State(state): State>, + Extension(http_handler): Extension>, message: String, ) -> TransportServerResult { let request = McpHttpHandler::create_request(Method::POST, uri, headers, Some(&message)); - let generic_response = McpHttpHandler::handle_sse_message(request, state.clone()).await?; + let generic_response = http_handler + .handle_sse_message(request, state.clone()) + .await?; let (parts, body) = generic_response.into_parts(); let resp = axum::response::Response::from_parts(parts, axum::body::Body::new(body)); Ok(resp) diff --git a/crates/rust-mcp-sdk/src/hyper_servers/routes/sse_routes.rs b/crates/rust-mcp-sdk/src/hyper_servers/routes/sse_routes.rs index e13c724..c85d81f 100644 --- a/crates/rust-mcp-sdk/src/hyper_servers/routes/sse_routes.rs +++ b/crates/rust-mcp-sdk/src/hyper_servers/routes/sse_routes.rs @@ -36,11 +36,13 @@ pub fn routes(sse_endpoint: &str, sse_message_endpoint: &str) -> Router` - The SSE response stream or an error pub async fn handle_sse( Extension(sse_message_endpoint): Extension, + Extension(http_handler): Extension>, State(state): State>, ) -> TransportServerResult { let SseMessageEndpoint(sse_message_endpoint) = sse_message_endpoint; - let generic_response = - McpHttpHandler::handle_sse_connection(state.clone(), Some(&sse_message_endpoint)).await?; + let generic_response = http_handler + .handle_sse_connection(state.clone(), Some(&sse_message_endpoint)) + .await?; let (parts, body) = generic_response.into_parts(); let resp = axum::response::Response::from_parts(parts, axum::body::Body::new(body)); Ok(resp) diff --git a/crates/rust-mcp-sdk/src/hyper_servers/routes/streamable_http_routes.rs b/crates/rust-mcp-sdk/src/hyper_servers/routes/streamable_http_routes.rs index 6f2e470..69287d4 100644 --- a/crates/rust-mcp-sdk/src/hyper_servers/routes/streamable_http_routes.rs +++ b/crates/rust-mcp-sdk/src/hyper_servers/routes/streamable_http_routes.rs @@ -1,6 +1,7 @@ use crate::hyper_servers::error::TransportServerResult; use crate::mcp_http::{McpAppState, McpHttpHandler}; use axum::routing::get; +use axum::Extension; use axum::{ extract::{Query, State}, response::IntoResponse, @@ -24,9 +25,10 @@ pub async fn handle_streamable_http_get( headers: HeaderMap, uri: Uri, State(state): State>, + Extension(http_handler): Extension>, ) -> TransportServerResult { let request = McpHttpHandler::create_request(Method::GET, uri, headers, None); - let generic_res = McpHttpHandler::handle_streamable_http(request, state).await?; + let generic_res = http_handler.handle_streamable_http(request, state).await?; let (parts, body) = generic_res.into_parts(); let resp = axum::response::Response::from_parts(parts, axum::body::Body::new(body)); Ok(resp) @@ -36,12 +38,13 @@ pub async fn handle_streamable_http_post( headers: HeaderMap, uri: Uri, State(state): State>, + Extension(http_handler): Extension>, Query(_params): Query>, payload: String, ) -> TransportServerResult { let request = McpHttpHandler::create_request(Method::POST, uri, headers, Some(payload.as_str())); - let generic_res = McpHttpHandler::handle_streamable_http(request, state).await?; + let generic_res = http_handler.handle_streamable_http(request, state).await?; let (parts, body) = generic_res.into_parts(); let resp = axum::response::Response::from_parts(parts, axum::body::Body::new(body)); Ok(resp) @@ -51,9 +54,10 @@ pub async fn handle_streamable_http_delete( headers: HeaderMap, uri: Uri, State(state): State>, + Extension(http_handler): Extension>, ) -> TransportServerResult { let request = McpHttpHandler::create_request(Method::DELETE, uri, headers, None); - let generic_res = McpHttpHandler::handle_streamable_http(request, state).await?; + let generic_res = http_handler.handle_streamable_http(request, state).await?; let (parts, body) = generic_res.into_parts(); let resp = axum::response::Response::from_parts(parts, axum::body::Body::new(body)); Ok(resp) diff --git a/crates/rust-mcp-sdk/src/hyper_servers/server.rs b/crates/rust-mcp-sdk/src/hyper_servers/server.rs index 4cd8eb6..f3e0983 100644 --- a/crates/rust-mcp-sdk/src/hyper_servers/server.rs +++ b/crates/rust-mcp-sdk/src/hyper_servers/server.rs @@ -5,7 +5,7 @@ use crate::{ utils::{ DEFAULT_MESSAGES_ENDPOINT, DEFAULT_SSE_ENDPOINT, DEFAULT_STREAMABLE_HTTP_ENDPOINT, }, - McpAppState, + McpAppState, McpHttpHandler, }, mcp_server::hyper_runtime::HyperRuntime, mcp_traits::{mcp_handler::McpServerHandler, IdGenerator}, @@ -275,7 +275,9 @@ impl HyperServer { dns_rebinding_protection: server_options.dns_rebinding_protection, event_store: server_options.event_store.as_ref().map(Arc::clone), }); - let app = app_routes(Arc::clone(&state), &server_options); + + let http_handler = McpHttpHandler::new(); //TODO: add auth handlers + let app = app_routes(Arc::clone(&state), &server_options, http_handler); Self { app, state, diff --git a/crates/rust-mcp-sdk/src/mcp_http.rs b/crates/rust-mcp-sdk/src/mcp_http.rs index 3f443d5..2e5d8fd 100644 --- a/crates/rust-mcp-sdk/src/mcp_http.rs +++ b/crates/rust-mcp-sdk/src/mcp_http.rs @@ -2,8 +2,11 @@ mod app_state; mod mcp_http_handler; pub(crate) mod mcp_http_utils; +mod mcp_http_middleware; //TODO: + pub use app_state::*; pub use mcp_http_handler::*; +pub use mcp_http_middleware::Middleware; pub(crate) mod utils { pub use super::mcp_http_utils::*; diff --git a/crates/rust-mcp-sdk/src/mcp_http/mcp_http_handler.rs b/crates/rust-mcp-sdk/src/mcp_http/mcp_http_handler.rs index 8b7efcf..c60b4dc 100644 --- a/crates/rust-mcp-sdk/src/mcp_http/mcp_http_handler.rs +++ b/crates/rust-mcp-sdk/src/mcp_http/mcp_http_handler.rs @@ -1,8 +1,11 @@ #[cfg(feature = "sse")] use super::utils::handle_sse_connection; +use crate::mcp_http::mcp_http_middleware::MiddlewareChain; use crate::mcp_http::utils::{ - accepts_event_stream, error_response, query_param, validate_mcp_protocol_version_header, + accepts_event_stream, empty_response, error_response, query_param, + validate_mcp_protocol_version_header, }; +use crate::mcp_http::Middleware; use crate::mcp_runtimes::server_runtime::DEFAULT_STREAM_ID; use crate::mcp_server::error::TransportServerError; use crate::schema::schema_utils::SdkError; @@ -19,26 +22,32 @@ use crate::{ mcp_server::error::TransportServerResult, utils::valid_initialize_method, }; -use bytes::Bytes; use http::{self, HeaderMap, Method, StatusCode, Uri}; -use http_body_util::{BodyExt, Full}; use rust_mcp_transport::{SessionId, MCP_LAST_EVENT_ID_HEADER, MCP_SESSION_ID_HEADER}; use std::sync::Arc; -pub struct McpHttpHandler {} +#[derive(Clone)] +pub struct McpHttpHandler { + middleware_chain: MiddlewareChain, +} + +impl Default for McpHttpHandler { + fn default() -> Self { + Self::new() + } +} impl McpHttpHandler { - /// Creates a new HTTP request with the given method, URI, headers, and optional body. - /// - /// # Arguments - /// - /// * `method` - The HTTP method to use (e.g., GET, POST). - /// * `uri` - The target URI for the request. - /// * `headers` - A map of optional header keys and their corresponding values. - /// * `body` - An optional string slice representing the request body. - /// - /// # Returns - /// + pub fn new() -> Self { + McpHttpHandler { + middleware_chain: MiddlewareChain::new(), + } + } + + pub fn add_middleware(&mut self, middleware: M) { + self.middleware_chain.add_middleware(middleware); + } + /// An `http::Request<&str>` initialized with the specified method, URI, headers, and body. /// If the `body` is `None`, an empty string is used as the default. /// @@ -77,6 +86,7 @@ impl McpHttpHandler { /// This function is only available when the `sse` feature is enabled. #[cfg(feature = "sse")] pub async fn handle_sse_connection( + &self, state: Arc, sse_message_endpoint: Option<&str>, ) -> TransportServerResult> { @@ -104,6 +114,7 @@ impl McpHttpHandler { /// - `StreamIoError`: if an error occurs while writing to the stream. /// - `HttpError`: if constructing the HTTP response fails. pub async fn handle_sse_message( + &self, request: http::Request<&str>, state: Arc, ) -> TransportServerResult> { @@ -124,13 +135,9 @@ impl McpHttpHandler { TransportServerError::StreamIoError(err.to_string()) })?; - let body = Full::new(Bytes::new()) - .map_err(|err| TransportServerError::HttpError(err.to_string())) - .boxed(); - http::Response::builder() .status(StatusCode::ACCEPTED) - .body(body) + .body(empty_response()) .map_err(|err| TransportServerError::HttpError(err.to_string())) } @@ -156,9 +163,16 @@ impl McpHttpHandler { /// * A `TransportServerResult` wrapping an HTTP response indicating success or failure of the operation. /// pub async fn handle_streamable_http( + &self, request: http::Request<&str>, state: Arc, ) -> TransportServerResult> { + let request = self + .middleware_chain + .process_request(request) + .await + .map_err(|e| TransportServerError::HttpError(e.to_string()))?; + // Enforces DNS rebinding protection if required by state. // If protection fails, respond with HTTP 403 Forbidden. if state.needs_dns_protection() { @@ -168,24 +182,36 @@ impl McpHttpHandler { } let method = request.method(); - match method { - &http::Method::GET => return Self::handle_http_get(request, state).await, - &http::Method::POST => return Self::handle_http_post(request, state).await, - &http::Method::DELETE => return Self::handle_http_delete(request, state).await, + let response = match method { + &http::Method::GET => return self.handle_http_get(request, state).await, + &http::Method::POST => return self.handle_http_post(request, state).await, + &http::Method::DELETE => return self.handle_http_delete(request, state).await, other => { let error = SdkError::bad_request().with_message(&format!( "'{other}' is not a valid HTTP method for StreamableHTTP transport." )); error_response(StatusCode::METHOD_NOT_ALLOWED, error) } - } + }; + + self.middleware_chain + .process_response(response?) + .await + .map_err(|e| TransportServerError::HttpError(e.to_string())) } /// Processes POST requests for the Streamable HTTP Protocol async fn handle_http_post( + &self, request: http::Request<&str>, state: Arc, ) -> TransportServerResult> { + let request = self + .middleware_chain + .process_request(request) + .await + .map_err(|e| TransportServerError::HttpError(e.to_string()))?; + let headers = request.headers(); if !valid_streaming_http_accept_header(headers) { @@ -213,7 +239,7 @@ impl McpHttpHandler { let payload = *request.body(); - match session_id { + let response = match session_id { // has session-id => write to the existing stream Some(id) => { if state.enable_json_response { @@ -232,14 +258,26 @@ impl McpHttpHandler { error_response(StatusCode::BAD_REQUEST, error) } }, - } + }; + + self.middleware_chain + .process_response(response?) + .await + .map_err(|e| TransportServerError::HttpError(e.to_string())) } /// Processes GET requests for the Streamable HTTP Protocol async fn handle_http_get( + &self, request: http::Request<&str>, state: Arc, ) -> TransportServerResult> { + let request = self + .middleware_chain + .process_request(request) + .await + .map_err(|e| TransportServerError::HttpError(e.to_string()))?; + let headers = request.headers(); if !accepts_event_stream(headers) { @@ -264,7 +302,7 @@ impl McpHttpHandler { .and_then(|value| value.to_str().ok()) .map(|s| s.to_string()); - match session_id { + let response = match session_id { Some(session_id) => { let res = create_standalone_stream(session_id, last_event_id, state).await; res @@ -273,14 +311,26 @@ impl McpHttpHandler { let error = SdkError::bad_request().with_message("Bad request: session not found"); error_response(StatusCode::BAD_REQUEST, error) } - } + }; + + self.middleware_chain + .process_response(response?) + .await + .map_err(|e| TransportServerError::HttpError(e.to_string())) } /// Processes DELETE requests for the Streamable HTTP Protocol async fn handle_http_delete( + &self, request: http::Request<&str>, state: Arc, ) -> TransportServerResult> { + let request = self + .middleware_chain + .process_request(request) + .await + .map_err(|e| TransportServerError::HttpError(e.to_string()))?; + let headers = request.headers(); if let Err(parse_error) = validate_mcp_protocol_version_header(headers) { @@ -294,12 +344,17 @@ impl McpHttpHandler { .and_then(|value| value.to_str().ok()) .map(|s| s.to_string()); - match session_id { + let response = match session_id { Some(id) => delete_session(id, state).await, None => { let error = SdkError::bad_request().with_message("Bad Request: Session not found"); error_response(StatusCode::BAD_REQUEST, error) } - } + }; + + self.middleware_chain + .process_response(response?) + .await + .map_err(|e| TransportServerError::HttpError(e.to_string())) } } diff --git a/crates/rust-mcp-sdk/src/mcp_http/mcp_http_middleware.rs b/crates/rust-mcp-sdk/src/mcp_http/mcp_http_middleware.rs new file mode 100644 index 0000000..22027d7 --- /dev/null +++ b/crates/rust-mcp-sdk/src/mcp_http/mcp_http_middleware.rs @@ -0,0 +1,389 @@ +use crate::mcp_http::utils::GenericBody; +use crate::mcp_server::error::TransportServerResult; +use http::{Request, Response}; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; + +/// Defines a middleware trait for processing HTTP requests and responses. +/// +/// Implementors of this trait can define custom logic to modify or inspect HTTP +/// requests before they reach the handler and HTTP responses before they are sent +/// back to the client. Middleware must be thread-safe (`Send + Sync`) and have a +/// static lifetime. +pub trait Middleware: Send + Sync + 'static { + /// Processes an incoming HTTP request. + /// + /// This method takes a request, applies middleware-specific logic, and returns + /// a future that resolves to a `TransportServerResult` containing the modified + /// request or an error. + /// + /// # Arguments + /// * `request` - The incoming HTTP request with a string body reference. + /// + /// # Returns + /// A pinned boxed future resolving to a `TransportServerResult` containing the + /// processed request. + fn process_request<'a, 'b>( + &'a self, + request: Request<&'b str>, + ) -> Pin>> + Send + 'a>> + where + 'b: 'a; // Ensure the request's lifetime outlives the future + + /// Processes an outgoing HTTP response. + /// + /// This method takes a response, applies middleware-specific logic, and returns + /// a future that resolves to a `TransportServerResult` containing the modified + /// response or an error. + /// + /// # Arguments + /// * `response` - The HTTP response with a `GenericBody`. + /// + /// # Returns + /// A pinned boxed future resolving to a `TransportServerResult` containing the + /// processed response. + fn process_response<'a, 'b>( + &'a self, + response: Response, + ) -> Pin>> + Send + 'a>> + where + 'b: 'a; // Optional, included for consistency +} + +/// A chain of middleware to process HTTP requests and responses sequentially. +/// +/// `MiddlewareChain` allows multiple middleware instances to be registered and +/// executed in order for requests (forward order) and responses (reverse order). +#[derive(Clone)] +pub struct MiddlewareChain { + middlewares: Vec>, +} + +impl MiddlewareChain { + /// Creates a new, empty middleware chain. + /// + /// # Returns + /// A new `MiddlewareChain` instance with no middleware registered. + pub fn new() -> Self { + MiddlewareChain { + middlewares: Vec::new(), + } + } + + /// Adds a middleware to the chain. + /// + /// The middleware is wrapped in an `Arc` to ensure thread-safety and shared + /// ownership. Middleware will be executed in the order they are added for + /// requests and in reverse order for responses. + /// + /// # Arguments + /// * `middleware` - The middleware to add to the chain. + pub fn add_middleware(&mut self, middleware: M) { + self.middlewares.push(Arc::new(middleware)); + } + + /// Processes an HTTP request through all registered middleware. + /// + /// Each middleware's `process_request` method is called in the order they + /// were added. If any middleware returns an error, processing stops and the + /// error is returned. + /// + /// # Arguments + /// * `request` - The HTTP request to process. + /// + /// # Returns + /// A `TransportServerResult` containing the processed request or an error. + pub async fn process_request<'a>( + &self, + request: http::Request<&'a str>, + ) -> TransportServerResult> { + let mut request = request; + for middleware in &self.middlewares { + request = middleware.process_request(request).await?; + } + Ok(request) + } + + /// Processes an HTTP response through all registered middleware. + /// + /// Each middleware's `process_response` method is called in the reverse order + /// of their addition. If any middleware returns an error, processing stops and + /// the error is returned. + /// + /// # Arguments + /// * `response` - The HTTP response to process. + /// + /// # Returns + /// A `TransportServerResult` containing the processed response or an error. + pub async fn process_response( + &self, + response: http::Response, + ) -> TransportServerResult> { + let mut response = response; + for middleware in self.middlewares.iter().rev() { + response = middleware.process_response(response).await?; + } + Ok(response) + } +} + +// Sample Middleware +pub struct LoggingMiddleware; + +impl Middleware for LoggingMiddleware { + fn process_request<'a, 'b>( + &'a self, + request: http::Request<&'b str>, + ) -> Pin>> + Send + 'a>> + where + 'b: 'a, + { + Box::pin(async move { + tracing::info!("Request: {} {}", request.method(), request.uri()); + Ok(request) + }) + } + + fn process_response<'a, 'b>( + &'a self, + response: http::Response, + ) -> Pin>> + Send + 'a>> + where + 'b: 'a, + { + Box::pin(async move { + tracing::info!("Response: {}", response.status()); + Ok(response) + }) + } +} + +#[cfg(test)] +mod tests { + use crate::{mcp_http::utils::empty_response, mcp_server::error::TransportServerError}; + + use super::*; + use async_trait::async_trait; + use bytes::Bytes; + use http::{Request, Response}; + use http_body_util::{BodyExt, Full}; + use std::sync::Mutex; + use thiserror::Error; + + /// Custom error type for test middleware. + #[derive(Error, Debug)] + enum TestMiddlewareError { + #[error("Request processing failed: {0}")] + RequestError(String), + #[error("Response processing failed: {0}")] + ResponseError(String), + } + + /// A test middleware that records its interactions with requests and responses. + struct TestMiddleware { + /// Tracks request calls with their input bodies. + request_calls: Arc>>, + /// Tracks response calls with their status codes. + response_calls: Arc>>, + /// Optional error to simulate failure in request processing. + request_error: Option, + /// Optional error to simulate failure in response processing. + response_error: Option, + } + + impl TestMiddleware { + fn new() -> Self { + TestMiddleware { + request_calls: Arc::new(Mutex::new(Vec::new())), + response_calls: Arc::new(Mutex::new(Vec::new())), + request_error: None, + response_error: None, + } + } + + fn with_errors(request_error: Option, response_error: Option) -> Self { + TestMiddleware { + request_calls: Arc::new(Mutex::new(Vec::new())), + response_calls: Arc::new(Mutex::new(Vec::new())), + request_error, + response_error, + } + } + } + + #[async_trait] + impl Middleware for TestMiddleware { + fn process_request<'a, 'b>( + &'a self, + request: Request<&'b str>, + ) -> Pin>> + Send + 'a>> + where + 'b: 'a, + { + Box::pin(async move { + if let Some(err) = &self.request_error { + return Err(TransportServerError::HttpError(err.to_string())); + } + self.request_calls + .lock() + .unwrap() + .push(request.body().to_string()); + Ok(request) + }) + } + + fn process_response<'a, 'b>( + &'a self, + response: Response, + ) -> Pin>> + Send + 'a>> + where + 'b: 'a, + { + Box::pin(async move { + if let Some(err) = &self.response_error { + return Err(TransportServerError::HttpError(err.to_string())); + } + self.response_calls + .lock() + .unwrap() + .push(response.status().as_u16()); + Ok(response) + }) + } + } + + #[tokio::test] + async fn test_empty_middleware_chain() { + let chain = MiddlewareChain::new(); + let request = Request::builder().body("test").unwrap(); + + let response = Response::builder() + .status(200) + .body(empty_response()) + .unwrap(); + + let result_request = chain.process_request(request).await.unwrap(); + let result_response = chain.process_response(response).await.unwrap(); + + assert_eq!(result_request.body().to_ascii_lowercase(), "test"); + assert_eq!(result_response.status(), 200); + } + + #[tokio::test] + async fn test_single_middleware() { + let mut chain = MiddlewareChain::new(); + let middleware = TestMiddleware::new(); + let request_calls = middleware.request_calls.clone(); + let response_calls = middleware.response_calls.clone(); + + chain.add_middleware(middleware); + + let request = Request::builder().body("test").unwrap(); + let response = Response::builder() + .status(200) + .body(empty_response()) + .unwrap(); + + let result_request = chain.process_request(request).await.unwrap(); + let result_response = chain.process_response(response).await.unwrap(); + + assert_eq!(result_request.body().to_ascii_lowercase(), "test"); + assert_eq!(result_response.status(), 200); + assert_eq!(request_calls.lock().unwrap().as_slice(), &["test"]); + assert_eq!(response_calls.lock().unwrap().as_slice(), &[200]); + } + + #[tokio::test] + async fn test_multiple_middlewares_request_order() { + let mut chain = MiddlewareChain::new(); + let middleware1 = TestMiddleware::new(); + let middleware2 = TestMiddleware::new(); + let request_calls1 = middleware1.request_calls.clone(); + let request_calls2 = middleware2.request_calls.clone(); + + chain.add_middleware(middleware1); + chain.add_middleware(middleware2); + + let request = Request::builder().body("test").unwrap(); + + let result = chain.process_request(request).await.unwrap(); + assert_eq!(result.body().to_ascii_lowercase(), "test"); + + // Check order of execution + assert_eq!(request_calls1.lock().unwrap().as_slice(), &["test"]); + assert_eq!(request_calls2.lock().unwrap().as_slice(), &["test"]); + } + + #[tokio::test] + async fn test_multiple_middlewares_response_reverse_order() { + let mut chain = MiddlewareChain::new(); + let middleware1 = TestMiddleware::new(); + let middleware2 = TestMiddleware::new(); + let response_calls1 = middleware1.response_calls.clone(); + let response_calls2 = middleware2.response_calls.clone(); + + chain.add_middleware(middleware1); + chain.add_middleware(middleware2); + + let response = Response::builder() + .status(200) + .body(empty_response()) + .unwrap(); + + let result = chain.process_response(response).await.unwrap(); + assert_eq!(result.status(), 200); + + // Check reverse order of execution + assert_eq!(response_calls2.lock().unwrap().as_slice(), &[200]); + assert_eq!(response_calls1.lock().unwrap().as_slice(), &[200]); + } + + #[tokio::test] + async fn test_middleware_request_error() { + let mut chain = MiddlewareChain::new(); + let middleware = TestMiddleware::with_errors(Some("request error".to_string()), None); + chain.add_middleware(middleware); + + let request = Request::builder().body("test").unwrap(); + + let result = chain.process_request(request).await; + assert!(result.is_err()); + assert_eq!(result.unwrap_err().to_string(), "request error"); + } + + #[tokio::test] + async fn test_middleware_response_error() { + let mut chain = MiddlewareChain::new(); + let middleware = TestMiddleware::with_errors(None, Some("response error".to_string())); + chain.add_middleware(middleware); + + let response = Response::builder() + .status(200) + .body(empty_response()) + .unwrap(); + + let result = chain.process_response(response).await; + assert!(result.is_err()); + assert_eq!(result.unwrap_err().to_string(), "response error"); + } + + #[tokio::test] + async fn test_middleware_chain_clone() { + let mut chain = MiddlewareChain::new(); + let middleware = TestMiddleware::new(); + let request_calls = middleware.request_calls.clone(); + + chain.add_middleware(middleware); + let chain_clone = chain.clone(); + + let request = Request::builder().body("test").unwrap(); + + // Process on original and clone + chain.process_request(request.clone()).await.unwrap(); + chain_clone.process_request(request).await.unwrap(); + + // Both should have processed the request + assert_eq!(request_calls.lock().unwrap().as_slice(), &["test", "test"]); + } +} diff --git a/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs b/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs index 6d003b9..06020d1 100644 --- a/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs +++ b/crates/rust-mcp-sdk/src/mcp_http/mcp_http_utils.rs @@ -34,6 +34,17 @@ const DUPLEX_BUFFER_SIZE: usize = 8192; pub type GenericBody = BoxBody; +/// Creates an empty HTTP response body. +/// +/// This function constructs a `GenericBody` containing an empty `Bytes` buffer, +/// The body is wrapped in a `BoxBody` to ensure type erasure and compatibility +/// with the HTTP framework. +pub fn empty_response() -> GenericBody { + Full::new(Bytes::new()) + .map_err(|err| TransportServerError::HttpError(err.to_string())) + .boxed() +} + /// Creates an initial SSE event that returns the messages endpoint /// /// Constructs an SSE event containing the messages endpoint URL with the session ID. From 84a635ee445a08c08f4858d7663f2a26a0c79751 Mon Sep 17 00:00:00 2001 From: Ali Hashemi <14126952+hashemix@users.noreply.github.com> Date: Mon, 20 Oct 2025 18:51:15 -0300 Subject: [PATCH 3/4] fix: mcp client stderr handling (#113) --- crates/rust-mcp-transport/src/stdio.rs | 18 ++++-------------- 1 file changed, 4 insertions(+), 14 deletions(-) diff --git a/crates/rust-mcp-transport/src/stdio.rs b/crates/rust-mcp-transport/src/stdio.rs index 7678c65..d4c766a 100644 --- a/crates/rust-mcp-transport/src/stdio.rs +++ b/crates/rust-mcp-transport/src/stdio.rs @@ -6,7 +6,6 @@ use crate::schema::RequestId; use async_trait::async_trait; use serde::de::DeserializeOwned; use std::collections::HashMap; -use std::pin::Pin; use std::sync::Arc; use std::time::Duration; use tokio::process::Command; @@ -136,12 +135,9 @@ where *lock = Some(sender); } - pub(crate) async fn set_error_stream( - &self, - error_stream: Pin>, - ) { + pub(crate) async fn set_error_stream(&self, error_stream: IoStream) { let mut lock = self.error_stream.write().await; - *lock = Some(IoStream::Writable(error_stream)); + *lock = Some(error_stream); } } @@ -230,10 +226,7 @@ where ); self.set_message_sender(sender).await; - - if let IoStream::Writable(error_stream) = error_stream { - self.set_error_stream(error_stream).await; - } + self.set_error_stream(error_stream).await; Ok(stream) } else { @@ -247,10 +240,7 @@ where ); self.set_message_sender(sender).await; - - if let IoStream::Writable(error_stream) = error_stream { - self.set_error_stream(error_stream).await; - } + self.set_error_stream(error_stream).await; Ok(stream) } } From 2d4906cb9c758e27d615e23e0544fd0d6b7de761 Mon Sep 17 00:00:00 2001 From: Ali Hashemi <14126952+hashemix@users.noreply.github.com> Date: Mon, 20 Oct 2025 19:01:53 -0300 Subject: [PATCH 4/4] chore: release main (#110) * chore: release main * chore: update Cargo.toml for release --------- Co-authored-by: github-actions[bot] --- .release-manifest.json | 26 +++++++++---------- Cargo.lock | 26 +++++++++---------- Cargo.toml | 2 +- crates/rust-mcp-extra/CHANGELOG.md | 7 +++++ crates/rust-mcp-extra/Cargo.toml | 4 +-- crates/rust-mcp-sdk/CHANGELOG.md | 12 +++++++++ crates/rust-mcp-sdk/Cargo.toml | 2 +- crates/rust-mcp-transport/CHANGELOG.md | 12 +++++++++ crates/rust-mcp-transport/Cargo.toml | 2 +- .../Cargo.toml | 2 +- .../hello-world-mcp-server-stdio/Cargo.toml | 2 +- .../Cargo.toml | 2 +- .../Cargo.toml | 2 +- .../simple-mcp-client-sse-core/Cargo.toml | 2 +- examples/simple-mcp-client-sse/Cargo.toml | 2 +- .../simple-mcp-client-stdio-core/Cargo.toml | 2 +- examples/simple-mcp-client-stdio/Cargo.toml | 2 +- .../Cargo.toml | 2 +- .../Cargo.toml | 2 +- 19 files changed, 72 insertions(+), 41 deletions(-) diff --git a/.release-manifest.json b/.release-manifest.json index c030e49..aa27863 100644 --- a/.release-manifest.json +++ b/.release-manifest.json @@ -1,16 +1,16 @@ { - "crates/rust-mcp-sdk": "0.7.1", + "crates/rust-mcp-sdk": "0.7.2", "crates/rust-mcp-macros": "0.5.2", - "crates/rust-mcp-transport": "0.6.1", - "crates/rust-mcp-extra": "0.1.1", - "examples/hello-world-mcp-server-stdio": "0.1.30", - "examples/hello-world-mcp-server-stdio-core": "0.1.21", - "examples/simple-mcp-client-stdio": "0.1.30", - "examples/simple-mcp-client-stdio-core": "0.1.30", - "examples/hello-world-server-streamable-http-core": "0.1.21", - "examples/hello-world-server-streamable-http": "0.1.33", - "examples/simple-mcp-client-sse-core": "0.1.21", - "examples/simple-mcp-client-sse": "0.1.24", - "examples/simple-mcp-client-streamable-http": "0.1.2", - "examples/simple-mcp-client-streamable-http-core": "0.1.2" + "crates/rust-mcp-transport": "0.6.2", + "crates/rust-mcp-extra": "0.1.2", + "examples/hello-world-mcp-server-stdio": "0.1.31", + "examples/hello-world-mcp-server-stdio-core": "0.1.22", + "examples/simple-mcp-client-stdio": "0.1.31", + "examples/simple-mcp-client-stdio-core": "0.1.31", + "examples/hello-world-server-streamable-http-core": "0.1.22", + "examples/hello-world-server-streamable-http": "0.1.34", + "examples/simple-mcp-client-sse-core": "0.1.22", + "examples/simple-mcp-client-sse": "0.1.25", + "examples/simple-mcp-client-streamable-http": "0.1.3", + "examples/simple-mcp-client-streamable-http-core": "0.1.3" } diff --git a/Cargo.lock b/Cargo.lock index 6fc3e9d..ed55333 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -681,7 +681,7 @@ checksum = "5419bdc4f6a9207fbeba6d11b604d481addf78ecd10c11ad51e76c2f6482748d" [[package]] name = "hello-world-mcp-server-stdio" -version = "0.1.30" +version = "0.1.31" dependencies = [ "async-trait", "futures", @@ -695,7 +695,7 @@ dependencies = [ [[package]] name = "hello-world-mcp-server-stdio-core" -version = "0.1.21" +version = "0.1.22" dependencies = [ "async-trait", "futures", @@ -707,7 +707,7 @@ dependencies = [ [[package]] name = "hello-world-server-streamable-http" -version = "0.1.33" +version = "0.1.34" dependencies = [ "async-trait", "futures", @@ -721,7 +721,7 @@ dependencies = [ [[package]] name = "hello-world-server-streamable-http-core" -version = "0.1.21" +version = "0.1.22" dependencies = [ "async-trait", "futures", @@ -1693,7 +1693,7 @@ dependencies = [ [[package]] name = "rust-mcp-extra" -version = "0.1.1" +version = "0.1.2" dependencies = [ "base64 0.22.1", "nanoid", @@ -1727,7 +1727,7 @@ dependencies = [ [[package]] name = "rust-mcp-sdk" -version = "0.7.1" +version = "0.7.2" dependencies = [ "async-trait", "axum", @@ -1756,7 +1756,7 @@ dependencies = [ [[package]] name = "rust-mcp-transport" -version = "0.6.1" +version = "0.6.2" dependencies = [ "async-trait", "bytes", @@ -1951,7 +1951,7 @@ dependencies = [ [[package]] name = "simple-mcp-client-sse" -version = "0.1.24" +version = "0.1.25" dependencies = [ "async-trait", "colored", @@ -1967,7 +1967,7 @@ dependencies = [ [[package]] name = "simple-mcp-client-sse-core" -version = "0.1.21" +version = "0.1.22" dependencies = [ "async-trait", "colored", @@ -1983,7 +1983,7 @@ dependencies = [ [[package]] name = "simple-mcp-client-stdio" -version = "0.1.30" +version = "0.1.31" dependencies = [ "async-trait", "colored", @@ -1997,7 +1997,7 @@ dependencies = [ [[package]] name = "simple-mcp-client-stdio-core" -version = "0.1.30" +version = "0.1.31" dependencies = [ "async-trait", "colored", @@ -2011,7 +2011,7 @@ dependencies = [ [[package]] name = "simple-mcp-client-streamable-http" -version = "0.1.2" +version = "0.1.3" dependencies = [ "async-trait", "colored", @@ -2027,7 +2027,7 @@ dependencies = [ [[package]] name = "simple-mcp-client-streamable-http-core" -version = "0.1.2" +version = "0.1.3" dependencies = [ "async-trait", "colored", diff --git a/Cargo.toml b/Cargo.toml index 718c9a4..1b3f9b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,7 +20,7 @@ members = [ [workspace.dependencies] # Workspace member crates -rust-mcp-transport = { version = "0.6.1", path = "crates/rust-mcp-transport", default-features = false } +rust-mcp-transport = { version = "0.6.2", path = "crates/rust-mcp-transport", default-features = false } rust-mcp-sdk = { path = "crates/rust-mcp-sdk", default-features = false } rust-mcp-macros = { version = "0.5.2", path = "crates/rust-mcp-macros", default-features = false } rust-mcp-extra = { version="0.1.0", path = "crates/rust-mcp-extra", default-features = false } diff --git a/crates/rust-mcp-extra/CHANGELOG.md b/crates/rust-mcp-extra/CHANGELOG.md index 7fd2cb3..117f919 100644 --- a/crates/rust-mcp-extra/CHANGELOG.md +++ b/crates/rust-mcp-extra/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## [0.1.2](https://github.com/rust-mcp-stack/rust-mcp-sdk/compare/rust-mcp-extra-v0.1.1...rust-mcp-extra-v0.1.2) (2025-10-20) + + +### πŸš€ Features + +* Add middleware support to mcp_http_handler ([#112](https://github.com/rust-mcp-stack/rust-mcp-sdk/issues/112)) ([18b1e6f](https://github.com/rust-mcp-stack/rust-mcp-sdk/commit/18b1e6f3e9671bfffa4bd59f64dc12fc2e44d818)) + ## [0.1.1](https://github.com/rust-mcp-stack/rust-mcp-sdk/compare/rust-mcp-extra-v0.1.0...rust-mcp-extra-v0.1.1) (2025-10-13) diff --git a/crates/rust-mcp-extra/Cargo.toml b/crates/rust-mcp-extra/Cargo.toml index 982da64..dba3c26 100644 --- a/crates/rust-mcp-extra/Cargo.toml +++ b/crates/rust-mcp-extra/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rust-mcp-extra" -version = "0.1.1" +version = "0.1.2" authors = ["Ali Hashemi"] categories = ["api-bindings", "development-tools", "asynchronous", "parsing"] description = "A companion crate to rust-mcp-sdk offering extra implementations of core traits like SessionStore and EventStore, enabling integration with various database backends and third-party platforms such as AWS Lambda for serverless and cloud-native MCP applications." @@ -12,7 +12,7 @@ edition = "2024" exclude = ["assets/", "tests/"] [dependencies] -rust-mcp-sdk = { version = "0.7.1" , path = "../rust-mcp-sdk", default-features = false, features=["server","2025_06_18"] } +rust-mcp-sdk = { version = "0.7.2" , path = "../rust-mcp-sdk", default-features = false, features=["server","2025_06_18"] } base64 = {workspace = true, optional=true} nanoid = {version="0.4", optional=true} diff --git a/crates/rust-mcp-sdk/CHANGELOG.md b/crates/rust-mcp-sdk/CHANGELOG.md index 7d56e22..f5d8329 100644 --- a/crates/rust-mcp-sdk/CHANGELOG.md +++ b/crates/rust-mcp-sdk/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## [0.7.2](https://github.com/rust-mcp-stack/rust-mcp-sdk/compare/rust-mcp-sdk-v0.7.1...rust-mcp-sdk-v0.7.2) (2025-10-20) + + +### πŸš€ Features + +* Add middleware support to mcp_http_handler ([#112](https://github.com/rust-mcp-stack/rust-mcp-sdk/issues/112)) ([18b1e6f](https://github.com/rust-mcp-stack/rust-mcp-sdk/commit/18b1e6f3e9671bfffa4bd59f64dc12fc2e44d818)) + + +### 🚜 Code Refactoring + +* Eventstore with better error handling and stability ([#109](https://github.com/rust-mcp-stack/rust-mcp-sdk/issues/109)) ([150e3a0](https://github.com/rust-mcp-stack/rust-mcp-sdk/commit/150e3a02ba593b2e41b16d2d621e770d292cfa23)) + ## [0.7.1](https://github.com/rust-mcp-stack/rust-mcp-sdk/compare/rust-mcp-sdk-v0.7.0...rust-mcp-sdk-v0.7.1) (2025-10-13) diff --git a/crates/rust-mcp-sdk/Cargo.toml b/crates/rust-mcp-sdk/Cargo.toml index 27f8d6f..5b53148 100644 --- a/crates/rust-mcp-sdk/Cargo.toml +++ b/crates/rust-mcp-sdk/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rust-mcp-sdk" -version = "0.7.1" +version = "0.7.2" authors = ["Ali Hashemi"] categories = ["data-structures", "parser-implementations", "parsing"] description = "An asynchronous SDK and framework for building MCP-Servers and MCP-Clients, leveraging the rust-mcp-schema for type safe MCP Schema Objects." diff --git a/crates/rust-mcp-transport/CHANGELOG.md b/crates/rust-mcp-transport/CHANGELOG.md index d3170b9..a50b8cd 100644 --- a/crates/rust-mcp-transport/CHANGELOG.md +++ b/crates/rust-mcp-transport/CHANGELOG.md @@ -1,5 +1,17 @@ # Changelog +## [0.6.2](https://github.com/rust-mcp-stack/rust-mcp-sdk/compare/rust-mcp-transport-v0.6.1...rust-mcp-transport-v0.6.2) (2025-10-20) + + +### πŸ› Bug Fixes + +* Mcp client stderr handling ([#113](https://github.com/rust-mcp-stack/rust-mcp-sdk/issues/113)) ([84a635e](https://github.com/rust-mcp-stack/rust-mcp-sdk/commit/84a635ee445a08c08f4858d7663f2a26a0c79751)) + + +### 🚜 Code Refactoring + +* Eventstore with better error handling and stability ([#109](https://github.com/rust-mcp-stack/rust-mcp-sdk/issues/109)) ([150e3a0](https://github.com/rust-mcp-stack/rust-mcp-sdk/commit/150e3a02ba593b2e41b16d2d621e770d292cfa23)) + ## [0.6.1](https://github.com/rust-mcp-stack/rust-mcp-sdk/compare/rust-mcp-transport-v0.6.0...rust-mcp-transport-v0.6.1) (2025-10-13) diff --git a/crates/rust-mcp-transport/Cargo.toml b/crates/rust-mcp-transport/Cargo.toml index e9605fc..31c8d40 100644 --- a/crates/rust-mcp-transport/Cargo.toml +++ b/crates/rust-mcp-transport/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rust-mcp-transport" -version = "0.6.1" +version = "0.6.2" authors = ["Ali Hashemi"] categories = ["data-structures"] description = "Transport implementations for the MCP (Model Context Protocol) within the rust-mcp-sdk ecosystem, enabling asynchronous data exchange and efficient message handling between MCP clients and servers." diff --git a/examples/hello-world-mcp-server-stdio-core/Cargo.toml b/examples/hello-world-mcp-server-stdio-core/Cargo.toml index 5166763..915896c 100644 --- a/examples/hello-world-mcp-server-stdio-core/Cargo.toml +++ b/examples/hello-world-mcp-server-stdio-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hello-world-mcp-server-stdio-core" -version = "0.1.21" +version = "0.1.22" edition = "2021" publish = false license = "MIT" diff --git a/examples/hello-world-mcp-server-stdio/Cargo.toml b/examples/hello-world-mcp-server-stdio/Cargo.toml index 397371f..7880158 100644 --- a/examples/hello-world-mcp-server-stdio/Cargo.toml +++ b/examples/hello-world-mcp-server-stdio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hello-world-mcp-server-stdio" -version = "0.1.30" +version = "0.1.31" edition = "2021" publish = false license = "MIT" diff --git a/examples/hello-world-server-streamable-http-core/Cargo.toml b/examples/hello-world-server-streamable-http-core/Cargo.toml index 2e1010f..01580cc 100644 --- a/examples/hello-world-server-streamable-http-core/Cargo.toml +++ b/examples/hello-world-server-streamable-http-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hello-world-server-streamable-http-core" -version = "0.1.21" +version = "0.1.22" edition = "2021" publish = false license = "MIT" diff --git a/examples/hello-world-server-streamable-http/Cargo.toml b/examples/hello-world-server-streamable-http/Cargo.toml index db5212d..3cb8887 100644 --- a/examples/hello-world-server-streamable-http/Cargo.toml +++ b/examples/hello-world-server-streamable-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "hello-world-server-streamable-http" -version = "0.1.33" +version = "0.1.34" edition = "2021" publish = false license = "MIT" diff --git a/examples/simple-mcp-client-sse-core/Cargo.toml b/examples/simple-mcp-client-sse-core/Cargo.toml index 46b6790..0defa1d 100644 --- a/examples/simple-mcp-client-sse-core/Cargo.toml +++ b/examples/simple-mcp-client-sse-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "simple-mcp-client-sse-core" -version = "0.1.21" +version = "0.1.22" edition = "2021" publish = false license = "MIT" diff --git a/examples/simple-mcp-client-sse/Cargo.toml b/examples/simple-mcp-client-sse/Cargo.toml index a2f4a73..0fb2308 100644 --- a/examples/simple-mcp-client-sse/Cargo.toml +++ b/examples/simple-mcp-client-sse/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "simple-mcp-client-sse" -version = "0.1.24" +version = "0.1.25" edition = "2021" publish = false license = "MIT" diff --git a/examples/simple-mcp-client-stdio-core/Cargo.toml b/examples/simple-mcp-client-stdio-core/Cargo.toml index 2db9211..29223ec 100644 --- a/examples/simple-mcp-client-stdio-core/Cargo.toml +++ b/examples/simple-mcp-client-stdio-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "simple-mcp-client-stdio-core" -version = "0.1.30" +version = "0.1.31" edition = "2021" publish = false license = "MIT" diff --git a/examples/simple-mcp-client-stdio/Cargo.toml b/examples/simple-mcp-client-stdio/Cargo.toml index 9560e88..a36dcec 100644 --- a/examples/simple-mcp-client-stdio/Cargo.toml +++ b/examples/simple-mcp-client-stdio/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "simple-mcp-client-stdio" -version = "0.1.30" +version = "0.1.31" edition = "2021" publish = false license = "MIT" diff --git a/examples/simple-mcp-client-streamable-http-core/Cargo.toml b/examples/simple-mcp-client-streamable-http-core/Cargo.toml index b53824c..c446bb6 100644 --- a/examples/simple-mcp-client-streamable-http-core/Cargo.toml +++ b/examples/simple-mcp-client-streamable-http-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "simple-mcp-client-streamable-http-core" -version = "0.1.2" +version = "0.1.3" edition = "2021" publish = false license = "MIT" diff --git a/examples/simple-mcp-client-streamable-http/Cargo.toml b/examples/simple-mcp-client-streamable-http/Cargo.toml index 3a5fa02..d4200c8 100644 --- a/examples/simple-mcp-client-streamable-http/Cargo.toml +++ b/examples/simple-mcp-client-streamable-http/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "simple-mcp-client-streamable-http" -version = "0.1.2" +version = "0.1.3" edition = "2021" publish = false license = "MIT"