Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: implement hyper service trait #240

Open
wants to merge 1 commit into
base: dev
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 41 additions & 29 deletions crates/hyper/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use http_body_util::BodyExt;
use hyper::body::Incoming;
use http_body_util::{BodyExt, Full};
use hyper::body::{Bytes, Incoming};
use hyper::server::conn::http1;
use hyper::{service::service_fn, Request};
use hyper::service::Service;
use hyper::Request;
use hyper::Response;
use hyper_util::rt::TokioIo;
use ngyn_shared::core::engine::{NgynHttpPlatform, PlatformData};
use ngyn_shared::server::NgynResponse;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::net::TcpListener;

Expand Down Expand Up @@ -43,11 +46,11 @@ impl HyperApplication {
let mut signal = std::pin::pin!(shutdown_signal());

loop {
let data = data.clone();
let service = HyperService { data: data.clone() };
tokio::select! {
Ok((stream, _)) = server.accept() => {
let io = TokioIo::new(stream);
let conn = http.serve_connection(io, service_fn(move |req| hyper_service(data.clone(), req)));
let conn = http.serve_connection(io, service);
let handle = graceful.watch(conn);

tokio::task::spawn(async move {
Expand Down Expand Up @@ -78,31 +81,40 @@ impl HyperApplication {
}
}

async fn hyper_service(
data: Arc<PlatformData>,
req: Request<Incoming>,
) -> Result<NgynResponse, hyper::Error> {
let (parts, mut body) = req.into_parts();
let body = {
let mut buf = Vec::new();
// TODO: change this approach. It's not efficient.
while let Some(frame) = body.frame().await {
if let Ok(bytes) = frame?.into_data() {
buf.extend_from_slice(&bytes);
} else {
break;
}
}
buf
};
let req = Request::from_parts(parts, body);
let res = data.respond(req).await;

Ok::<_, hyper::Error>(res)
}

async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
.expect("failed to listen for signal");
}

struct HyperService {
data: Arc<PlatformData>,
}

impl Service<Request<Incoming>> for HyperService {
type Response = Response<Full<Bytes>>;
type Error = hyper::Error;
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;

fn call(&self, req: Request<Incoming>) -> Self::Future {
let (parts, mut body) = req.into_parts();
let data = self.data.clone();

Box::pin(async move {
let body = {
let mut buf = Vec::new();
// TODO: change this approach. It's not efficient.
while let Some(Ok(frame)) = body.frame().await {
if let Ok(bytes) = frame.into_data() {
buf.extend_from_slice(&bytes);
} else {
break;
}
}
buf
};
let req = Request::from_parts(parts, body);
Ok(data.respond(req).await)
})
}
}