Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(torii): add SSE support for mcp #3075

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open

Conversation

Larkooo
Copy link
Collaborator

@Larkooo Larkooo commented Feb 28, 2025

Summary by CodeRabbit

  • New Features

    • Enabled real-time notifications with streaming support.
    • Added endpoints for dynamic resource listing and retrieval.
    • Introduced new methods for handling Server-Sent Events (SSE) and improved resource management.
    • Added new modules for tools, resources, and types to enhance functionality.
  • Chores

    • Updated underlying dependencies to enhance performance and reliability.

Copy link

codecov bot commented Mar 4, 2025

Codecov Report

Attention: Patch coverage is 0% with 272 lines in your changes missing coverage. Please review.

Project coverage is 57.05%. Comparing base (296162d) to head (a57aed9).

Files with missing lines Patch % Lines
crates/torii/server/src/handlers/mcp.rs 0.00% 272 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #3075      +/-   ##
==========================================
- Coverage   57.22%   57.05%   -0.18%     
==========================================
  Files         441      441              
  Lines       60098    60287     +189     
==========================================
+ Hits        34394    34399       +5     
- Misses      25704    25888     +184     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@Larkooo Larkooo marked this pull request as ready for review March 4, 2025 05:11
Copy link

coderabbitai bot commented Mar 4, 2025

Walkthrough

Ohayo! The update introduces several enhancements primarily in the Cargo.toml and mcp.rs files. In Cargo.toml, new dependencies tokio-stream (v0.1.17), uuid (v1.15.1), and torii-mcp are added. The mcp.rs file sees the implementation of Server-Sent Events (SSE) management through a new SseSession struct, along with improved handling of JSON-RPC requests and resource management. New methods facilitate SSE connections, resource listing, and reading, while error handling is refined.

Changes

File(s) Change Summary
crates/torii/.../Cargo.toml Added dependencies: torii-mcp = { path = "../../torii/mcp" }, tokio-stream = "0.1.17", and uuid = "1.15.1".
crates/torii/.../src/handlers/mcp.rs Introduced SseSession struct; updated McpHandler to manage SSE sessions and resource routes; added methods for handling SSE connections, JSON-RPC messages, and resource listing/reading; improved error logging.
crates/torii/mcp/Cargo.toml New package torii-mcp declared with dependencies: serde, serde_json, sqlx, tokio, torii-sqlite.
crates/torii/mcp/src/lib.rs Added new public modules: types, tools, resources.
crates/torii/mcp/src/resources/mod.rs Added Resource struct and get_resources function.
crates/torii/mcp/src/tools/mod.rs Added Tool struct and get_tools function.
crates/torii/mcp/src/tools/query.rs Introduced get_tool and handle functions for executing SQL queries.
crates/torii/mcp/src/tools/schema.rs Introduced get_tool and handle functions for retrieving database schema.
crates/torii/mcp/src/types/mod.rs Added multiple structs and enums for handling JSON-RPC messages and SSE session management.
crates/torii/server/src/handlers/sql.rs Removed map_row_to_json function, refactoring JSON mapping to torii_sqlite::utils.
crates/torii/sqlite/src/utils.rs Added new map_row_to_json function for converting SQLite rows to JSON.

Suggested reviewers

  • ohayo sensei glihm

Possibly related PRs

✨ Finishing Touches
  • 📝 Generate Docstrings

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (4)
crates/torii/server/src/handlers/mod.rs (1)

17-17: Key interface change that improves handler design

Ohayo, sensei! This is the central change that drives modifications in all handler implementations. Moving the client IP from being a stored field to a method parameter is a better design because:

  1. Client IP is request-specific data rather than handler configuration
  2. It eliminates state that needs to be managed in handler instances
  3. It makes the data flow more explicit and traceable

This is a solid architectural improvement.

crates/torii/server/src/handlers/mcp.rs (3)

84-89: Ohayo sensei! Introducing the SseSession data structure is a nice step for SSE.
Just watch memory usage if many sessions are spawned but not cleaned up.


244-244: Ohayo sensei! The warning log is valuable.
Ensure that repeated logs don’t flood logs in production if spamming or large throughput occurs.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 244-244: crates/torii/server/src/handlers/mcp.rs#L244
Added line #L244 was not covered by tests


376-378: Ohayo sensei! Broadcasting the response to SSE is beneficial.
Keep an eye on performance if large volumes of messages are broadcast frequently.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 376-378: crates/torii/server/src/handlers/mcp.rs#L376-L378
Added lines #L376 - L378 were not covered by tests

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 429f238 and 149d749.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (8)
  • crates/torii/server/Cargo.toml (1 hunks)
  • crates/torii/server/src/handlers/graphql.rs (2 hunks)
  • crates/torii/server/src/handlers/grpc.rs (2 hunks)
  • crates/torii/server/src/handlers/mcp.rs (11 hunks)
  • crates/torii/server/src/handlers/mod.rs (1 hunks)
  • crates/torii/server/src/handlers/sql.rs (3 hunks)
  • crates/torii/server/src/handlers/static_files.rs (2 hunks)
  • crates/torii/server/src/proxy.rs (5 hunks)
🧰 Additional context used
🪛 GitHub Check: codecov/patch
crates/torii/server/src/handlers/graphql.rs

[warning] 17-18: crates/torii/server/src/handlers/graphql.rs#L17-L18
Added lines #L17 - L18 were not covered by tests


[warning] 28-28: crates/torii/server/src/handlers/graphql.rs#L28
Added line #L28 was not covered by tests


[warning] 31-31: crates/torii/server/src/handlers/graphql.rs#L31
Added line #L31 was not covered by tests

crates/torii/server/src/handlers/grpc.rs

[warning] 17-18: crates/torii/server/src/handlers/grpc.rs#L17-L18
Added lines #L17 - L18 were not covered by tests


[warning] 32-32: crates/torii/server/src/handlers/grpc.rs#L32
Added line #L32 was not covered by tests


[warning] 35-35: crates/torii/server/src/handlers/grpc.rs#L35
Added line #L35 was not covered by tests

crates/torii/server/src/handlers/mcp.rs

[warning] 113-151: crates/torii/server/src/handlers/mcp.rs#L113-L151
Added lines #L113 - L151 were not covered by tests


[warning] 163-164: crates/torii/server/src/handlers/mcp.rs#L163-L164
Added lines #L163 - L164 were not covered by tests


[warning] 193-205: crates/torii/server/src/handlers/mcp.rs#L193-L205
Added lines #L193 - L205 were not covered by tests


[warning] 244-244: crates/torii/server/src/handlers/mcp.rs#L244
Added line #L244 was not covered by tests


[warning] 252-257: crates/torii/server/src/handlers/mcp.rs#L252-L257
Added lines #L252 - L257 were not covered by tests


[warning] 261-289: crates/torii/server/src/handlers/mcp.rs#L261-L289
Added lines #L261 - L289 were not covered by tests


[warning] 291-292: crates/torii/server/src/handlers/mcp.rs#L291-L292
Added lines #L291 - L292 were not covered by tests


[warning] 294-300: crates/torii/server/src/handlers/mcp.rs#L294-L300
Added lines #L294 - L300 were not covered by tests


[warning] 304-304: crates/torii/server/src/handlers/mcp.rs#L304
Added line #L304 was not covered by tests


[warning] 306-317: crates/torii/server/src/handlers/mcp.rs#L306-L317
Added lines #L306 - L317 were not covered by tests


[warning] 320-323: crates/torii/server/src/handlers/mcp.rs#L320-L323
Added lines #L320 - L323 were not covered by tests


[warning] 326-329: crates/torii/server/src/handlers/mcp.rs#L326-L329
Added lines #L326 - L329 were not covered by tests


[warning] 331-339: crates/torii/server/src/handlers/mcp.rs#L331-L339
Added lines #L331 - L339 were not covered by tests


[warning] 345-351: crates/torii/server/src/handlers/mcp.rs#L345-L351
Added lines #L345 - L351 were not covered by tests


[warning] 355-361: crates/torii/server/src/handlers/mcp.rs#L355-L361
Added lines #L355 - L361 were not covered by tests


[warning] 366-366: crates/torii/server/src/handlers/mcp.rs#L366
Added line #L366 was not covered by tests


[warning] 368-373: crates/torii/server/src/handlers/mcp.rs#L368-L373
Added lines #L368 - L373 were not covered by tests


[warning] 376-378: crates/torii/server/src/handlers/mcp.rs#L376-L378
Added lines #L376 - L378 were not covered by tests


[warning] 380-385: crates/torii/server/src/handlers/mcp.rs#L380-L385
Added lines #L380 - L385 were not covered by tests


[warning] 389-393: crates/torii/server/src/handlers/mcp.rs#L389-L393
Added lines #L389 - L393 were not covered by tests


[warning] 399-408: crates/torii/server/src/handlers/mcp.rs#L399-L408
Added lines #L399 - L408 were not covered by tests


[warning] 410-410: crates/torii/server/src/handlers/mcp.rs#L410
Added line #L410 was not covered by tests


[warning] 413-415: crates/torii/server/src/handlers/mcp.rs#L413-L415
Added lines #L413 - L415 were not covered by tests


[warning] 417-422: crates/torii/server/src/handlers/mcp.rs#L417-L422
Added lines #L417 - L422 were not covered by tests

🔇 Additional comments (53)
crates/torii/server/Cargo.toml (1)

37-38: Ohayo! Your dependencies look well chosen for SSE implementation, sensei!

The addition of tokio-stream and uuid makes perfect sense for this feature:

  • tokio-stream provides asynchronous stream primitives that are ideal for SSE implementation
  • uuid will be useful for generating unique identifiers for sessions or resources

The exact version pinning is also good practice for reproducible builds.

crates/torii/server/src/handlers/sql.rs (3)

1-1: IpAddr import aligns with handler interface changes, sensei!

This import is necessary to support the updated Handler trait requirements.


13-13: Ohayo! Adding Debug trait is excellent for troubleshooting, sensei!

The #[derive(Debug)] addition allows better logging and debugging capabilities, which aligns with the updated Handler trait requirements.


116-116: Client address parameter properly implemented but currently unused

The underscore prefix in _client_addr correctly indicates an unused parameter. This is acceptable as part of standardizing the handler interface across the codebase.

Would you consider documenting why this parameter is currently unused but required by the trait? A small comment might help future developers understand the design decision.

crates/torii/server/src/handlers/graphql.rs (4)

11-11: Ohayo! Debug trait addition is consistent with other handlers, sensei!

This change properly aligns with the updated Handler trait requirements.


13-13: Increased visibility of graphql_addr is appropriate

Making graphql_addr public within the crate (pub(crate)) improves the module's API design while maintaining encapsulation at the crate level.


17-19:

✅ Verification successful

Constructor simplified by removing client_ip dependency, sensei!

The updated constructor properly reflects the removal of the client_ip field from the struct, which is now passed directly to the handle method. This design is cleaner as it avoids storing request-specific data in the handler instance.


🏁 Script executed:

#!/bin/bash
# Verify that all usages of the GraphQLHandler constructor have been updated
rg -A 2 "GraphQLHandler::new\(" --type rust

Length of output: 479


Constructors Verified: GraphQLHandler ctor cleanup confirmed, sensei!

Ohayo, sensei! The constructor in crates/torii/server/src/handlers/graphql.rs has been simplified correctly by removing the client_ip dependency. The verification shows that all usage sites in crates/torii/server/src/proxy.rs are updated to pass only the Option<SocketAddr>, ensuring that request-specific data isn’t stored in the handler instance.

  • The constructor now accepts an Option<SocketAddr>, aligning with the updated design.
  • Usages in proxy.rs (e.g., passing graphql_addr and Some(addr)) correctly reflect the removal of client_ip.

Nice job streamlining the code!

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 17-18: crates/torii/server/src/handlers/graphql.rs#L17-L18
Added lines #L17 - L18 were not covered by tests


28-31:

❓ Verification inconclusive

Client address parameter now correctly passed to proxy client

Ohayo, sensei! The implementation now properly uses the client_addr parameter passed to the handle method instead of storing it in the struct. This is a better design as the client IP address is request-specific.

Note that static analysis shows these lines lack test coverage. Consider adding tests for this critical path.


🏁 Script executed:

#!/bin/bash
# Check for test coverage of GraphQLHandler
rg "test.*GraphQLHandler" --type rust

Length of output: 39


Ohayo, sensei!

  • The updated implementation now correctly passes the client_addr parameter to the proxy client in crates/torii/server/src/handlers/graphql.rs (lines 28–31), which is a neat improvement.
  • However, our static analysis didn’t locate any tests covering this critical GraphQL handling path. This could lead to potential regressions if any future changes touch this functionality.

Please add tests to verify this critical flow—ensuring we have sufficient test coverage will help maintain the robustness of the proxy logic.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 28-28: crates/torii/server/src/handlers/graphql.rs#L28
Added line #L28 was not covered by tests


[warning] 31-31: crates/torii/server/src/handlers/graphql.rs#L31
Added line #L31 was not covered by tests

crates/torii/server/src/handlers/mod.rs (2)

7-8: Ohayo! IpAddr import added for updated handler interface, sensei!

This import is necessary for the updated Handler trait definition.


12-12: Debug trait requirement is a great addition for handler diagnostics

Adding the std::fmt::Debug trait requirement to the Handler interface enables better logging and debugging capabilities throughout the system. This is especially valuable for a server component where diagnostics are important.

crates/torii/server/src/handlers/static_files.rs (4)

10-10: Ohayo sensei! Good addition of Debug trait.
No issues spotted here.


16-17: Ohayo sensei! Consider verifying test coverage for this constructor.
Adding a quick unit test can prevent regressions and boost confidence that this initialization behaves as expected.


27-27: Ohayo sensei! Parameter update is consistent with the Handler trait.
Looks correct and consistent with the global approach of passing client_addr.


30-30: Ohayo sensei! Ensure proper error handling coverage.
It might be beneficial to verify that any failures from GRAPHQL_PROXY_CLIENT.call are properly logged and tested.

crates/torii/server/src/handlers/grpc.rs (4)

11-11: Ohayo sensei! Debug trait derivation confirmed.
Glad to see you’re keeping things consistent with the rest of the handlers.


17-18: Ohayo sensei! Please ensure this constructor is tested.
Static analysis indicates no coverage for these lines. Consider a small test verifying GrpcHandler::new logic.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 17-18: crates/torii/server/src/handlers/grpc.rs#L17-L18
Added lines #L17 - L18 were not covered by tests


32-32: Ohayo sensei! The handle method’s new parameter is well integrated.
This aligns with the broader refactoring; no immediate issues.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 32-32: crates/torii/server/src/handlers/grpc.rs#L32
Added line #L32 was not covered by tests


35-35: Ohayo sensei! Double-check robust error logging.
The GRPC_PROXY_CLIENT.call might fail under network or config issues. Confirm test coverage for error paths.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 35-35: crates/torii/server/src/handlers/grpc.rs#L35
Added line #L35 was not covered by tests

crates/torii/server/src/proxy.rs (8)

67-67: Ohayo sensei! Consolidating handler references into a vector is a neat design.
It promotes easier extension of new handlers in the future. Nicely done.


79-85: Ohayo sensei! Great approach for initializing multiple handlers in one place.
Consider verifying coverage: ensure there's a test that checks the presence/ordering of these handlers.


87-87: Ohayo sensei! The Proxy struct initialization is tidy.
No issues detected.


91-92: Ohayo sensei! set_graphql_addr approach is consistent with the new Handler vector.
Just confirm that index [0] usage remains correct if the handlers’ order changes eventually.


104-104: Ohayo sensei! Good usage capturing the remote IP directly from the connection.
No concerns here.


138-138: Ohayo sensei! Cloning the Arc for handlers is appropriate.
No synchronization issues are apparent.


140-143: Ohayo sensei! The handle call with remote_addr looks straightforward.
Ensure you have coverage for the scenario when no handler matches.


163-167: Ohayo sensei! Iterating over the handlers and invoking the first match is standard.
This short-circuits after handling, which is desired for this architecture. No issues.

crates/torii/server/src/handlers/mcp.rs (27)

1-19: Ohayo sensei! Thanks for bringing in IpAddr and these additional crates.
No immediate concerns regarding imports and constants here, but do watch out for any untested lines.


43-50: Ohayo sensei! Extended JSON-RPC response struct is well-organized.
Skipping serialization on None fields is a neat approach. Just ensure thorough test coverage.


53-59: Ohayo sensei! The custom JSON-RPC error struct is a good addition.
This keeps error details consistent across the codebase.


91-96: Ohayo sensei! The Tool struct is straightforward.
Ensure the input_schema is validated somewhere to avoid invalid or ambiguous definitions.


98-100: Ohayo sensei! Resource struct introduced.
No issues; left open for growth, presumably.


103-103: Ohayo sensei! The #[derive(Clone, Debug)] for McpHandler is consistent.
This will help with concurrency scenarios.


106-108: Ohayo sensei! Maintaining SSE sessions in a RwLock’d HashMap is sensible.
Double-check concurrency usage and potential leftover sessions.


163-164: Ohayo sensei! resources/list and resources/read enhance your API surface.
Be sure to add coverage around them.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 163-164: crates/torii/server/src/handlers/mcp.rs#L163-L164
Added lines #L163 - L164 were not covered by tests


193-205: Ohayo sensei! The handle_tools_list method returns detailed tool info.
It’s well-structured. Possibly confirm the shape of tools_json in tests.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 193-205: crates/torii/server/src/handlers/mcp.rs#L193-L205
Added lines #L193 - L205 were not covered by tests


252-257: Ohayo sensei! Creating a broadcast channel for SSE with capacity 100.
Seems balanced, might want to confirm sizes are adequate under load.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 252-257: crates/torii/server/src/handlers/mcp.rs#L252-L257
Added lines #L252 - L257 were not covered by tests


291-292: Ohayo sensei! Good logic for error serialization fallback.
No direct issues here.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 291-292: crates/torii/server/src/handlers/mcp.rs#L291-L292
Added lines #L291 - L292 were not covered by tests


294-300: Ohayo sensei! Generating an SSE error event is thoughtful.
This helps debugging on the client side.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 294-300: crates/torii/server/src/handlers/mcp.rs#L294-L300
Added lines #L294 - L300 were not covered by tests


304-304: Ohayo sensei! Guarding for broadcast channel closure or client disconnection is wise.
No further concerns.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 304-304: crates/torii/server/src/handlers/mcp.rs#L304
Added line #L304 was not covered by tests


306-317: Ohayo sensei! Thorough SSE response setup with appropriate headers.
Great job returning the session ID in the headers too.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 306-317: crates/torii/server/src/handlers/mcp.rs#L306-L317
Added lines #L306 - L317 were not covered by tests


320-323: Ohayo sensei! Extracting session_id from query is correct, but watch for query parsing edge cases.
Maybe verify what happens if uri.query() is missing or if param is incomplete.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 320-323: crates/torii/server/src/handlers/mcp.rs#L320-L323
Added lines #L320 - L323 were not covered by tests


326-329: Ohayo sensei! Checking for an existing SSE session is crucial.
Ensure you handle multiple concurrent message requests for the same session.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 326-329: crates/torii/server/src/handlers/mcp.rs#L326-L329
Added lines #L326 - L329 were not covered by tests


331-339: Ohayo sensei! Returning an error response when the session is not found is user-friendly.
No further issues.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 331-339: crates/torii/server/src/handlers/mcp.rs#L331-L339
Added lines #L331 - L339 were not covered by tests


345-351: Ohayo sensei! Handling errors from reading the request body is well-defined.
Just confirm it’s tested with invalid inputs.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 345-351: crates/torii/server/src/handlers/mcp.rs#L345-L351
Added lines #L345 - L351 were not covered by tests


355-361: Ohayo sensei! UTF-8 conversion error handling is thorough.
This helps avoid cryptic failures.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 355-361: crates/torii/server/src/handlers/mcp.rs#L355-L361
Added lines #L355 - L361 were not covered by tests


366-366: Ohayo sensei! Preliminary parse as raw JSON is a flexible approach.
No immediate concerns, just ensure you log parse failures.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 366-366: crates/torii/server/src/handlers/mcp.rs#L366
Added line #L366 was not covered by tests


368-373: Ohayo sensei! A clean approach to separate Request vs Notification.
Cover both in your tests to confirm they handle all JSON correctly.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 368-373: crates/torii/server/src/handlers/mcp.rs#L368-L373
Added lines #L368 - L373 were not covered by tests


380-385: Ohayo sensei! Good job including typical JSON-RPC headers for the response.
Consistency with your SSE approach is a plus.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 380-385: crates/torii/server/src/handlers/mcp.rs#L380-L385
Added lines #L380 - L385 were not covered by tests


389-393: Ohayo sensei! Correctly responding 202 for notifications.
This design is simple and fits JSON-RPC usage.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 389-393: crates/torii/server/src/handlers/mcp.rs#L389-L393
Added lines #L389 - L393 were not covered by tests


399-408: Ohayo sensei! Smart fallback for “raw request” style inputs.
Ensures broader compatibility with partial JSON-RPC clients.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 399-408: crates/torii/server/src/handlers/mcp.rs#L399-L408
Added lines #L399 - L408 were not covered by tests


410-410: Ohayo sensei! Passing along a default JSON-RPC version is a nice fallback.
No issues found.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 410-410: crates/torii/server/src/handlers/mcp.rs#L410
Added line #L410 was not covered by tests


413-415: Ohayo sensei! Good check for SSE broadcast errors.
Always good to surface these warnings for debugging.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 413-415: crates/torii/server/src/handlers/mcp.rs#L413-L415
Added lines #L413 - L415 were not covered by tests


417-422: Ohayo sensei! Graceful handling of invalid requests with a JSON-RPC error.
Clear user feedback is essential.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 417-422: crates/torii/server/src/handlers/mcp.rs#L417-L422
Added lines #L417 - L422 were not covered by tests

Comment on lines 261 to 289
let mut sessions = self.sse_sessions.write().await;
sessions.insert(
session_id.clone(),
SseSession { tx: tx.clone(), _session_id: session_id.clone() },
);
}

// Create the message endpoint path
let message_endpoint = format!("/mcp/message?sessionId={}", session_id);

// Create initial endpoint info event - using full URL format
let endpoint_info = format!("event: endpoint\ndata: {}\n\n", message_endpoint);

// Create the streaming body with the endpoint information followed by event data
let stream = futures_util::stream::once(async move {
Ok::<_, hyper::Error>(hyper::body::Bytes::from(endpoint_info))
})
.chain(futures_util::stream::unfold(rx, move |mut rx| {
async move {
match rx.recv().await {
Ok(msg) => {
match serde_json::to_string(&msg) {
Ok(json) => {
// Format SSE data with event name and proper line breaks
let sse_data = format!("event: message\ndata: {}\n\n", json);
Some((
Ok::<_, hyper::Error>(hyper::body::Bytes::from(sse_data)),
rx,
))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Inserting SseSession in a shared map for SSE.
Be mindful of session cleanup. If sessions remain after clients disconnect, memory can grow.

🧰 Tools
🪛 GitHub Check: codecov/patch

[warning] 261-289: crates/torii/server/src/handlers/mcp.rs#L261-L289
Added lines #L261 - L289 were not covered by tests

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (2)
crates/torii/server/src/handlers/mcp.rs (2)

84-89: 🛠️ Refactor suggestion

Ohayo sensei! Session cleanup mechanism needed.

Be mindful of session cleanup. If sessions remain after clients disconnect, memory can grow unbounded over time as more clients connect and disconnect.

Consider adding a cleanup mechanism like:

  1. Session expiration timestamps
  2. Client heartbeat checks
  3. Periodic cleanup of inactive sessions
#[derive(Clone, Debug)]
struct SseSession {
    tx: broadcast::Sender<JsonRpcResponse>,
    _session_id: String,
+   created_at: std::time::Instant, // Add timestamp for expiration checks
}

261-266: 🛠️ Refactor suggestion

Session management needs improvement.

The sessions are stored in a shared map, but there's no mechanism to clean them up when clients disconnect. This could lead to a memory leak over time.

{
    let mut sessions = self.sse_sessions.write().await;
    sessions.insert(
        session_id.clone(),
-       SseSession { tx: tx.clone(), _session_id: session_id.clone() },
+       SseSession { 
+           tx: tx.clone(), 
+           _session_id: session_id.clone(),
+           created_at: std::time::Instant::now(), 
+       },
    );
+   
+   // Remove old sessions that are older than 1 hour
+   let expired = sessions
+       .iter()
+       .filter(|(_, session)| session.created_at.elapsed() > std::time::Duration::from_secs(3600))
+       .map(|(id, _)| id.clone())
+       .collect::<Vec<_>>();
+   
+   for id in expired {
+       sessions.remove(&id);
+   }
}
🧹 Nitpick comments (3)
crates/torii/server/src/handlers/mcp.rs (3)

19-19: Ohayo sensei! Consider increasing SSE_CHANNEL_CAPACITY for high-traffic applications.

The current capacity of 100 messages might be sufficient for development, but in production with many concurrent users, this could lead to message drops if the channel fills up faster than clients can consume messages.

-const SSE_CHANNEL_CAPACITY: usize = 100;
+const SSE_CHANNEL_CAPACITY: usize = 1000; // Increased capacity for high-traffic scenarios

598-619: Ohayo sensei! Add implementation plan for resources feature.

The handle_resources_read method always returns a "Resource not found" error. Consider adding a TODO comment with implementation details or actual implementation.

// New method to handle resources/read
async fn handle_resources_read(&self, request: JsonRpcRequest) -> JsonRpcResponse {
    let Some(params) = &request.params else {
        return JsonRpcResponse::invalid_params(request.id, "Missing params");
    };

    let Some(uri) = params.get("uri").and_then(Value::as_str) else {
        return JsonRpcResponse::invalid_params(request.id, "Missing uri parameter");
    };

+   // TODO: Implement resource reading functionality
+   // 1. Add resource loading from filesystem or database
+   // 2. Support different resource types (text, binary, etc.)
+   // 3. Add caching for frequently accessed resources
+   // 4. Implement access control if needed

    // For now, we don't have any resources to read
    JsonRpcResponse {
        jsonrpc: JSONRPC_VERSION.to_string(),
        id: request.id,
        result: None,
        error: Some(JsonRpcError {
            code: -32601,
            message: "Resource not found".to_string(),
            data: Some(json!({ "details": format!("No resource found with URI: {}", uri) })),
        }),
    }
}

Would you like me to suggest a complete implementation for resource reading functionality?


319-448: Refactor the long handle_message_request method for better maintainability.

Ohayo sensei! This method is quite long (130 lines) and handles multiple responsibilities, making it harder to maintain and test. Consider breaking it down into smaller, focused methods.

Extract the JSON parsing and request handling logic into separate methods:

async fn handle_message_request(&self, req: Request<Body>) -> Response<Body> {
    // Extract the session ID and validate
+   let session_id = self.extract_session_id_from_request(&req);
+   if let Err(response) = &session_id {
+       return response.clone();
+   }
+   let session_id = session_id.unwrap();

    // Check if the session exists
+   let tx = match self.get_session_tx(&session_id).await {
+       Some(tx) => tx,
+       None => {
+           return self.create_session_not_found_response();
+       }
+   };

    // Parse and process the request body
+   let body_bytes = match self.read_request_body(req).await {
+       Ok(bytes) => bytes,
+       Err(response) => return response,
+   };

+   let body_str = match self.validate_utf8(body_bytes) {
+       Ok(str) => str,
+       Err(response) => return response,
+   };

+   self.process_json_request(body_str, tx).await
}

// Helper methods
+ async fn extract_session_id_from_request(&self, req: &Request<Body>) -> Result<String, Response<Body>> {
+   // Implementation
+ }

+ async fn get_session_tx(&self, session_id: &str) -> Option<broadcast::Sender<JsonRpcResponse>> {
+   // Implementation
+ }

+ fn create_session_not_found_response(&self) -> Response<Body> {
+   // Implementation
+ }

+ async fn read_request_body(&self, req: Request<Body>) -> Result<hyper::body::Bytes, Response<Body>> {
+   // Implementation
+ }

+ fn validate_utf8(&self, bytes: hyper::body::Bytes) -> Result<String, Response<Body>> {
+   // Implementation
+ }

+ async fn process_json_request(&self, body_str: String, tx: broadcast::Sender<JsonRpcResponse>) -> Response<Body> {
+   // Implementation
+ }
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 149d749 and 4f427cf.

📒 Files selected for processing (1)
  • crates/torii/server/src/handlers/mcp.rs (9 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (4)
  • GitHub Check: ensure-wasm
  • GitHub Check: clippy
  • GitHub Check: build
  • GitHub Check: docs
🔇 Additional comments (3)
crates/torii/server/src/handlers/mcp.rs (3)

591-596: Ohayo sensei! Great implementation of the resources listing feature.

The implementation is clean and follows the same pattern as the tools listing method. Good job!


251-317: Well-structured SSE connection handling!

The implementation correctly sets up an SSE stream with proper headers and formatting. The use of broadcast channels for distributing messages to clients is a good approach.


659-700: Clean and comprehensive request routing.

Ohayo sensei! The handler implementation provides clear routing for different types of connections (SSE, WebSocket, etc.) and methods. Good use of pattern matching for readability.

async fn handle_message_request(&self, req: Request<Body>) -> Response<Body> {
// Extract the session ID from the query parameters
let uri = req.uri();
let session_id = uri.query().unwrap().split("=").collect::<Vec<_>>()[1];
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo sensei! URI query parameter parsing is unsafe.

The current query parameter parsing assumes a specific format without validation. This could panic if the query string is malformed.

- let session_id = uri.query().unwrap().split("=").collect::<Vec<_>>()[1];
+ let session_id = match uri.query() {
+     Some(query) => {
+         let params: Vec<&str> = query.split('=').collect();
+         if params.len() < 2 || params[0] != "sessionId" {
+             return Response::builder()
+                 .status(StatusCode::BAD_REQUEST)
+                 .body(Body::from("Invalid session ID parameter"))
+                 .unwrap();
+         }
+         params[1]
+     },
+     None => {
+         return Response::builder()
+             .status(StatusCode::BAD_REQUEST)
+             .body(Body::from("Missing session ID parameter"))
+             .unwrap();
+     }
+ };
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let session_id = uri.query().unwrap().split("=").collect::<Vec<_>>()[1];
let session_id = match uri.query() {
Some(query) => {
let params: Vec<&str> = query.split('=').collect();
if params.len() < 2 || params[0] != "sessionId" {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("Invalid session ID parameter"))
.unwrap();
}
params[1]
},
None => {
return Response::builder()
.status(StatusCode::BAD_REQUEST)
.body(Body::from("Missing session ID parameter"))
.unwrap();
}
};

Comment on lines 545 to 575

let args = params.get("arguments").and_then(Value::as_object);
if let Some(query) = args.and_then(|args| args.get("query").and_then(Value::as_str)) {
match sqlx::query(query).fetch_all(&*self.pool).await {
Ok(rows) => {
// Convert rows to JSON using shared mapping function
let result = rows.iter().map(map_row_to_json).collect::<Vec<_>>();

JsonRpcResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Database error".to_string(),
data: Some(json!({ "details": e.to_string() })),
}),
},
result: Some(json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&result).unwrap()
}]
})),
error: None,
}
}
} else {
JsonRpcResponse {
Err(e) => JsonRpcResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Invalid params".to_string(),
data: Some(json!({ "details": "Missing query parameter" })),
code: -32603,
message: "Database error".to_string(),
data: Some(json!({ "details": e.to_string() })),
}),
}
},
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

SQL Injection vulnerability in query tool.

Ohayo sensei! The query tool executes SQL directly from user input without proper validation, creating a SQL injection risk.

While sqlx::query() provides some protection, add additional validation:

if let Some(query) = args.and_then(|args| args.get("query").and_then(Value::as_str)) {
+   // Validate query - prevent destructive operations
+   let query_upper = query.to_uppercase();
+   if query_upper.contains("DROP") || query_upper.contains("DELETE") || query_upper.contains("TRUNCATE") {
+       return JsonRpcResponse {
+           jsonrpc: JSONRPC_VERSION.to_string(),
+           id: request.id,
+           result: None,
+           error: Some(JsonRpcError {
+               code: -32602,
+               message: "Invalid query".to_string(),
+               data: Some(json!({ "details": "Destructive operations are not allowed" })),
+           }),
+       };
+   }
+
    match sqlx::query(query).fetch_all(&*self.pool).await {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let args = params.get("arguments").and_then(Value::as_object);
if let Some(query) = args.and_then(|args| args.get("query").and_then(Value::as_str)) {
match sqlx::query(query).fetch_all(&*self.pool).await {
Ok(rows) => {
// Convert rows to JSON using shared mapping function
let result = rows.iter().map(map_row_to_json).collect::<Vec<_>>();
JsonRpcResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Database error".to_string(),
data: Some(json!({ "details": e.to_string() })),
}),
},
result: Some(json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&result).unwrap()
}]
})),
error: None,
}
}
} else {
JsonRpcResponse {
Err(e) => JsonRpcResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Invalid params".to_string(),
data: Some(json!({ "details": "Missing query parameter" })),
code: -32603,
message: "Database error".to_string(),
data: Some(json!({ "details": e.to_string() })),
}),
}
},
}
let args = params.get("arguments").and_then(Value::as_object);
if let Some(query) = args.and_then(|args| args.get("query").and_then(Value::as_str)) {
// Validate query - prevent destructive operations
let query_upper = query.to_uppercase();
if query_upper.contains("DROP") || query_upper.contains("DELETE") || query_upper.contains("TRUNCATE") {
return JsonRpcResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32602,
message: "Invalid query".to_string(),
data: Some(json!({ "details": "Destructive operations are not allowed" })),
}),
};
}
match sqlx::query(query).fetch_all(&*self.pool).await {
Ok(rows) => {
// Convert rows to JSON using shared mapping function
let result = rows.iter().map(map_row_to_json).collect::<Vec<_>>();
JsonRpcResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: request.id,
result: Some(json!({
"content": [{
"type": "text",
"text": serde_json::to_string(&result).unwrap()
}]
})),
error: None,
}
}
Err(e) => JsonRpcResponse {
jsonrpc: JSONRPC_VERSION.to_string(),
id: request.id,
result: None,
error: Some(JsonRpcError {
code: -32603,
message: "Database error".to_string(),
data: Some(json!({ "details": e.to_string() })),
}),
},
}
}

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (5)
crates/torii/server/src/handlers/mcp.rs (5)

84-89: Ohayo sensei! Implement session cleanup mechanism.

Your SseSession struct is well-designed, but there's no mechanism to clean up disconnected sessions. This could lead to memory leaks if clients disconnect without properly closing their connections.

Consider adding a session cleanup mechanism either by:

  1. Implementing a heartbeat mechanism where clients regularly ping the server
  2. Adding a session timeout and cleanup task:
// Structure to hold SSE session information
#[derive(Clone, Debug)]
struct SseSession {
    tx: broadcast::Sender<JsonRpcResponse>,
    _session_id: String,
+   last_active: std::time::Instant,
}

// Add this method to McpHandler
+ async fn cleanup_stale_sessions(&self) {
+     let session_timeout = std::time::Duration::from_secs(300); // 5 minutes
+     loop {
+         tokio::time::sleep(std::time::Duration::from_secs(60)).await;
+         let now = std::time::Instant::now();
+         let mut sessions_to_remove = Vec::new();
+         
+         {
+             let sessions = self.sse_sessions.read().await;
+             for (id, session) in sessions.iter() {
+                 if now.duration_since(session.last_active) > session_timeout {
+                     sessions_to_remove.push(id.clone());
+                 }
+             }
+         }
+         
+         if !sessions_to_remove.is_empty() {
+             let mut sessions = self.sse_sessions.write().await;
+             for id in sessions_to_remove {
+                 sessions.remove(&id);
+             }
+         }
+     }
+ }

Also, start the cleanup task in the new method:

pub fn new(pool: Arc<SqlitePool>) -> Self {
    // existing code...
    
    let handler = Self {
        pool,
        sse_sessions: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
        tools,
        resources,
    };
    
+   let handler_clone = handler.clone();
+   tokio::spawn(async move {
+       handler_clone.cleanup_stale_sessions().await;
+   });
    
    handler
}

598-619: Ohayo sensei! Resources implementation is a placeholder.

The handle_resources_read method always returns "Resource not found" since no resources are implemented yet. This is fine for a first iteration, but please consider adding a comment indicating this is a placeholder.

// New method to handle resources/read
async fn handle_resources_read(&self, request: JsonRpcRequest) -> JsonRpcResponse {
    let Some(params) = &request.params else {
        return JsonRpcResponse::invalid_params(request.id, "Missing params");
    };

    let Some(uri) = params.get("uri").and_then(Value::as_str) else {
        return JsonRpcResponse::invalid_params(request.id, "Missing uri parameter");
    };

+   // TODO: This is a placeholder. Actual resource implementation will be added in a future PR.
    // For now, we don't have any resources to read
    JsonRpcResponse {
        jsonrpc: JSONRPC_VERSION.to_string(),
        id: request.id,
        result: None,
        error: Some(JsonRpcError {
            code: -32601,
            message: "Resource not found".to_string(),
            data: Some(json!({ "details": format!("No resource found with URI: {}", uri) })),
        }),
    }
}

312-315: Ohayo sensei! Consider adding CORS headers for SSE response.

The SSE response includes necessary headers for event streaming but lacks CORS headers that are present in other endpoint responses.

Response::builder()
    .header("Content-Type", "text/event-stream")
    .header("Cache-Control", "no-cache")
    .header("Connection", "keep-alive")
    .header("X-Session-Id", session_id)
+   .header("Access-Control-Allow-Origin", "*")
+   .header("Access-Control-Allow-Headers", "Content-Type")
    .body(Body::wrap_stream(stream))
    .unwrap()

376-378: Ohayo sensei! Add error details for SSE broadcast failures.

When broadcasting to SSE fails, the current implementation logs the error but doesn't provide detailed information about what went wrong.

// Forward the response to the SSE channel
if let Err(e) = tx.send(response.clone()) {
-   warn!("Error forwarding response to SSE: {}", e);
+   warn!("Error forwarding response to SSE: {}, session may be disconnected or channel full", e);
+   
+   // Optionally, we could trigger session cleanup here
+   // let session_id_clone = session_id.clone();
+   // tokio::spawn(async move {
+   //     let mut sessions = self.sse_sessions.write().await;
+   //     sessions.remove(&session_id_clone);
+   // });
}

413-415: Ohayo sensei! Consider consolidating duplicate error handling code.

The error handling for SSE broadcasting is duplicated in two places (lines 376-378 and 413-415). Consider extracting this to a helper method.

+ // Add this helper method to McpHandler
+ fn handle_broadcast_error(&self, e: broadcast::error::SendError<JsonRpcResponse>, session_id: &str) {
+     warn!("Error forwarding response to SSE: {}, session may be disconnected or channel full", e);
+     
+     // Optionally, queue the session for cleanup if broadcasting consistently fails
+ }

// Then replace both occurrences:
if let Err(e) = tx.send(response.clone()) {
-   warn!("Error forwarding response to SSE: {}", e);
+   self.handle_broadcast_error(e, session_id);
}
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 4f427cf and a57aed9.

📒 Files selected for processing (1)
  • crates/torii/server/src/handlers/mcp.rs (10 hunks)
🔇 Additional comments (3)
crates/torii/server/src/handlers/mcp.rs (3)

323-323: Ohayo sensei! URI query parameter parsing is unsafe.

The current query parameter parsing assumes a specific format without validation. This could panic if the query string is malformed.

- let session_id = uri.query().unwrap().split("=").collect::<Vec<_>>()[1];
+ let session_id = match uri.query() {
+     Some(query) => {
+         let params: Vec<&str> = query.split('=').collect();
+         if params.len() < 2 || params[0] != "sessionId" {
+             return Response::builder()
+                 .status(StatusCode::BAD_REQUEST)
+                 .body(Body::from("Invalid session ID parameter"))
+                 .unwrap();
+         }
+         params[1]
+     },
+     None => {
+         return Response::builder()
+             .status(StatusCode::BAD_REQUEST)
+             .body(Body::from("Missing session ID parameter"))
+             .unwrap();
+     }
+ };

545-548: Ohayo sensei! SQL Injection vulnerability in query tool.

The query tool executes SQL directly from user input without proper validation, creating a SQL injection risk.

While sqlx::query() provides some protection, add additional validation:

if let Some(query) = args.and_then(|args| args.get("query").and_then(Value::as_str)) {
+   // Validate query - prevent destructive operations
+   let query_upper = query.to_uppercase();
+   if query_upper.contains("DROP") || query_upper.contains("DELETE") || query_upper.contains("TRUNCATE") {
+       return JsonRpcResponse {
+           jsonrpc: JSONRPC_VERSION.to_string(),
+           id: request.id,
+           result: None,
+           error: Some(JsonRpcError {
+               code: -32602,
+               message: "Invalid query".to_string(),
+               data: Some(json!({ "details": "Destructive operations are not allowed" })),
+           }),
+       };
+   }
+
    match sqlx::query(query).fetch_all(&*self.pool).await {

261-266: Ohayo sensei! Inserting SseSession in a shared map for SSE.

Be mindful of session cleanup. If sessions remain after clients disconnect, memory can grow.

Comment on lines +689 to 700
match req.method() {
// Handle GET requests for SSE connection
&hyper::Method::GET => {
return self.handle_sse_connection().await;
}
// Return Method Not Allowed for other methods
_ => Response::builder()
.body(Body::from(
serde_json::to_string(&JsonRpcResponse::method_not_found(Value::Null)).unwrap(),
))
.unwrap(),
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo sensei! Add support for CORS preflight requests.

The current implementation handles GET requests for SSE but doesn't properly support OPTIONS preflight requests that browsers send for CORS.

match req.method() {
    // Handle GET requests for SSE connection
    &hyper::Method::GET => {
        return self.handle_sse_connection().await;
    }
+   // Handle OPTIONS preflight requests
+   &hyper::Method::OPTIONS => {
+       return Response::builder()
+           .header("Access-Control-Allow-Origin", "*")
+           .header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
+           .header("Access-Control-Allow-Headers", "Content-Type")
+           .header("Access-Control-Max-Age", "86400")
+           .body(Body::empty())
+           .unwrap();
+   }
    // Return Method Not Allowed for other methods
    _ => Response::builder()
        .body(Body::from(
            serde_json::to_string(&JsonRpcResponse::method_not_found(Value::Null)).unwrap(),
        ))
        .unwrap(),
}
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
match req.method() {
// Handle GET requests for SSE connection
&hyper::Method::GET => {
return self.handle_sse_connection().await;
}
// Return Method Not Allowed for other methods
_ => Response::builder()
.body(Body::from(
serde_json::to_string(&JsonRpcResponse::method_not_found(Value::Null)).unwrap(),
))
.unwrap(),
}
match req.method() {
// Handle GET requests for SSE connection
&hyper::Method::GET => {
return self.handle_sse_connection().await;
}
// Handle OPTIONS preflight requests
&hyper::Method::OPTIONS => {
return Response::builder()
.header("Access-Control-Allow-Origin", "*")
.header("Access-Control-Allow-Methods", "GET, POST, OPTIONS")
.header("Access-Control-Allow-Headers", "Content-Type")
.header("Access-Control-Max-Age", "86400")
.body(Body::empty())
.unwrap();
}
// Return Method Not Allowed for other methods
_ => Response::builder()
.body(Body::from(
serde_json::to_string(&JsonRpcResponse::method_not_found(Value::Null)).unwrap(),
))
.unwrap(),
}

Comment on lines 112 to 152
pub fn new(pool: Arc<SqlitePool>) -> Self {
Self { pool }
let tools = vec![
Tool {
name: "query",
description: "Execute a SQL query on the database",
input_schema: json!({
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "SQL query to execute"
}
},
"required": ["query"]
}),
},
Tool {
name: "schema",
description: "Retrieve the database schema including tables, columns, and their \
types",
input_schema: json!({
"type": "object",
"properties": {
"table": {
"type": "string",
"description": "Optional table name to get schema for. If omitted, returns schema for all tables."
}
}
}),
},
];

let resources = vec![]; // Add resources as needed

Self {
pool,
sse_sessions: Arc::new(tokio::sync::RwLock::new(std::collections::HashMap::new())),
tools,
resources,
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Ohayo sensei! Consider adding unit tests for new SSE functionality.

The SSE implementation adds significant new functionality that would benefit from unit tests to ensure it works correctly and remains stable during future changes.


🏁 Script executed:

#!/bin/bash
# Check if there are any test files for mcp.rs
find crates/torii/server/src -name "test_*.rs" | grep -i mcp

Length of output: 62



Ohayo sensei!
It appears that the new SSE functionality in crates/torii/server/src/handlers/mcp.rs (lines 112–152) isn’t covered by any unit tests. Since SSE is a critical new feature, adding tests—especially around session management and event flow—will help safeguard against regressions in the future. Please consider integrating these unit tests to ensure long-term stability.


🏁 Script executed:

#!/bin/bash
echo "Searching for inline tests in crates/torii/server/src/handlers/mcp.rs"
grep -R "#\[cfg(test)\]" -n crates/torii/server/src/handlers/mcp.rs || echo "No #[cfg(test)] blocks found in mcp.rs"

echo "Searching for #[test] functions in crates/torii/server/src/handlers/mcp.rs"
grep -R "#\[test\]" -n crates/torii/server/src/handlers/mcp.rs || echo "No #[test] functions found in mcp.rs"

Length of output: 612


Ohayo sensei!
The new SSE functionality in crates/torii/server/src/handlers/mcp.rs (lines 112–152) isn't covered by any unit tests. Given that SSE is a critical and complex feature—especially with session management—it’s important to add tests verifying both session creation and event flow. Please include unit tests to fortify this functionality against future changes.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (9)
crates/torii/mcp/src/resources/mod.rs (1)

2-4: Ohayo, sensei! Consider enhancing the Resource struct with additional metadata fields.

The Resource struct is quite minimal with just a name field. For improved usability, consider adding more descriptive fields like:

pub struct Resource {
    pub name: &'static str,
+   pub description: &'static str,
+   pub version: &'static str,
+   pub capabilities: Vec<&'static str>,
}

This would provide better documentation and context for developers using these resources.

crates/torii/mcp/src/tools/mod.rs (1)

13-18: Consider adding schema validation for tools.

While the implementation is clean, there's no validation for the input schemas. It might be beneficial to add some form of schema validation to ensure that tools are correctly defined.

pub fn get_tools() -> Vec<Tool> {
+   // Validate tool schemas before returning
+   let tools = vec![
        query::get_tool(),
        schema::get_tool(),
    ];
+   
+   // Optional: Log the available tools at startup
+   log::info!("Loaded {} MCP tools", tools.len());
+   
+   tools
}
crates/torii/sqlite/src/utils.rs (1)

188-229: Ohayo sensei! The map_row_to_json function is a solid approach to converting rows into JSON.
It gracefully handles different SQLite data types (TEXT, INTEGER, REAL, BLOB, and fallback attempts). A few suggestions to consider:
• If you anticipate date/time columns, you might parse them into a structured format before turning them into strings.
• Repeated try_get attempts might be slightly less performant; if performance becomes critical, consider an optimized approach or a custom type conversion method.

crates/torii/mcp/src/tools/schema.rs (1)

27-116: Ohayo sensei! The handle function effectively retrieves schema details.
It properly parameterizes the query and uses pragma_table_info to fetch columns and their attributes. A few minor points:
• Consider logging or surfacing an empty result if the user specifies a table that doesn’t exist.
serde_json::to_string_pretty(...).unwrap() can theoretically fail; you could gracefully handle serialization errors, but it’s likely safe in typical usage.

crates/torii/mcp/src/tools/query.rs (1)

29-76: Ohayo sensei! The handle function correctly executes arbitrary SQL and returns rows as JSON.
Just keep in mind that running arbitrary user-provided queries may be dangerous in certain environments—consider restricting queries to read-only or validating them if security is a concern.

crates/torii/server/src/handlers/mcp.rs (3)

103-104: Ohayo sensei! Delegation to tool handlers is clear.
These lines properly route queries to the query or schema tool. Consider logging calls for debugging.


129-129: Ohayo sensei! Good usage of warn!.
This log statement helps catch message sending errors. Ensure logs remain adequately descriptive in production.


291-303: Ohayo sensei! The resource read handler is incomplete.
Currently returning method_not_found. If you need assistance implementing resource reading logic, let me know!

crates/torii/mcp/src/types/mod.rs (1)

17-23: Ohayo sensei! JsonRpcRequest captures standard fields.
Ensure that any future optional fields (e.g. context) are accounted for if needed.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a57aed9 and 917e50d.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (12)
  • Cargo.toml (1 hunks)
  • crates/torii/mcp/Cargo.toml (1 hunks)
  • crates/torii/mcp/src/lib.rs (1 hunks)
  • crates/torii/mcp/src/resources/mod.rs (1 hunks)
  • crates/torii/mcp/src/tools/mod.rs (1 hunks)
  • crates/torii/mcp/src/tools/query.rs (1 hunks)
  • crates/torii/mcp/src/tools/schema.rs (1 hunks)
  • crates/torii/mcp/src/types/mod.rs (1 hunks)
  • crates/torii/server/Cargo.toml (2 hunks)
  • crates/torii/server/src/handlers/mcp.rs (6 hunks)
  • crates/torii/server/src/handlers/sql.rs (1 hunks)
  • crates/torii/sqlite/src/utils.rs (3 hunks)
✅ Files skipped from review due to trivial changes (2)
  • crates/torii/mcp/src/lib.rs
  • crates/torii/mcp/Cargo.toml
🚧 Files skipped from review as they are similar to previous changes (1)
  • crates/torii/server/Cargo.toml
🔇 Additional comments (28)
crates/torii/mcp/src/resources/mod.rs (1)

6-8: This empty vector implementation is preparation for future resources.

The empty vector return indicates this is preparatory work for future expansion. When adding actual resources, consider implementing a builder pattern or factory methods to construct them consistently.

Would you like me to suggest a more robust implementation for when you start adding real resources to this collection?

Cargo.toml (1)

132-132: Ohayo! Dependency addition looks good, sensei.

The addition of the torii-mcp workspace dependency is clean and follows the project's established pattern for internal dependencies.

crates/torii/mcp/src/tools/mod.rs (1)

6-11: Tool struct looks well-designed, sensei!

The Tool struct with name, description, and input_schema provides a good foundation for tool definitions. The use of serde_json::Value for input_schema offers flexibility for various schema formats.

crates/torii/server/src/handlers/sql.rs (1)

7-8: Ohayo! Good refactoring move, sensei.

Moving the map_row_to_json function to a utility module is a great refactoring decision. This centralizes the row-to-JSON conversion logic, making it reusable across different parts of the codebase.

crates/torii/sqlite/src/utils.rs (2)

7-8: Ohayo sensei! These import additions look good.
No issues spotted with the inclusion of base64::Engine and the STANDARD encoder.


18-18: Ohayo sensei! The additional imports from sqlx look correct.
This nicely streamlines row handling in the new function.

crates/torii/mcp/src/tools/schema.rs (1)

11-25: Ohayo sensei! get_tool for the schema functionality looks neat.
This clarifies the usage for retrieving table information with an optional table parameter. The concise JSON schema is easy to follow.

crates/torii/mcp/src/tools/query.rs (1)

12-27: Ohayo sensei! get_tool for the query functionality is well-defined.
Exposing a single required query parameter keeps this tool straightforward for running database commands.

crates/torii/server/src/handlers/mcp.rs (11)

7-17: Ohayo sensei! Solid import additions.
These added imports look correct for your SSE, resource, and tool functionalities.


24-26: Ohayo sensei! Good approach to struct initialization.
Storing sse_sessions, tools, and resources in the handler is neat. Just be mindful that your SSE sessions might accumulate if not removed on client disconnect.


31-36: Ohayo sensei! The new constructor is straightforward.
No obvious issues; initialization matches the updated handler fields.


48-49: Ohayo sensei! Resource routes introduced.
These new JSON-RPC methods for resources/list and resources/read expand your MCP protocol coverage. Ensure any existing client code is aware of these new routes.


59-71: Ohayo sensei! Nice expansion of server capabilities.
Exposing tools and resources features here helps clients discover your new endpoints. The versioning approach for serverInfo also looks tidy.


98-98: Ohayo sensei! Checking tool name.
Looks good. Just confirm the front-end doesn't rely on older tool endpoints.


136-151: Ohayo sensei! Remember session cleanup.
Inserting the session into the HashMap is fine, but a removal step after client disconnect would prevent memory growth.


153-212: Ohayo sensei! SSE streaming logic is well-structured.
The chain of asynchronous streams properly merges the initial endpoint event and subsequent broadcast messages. This is a robust approach for SSE.


214-282: Ohayo sensei! Thorough handling of JSON-RPC over SSE.
Validates session presence, parses the request, and pushes the response into the broadcast channel. Looks consistent with your SSE design.


284-289: Ohayo sensei! Resource listing function is concise.
Returning the resource names as JSON is straightforward. The approach looks good for the new endpoint.


312-345: Ohayo sensei! Consider adding a preflight OPTIONS response.
Only GET and message requests are allowed. Modern browsers might try OPTIONS for CORS compliance, especially for SSE endpoints.

crates/torii/mcp/src/types/mod.rs (9)

1-9: Ohayo sensei! Good foundation for JSON-RPC constants.
JSONRPC_VERSION, MCP_VERSION, and SSE_CHANNEL_CAPACITY define a clear protocol boundary.


10-15: Ohayo sensei! Introducing JsonRpcMessage enum.
Untagged approach neatly captures Requests vs. Notifications.


25-30: Ohayo sensei! JsonRpcNotification structure is simple.
It’s a fine approach—just confirm notifications aren't being processed incorrectly downstream.


32-40: Ohayo sensei! JsonRpcResponse structure is well-designed.
Including optional error and result fields is aligned with the JSON-RPC 2.0 spec.


42-48: Ohayo sensei! JsonRpcError straightforward.
Captures code, message, and data for advanced error reporting.


50-54: Ohayo sensei! Implementation struct.
Handy for relaying server info to clients.


56-71: Ohayo sensei! Capability structs look correct.
Clients can discover tool listing, resource subscription, etc. It's good practice to keep these separate.


73-79: Ohayo sensei! SseSession suits your broadcast approach.
Storing the broadcast sender along with a session ID is a neat abstraction.


80-109: Ohayo sensei! Exhaustive JSON-RPC response helpers.
Methods like ok, error, and specialized errors match typical JSON-RPC patterns. Great coverage for consistent error handling.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant