Skip to content

Commit 20c0bc2

Browse files
committed
test: run integration test
1 parent f01ba14 commit 20c0bc2

File tree

17 files changed

+190
-61
lines changed

17 files changed

+190
-61
lines changed

Cargo.lock

+27
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ clap = { version = "4.4" }
2121
feed-rs = { version = "1.4" }
2222
futures-util = { version = "0.3.30" }
2323
graphql_client = { version = "0.13.0", default-features = false }
24+
headers = { version = "0.4.0" }
2425
http = { version = "0.2" } # request use 0.2
2526
kvsd = { version = "0.1.2", default-features = false }
2627
# kvsd = { path = "../kvsd" }

crates/synd_api/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ clap = { workspace = true, features = ["derive"] }
2424
feed-rs = { workspace = true }
2525
futures-util = { workspace = true }
2626
graphql_client = { workspace = true }
27+
headers = { workspace = true }
2728
kvsd = { workspace = true }
2829
moka = { workspace = true, features = ["future"] }
2930
pin-project = "1.1.4"

crates/synd_api/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -9,4 +9,5 @@ pub mod principal;
99
pub mod repository;
1010
pub mod serve;
1111
pub mod service;
12+
pub mod shutdown;
1213
pub mod usecase;

crates/synd_api/src/main.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use synd_o11y::{opentelemetry::OpenTelemetryGuard, tracing_subscriber::otel_metrics};
22
use tracing::{error, info};
33

4-
use synd_api::{args, config, dependency::Dependency, serve::listen_and_serve};
4+
use synd_api::{args, config, dependency::Dependency, serve::listen_and_serve, shutdown::Shutdown};
55

66
fn init_tracing() -> Option<OpenTelemetryGuard> {
77
use synd_o11y::{
@@ -72,10 +72,11 @@ async fn main() {
7272
let _guard = init_tracing();
7373

7474
let dep = Dependency::new(args.kvsd).await.unwrap();
75+
let shutdown = Shutdown::watch_signal();
7576

7677
info!(version = config::VERSION, "Runinng...");
7778

78-
if let Err(err) = listen_and_serve(dep).await {
79+
if let Err(err) = listen_and_serve(dep, shutdown.notify()).await {
7980
error!("{err:?}");
8081
std::process::exit(1);
8182
}

crates/synd_api/src/repository/kvsd.rs

+12-8
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use async_trait::async_trait;
2+
use futures_util::TryFutureExt;
23
use kvsd::{
34
client::{tcp::Client, Api},
45
Key, Value,
@@ -17,20 +18,23 @@ pub struct KvsdClient {
1718
}
1819

1920
impl KvsdClient {
21+
pub fn new(client: Client<TcpStream>) -> Self {
22+
Self {
23+
client: Mutex::new(client),
24+
}
25+
}
26+
2027
pub async fn connect(
2128
host: impl AsRef<str>,
2229
port: u16,
2330
username: String,
2431
password: String,
2532
) -> anyhow::Result<Self> {
26-
let client =
27-
kvsd::client::tcp::UnauthenticatedClient::insecure_from_addr(host, port).await?;
28-
29-
let client = client.authenticate(username, password).await?;
30-
31-
Ok(Self {
32-
client: Mutex::new(client),
33-
})
33+
kvsd::client::tcp::UnauthenticatedClient::insecure_from_addr(host, port)
34+
.and_then(|client| client.authenticate(username, password))
35+
.await
36+
.map(Self::new)
37+
.map_err(Into::into)
3438
}
3539

3640
async fn get<'a, T>(

crates/synd_api/src/serve/layer/trace.rs

-2
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ impl<B> tower_http::trace::MakeSpan<B> for MakeSpan {
2222
%request_id,
2323
);
2424

25-
tracing::info!("Request headers: {:#?}", request.headers());
26-
2725
span.set_parent(cx);
2826
span
2927
}

crates/synd_api/src/serve/mod.rs

+16-5
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ use axum::{
66
routing::{get, post},
77
BoxError, Extension, Router,
88
};
9+
use futures_util::Future;
910
use tokio::net::TcpListener;
1011
use tower::{limit::ConcurrencyLimitLayer, timeout::TimeoutLayer, ServiceBuilder};
1112
use tower_http::{
@@ -26,18 +27,24 @@ mod probe;
2627
pub mod layer;
2728

2829
/// Bind tcp listener and serve.
29-
pub async fn listen_and_serve(dep: Dependency) -> anyhow::Result<()> {
30+
pub async fn listen_and_serve<Fut>(dep: Dependency, shutdown: Fut) -> anyhow::Result<()>
31+
where
32+
Fut: Future<Output = ()> + Send + 'static,
33+
{
3034
// should 127.0.0.1?
3135
let addr = ("0.0.0.0", config::PORT);
3236
let listener = TcpListener::bind(addr).await?;
3337

3438
info!(ip = addr.0, port = addr.1, "Listening...");
3539

36-
serve(listener, dep).await
40+
serve(listener, dep, shutdown).await
3741
}
3842

3943
/// Start api server
40-
pub async fn serve(listener: TcpListener, dep: Dependency) -> anyhow::Result<()> {
44+
pub async fn serve<Fut>(listener: TcpListener, dep: Dependency, shutdown: Fut) -> anyhow::Result<()>
45+
where
46+
Fut: Future<Output = ()> + Send + 'static,
47+
{
4148
let Dependency {
4249
authenticator,
4350
runtime,
@@ -64,8 +71,12 @@ pub async fn serve(listener: TcpListener, dep: Dependency) -> anyhow::Result<()>
6471
)
6572
.route("/healthcheck", get(probe::healthcheck));
6673

67-
// TODO: graceful shutdown
68-
axum::serve(listener, service).await?;
74+
axum::serve(listener, service)
75+
.with_graceful_shutdown(shutdown)
76+
.await?;
77+
78+
tracing::info!("Shutdown complete");
79+
6980
Ok(())
7081
}
7182

crates/synd_api/src/shutdown.rs

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use tokio::sync::broadcast::{self, Receiver, Sender};
2+
3+
pub struct Shutdown {
4+
tx: Sender<()>,
5+
rx: Receiver<()>,
6+
}
7+
8+
impl Shutdown {
9+
pub fn watch_signal() -> Self {
10+
let (tx, rx) = broadcast::channel(2);
11+
12+
let tx2 = tx.clone();
13+
tokio::spawn(async move {
14+
match tokio::signal::ctrl_c().await {
15+
Ok(()) => tracing::info!("Received ctrl-c signal"),
16+
Err(err) => tracing::error!("Failed to handle signal {err}"),
17+
}
18+
tx2.send(()).ok();
19+
});
20+
21+
Self { tx, rx }
22+
}
23+
24+
pub async fn notify(mut self) {
25+
self.rx.recv().await.ok();
26+
}
27+
}
28+
29+
impl Clone for Shutdown {
30+
fn clone(&self) -> Self {
31+
let rx = self.tx.subscribe();
32+
let tx = self.tx.clone();
33+
Self { tx, rx }
34+
}
35+
}

crates/synd_authn/src/device_flow/github.rs

-2
Original file line numberDiff line numberDiff line change
@@ -119,8 +119,6 @@ impl DeviceFlow {
119119
.send()
120120
.await?;
121121

122-
debug!("{:?}", response.status());
123-
124122
match response.status() {
125123
StatusCode::OK => {
126124
let full = response.bytes().await?;

crates/synd_term/tests/integration.rs

+8-4
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ mod test {
1010
style::{Modifier, Style},
1111
};
1212
use serial_test::file_serial;
13+
1314
use synd_authn::device_flow::github::DeviceFlow;
15+
1416
use synd_term::{
1517
application::{Application, Config},
1618
client::Client,
@@ -33,14 +35,16 @@ mod test {
3335

3436
tracing::info!("TEST hello_world run");
3537

36-
let _kvsd_client = helper::run_kvsd().await?;
37-
3838
let mock_port = 6000;
39+
let api_port = 6001;
3940
let oauth_addr = ("127.0.0.1", mock_port);
4041
let oauth_listener = TcpListener::bind(oauth_addr).await?;
41-
tokio::spawn(synd_test::oauth::serve(oauth_listener));
42+
tokio::spawn(synd_test::mock::serve(oauth_listener));
43+
helper::serve_api(mock_port, api_port).await?;
4244

43-
let endpoint = "http://localhost:5961/graphql".parse().unwrap();
45+
let endpoint = format!("http://localhost:{api_port}/graphql")
46+
.parse()
47+
.unwrap();
4448
let terminal = helper::new_test_terminal();
4549
let client = Client::new(endpoint).unwrap();
4650
let config = Config {

crates/synd_term/tests/test/helper.rs

+45-28
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,15 @@
1-
use std::future::pending;
1+
use std::{future::pending, sync::Arc, time::Duration};
22

3-
use kvsd::client::Api;
3+
use futures_util::TryFutureExt;
44
use ratatui::backend::TestBackend;
5-
use synd_api::{client::github::GithubClient, serve::auth::Authenticator};
5+
use synd_api::{
6+
client::github::GithubClient,
7+
dependency::Dependency,
8+
repository::kvsd::KvsdClient,
9+
serve::auth::Authenticator,
10+
usecase::{authorize::Authorizer, MakeUsecase, Runtime},
11+
};
12+
use synd_feed::feed::{cache::CacheLayer, parser::FeedService};
613
use synd_term::terminal::Terminal;
714
use tokio::net::{TcpListener, TcpStream};
815

@@ -12,25 +19,33 @@ pub fn new_test_terminal() -> Terminal {
1219
Terminal::with(terminal)
1320
}
1421

15-
// Dependency
16-
// * serve
17-
// * tcp listener
18-
// * dependency
19-
// * authenticator
20-
// * github client
21-
// * github endpoint
22-
// * usecase runtime
23-
// * make usecase
24-
// * datastore
25-
// * fetch cached feed
26-
// * authorizer
27-
#[allow(unused)]
28-
pub fn serve_api(mock_port: u16) -> anyhow::Result<()> {
22+
pub async fn serve_api(mock_port: u16, api_port: u16) -> anyhow::Result<()> {
2923
let github_endpoint: &'static str =
3024
format!("http://localhost:{mock_port}/github/graphql").leak();
3125
let github_client = GithubClient::new()?.with_endpoint(github_endpoint);
3226
let authenticator = Authenticator::new()?.with_client(github_client);
3327

28+
let kvsd_client = run_kvsd().await.map(KvsdClient::new)?;
29+
let feed_service = FeedService::new("synd_term_test", 1024 * 1024);
30+
let feed_service = CacheLayer::new(feed_service);
31+
let make_usecase = MakeUsecase {
32+
subscription_repo: Arc::new(kvsd_client),
33+
fetch_feed: Arc::new(feed_service),
34+
};
35+
let authorizer = Authorizer::new();
36+
let runtime = Runtime::new(make_usecase, authorizer);
37+
let dep = Dependency {
38+
authenticator,
39+
runtime,
40+
};
41+
let listener = TcpListener::bind(("localhost", api_port)).await?;
42+
43+
tokio::spawn(synd_api::serve::serve(
44+
listener,
45+
dep,
46+
std::future::pending::<()>(),
47+
));
48+
3449
Ok(())
3550
}
3651

@@ -57,17 +72,19 @@ pub async fn run_kvsd() -> anyhow::Result<kvsd::client::tcp::Client<TcpStream>>
5772

5873
let _server_handler = tokio::spawn(initializer.run_kvsd(pending::<()>()));
5974

60-
// TODO: retry
61-
let mut client = kvsd::client::tcp::UnauthenticatedClient::insecure_from_addr(addr.0, addr.1)
62-
.await
63-
.unwrap()
64-
.authenticate("test", "test")
65-
.await
66-
.unwrap();
67-
68-
// Ping
69-
let ping_duration = client.ping().await.unwrap();
70-
assert!(ping_duration.num_nanoseconds().unwrap() > 0);
75+
let handshake = async {
76+
loop {
77+
match kvsd::client::tcp::UnauthenticatedClient::insecure_from_addr(addr.0, addr.1)
78+
.and_then(|client| client.authenticate("test", "test"))
79+
.await
80+
{
81+
Ok(client) => break client,
82+
Err(_) => tokio::time::sleep(Duration::from_millis(500)).await,
83+
}
84+
}
85+
};
86+
87+
let client = tokio::time::timeout(Duration::from_secs(5), handshake).await?;
7188

7289
Ok(client)
7390
}

crates/synd_test/Cargo.toml

+6-4
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,12 @@ version = "0.1.0"
1414
[dependencies]
1515
synd_authn = { path = "../synd_authn" }
1616

17-
anyhow = { workspace = true }
18-
axum = { workspace = true }
19-
tokio = { workspace = true, features = ["rt-multi-thread", "net", "time"] }
20-
tracing = { workspace = true }
17+
anyhow = { workspace = true }
18+
axum = { workspace = true }
19+
headers = { workspace = true }
20+
serde_json = { workspace = true }
21+
tokio = { workspace = true, features = ["rt-multi-thread", "net", "time"] }
22+
tracing = { workspace = true }
2123

2224
[lints]
2325
workspace = true

crates/synd_test/src/lib.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
pub mod oauth;
1+
pub mod mock;

0 commit comments

Comments
 (0)