Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(torii-server): initializing handlers #3078

Merged
merged 4 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 6 additions & 7 deletions crates/torii/server/src/handlers/graphql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

pub(crate) const LOG_TARGET: &str = "torii::server::handlers::graphql";

#[derive(Debug)]
pub struct GraphQLHandler {
client_ip: IpAddr,
graphql_addr: Option<SocketAddr>,
pub(crate) graphql_addr: Option<SocketAddr>,
}

impl GraphQLHandler {
pub fn new(client_ip: IpAddr, graphql_addr: Option<SocketAddr>) -> Self {
Self { client_ip, graphql_addr }
pub fn new(graphql_addr: Option<SocketAddr>) -> Self {
Self { graphql_addr }

Check warning on line 18 in crates/torii/server/src/handlers/graphql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/graphql.rs#L17-L18

Added lines #L17 - L18 were not covered by tests
}
}

Expand All @@ -25,11 +25,10 @@
req.uri().path().starts_with("/graphql")
}

async fn handle(&self, req: Request<Body>) -> Response<Body> {
async fn handle(&self, req: Request<Body>, client_addr: IpAddr) -> Response<Body> {

Check warning on line 28 in crates/torii/server/src/handlers/graphql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/graphql.rs#L28

Added line #L28 was not covered by tests
if let Some(addr) = self.graphql_addr {
let graphql_addr = format!("http://{}", addr);
match crate::proxy::GRAPHQL_PROXY_CLIENT.call(self.client_ip, &graphql_addr, req).await
{
match crate::proxy::GRAPHQL_PROXY_CLIENT.call(client_addr, &graphql_addr, req).await {

Check warning on line 31 in crates/torii/server/src/handlers/graphql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/graphql.rs#L31

Added line #L31 was not covered by tests
Ok(response) => response,
Err(_error) => {
error!(target: LOG_TARGET, "GraphQL proxy error: {:?}", _error);
Expand Down
10 changes: 5 additions & 5 deletions crates/torii/server/src/handlers/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@

pub(crate) const LOG_TARGET: &str = "torii::server::handlers::grpc";

#[derive(Debug)]
pub struct GrpcHandler {
client_ip: IpAddr,
grpc_addr: Option<SocketAddr>,
}

impl GrpcHandler {
pub fn new(client_ip: IpAddr, grpc_addr: Option<SocketAddr>) -> Self {
Self { client_ip, grpc_addr }
pub fn new(grpc_addr: Option<SocketAddr>) -> Self {
Self { grpc_addr }

Check warning on line 18 in crates/torii/server/src/handlers/grpc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/grpc.rs#L17-L18

Added lines #L17 - L18 were not covered by tests
}
}

Expand All @@ -29,10 +29,10 @@
.unwrap_or(false)
}

async fn handle(&self, req: Request<Body>) -> Response<Body> {
async fn handle(&self, req: Request<Body>, client_addr: IpAddr) -> Response<Body> {

Check warning on line 32 in crates/torii/server/src/handlers/grpc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/grpc.rs#L32

Added line #L32 was not covered by tests
if let Some(grpc_addr) = self.grpc_addr {
let grpc_addr = format!("http://{}", grpc_addr);
match crate::proxy::GRPC_PROXY_CLIENT.call(self.client_ip, &grpc_addr, req).await {
match crate::proxy::GRPC_PROXY_CLIENT.call(client_addr, &grpc_addr, req).await {

Check warning on line 35 in crates/torii/server/src/handlers/grpc.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/grpc.rs#L35

Added line #L35 was not covered by tests
Ok(response) => response,
Err(_error) => {
error!(target: LOG_TARGET, "{:?}", _error);
Expand Down
9 changes: 5 additions & 4 deletions crates/torii/server/src/handlers/mcp.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::net::IpAddr;
use std::sync::Arc;

use futures_util::{SinkExt, StreamExt};
Expand Down Expand Up @@ -75,7 +76,7 @@
list_changed: bool,
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct McpHandler {
pool: Arc<SqlitePool>,
}
Expand Down Expand Up @@ -219,7 +220,7 @@
AND m.name = ?
ORDER BY m.name, p.cid"
.to_string(),
None => "SELECT
_ => "SELECT

Check warning on line 223 in crates/torii/server/src/handlers/mcp.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/mcp.rs#L223

Added line #L223 was not covered by tests
m.name as table_name,
p.*
FROM sqlite_master m
Expand All @@ -231,7 +232,7 @@

let rows = match table_filter {
Some(table) => sqlx::query(&schema_query).bind(table).fetch_all(&*self.pool).await,
None => sqlx::query(&schema_query).fetch_all(&*self.pool).await,
_ => sqlx::query(&schema_query).fetch_all(&*self.pool).await,

Check warning on line 235 in crates/torii/server/src/handlers/mcp.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/mcp.rs#L235

Added line #L235 was not covered by tests
};

match rows {
Expand Down Expand Up @@ -393,7 +394,7 @@
.unwrap_or(false)
}

async fn handle(&self, req: Request<Body>) -> Response<Body> {
async fn handle(&self, req: Request<Body>, _client_addr: IpAddr) -> Response<Body> {

Check warning on line 397 in crates/torii/server/src/handlers/mcp.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/mcp.rs#L397

Added line #L397 was not covered by tests
if hyper_tungstenite::is_upgrade_request(&req) {
let (response, websocket) = hyper_tungstenite::upgrade(req, None)
.expect("Failed to upgrade WebSocket connection");
Expand Down
6 changes: 4 additions & 2 deletions crates/torii/server/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@ pub mod mcp;
pub mod sql;
pub mod static_files;

use std::net::IpAddr;

use hyper::{Body, Request, Response};

#[async_trait::async_trait]
pub trait Handler: Send + Sync {
pub trait Handler: Send + Sync + std::fmt::Debug {
// Check if this handler should handle the given request
fn should_handle(&self, req: &Request<Body>) -> bool;

// Handle the request
async fn handle(&self, req: Request<Body>) -> Response<Body>;
async fn handle(&self, req: Request<Body>, client_addr: IpAddr) -> Response<Body>;
}
4 changes: 3 additions & 1 deletion crates/torii/server/src/handlers/sql.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use std::net::IpAddr;
use std::sync::Arc;

use base64::engine::general_purpose::STANDARD;
Expand All @@ -9,6 +10,7 @@

use super::Handler;

#[derive(Debug)]
pub struct SqlHandler {
pool: Arc<SqlitePool>,
}
Expand Down Expand Up @@ -111,7 +113,7 @@
req.uri().path().starts_with("/sql")
}

async fn handle(&self, req: Request<Body>) -> Response<Body> {
async fn handle(&self, req: Request<Body>, _client_addr: IpAddr) -> Response<Body> {

Check warning on line 116 in crates/torii/server/src/handlers/sql.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/sql.rs#L116

Added line #L116 was not covered by tests
self.handle_request(req).await
}
}
Expand Down
13 changes: 5 additions & 8 deletions crates/torii/server/src/handlers/static_files.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

pub(crate) const LOG_TARGET: &str = "torii::server::handlers::static";

#[derive(Debug)]
pub struct StaticHandler {
client_ip: IpAddr,
artifacts_addr: Option<SocketAddr>,
}

impl StaticHandler {
pub fn new(client_ip: IpAddr, artifacts_addr: Option<SocketAddr>) -> Self {
Self { client_ip, artifacts_addr }
pub fn new(artifacts_addr: Option<SocketAddr>) -> Self {
Self { artifacts_addr }

Check warning on line 17 in crates/torii/server/src/handlers/static_files.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/static_files.rs#L16-L17

Added lines #L16 - L17 were not covered by tests
}
}

Expand All @@ -24,13 +24,10 @@
req.uri().path().starts_with("/static")
}

async fn handle(&self, req: Request<Body>) -> Response<Body> {
async fn handle(&self, req: Request<Body>, client_addr: IpAddr) -> Response<Body> {

Check warning on line 27 in crates/torii/server/src/handlers/static_files.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/static_files.rs#L27

Added line #L27 was not covered by tests
if let Some(artifacts_addr) = self.artifacts_addr {
let artifacts_addr = format!("http://{}", artifacts_addr);
match crate::proxy::GRAPHQL_PROXY_CLIENT
.call(self.client_ip, &artifacts_addr, req)
.await
{
match crate::proxy::GRAPHQL_PROXY_CLIENT.call(client_addr, &artifacts_addr, req).await {

Check warning on line 30 in crates/torii/server/src/handlers/static_files.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/handlers/static_files.rs#L30

Added line #L30 was not covered by tests
Ok(response) => response,
Err(_error) => {
error!(target: LOG_TARGET, "{:?}", _error);
Expand Down
58 changes: 20 additions & 38 deletions crates/torii/server/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,7 @@
pub struct Proxy {
addr: SocketAddr,
allowed_origins: Option<Vec<String>>,
grpc_addr: Option<SocketAddr>,
artifacts_addr: Option<SocketAddr>,
graphql_addr: Arc<RwLock<Option<SocketAddr>>>,
pool: Arc<SqlitePool>,
handlers: Arc<RwLock<Vec<Box<dyn Handler>>>>,
}

impl Proxy {
Expand All @@ -79,19 +76,20 @@
artifacts_addr: Option<SocketAddr>,
pool: Arc<SqlitePool>,
) -> Self {
Self {
addr,
allowed_origins,
grpc_addr,
graphql_addr: Arc::new(RwLock::new(graphql_addr)),
artifacts_addr,
pool,
}
let handlers: Arc<RwLock<Vec<Box<dyn Handler>>>> = Arc::new(RwLock::new(vec![
Box::new(GraphQLHandler::new(graphql_addr)),
Box::new(GrpcHandler::new(grpc_addr)),
Box::new(McpHandler::new(pool.clone())),
Box::new(SqlHandler::new(pool.clone())),
Box::new(StaticHandler::new(artifacts_addr)),
]));

Self { addr, allowed_origins, handlers }

Check warning on line 87 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L79-L87

Added lines #L79 - L87 were not covered by tests
}

pub async fn set_graphql_addr(&self, addr: SocketAddr) {
let mut graphql_addr = self.graphql_addr.write().await;
*graphql_addr = Some(addr);
let mut handlers = self.handlers.write().await;
handlers[0] = Box::new(GraphQLHandler::new(Some(addr)));

Check warning on line 92 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L91-L92

Added lines #L91 - L92 were not covered by tests
}

pub async fn start(
Expand All @@ -100,13 +98,10 @@
) -> Result<(), hyper::Error> {
let addr = self.addr;
let allowed_origins = self.allowed_origins.clone();
let grpc_addr = self.grpc_addr;
let graphql_addr = self.graphql_addr.clone();
let artifacts_addr = self.artifacts_addr;
let pool = self.pool.clone();

let make_svc = make_service_fn(move |conn: &AddrStream| {
let remote_addr = conn.remote_addr().ip();

Check warning on line 104 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L104

Added line #L104 was not covered by tests
let cors = CorsLayer::new()
.max_age(DEFAULT_MAX_AGE)
.allow_methods([Method::GET, Method::POST])
Expand Down Expand Up @@ -140,14 +135,12 @@
),
});

let pool_clone = pool.clone();
let graphql_addr_clone = graphql_addr.clone();
let handlers = self.handlers.clone();

Check warning on line 138 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L138

Added line #L138 was not covered by tests
let service = ServiceBuilder::new().option_layer(cors).service_fn(move |req| {
let pool = pool_clone.clone();
let graphql_addr = graphql_addr_clone.clone();
let handlers = handlers.clone();

Check warning on line 140 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L140

Added line #L140 was not covered by tests
async move {
let graphql_addr = graphql_addr.read().await;
handle(remote_addr, grpc_addr, artifacts_addr, *graphql_addr, pool, req).await
let handlers = handlers.read().await;
handle(remote_addr, req, &handlers).await

Check warning on line 143 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L142-L143

Added lines #L142 - L143 were not covered by tests
}
});

Expand All @@ -166,23 +159,12 @@

async fn handle(
client_ip: IpAddr,
grpc_addr: Option<SocketAddr>,
artifacts_addr: Option<SocketAddr>,
graphql_addr: Option<SocketAddr>,
pool: Arc<SqlitePool>,
req: Request<Body>,
handlers: &[Box<dyn Handler>],

Check warning on line 163 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L163

Added line #L163 was not covered by tests
) -> Result<Response<Body>, Infallible> {
let handlers: Vec<Box<dyn Handler>> = vec![
Box::new(SqlHandler::new(pool.clone())),
Box::new(GraphQLHandler::new(client_ip, graphql_addr)),
Box::new(GrpcHandler::new(client_ip, grpc_addr)),
Box::new(StaticHandler::new(client_ip, artifacts_addr)),
Box::new(McpHandler::new(pool.clone())),
];

for handler in handlers {
for handler in handlers.iter() {

Check warning on line 165 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L165

Added line #L165 was not covered by tests
if handler.should_handle(&req) {
return Ok(handler.handle(req).await);
return Ok(handler.handle(req, client_ip).await);

Check warning on line 167 in crates/torii/server/src/proxy.rs

View check run for this annotation

Codecov / codecov/patch

crates/torii/server/src/proxy.rs#L167

Added line #L167 was not covered by tests
}
}

Expand Down
Loading