-
Notifications
You must be signed in to change notification settings - Fork 194
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(torii): add SSE support for mcp #3075
base: main
Are you sure you want to change the base?
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3075 +/- ##
==========================================
- Coverage 57.22% 57.05% -0.18%
==========================================
Files 441 441
Lines 60098 60287 +189
==========================================
+ Hits 34394 34399 +5
- Misses 25704 25888 +184 ☔ View full report in Codecov by Sentry. |
WalkthroughOhayo! The update introduces several enhancements primarily in the Changes
Suggested reviewers
Possibly related PRs
✨ Finishing Touches
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
Documentation and Community
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (4)
crates/torii/server/src/handlers/mod.rs (1)
17-17
: Key interface change that improves handler designOhayo, sensei! This is the central change that drives modifications in all handler implementations. Moving the client IP from being a stored field to a method parameter is a better design because:
- Client IP is request-specific data rather than handler configuration
- It eliminates state that needs to be managed in handler instances
- It makes the data flow more explicit and traceable
This is a solid architectural improvement.
crates/torii/server/src/handlers/mcp.rs (3)
84-89
: Ohayo sensei! Introducing theSseSession
data structure is a nice step for SSE.
Just watch memory usage if many sessions are spawned but not cleaned up.
244-244
: Ohayo sensei! The warning log is valuable.
Ensure that repeated logs don’t flood logs in production if spamming or large throughput occurs.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 244-244: crates/torii/server/src/handlers/mcp.rs#L244
Added line #L244 was not covered by tests
376-378
: Ohayo sensei! Broadcasting the response to SSE is beneficial.
Keep an eye on performance if large volumes of messages are broadcast frequently.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 376-378: crates/torii/server/src/handlers/mcp.rs#L376-L378
Added lines #L376 - L378 were not covered by tests
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (8)
crates/torii/server/Cargo.toml
(1 hunks)crates/torii/server/src/handlers/graphql.rs
(2 hunks)crates/torii/server/src/handlers/grpc.rs
(2 hunks)crates/torii/server/src/handlers/mcp.rs
(11 hunks)crates/torii/server/src/handlers/mod.rs
(1 hunks)crates/torii/server/src/handlers/sql.rs
(3 hunks)crates/torii/server/src/handlers/static_files.rs
(2 hunks)crates/torii/server/src/proxy.rs
(5 hunks)
🧰 Additional context used
🪛 GitHub Check: codecov/patch
crates/torii/server/src/handlers/graphql.rs
[warning] 17-18: crates/torii/server/src/handlers/graphql.rs#L17-L18
Added lines #L17 - L18 were not covered by tests
[warning] 28-28: crates/torii/server/src/handlers/graphql.rs#L28
Added line #L28 was not covered by tests
[warning] 31-31: crates/torii/server/src/handlers/graphql.rs#L31
Added line #L31 was not covered by tests
crates/torii/server/src/handlers/grpc.rs
[warning] 17-18: crates/torii/server/src/handlers/grpc.rs#L17-L18
Added lines #L17 - L18 were not covered by tests
[warning] 32-32: crates/torii/server/src/handlers/grpc.rs#L32
Added line #L32 was not covered by tests
[warning] 35-35: crates/torii/server/src/handlers/grpc.rs#L35
Added line #L35 was not covered by tests
crates/torii/server/src/handlers/mcp.rs
[warning] 113-151: crates/torii/server/src/handlers/mcp.rs#L113-L151
Added lines #L113 - L151 were not covered by tests
[warning] 163-164: crates/torii/server/src/handlers/mcp.rs#L163-L164
Added lines #L163 - L164 were not covered by tests
[warning] 193-205: crates/torii/server/src/handlers/mcp.rs#L193-L205
Added lines #L193 - L205 were not covered by tests
[warning] 244-244: crates/torii/server/src/handlers/mcp.rs#L244
Added line #L244 was not covered by tests
[warning] 252-257: crates/torii/server/src/handlers/mcp.rs#L252-L257
Added lines #L252 - L257 were not covered by tests
[warning] 261-289: crates/torii/server/src/handlers/mcp.rs#L261-L289
Added lines #L261 - L289 were not covered by tests
[warning] 291-292: crates/torii/server/src/handlers/mcp.rs#L291-L292
Added lines #L291 - L292 were not covered by tests
[warning] 294-300: crates/torii/server/src/handlers/mcp.rs#L294-L300
Added lines #L294 - L300 were not covered by tests
[warning] 304-304: crates/torii/server/src/handlers/mcp.rs#L304
Added line #L304 was not covered by tests
[warning] 306-317: crates/torii/server/src/handlers/mcp.rs#L306-L317
Added lines #L306 - L317 were not covered by tests
[warning] 320-323: crates/torii/server/src/handlers/mcp.rs#L320-L323
Added lines #L320 - L323 were not covered by tests
[warning] 326-329: crates/torii/server/src/handlers/mcp.rs#L326-L329
Added lines #L326 - L329 were not covered by tests
[warning] 331-339: crates/torii/server/src/handlers/mcp.rs#L331-L339
Added lines #L331 - L339 were not covered by tests
[warning] 345-351: crates/torii/server/src/handlers/mcp.rs#L345-L351
Added lines #L345 - L351 were not covered by tests
[warning] 355-361: crates/torii/server/src/handlers/mcp.rs#L355-L361
Added lines #L355 - L361 were not covered by tests
[warning] 366-366: crates/torii/server/src/handlers/mcp.rs#L366
Added line #L366 was not covered by tests
[warning] 368-373: crates/torii/server/src/handlers/mcp.rs#L368-L373
Added lines #L368 - L373 were not covered by tests
[warning] 376-378: crates/torii/server/src/handlers/mcp.rs#L376-L378
Added lines #L376 - L378 were not covered by tests
[warning] 380-385: crates/torii/server/src/handlers/mcp.rs#L380-L385
Added lines #L380 - L385 were not covered by tests
[warning] 389-393: crates/torii/server/src/handlers/mcp.rs#L389-L393
Added lines #L389 - L393 were not covered by tests
[warning] 399-408: crates/torii/server/src/handlers/mcp.rs#L399-L408
Added lines #L399 - L408 were not covered by tests
[warning] 410-410: crates/torii/server/src/handlers/mcp.rs#L410
Added line #L410 was not covered by tests
[warning] 413-415: crates/torii/server/src/handlers/mcp.rs#L413-L415
Added lines #L413 - L415 were not covered by tests
[warning] 417-422: crates/torii/server/src/handlers/mcp.rs#L417-L422
Added lines #L417 - L422 were not covered by tests
🔇 Additional comments (53)
crates/torii/server/Cargo.toml (1)
37-38
: Ohayo! Your dependencies look well chosen for SSE implementation, sensei!The addition of
tokio-stream
anduuid
makes perfect sense for this feature:
tokio-stream
provides asynchronous stream primitives that are ideal for SSE implementationuuid
will be useful for generating unique identifiers for sessions or resourcesThe exact version pinning is also good practice for reproducible builds.
crates/torii/server/src/handlers/sql.rs (3)
1-1
: IpAddr import aligns with handler interface changes, sensei!This import is necessary to support the updated Handler trait requirements.
13-13
: Ohayo! Adding Debug trait is excellent for troubleshooting, sensei!The
#[derive(Debug)]
addition allows better logging and debugging capabilities, which aligns with the updated Handler trait requirements.
116-116
: Client address parameter properly implemented but currently unusedThe underscore prefix in
_client_addr
correctly indicates an unused parameter. This is acceptable as part of standardizing the handler interface across the codebase.Would you consider documenting why this parameter is currently unused but required by the trait? A small comment might help future developers understand the design decision.
crates/torii/server/src/handlers/graphql.rs (4)
11-11
: Ohayo! Debug trait addition is consistent with other handlers, sensei!This change properly aligns with the updated Handler trait requirements.
13-13
: Increased visibility of graphql_addr is appropriateMaking
graphql_addr
public within the crate (pub(crate)
) improves the module's API design while maintaining encapsulation at the crate level.
17-19
:✅ Verification successful
Constructor simplified by removing client_ip dependency, sensei!
The updated constructor properly reflects the removal of the
client_ip
field from the struct, which is now passed directly to thehandle
method. This design is cleaner as it avoids storing request-specific data in the handler instance.
🏁 Script executed:
#!/bin/bash # Verify that all usages of the GraphQLHandler constructor have been updated rg -A 2 "GraphQLHandler::new\(" --type rustLength of output: 479
Constructors Verified: GraphQLHandler ctor cleanup confirmed, sensei!
Ohayo, sensei! The constructor in
crates/torii/server/src/handlers/graphql.rs
has been simplified correctly by removing theclient_ip
dependency. The verification shows that all usage sites incrates/torii/server/src/proxy.rs
are updated to pass only theOption<SocketAddr>
, ensuring that request-specific data isn’t stored in the handler instance.
- The constructor now accepts an
Option<SocketAddr>
, aligning with the updated design.- Usages in
proxy.rs
(e.g., passinggraphql_addr
andSome(addr)
) correctly reflect the removal ofclient_ip
.Nice job streamlining the code!
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 17-18: crates/torii/server/src/handlers/graphql.rs#L17-L18
Added lines #L17 - L18 were not covered by tests
28-31
:❓ Verification inconclusive
Client address parameter now correctly passed to proxy client
Ohayo, sensei! The implementation now properly uses the
client_addr
parameter passed to thehandle
method instead of storing it in the struct. This is a better design as the client IP address is request-specific.Note that static analysis shows these lines lack test coverage. Consider adding tests for this critical path.
🏁 Script executed:
#!/bin/bash # Check for test coverage of GraphQLHandler rg "test.*GraphQLHandler" --type rustLength of output: 39
Ohayo, sensei!
- The updated implementation now correctly passes the
client_addr
parameter to the proxy client incrates/torii/server/src/handlers/graphql.rs
(lines 28–31), which is a neat improvement.- However, our static analysis didn’t locate any tests covering this critical GraphQL handling path. This could lead to potential regressions if any future changes touch this functionality.
Please add tests to verify this critical flow—ensuring we have sufficient test coverage will help maintain the robustness of the proxy logic.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 28-28: crates/torii/server/src/handlers/graphql.rs#L28
Added line #L28 was not covered by tests
[warning] 31-31: crates/torii/server/src/handlers/graphql.rs#L31
Added line #L31 was not covered by testscrates/torii/server/src/handlers/mod.rs (2)
7-8
: Ohayo! IpAddr import added for updated handler interface, sensei!This import is necessary for the updated Handler trait definition.
12-12
: Debug trait requirement is a great addition for handler diagnosticsAdding the
std::fmt::Debug
trait requirement to the Handler interface enables better logging and debugging capabilities throughout the system. This is especially valuable for a server component where diagnostics are important.crates/torii/server/src/handlers/static_files.rs (4)
10-10
: Ohayo sensei! Good addition of Debug trait.
No issues spotted here.
16-17
: Ohayo sensei! Consider verifying test coverage for this constructor.
Adding a quick unit test can prevent regressions and boost confidence that this initialization behaves as expected.
27-27
: Ohayo sensei! Parameter update is consistent with the Handler trait.
Looks correct and consistent with the global approach of passingclient_addr
.
30-30
: Ohayo sensei! Ensure proper error handling coverage.
It might be beneficial to verify that any failures fromGRAPHQL_PROXY_CLIENT.call
are properly logged and tested.crates/torii/server/src/handlers/grpc.rs (4)
11-11
: Ohayo sensei! Debug trait derivation confirmed.
Glad to see you’re keeping things consistent with the rest of the handlers.
17-18
: Ohayo sensei! Please ensure this constructor is tested.
Static analysis indicates no coverage for these lines. Consider a small test verifyingGrpcHandler::new
logic.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 17-18: crates/torii/server/src/handlers/grpc.rs#L17-L18
Added lines #L17 - L18 were not covered by tests
32-32
: Ohayo sensei! The handle method’s new parameter is well integrated.
This aligns with the broader refactoring; no immediate issues.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 32-32: crates/torii/server/src/handlers/grpc.rs#L32
Added line #L32 was not covered by tests
35-35
: Ohayo sensei! Double-check robust error logging.
TheGRPC_PROXY_CLIENT.call
might fail under network or config issues. Confirm test coverage for error paths.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 35-35: crates/torii/server/src/handlers/grpc.rs#L35
Added line #L35 was not covered by testscrates/torii/server/src/proxy.rs (8)
67-67
: Ohayo sensei! Consolidating handler references into a vector is a neat design.
It promotes easier extension of new handlers in the future. Nicely done.
79-85
: Ohayo sensei! Great approach for initializing multiple handlers in one place.
Consider verifying coverage: ensure there's a test that checks the presence/ordering of these handlers.
87-87
: Ohayo sensei! The Proxy struct initialization is tidy.
No issues detected.
91-92
: Ohayo sensei!set_graphql_addr
approach is consistent with the new Handler vector.
Just confirm that index[0]
usage remains correct if the handlers’ order changes eventually.
104-104
: Ohayo sensei! Good usage capturing the remote IP directly from the connection.
No concerns here.
138-138
: Ohayo sensei! Cloning the Arc for handlers is appropriate.
No synchronization issues are apparent.
140-143
: Ohayo sensei! Thehandle
call withremote_addr
looks straightforward.
Ensure you have coverage for the scenario when no handler matches.
163-167
: Ohayo sensei! Iterating over the handlers and invoking the first match is standard.
This short-circuits after handling, which is desired for this architecture. No issues.crates/torii/server/src/handlers/mcp.rs (27)
1-19
: Ohayo sensei! Thanks for bringing inIpAddr
and these additional crates.
No immediate concerns regarding imports and constants here, but do watch out for any untested lines.
43-50
: Ohayo sensei! Extended JSON-RPC response struct is well-organized.
Skipping serialization onNone
fields is a neat approach. Just ensure thorough test coverage.
53-59
: Ohayo sensei! The custom JSON-RPC error struct is a good addition.
This keeps error details consistent across the codebase.
91-96
: Ohayo sensei! TheTool
struct is straightforward.
Ensure theinput_schema
is validated somewhere to avoid invalid or ambiguous definitions.
98-100
: Ohayo sensei!Resource
struct introduced.
No issues; left open for growth, presumably.
103-103
: Ohayo sensei! The#[derive(Clone, Debug)]
forMcpHandler
is consistent.
This will help with concurrency scenarios.
106-108
: Ohayo sensei! Maintaining SSE sessions in a RwLock’d HashMap is sensible.
Double-check concurrency usage and potential leftover sessions.
163-164
: Ohayo sensei!resources/list
andresources/read
enhance your API surface.
Be sure to add coverage around them.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 163-164: crates/torii/server/src/handlers/mcp.rs#L163-L164
Added lines #L163 - L164 were not covered by tests
193-205
: Ohayo sensei! Thehandle_tools_list
method returns detailed tool info.
It’s well-structured. Possibly confirm the shape oftools_json
in tests.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 193-205: crates/torii/server/src/handlers/mcp.rs#L193-L205
Added lines #L193 - L205 were not covered by tests
252-257
: Ohayo sensei! Creating a broadcast channel for SSE with capacity 100.
Seems balanced, might want to confirm sizes are adequate under load.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 252-257: crates/torii/server/src/handlers/mcp.rs#L252-L257
Added lines #L252 - L257 were not covered by tests
291-292
: Ohayo sensei! Good logic for error serialization fallback.
No direct issues here.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 291-292: crates/torii/server/src/handlers/mcp.rs#L291-L292
Added lines #L291 - L292 were not covered by tests
294-300
: Ohayo sensei! Generating an SSE error event is thoughtful.
This helps debugging on the client side.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 294-300: crates/torii/server/src/handlers/mcp.rs#L294-L300
Added lines #L294 - L300 were not covered by tests
304-304
: Ohayo sensei! Guarding for broadcast channel closure or client disconnection is wise.
No further concerns.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 304-304: crates/torii/server/src/handlers/mcp.rs#L304
Added line #L304 was not covered by tests
306-317
: Ohayo sensei! Thorough SSE response setup with appropriate headers.
Great job returning the session ID in the headers too.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 306-317: crates/torii/server/src/handlers/mcp.rs#L306-L317
Added lines #L306 - L317 were not covered by tests
320-323
: Ohayo sensei! Extractingsession_id
from query is correct, but watch for query parsing edge cases.
Maybe verify what happens ifuri.query()
is missing or if param is incomplete.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 320-323: crates/torii/server/src/handlers/mcp.rs#L320-L323
Added lines #L320 - L323 were not covered by tests
326-329
: Ohayo sensei! Checking for an existing SSE session is crucial.
Ensure you handle multiple concurrent message requests for the same session.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 326-329: crates/torii/server/src/handlers/mcp.rs#L326-L329
Added lines #L326 - L329 were not covered by tests
331-339
: Ohayo sensei! Returning an error response when the session is not found is user-friendly.
No further issues.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 331-339: crates/torii/server/src/handlers/mcp.rs#L331-L339
Added lines #L331 - L339 were not covered by tests
345-351
: Ohayo sensei! Handling errors from reading the request body is well-defined.
Just confirm it’s tested with invalid inputs.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 345-351: crates/torii/server/src/handlers/mcp.rs#L345-L351
Added lines #L345 - L351 were not covered by tests
355-361
: Ohayo sensei! UTF-8 conversion error handling is thorough.
This helps avoid cryptic failures.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 355-361: crates/torii/server/src/handlers/mcp.rs#L355-L361
Added lines #L355 - L361 were not covered by tests
366-366
: Ohayo sensei! Preliminary parse as raw JSON is a flexible approach.
No immediate concerns, just ensure you log parse failures.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 366-366: crates/torii/server/src/handlers/mcp.rs#L366
Added line #L366 was not covered by tests
368-373
: Ohayo sensei! A clean approach to separate Request vs Notification.
Cover both in your tests to confirm they handle all JSON correctly.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 368-373: crates/torii/server/src/handlers/mcp.rs#L368-L373
Added lines #L368 - L373 were not covered by tests
380-385
: Ohayo sensei! Good job including typical JSON-RPC headers for the response.
Consistency with your SSE approach is a plus.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 380-385: crates/torii/server/src/handlers/mcp.rs#L380-L385
Added lines #L380 - L385 were not covered by tests
389-393
: Ohayo sensei! Correctly responding 202 for notifications.
This design is simple and fits JSON-RPC usage.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 389-393: crates/torii/server/src/handlers/mcp.rs#L389-L393
Added lines #L389 - L393 were not covered by tests
399-408
: Ohayo sensei! Smart fallback for “raw request” style inputs.
Ensures broader compatibility with partial JSON-RPC clients.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 399-408: crates/torii/server/src/handlers/mcp.rs#L399-L408
Added lines #L399 - L408 were not covered by tests
410-410
: Ohayo sensei! Passing along a default JSON-RPC version is a nice fallback.
No issues found.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 410-410: crates/torii/server/src/handlers/mcp.rs#L410
Added line #L410 was not covered by tests
413-415
: Ohayo sensei! Good check for SSE broadcast errors.
Always good to surface these warnings for debugging.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 413-415: crates/torii/server/src/handlers/mcp.rs#L413-L415
Added lines #L413 - L415 were not covered by tests
417-422
: Ohayo sensei! Graceful handling of invalid requests with a JSON-RPC error.
Clear user feedback is essential.🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 417-422: crates/torii/server/src/handlers/mcp.rs#L417-L422
Added lines #L417 - L422 were not covered by tests
let mut sessions = self.sse_sessions.write().await; | ||
sessions.insert( | ||
session_id.clone(), | ||
SseSession { tx: tx.clone(), _session_id: session_id.clone() }, | ||
); | ||
} | ||
|
||
// Create the message endpoint path | ||
let message_endpoint = format!("/mcp/message?sessionId={}", session_id); | ||
|
||
// Create initial endpoint info event - using full URL format | ||
let endpoint_info = format!("event: endpoint\ndata: {}\n\n", message_endpoint); | ||
|
||
// Create the streaming body with the endpoint information followed by event data | ||
let stream = futures_util::stream::once(async move { | ||
Ok::<_, hyper::Error>(hyper::body::Bytes::from(endpoint_info)) | ||
}) | ||
.chain(futures_util::stream::unfold(rx, move |mut rx| { | ||
async move { | ||
match rx.recv().await { | ||
Ok(msg) => { | ||
match serde_json::to_string(&msg) { | ||
Ok(json) => { | ||
// Format SSE data with event name and proper line breaks | ||
let sse_data = format!("event: message\ndata: {}\n\n", json); | ||
Some(( | ||
Ok::<_, hyper::Error>(hyper::body::Bytes::from(sse_data)), | ||
rx, | ||
)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Inserting SseSession
in a shared map for SSE.
Be mindful of session cleanup. If sessions remain after clients disconnect, memory can grow.
🧰 Tools
🪛 GitHub Check: codecov/patch
[warning] 261-289: crates/torii/server/src/handlers/mcp.rs#L261-L289
Added lines #L261 - L289 were not covered by tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
♻️ Duplicate comments (2)
crates/torii/server/src/handlers/mcp.rs (2)
84-89
: 🛠️ Refactor suggestionOhayo sensei! Session cleanup mechanism needed.
Be mindful of session cleanup. If sessions remain after clients disconnect, memory can grow unbounded over time as more clients connect and disconnect.
Consider adding a cleanup mechanism like:
- Session expiration timestamps
- Client heartbeat checks
- Periodic cleanup of inactive sessions
#[derive(Clone, Debug)] struct SseSession { tx: broadcast::Sender<JsonRpcResponse>, _session_id: String, + created_at: std::time::Instant, // Add timestamp for expiration checks }
261-266
: 🛠️ Refactor suggestionSession management needs improvement.
The sessions are stored in a shared map, but there's no mechanism to clean them up when clients disconnect. This could lead to a memory leak over time.
{ let mut sessions = self.sse_sessions.write().await; sessions.insert( session_id.clone(), - SseSession { tx: tx.clone(), _session_id: session_id.clone() }, + SseSession { + tx: tx.clone(), + _session_id: session_id.clone(), + created_at: std::time::Instant::now(), + }, ); + + // Remove old sessions that are older than 1 hour + let expired = sessions + .iter() + .filter(|(_, session)| session.created_at.elapsed() > std::time::Duration::from_secs(3600)) + .map(|(id, _)| id.clone()) + .collect::<Vec<_>>(); + + for id in expired { + sessions.remove(&id); + } }
🧹 Nitpick comments (3)
crates/torii/server/src/handlers/mcp.rs (3)
19-19
: Ohayo sensei! Consider increasing SSE_CHANNEL_CAPACITY for high-traffic applications.The current capacity of 100 messages might be sufficient for development, but in production with many concurrent users, this could lead to message drops if the channel fills up faster than clients can consume messages.
-const SSE_CHANNEL_CAPACITY: usize = 100; +const SSE_CHANNEL_CAPACITY: usize = 1000; // Increased capacity for high-traffic scenarios
598-619
: Ohayo sensei! Add implementation plan for resources feature.The
handle_resources_read
method always returns a "Resource not found" error. Consider adding a TODO comment with implementation details or actual implementation.// New method to handle resources/read async fn handle_resources_read(&self, request: JsonRpcRequest) -> JsonRpcResponse { let Some(params) = &request.params else { return JsonRpcResponse::invalid_params(request.id, "Missing params"); }; let Some(uri) = params.get("uri").and_then(Value::as_str) else { return JsonRpcResponse::invalid_params(request.id, "Missing uri parameter"); }; + // TODO: Implement resource reading functionality + // 1. Add resource loading from filesystem or database + // 2. Support different resource types (text, binary, etc.) + // 3. Add caching for frequently accessed resources + // 4. Implement access control if needed // For now, we don't have any resources to read JsonRpcResponse { jsonrpc: JSONRPC_VERSION.to_string(), id: request.id, result: None, error: Some(JsonRpcError { code: -32601, message: "Resource not found".to_string(), data: Some(json!({ "details": format!("No resource found with URI: {}", uri) })), }), } }Would you like me to suggest a complete implementation for resource reading functionality?
319-448
: Refactor the long handle_message_request method for better maintainability.Ohayo sensei! This method is quite long (130 lines) and handles multiple responsibilities, making it harder to maintain and test. Consider breaking it down into smaller, focused methods.
Extract the JSON parsing and request handling logic into separate methods:
async fn handle_message_request(&self, req: Request<Body>) -> Response<Body> { // Extract the session ID and validate + let session_id = self.extract_session_id_from_request(&req); + if let Err(response) = &session_id { + return response.clone(); + } + let session_id = session_id.unwrap(); // Check if the session exists + let tx = match self.get_session_tx(&session_id).await { + Some(tx) => tx, + None => { + return self.create_session_not_found_response(); + } + }; // Parse and process the request body + let body_bytes = match self.read_request_body(req).await { + Ok(bytes) => bytes, + Err(response) => return response, + }; + let body_str = match self.validate_utf8(body_bytes) { + Ok(str) => str, + Err(response) => return response, + }; + self.process_json_request(body_str, tx).await } // Helper methods + async fn extract_session_id_from_request(&self, req: &Request<Body>) -> Result<String, Response<Body>> { + // Implementation + } + async fn get_session_tx(&self, session_id: &str) -> Option<broadcast::Sender<JsonRpcResponse>> { + // Implementation + } + fn create_session_not_found_response(&self) -> Response<Body> { + // Implementation + } + async fn read_request_body(&self, req: Request<Body>) -> Result<hyper::body::Bytes, Response<Body>> { + // Implementation + } + fn validate_utf8(&self, bytes: hyper::body::Bytes) -> Result<String, Response<Body>> { + // Implementation + } + async fn process_json_request(&self, body_str: String, tx: broadcast::Sender<JsonRpcResponse>) -> Response<Body> { + // Implementation + }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
crates/torii/server/src/handlers/mcp.rs
(9 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
- GitHub Check: ensure-wasm
- GitHub Check: clippy
- GitHub Check: build
- GitHub Check: docs
🔇 Additional comments (3)
crates/torii/server/src/handlers/mcp.rs (3)
591-596
: Ohayo sensei! Great implementation of the resources listing feature.The implementation is clean and follows the same pattern as the tools listing method. Good job!
251-317
: Well-structured SSE connection handling!The implementation correctly sets up an SSE stream with proper headers and formatting. The use of broadcast channels for distributing messages to clients is a good approach.
659-700
: Clean and comprehensive request routing.Ohayo sensei! The handler implementation provides clear routing for different types of connections (SSE, WebSocket, etc.) and methods. Good use of pattern matching for readability.
async fn handle_message_request(&self, req: Request<Body>) -> Response<Body> { | ||
// Extract the session ID from the query parameters | ||
let uri = req.uri(); | ||
let session_id = uri.query().unwrap().split("=").collect::<Vec<_>>()[1]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ohayo sensei! URI query parameter parsing is unsafe.
The current query parameter parsing assumes a specific format without validation. This could panic if the query string is malformed.
- let session_id = uri.query().unwrap().split("=").collect::<Vec<_>>()[1];
+ let session_id = match uri.query() {
+ Some(query) => {
+ let params: Vec<&str> = query.split('=').collect();
+ if params.len() < 2 || params[0] != "sessionId" {
+ return Response::builder()
+ .status(StatusCode::BAD_REQUEST)
+ .body(Body::from("Invalid session ID parameter"))
+ .unwrap();
+ }
+ params[1]
+ },
+ None => {
+ return Response::builder()
+ .status(StatusCode::BAD_REQUEST)
+ .body(Body::from("Missing session ID parameter"))
+ .unwrap();
+ }
+ };
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let session_id = uri.query().unwrap().split("=").collect::<Vec<_>>()[1]; | |
let session_id = match uri.query() { | |
Some(query) => { | |
let params: Vec<&str> = query.split('=').collect(); | |
if params.len() < 2 || params[0] != "sessionId" { | |
return Response::builder() | |
.status(StatusCode::BAD_REQUEST) | |
.body(Body::from("Invalid session ID parameter")) | |
.unwrap(); | |
} | |
params[1] | |
}, | |
None => { | |
return Response::builder() | |
.status(StatusCode::BAD_REQUEST) | |
.body(Body::from("Missing session ID parameter")) | |
.unwrap(); | |
} | |
}; |
|
||
let args = params.get("arguments").and_then(Value::as_object); | ||
if let Some(query) = args.and_then(|args| args.get("query").and_then(Value::as_str)) { | ||
match sqlx::query(query).fetch_all(&*self.pool).await { | ||
Ok(rows) => { | ||
// Convert rows to JSON using shared mapping function | ||
let result = rows.iter().map(map_row_to_json).collect::<Vec<_>>(); | ||
|
||
JsonRpcResponse { | ||
jsonrpc: JSONRPC_VERSION.to_string(), | ||
id: request.id, | ||
result: None, | ||
error: Some(JsonRpcError { | ||
code: -32603, | ||
message: "Database error".to_string(), | ||
data: Some(json!({ "details": e.to_string() })), | ||
}), | ||
}, | ||
result: Some(json!({ | ||
"content": [{ | ||
"type": "text", | ||
"text": serde_json::to_string(&result).unwrap() | ||
}] | ||
})), | ||
error: None, | ||
} | ||
} | ||
} else { | ||
JsonRpcResponse { | ||
Err(e) => JsonRpcResponse { | ||
jsonrpc: JSONRPC_VERSION.to_string(), | ||
id: request.id, | ||
result: None, | ||
error: Some(JsonRpcError { | ||
code: -32602, | ||
message: "Invalid params".to_string(), | ||
data: Some(json!({ "details": "Missing query parameter" })), | ||
code: -32603, | ||
message: "Database error".to_string(), | ||
data: Some(json!({ "details": e.to_string() })), | ||
}), | ||
} | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
SQL Injection vulnerability in query tool.
Ohayo sensei! The query tool executes SQL directly from user input without proper validation, creating a SQL injection risk.
While sqlx::query()
provides some protection, add additional validation:
if let Some(query) = args.and_then(|args| args.get("query").and_then(Value::as_str)) {
+ // Validate query - prevent destructive operations
+ let query_upper = query.to_uppercase();
+ if query_upper.contains("DROP") || query_upper.contains("DELETE") || query_upper.contains("TRUNCATE") {
+ return JsonRpcResponse {
+ jsonrpc: JSONRPC_VERSION.to_string(),
+ id: request.id,
+ result: None,
+ error: Some(JsonRpcError {
+ code: -32602,
+ message: "Invalid query".to_string(),
+ data: Some(json!({ "details": "Destructive operations are not allowed" })),
+ }),
+ };
+ }
+
match sqlx::query(query).fetch_all(&*self.pool).await {
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
let args = params.get("arguments").and_then(Value::as_object); | |
if let Some(query) = args.and_then(|args| args.get("query").and_then(Value::as_str)) { | |
match sqlx::query(query).fetch_all(&*self.pool).await { | |
Ok(rows) => { | |
// Convert rows to JSON using shared mapping function | |
let result = rows.iter().map(map_row_to_json).collect::<Vec<_>>(); | |
JsonRpcResponse { | |
jsonrpc: JSONRPC_VERSION.to_string(), | |
id: request.id, | |
result: None, | |
error: Some(JsonRpcError { | |
code: -32603, | |
message: "Database error".to_string(), | |
data: Some(json!({ "details": e.to_string() })), | |
}), | |
}, | |
result: Some(json!({ | |
"content": [{ | |
"type": "text", | |
"text": serde_json::to_string(&result).unwrap() | |
}] | |
})), | |
error: None, | |
} | |
} | |
} else { | |
JsonRpcResponse { | |
Err(e) => JsonRpcResponse { | |
jsonrpc: JSONRPC_VERSION.to_string(), | |
id: request.id, | |
result: None, | |
error: Some(JsonRpcError { | |
code: -32602, | |
message: "Invalid params".to_string(), | |
data: Some(json!({ "details": "Missing query parameter" })), | |
code: -32603, | |
message: "Database error".to_string(), | |
data: Some(json!({ "details": e.to_string() })), | |
}), | |
} | |
}, | |
} | |
let args = params.get("arguments").and_then(Value::as_object); | |
if let Some(query) = args.and_then(|args| args.get("query").and_then(Value::as_str)) { | |
// Validate query - prevent destructive operations | |
let query_upper = query.to_uppercase(); | |
if query_upper.contains("DROP") || query_upper.contains("DELETE") || query_upper.contains("TRUNCATE") { | |
return JsonRpcResponse { | |
jsonrpc: JSONRPC_VERSION.to_string(), | |
id: request.id, | |
result: None, | |
error: Some(JsonRpcError { | |
code: -32602, | |
message: "Invalid query".to_string(), | |
data: Some(json!({ "details": "Destructive operations are not allowed" })), | |
}), | |
}; | |
} | |
match sqlx::query(query).fetch_all(&*self.pool).await { | |
Ok(rows) => { | |
// Convert rows to JSON using shared mapping function | |
let result = rows.iter().map(map_row_to_json).collect::<Vec<_>>(); | |
JsonRpcResponse { | |
jsonrpc: JSONRPC_VERSION.to_string(), | |
id: request.id, | |
result: Some(json!({ | |
"content": [{ | |
"type": "text", | |
"text": serde_json::to_string(&result).unwrap() | |
}] | |
})), | |
error: None, | |
} | |
} | |
Err(e) => JsonRpcResponse { | |
jsonrpc: JSONRPC_VERSION.to_string(), | |
id: request.id, | |
result: None, | |
error: Some(JsonRpcError { | |
code: -32603, | |
message: "Database error".to_string(), | |
data: Some(json!({ "details": e.to_string() })), | |
}), | |
}, | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (5)
crates/torii/server/src/handlers/mcp.rs (5)
84-89
: Ohayo sensei! Implement session cleanup mechanism.Your
SseSession
struct is well-designed, but there's no mechanism to clean up disconnected sessions. This could lead to memory leaks if clients disconnect without properly closing their connections.Consider adding a session cleanup mechanism either by:
- Implementing a heartbeat mechanism where clients regularly ping the server
- Adding a session timeout and cleanup task:
// Structure to hold SSE session information #[derive(Clone, Debug)] struct SseSession { tx: broadcast::Sender<JsonRpcResponse>, _session_id: String, + last_active: std::time::Instant, } // Add this method to McpHandler + async fn cleanup_stale_sessions(&self) { + let session_timeout = std::time::Duration::from_secs(300); // 5 minutes + loop { + tokio::time::sleep(std::time::Duration::from_secs(60)).await; + let now = std::time::Instant::now(); + let mut sessions_to_remove = Vec::new(); + + { + let sessions = self.sse_sessions.read().await; + for (id, session) in sessions.iter() { + if now.duration_since(session.last_active) > session_timeout { + sessions_to_remove.push(id.clone()); + } + } + } + + if !sessions_to_remove.is_empty() { + let mut sessions = self.sse_sessions.write().await; + for id in sessions_to_remove { + sessions.remove(&id); + } + } + } + }Also, start the cleanup task in the
new
method:pub fn new(pool: Arc<SqlitePool>) -> Self { // existing code... let handler = Self { pool, sse_sessions: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), tools, resources, }; + let handler_clone = handler.clone(); + tokio::spawn(async move { + handler_clone.cleanup_stale_sessions().await; + }); handler }
598-619
: Ohayo sensei! Resources implementation is a placeholder.The
handle_resources_read
method always returns "Resource not found" since no resources are implemented yet. This is fine for a first iteration, but please consider adding a comment indicating this is a placeholder.// New method to handle resources/read async fn handle_resources_read(&self, request: JsonRpcRequest) -> JsonRpcResponse { let Some(params) = &request.params else { return JsonRpcResponse::invalid_params(request.id, "Missing params"); }; let Some(uri) = params.get("uri").and_then(Value::as_str) else { return JsonRpcResponse::invalid_params(request.id, "Missing uri parameter"); }; + // TODO: This is a placeholder. Actual resource implementation will be added in a future PR. // For now, we don't have any resources to read JsonRpcResponse { jsonrpc: JSONRPC_VERSION.to_string(), id: request.id, result: None, error: Some(JsonRpcError { code: -32601, message: "Resource not found".to_string(), data: Some(json!({ "details": format!("No resource found with URI: {}", uri) })), }), } }
312-315
: Ohayo sensei! Consider adding CORS headers for SSE response.The SSE response includes necessary headers for event streaming but lacks CORS headers that are present in other endpoint responses.
Response::builder() .header("Content-Type", "text/event-stream") .header("Cache-Control", "no-cache") .header("Connection", "keep-alive") .header("X-Session-Id", session_id) + .header("Access-Control-Allow-Origin", "*") + .header("Access-Control-Allow-Headers", "Content-Type") .body(Body::wrap_stream(stream)) .unwrap()
376-378
: Ohayo sensei! Add error details for SSE broadcast failures.When broadcasting to SSE fails, the current implementation logs the error but doesn't provide detailed information about what went wrong.
// Forward the response to the SSE channel if let Err(e) = tx.send(response.clone()) { - warn!("Error forwarding response to SSE: {}", e); + warn!("Error forwarding response to SSE: {}, session may be disconnected or channel full", e); + + // Optionally, we could trigger session cleanup here + // let session_id_clone = session_id.clone(); + // tokio::spawn(async move { + // let mut sessions = self.sse_sessions.write().await; + // sessions.remove(&session_id_clone); + // }); }
413-415
: Ohayo sensei! Consider consolidating duplicate error handling code.The error handling for SSE broadcasting is duplicated in two places (lines 376-378 and 413-415). Consider extracting this to a helper method.
+ // Add this helper method to McpHandler + fn handle_broadcast_error(&self, e: broadcast::error::SendError<JsonRpcResponse>, session_id: &str) { + warn!("Error forwarding response to SSE: {}, session may be disconnected or channel full", e); + + // Optionally, queue the session for cleanup if broadcasting consistently fails + } // Then replace both occurrences: if let Err(e) = tx.send(response.clone()) { - warn!("Error forwarding response to SSE: {}", e); + self.handle_broadcast_error(e, session_id); }
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
crates/torii/server/src/handlers/mcp.rs
(10 hunks)
🔇 Additional comments (3)
crates/torii/server/src/handlers/mcp.rs (3)
323-323
: Ohayo sensei! URI query parameter parsing is unsafe.The current query parameter parsing assumes a specific format without validation. This could panic if the query string is malformed.
- let session_id = uri.query().unwrap().split("=").collect::<Vec<_>>()[1]; + let session_id = match uri.query() { + Some(query) => { + let params: Vec<&str> = query.split('=').collect(); + if params.len() < 2 || params[0] != "sessionId" { + return Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from("Invalid session ID parameter")) + .unwrap(); + } + params[1] + }, + None => { + return Response::builder() + .status(StatusCode::BAD_REQUEST) + .body(Body::from("Missing session ID parameter")) + .unwrap(); + } + };
545-548
: Ohayo sensei! SQL Injection vulnerability in query tool.The query tool executes SQL directly from user input without proper validation, creating a SQL injection risk.
While
sqlx::query()
provides some protection, add additional validation:if let Some(query) = args.and_then(|args| args.get("query").and_then(Value::as_str)) { + // Validate query - prevent destructive operations + let query_upper = query.to_uppercase(); + if query_upper.contains("DROP") || query_upper.contains("DELETE") || query_upper.contains("TRUNCATE") { + return JsonRpcResponse { + jsonrpc: JSONRPC_VERSION.to_string(), + id: request.id, + result: None, + error: Some(JsonRpcError { + code: -32602, + message: "Invalid query".to_string(), + data: Some(json!({ "details": "Destructive operations are not allowed" })), + }), + }; + } + match sqlx::query(query).fetch_all(&*self.pool).await {
261-266
: Ohayo sensei! InsertingSseSession
in a shared map for SSE.Be mindful of session cleanup. If sessions remain after clients disconnect, memory can grow.
match req.method() { | ||
// Handle GET requests for SSE connection | ||
&hyper::Method::GET => { | ||
return self.handle_sse_connection().await; | ||
} | ||
// Return Method Not Allowed for other methods | ||
_ => Response::builder() | ||
.body(Body::from( | ||
serde_json::to_string(&JsonRpcResponse::method_not_found(Value::Null)).unwrap(), | ||
)) | ||
.unwrap(), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Ohayo sensei! Add support for CORS preflight requests.
The current implementation handles GET requests for SSE but doesn't properly support OPTIONS preflight requests that browsers send for CORS.
match req.method() {
// Handle GET requests for SSE connection
&hyper::Method::GET => {
return self.handle_sse_connection().await;
}
+ // Handle OPTIONS preflight requests
+ &hyper::Method::OPTIONS => {
+ return Response::builder()
+ .header("Access-Control-Allow-Origin", "*")
+ .header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
+ .header("Access-Control-Allow-Headers", "Content-Type")
+ .header("Access-Control-Max-Age", "86400")
+ .body(Body::empty())
+ .unwrap();
+ }
// Return Method Not Allowed for other methods
_ => Response::builder()
.body(Body::from(
serde_json::to_string(&JsonRpcResponse::method_not_found(Value::Null)).unwrap(),
))
.unwrap(),
}
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
match req.method() { | |
// Handle GET requests for SSE connection | |
&hyper::Method::GET => { | |
return self.handle_sse_connection().await; | |
} | |
// Return Method Not Allowed for other methods | |
_ => Response::builder() | |
.body(Body::from( | |
serde_json::to_string(&JsonRpcResponse::method_not_found(Value::Null)).unwrap(), | |
)) | |
.unwrap(), | |
} | |
match req.method() { | |
// Handle GET requests for SSE connection | |
&hyper::Method::GET => { | |
return self.handle_sse_connection().await; | |
} | |
// Handle OPTIONS preflight requests | |
&hyper::Method::OPTIONS => { | |
return Response::builder() | |
.header("Access-Control-Allow-Origin", "*") | |
.header("Access-Control-Allow-Methods", "GET, POST, OPTIONS") | |
.header("Access-Control-Allow-Headers", "Content-Type") | |
.header("Access-Control-Max-Age", "86400") | |
.body(Body::empty()) | |
.unwrap(); | |
} | |
// Return Method Not Allowed for other methods | |
_ => Response::builder() | |
.body(Body::from( | |
serde_json::to_string(&JsonRpcResponse::method_not_found(Value::Null)).unwrap(), | |
)) | |
.unwrap(), | |
} |
pub fn new(pool: Arc<SqlitePool>) -> Self { | ||
Self { pool } | ||
let tools = vec![ | ||
Tool { | ||
name: "query", | ||
description: "Execute a SQL query on the database", | ||
input_schema: json!({ | ||
"type": "object", | ||
"properties": { | ||
"query": { | ||
"type": "string", | ||
"description": "SQL query to execute" | ||
} | ||
}, | ||
"required": ["query"] | ||
}), | ||
}, | ||
Tool { | ||
name: "schema", | ||
description: "Retrieve the database schema including tables, columns, and their \ | ||
types", | ||
input_schema: json!({ | ||
"type": "object", | ||
"properties": { | ||
"table": { | ||
"type": "string", | ||
"description": "Optional table name to get schema for. If omitted, returns schema for all tables." | ||
} | ||
} | ||
}), | ||
}, | ||
]; | ||
|
||
let resources = vec![]; // Add resources as needed | ||
|
||
Self { | ||
pool, | ||
sse_sessions: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())), | ||
tools, | ||
resources, | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Ohayo sensei! Consider adding unit tests for new SSE functionality.
The SSE implementation adds significant new functionality that would benefit from unit tests to ensure it works correctly and remains stable during future changes.
🏁 Script executed:
#!/bin/bash
# Check if there are any test files for mcp.rs
find crates/torii/server/src -name "test_*.rs" | grep -i mcp
Length of output: 62
Ohayo sensei!
It appears that the new SSE functionality in crates/torii/server/src/handlers/mcp.rs
(lines 112–152) isn’t covered by any unit tests. Since SSE is a critical new feature, adding tests—especially around session management and event flow—will help safeguard against regressions in the future. Please consider integrating these unit tests to ensure long-term stability.
🏁 Script executed:
#!/bin/bash
echo "Searching for inline tests in crates/torii/server/src/handlers/mcp.rs"
grep -R "#\[cfg(test)\]" -n crates/torii/server/src/handlers/mcp.rs || echo "No #[cfg(test)] blocks found in mcp.rs"
echo "Searching for #[test] functions in crates/torii/server/src/handlers/mcp.rs"
grep -R "#\[test\]" -n crates/torii/server/src/handlers/mcp.rs || echo "No #[test] functions found in mcp.rs"
Length of output: 612
Ohayo sensei!
The new SSE functionality in crates/torii/server/src/handlers/mcp.rs
(lines 112–152) isn't covered by any unit tests. Given that SSE is a critical and complex feature—especially with session management—it’s important to add tests verifying both session creation and event flow. Please include unit tests to fortify this functionality against future changes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (9)
crates/torii/mcp/src/resources/mod.rs (1)
2-4
: Ohayo, sensei! Consider enhancing the Resource struct with additional metadata fields.The Resource struct is quite minimal with just a name field. For improved usability, consider adding more descriptive fields like:
pub struct Resource { pub name: &'static str, + pub description: &'static str, + pub version: &'static str, + pub capabilities: Vec<&'static str>, }This would provide better documentation and context for developers using these resources.
crates/torii/mcp/src/tools/mod.rs (1)
13-18
: Consider adding schema validation for tools.While the implementation is clean, there's no validation for the input schemas. It might be beneficial to add some form of schema validation to ensure that tools are correctly defined.
pub fn get_tools() -> Vec<Tool> { + // Validate tool schemas before returning + let tools = vec![ query::get_tool(), schema::get_tool(), ]; + + // Optional: Log the available tools at startup + log::info!("Loaded {} MCP tools", tools.len()); + + tools }crates/torii/sqlite/src/utils.rs (1)
188-229
: Ohayo sensei! Themap_row_to_json
function is a solid approach to converting rows into JSON.
It gracefully handles different SQLite data types (TEXT, INTEGER, REAL, BLOB, and fallback attempts). A few suggestions to consider:
• If you anticipate date/time columns, you might parse them into a structured format before turning them into strings.
• Repeatedtry_get
attempts might be slightly less performant; if performance becomes critical, consider an optimized approach or a custom type conversion method.crates/torii/mcp/src/tools/schema.rs (1)
27-116
: Ohayo sensei! Thehandle
function effectively retrieves schema details.
It properly parameterizes the query and usespragma_table_info
to fetch columns and their attributes. A few minor points:
• Consider logging or surfacing an empty result if the user specifies a table that doesn’t exist.
•serde_json::to_string_pretty(...).unwrap()
can theoretically fail; you could gracefully handle serialization errors, but it’s likely safe in typical usage.crates/torii/mcp/src/tools/query.rs (1)
29-76
: Ohayo sensei! Thehandle
function correctly executes arbitrary SQL and returns rows as JSON.
Just keep in mind that running arbitrary user-provided queries may be dangerous in certain environments—consider restricting queries to read-only or validating them if security is a concern.crates/torii/server/src/handlers/mcp.rs (3)
103-104
: Ohayo sensei! Delegation to tool handlers is clear.
These lines properly route queries to thequery
orschema
tool. Consider logging calls for debugging.
129-129
: Ohayo sensei! Good usage ofwarn!
.
This log statement helps catch message sending errors. Ensure logs remain adequately descriptive in production.
291-303
: Ohayo sensei! The resource read handler is incomplete.
Currently returningmethod_not_found
. If you need assistance implementing resource reading logic, let me know!crates/torii/mcp/src/types/mod.rs (1)
17-23
: Ohayo sensei!JsonRpcRequest
captures standard fields.
Ensure that any future optional fields (e.g.context
) are accounted for if needed.
📜 Review details
Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
Cargo.lock
is excluded by!**/*.lock
📒 Files selected for processing (12)
Cargo.toml
(1 hunks)crates/torii/mcp/Cargo.toml
(1 hunks)crates/torii/mcp/src/lib.rs
(1 hunks)crates/torii/mcp/src/resources/mod.rs
(1 hunks)crates/torii/mcp/src/tools/mod.rs
(1 hunks)crates/torii/mcp/src/tools/query.rs
(1 hunks)crates/torii/mcp/src/tools/schema.rs
(1 hunks)crates/torii/mcp/src/types/mod.rs
(1 hunks)crates/torii/server/Cargo.toml
(2 hunks)crates/torii/server/src/handlers/mcp.rs
(6 hunks)crates/torii/server/src/handlers/sql.rs
(1 hunks)crates/torii/sqlite/src/utils.rs
(3 hunks)
✅ Files skipped from review due to trivial changes (2)
- crates/torii/mcp/src/lib.rs
- crates/torii/mcp/Cargo.toml
🚧 Files skipped from review as they are similar to previous changes (1)
- crates/torii/server/Cargo.toml
🔇 Additional comments (28)
crates/torii/mcp/src/resources/mod.rs (1)
6-8
: This empty vector implementation is preparation for future resources.The empty vector return indicates this is preparatory work for future expansion. When adding actual resources, consider implementing a builder pattern or factory methods to construct them consistently.
Would you like me to suggest a more robust implementation for when you start adding real resources to this collection?
Cargo.toml (1)
132-132
: Ohayo! Dependency addition looks good, sensei.The addition of the torii-mcp workspace dependency is clean and follows the project's established pattern for internal dependencies.
crates/torii/mcp/src/tools/mod.rs (1)
6-11
: Tool struct looks well-designed, sensei!The Tool struct with name, description, and input_schema provides a good foundation for tool definitions. The use of serde_json::Value for input_schema offers flexibility for various schema formats.
crates/torii/server/src/handlers/sql.rs (1)
7-8
: Ohayo! Good refactoring move, sensei.Moving the
map_row_to_json
function to a utility module is a great refactoring decision. This centralizes the row-to-JSON conversion logic, making it reusable across different parts of the codebase.crates/torii/sqlite/src/utils.rs (2)
7-8
: Ohayo sensei! These import additions look good.
No issues spotted with the inclusion ofbase64::Engine
and theSTANDARD
encoder.
18-18
: Ohayo sensei! The additional imports fromsqlx
look correct.
This nicely streamlines row handling in the new function.crates/torii/mcp/src/tools/schema.rs (1)
11-25
: Ohayo sensei!get_tool
for theschema
functionality looks neat.
This clarifies the usage for retrieving table information with an optionaltable
parameter. The concise JSON schema is easy to follow.crates/torii/mcp/src/tools/query.rs (1)
12-27
: Ohayo sensei!get_tool
for thequery
functionality is well-defined.
Exposing a single requiredquery
parameter keeps this tool straightforward for running database commands.crates/torii/server/src/handlers/mcp.rs (11)
7-17
: Ohayo sensei! Solid import additions.
These added imports look correct for your SSE, resource, and tool functionalities.
24-26
: Ohayo sensei! Good approach to struct initialization.
Storingsse_sessions
,tools
, andresources
in the handler is neat. Just be mindful that your SSE sessions might accumulate if not removed on client disconnect.
31-36
: Ohayo sensei! The new constructor is straightforward.
No obvious issues; initialization matches the updated handler fields.
48-49
: Ohayo sensei! Resource routes introduced.
These new JSON-RPC methods forresources/list
andresources/read
expand your MCP protocol coverage. Ensure any existing client code is aware of these new routes.
59-71
: Ohayo sensei! Nice expansion of server capabilities.
Exposingtools
andresources
features here helps clients discover your new endpoints. The versioning approach forserverInfo
also looks tidy.
98-98
: Ohayo sensei! Checking tool name.
Looks good. Just confirm the front-end doesn't rely on older tool endpoints.
136-151
: Ohayo sensei! Remember session cleanup.
Inserting the session into theHashMap
is fine, but a removal step after client disconnect would prevent memory growth.
153-212
: Ohayo sensei! SSE streaming logic is well-structured.
The chain of asynchronous streams properly merges the initial endpoint event and subsequent broadcast messages. This is a robust approach for SSE.
214-282
: Ohayo sensei! Thorough handling of JSON-RPC over SSE.
Validates session presence, parses the request, and pushes the response into the broadcast channel. Looks consistent with your SSE design.
284-289
: Ohayo sensei! Resource listing function is concise.
Returning the resource names as JSON is straightforward. The approach looks good for the new endpoint.
312-345
: Ohayo sensei! Consider adding a preflight OPTIONS response.
Only GET and message requests are allowed. Modern browsers might try OPTIONS for CORS compliance, especially for SSE endpoints.crates/torii/mcp/src/types/mod.rs (9)
1-9
: Ohayo sensei! Good foundation for JSON-RPC constants.
JSONRPC_VERSION
,MCP_VERSION
, andSSE_CHANNEL_CAPACITY
define a clear protocol boundary.
10-15
: Ohayo sensei! IntroducingJsonRpcMessage
enum.
Untagged approach neatly captures Requests vs. Notifications.
25-30
: Ohayo sensei!JsonRpcNotification
structure is simple.
It’s a fine approach—just confirm notifications aren't being processed incorrectly downstream.
32-40
: Ohayo sensei!JsonRpcResponse
structure is well-designed.
Including optional error and result fields is aligned with the JSON-RPC 2.0 spec.
42-48
: Ohayo sensei!JsonRpcError
straightforward.
Captures code, message, and data for advanced error reporting.
50-54
: Ohayo sensei!Implementation
struct.
Handy for relaying server info to clients.
56-71
: Ohayo sensei! Capability structs look correct.
Clients can discover tool listing, resource subscription, etc. It's good practice to keep these separate.
73-79
: Ohayo sensei!SseSession
suits your broadcast approach.
Storing the broadcast sender along with a session ID is a neat abstraction.
80-109
: Ohayo sensei! Exhaustive JSON-RPC response helpers.
Methods likeok
,error
, and specialized errors match typical JSON-RPC patterns. Great coverage for consistent error handling.
Summary by CodeRabbit
New Features
Chores