Skip to content

Commit

Permalink
Merge branch 'main' of github.com:omnius-labs/core-rs
Browse files Browse the repository at this point in the history
  • Loading branch information
lyrise committed Jan 19, 2025
2 parents af0bbd0 + 0d67136 commit 521e3cb
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 175 deletions.
51 changes: 7 additions & 44 deletions modules/cloud/src/aws/s3/s3_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,8 @@ use tokio::io::AsyncWriteExt;

#[async_trait]
pub trait S3Client {
async fn gen_get_presigned_uri(
&self,
key: &str,
start_time: DateTime<Utc>,
expires_in: Duration,
file_name: &str,
) -> anyhow::Result<String>;
async fn gen_put_presigned_uri(
&self,
key: &str,
start_time: DateTime<Utc>,
expires_in: Duration,
) -> anyhow::Result<String>;
async fn gen_get_presigned_uri(&self, key: &str, start_time: DateTime<Utc>, expires_in: Duration, file_name: &str) -> anyhow::Result<String>;
async fn gen_put_presigned_uri(&self, key: &str, start_time: DateTime<Utc>, expires_in: Duration) -> anyhow::Result<String>;
async fn get_object(&self, key: &str, destination: &Path) -> anyhow::Result<()>;
async fn put_object(&self, key: &str, source: &Path) -> anyhow::Result<()>;
}
Expand All @@ -32,13 +21,7 @@ pub struct S3ClientImpl {

#[async_trait]
impl S3Client for S3ClientImpl {
async fn gen_get_presigned_uri(
&self,
key: &str,
start_time: DateTime<Utc>,
expires_in: Duration,
file_name: &str,
) -> anyhow::Result<String> {
async fn gen_get_presigned_uri(&self, key: &str, start_time: DateTime<Utc>, expires_in: Duration, file_name: &str) -> anyhow::Result<String> {
let presigning_config = PresigningConfig::builder()
.start_time(start_time.into())
.expires_in(expires_in.to_std()?)
Expand All @@ -60,12 +43,7 @@ impl S3Client for S3ClientImpl {
Ok(request.uri().to_string())
}

async fn gen_put_presigned_uri(
&self,
key: &str,
start_time: DateTime<Utc>,
expires_in: Duration,
) -> anyhow::Result<String> {
async fn gen_put_presigned_uri(&self, key: &str, start_time: DateTime<Utc>, expires_in: Duration) -> anyhow::Result<String> {
let presigning_config = PresigningConfig::builder()
.start_time(start_time.into())
.expires_in(expires_in.to_std()?)
Expand All @@ -84,13 +62,7 @@ impl S3Client for S3ClientImpl {
async fn get_object(&self, key: &str, destination: &Path) -> anyhow::Result<()> {
let mut file = File::create(destination).await?;

let mut object = self
.client
.get_object()
.bucket(self.bucket.as_str())
.key(key)
.send()
.await?;
let mut object = self.client.get_object().bucket(self.bucket.as_str()).key(key).send().await?;

while let Some(bytes) = object.body.try_next().await? {
file.write_all(&bytes).await?;
Expand All @@ -101,13 +73,7 @@ impl S3Client for S3ClientImpl {

async fn put_object(&self, key: &str, source: &Path) -> anyhow::Result<()> {
let body = ByteStream::from_path(source).await?;
self.client
.put_object()
.bucket(self.bucket.as_str())
.key(key)
.body(body)
.send()
.await?;
self.client.put_object().bucket(self.bucket.as_str()).key(key).body(body).send().await?;

Ok(())
}
Expand All @@ -131,10 +97,7 @@ mod tests {
client: aws_sdk_s3::Client::new(&sdk_config),
bucket: "opxs.v1.dev.file-convert".to_string(),
};
let uri = s3
.gen_put_presigned_uri("in/test.txt", Utc::now(), Duration::minutes(5))
.await
.unwrap();
let uri = s3.gen_put_presigned_uri("in/test.txt", Utc::now(), Duration::minutes(5)).await.unwrap();
println!("{:?}", uri);
let client = reqwest::Client::new();
let res = client.put(&uri).body("test").send().await.unwrap();
Expand Down
57 changes: 17 additions & 40 deletions modules/cloud/src/aws/s3/s3_client_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,49 +49,26 @@ pub struct PutObject {

#[async_trait]
impl S3Client for S3ClientMock {
async fn gen_get_presigned_uri(
&self,
key: &str,
start_time: DateTime<Utc>,
expires_in: Duration,
file_name: &str,
) -> anyhow::Result<String> {
self.gen_get_presigned_uri_inputs
.lock()
.push(GenGetPresignedUriInput {
key: key.to_string(),
start_time,
expires_in,
file_name: file_name.to_string(),
});

let output = self
.gen_get_presigned_uri_outputs
.lock()
.pop_front()
.unwrap_or_default();
async fn gen_get_presigned_uri(&self, key: &str, start_time: DateTime<Utc>, expires_in: Duration, file_name: &str) -> anyhow::Result<String> {
self.gen_get_presigned_uri_inputs.lock().push(GenGetPresignedUriInput {
key: key.to_string(),
start_time,
expires_in,
file_name: file_name.to_string(),
});

let output = self.gen_get_presigned_uri_outputs.lock().pop_front().unwrap_or_default();
Ok(output)
}

async fn gen_put_presigned_uri(
&self,
key: &str,
start_time: DateTime<Utc>,
expires_in: Duration,
) -> anyhow::Result<String> {
self.gen_put_presigned_uri_inputs
.lock()
.push(GenPutPresignedUriInput {
key: key.to_string(),
start_time,
expires_in,
});

let output = self
.gen_put_presigned_uri_outputs
.lock()
.pop_front()
.unwrap_or_default();
async fn gen_put_presigned_uri(&self, key: &str, start_time: DateTime<Utc>, expires_in: Duration) -> anyhow::Result<String> {
self.gen_put_presigned_uri_inputs.lock().push(GenPutPresignedUriInput {
key: key.to_string(),
start_time,
expires_in,
});

let output = self.gen_put_presigned_uri_outputs.lock().pop_front().unwrap_or_default();
Ok(output)
}

Expand Down
7 changes: 1 addition & 6 deletions modules/cloud/src/aws/secrets/secrets_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,7 @@ pub struct SecretsReaderImpl {
#[async_trait]
impl SecretsReader for SecretsReaderImpl {
async fn read_value(&self, secret_id: &str) -> anyhow::Result<String> {
let output = self
.client
.get_secret_value()
.secret_id(secret_id)
.send()
.await?;
let output = self.client.get_secret_value().secret_id(secret_id).send().await?;

let res = output.secret_string().ok_or_else(|| anyhow!("not found"))?;
Ok(res.to_string())
Expand Down
33 changes: 5 additions & 28 deletions modules/cloud/src/aws/ses/ses_sender.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,7 @@ use aws_sdk_sesv2::types::{Body, Content, Destination, EmailContent, Message};

#[async_trait]
pub trait SesSender {
async fn send_mail_simple_text(
&self,
to_address: &str,
from_address: &str,
subject: &str,
body: &str,
) -> anyhow::Result<String>;
async fn send_mail_simple_text(&self, to_address: &str, from_address: &str, subject: &str, body: &str) -> anyhow::Result<String>;
}

pub struct SesSenderImpl {
Expand All @@ -19,13 +13,7 @@ pub struct SesSenderImpl {

#[async_trait]
impl SesSender for SesSenderImpl {
async fn send_mail_simple_text(
&self,
to_address: &str,
from_address: &str,
subject: &str,
text_body: &str,
) -> anyhow::Result<String> {
async fn send_mail_simple_text(&self, to_address: &str, from_address: &str, subject: &str, text_body: &str) -> anyhow::Result<String> {
let res = self
.client
.send_email()
Expand All @@ -36,11 +24,7 @@ impl SesSender for SesSenderImpl {
.simple(
Message::builder()
.subject(Content::builder().data(subject).build()?)
.body(
Body::builder()
.text(Content::builder().data(text_body).build()?)
.build(),
)
.body(Body::builder().text(Content::builder().data(text_body).build()?).build())
.build(),
)
.build(),
Expand All @@ -49,9 +33,7 @@ impl SesSender for SesSenderImpl {
.send()
.await?;

Ok(res
.message_id
.ok_or_else(|| anyhow::anyhow!("message_id is None"))?)
Ok(res.message_id.ok_or_else(|| anyhow::anyhow!("message_id is None"))?)
}
}

Expand All @@ -70,12 +52,7 @@ mod tests {
configuration_set_name: None,
};
let r = sender
.send_mail_simple_text(
"lyrise1984@gmail.com",
"no-reply@opxs-dev.omnius-labs.com",
"test subject",
"test body",
)
.send_mail_simple_text("lyrise1984@gmail.com", "no-reply@opxs-dev.omnius-labs.com", "test subject", "test body")
.await;
if let Err(e) = r {
println!("{:?}", e);
Expand Down
30 changes: 9 additions & 21 deletions modules/cloud/src/aws/ses/ses_sender_mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,15 @@ pub struct SendMailSimpleTextInput {

#[async_trait]
impl SesSender for SesSenderMock {
async fn send_mail_simple_text(
&self,
to_address: &str,
from_address: &str,
subject: &str,
text_body: &str,
) -> anyhow::Result<String> {
self.send_mail_simple_text_inputs
.lock()
.push(SendMailSimpleTextInput {
to_address: to_address.to_string(),
from_address: from_address.to_string(),
subject: subject.to_string(),
text_body: text_body.to_string(),
});

let output = self
.send_mail_simple_text_outputs
.lock()
.pop_front()
.unwrap_or_default();
async fn send_mail_simple_text(&self, to_address: &str, from_address: &str, subject: &str, text_body: &str) -> anyhow::Result<String> {
self.send_mail_simple_text_inputs.lock().push(SendMailSimpleTextInput {
to_address: to_address.to_string(),
from_address: from_address.to_string(),
subject: subject.to_string(),
text_body: text_body.to_string(),
});

let output = self.send_mail_simple_text_outputs.lock().pop_front().unwrap_or_default();
Ok(output)
}
}
Expand Down
4 changes: 1 addition & 3 deletions modules/cloud/src/aws/sqs/sqs_receiver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ impl SqsReceiver for SqsReceiverImpl {
.send()
.await?;

let res: Option<Vec<String>> = output
.messages
.map(|n| n.into_iter().flat_map(|m| m.body).collect::<Vec<_>>());
let res: Option<Vec<String>> = output.messages.map(|n| n.into_iter().flat_map(|m| m.body).collect::<Vec<_>>());
Ok(res)
}
}
Expand Down
13 changes: 3 additions & 10 deletions modules/cloud/src/gcp/secret/secret_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,9 @@ pub struct SecretReaderImpl {}
impl SecretReader for SecretReaderImpl {
async fn read_value(&self, secret_id: &str) -> anyhow::Result<String> {
let client: GoogleApi<SecretManagerServiceClient<GoogleAuthMiddleware>> =
GoogleApi::from_function(
SecretManagerServiceClient::new,
"https://secretmanager.googleapis.com",
None,
)
.await?;

let request = AccessSecretVersionRequest {
name: secret_id.to_string(),
};
GoogleApi::from_function(SecretManagerServiceClient::new, "https://secretmanager.googleapis.com", None).await?;

let request = AccessSecretVersionRequest { name: secret_id.to_string() };

let response = client.get().access_secret_version(request).await?;

Expand Down
30 changes: 7 additions & 23 deletions modules/migration/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,7 @@ pub struct PostgresMigrator {
}

impl PostgresMigrator {
pub async fn new(
url: &str,
path: &str,
username: &str,
description: &str,
) -> anyhow::Result<PostgresMigrator> {
pub async fn new(url: &str, path: &str, username: &str, description: &str) -> anyhow::Result<PostgresMigrator> {
// Get DB client and connection
let (client, connection) = tokio_postgres::connect(url, tokio_postgres::NoTls).await?;

Expand All @@ -42,10 +37,7 @@ impl PostgresMigrator {
let ignore_set: HashSet<String> = histories.iter().map(|n| n.file_name.clone()).collect();

let files: Vec<MigrationFile> = self.load_migration_files().await?;
let files: Vec<MigrationFile> = files
.into_iter()
.filter(|x| !ignore_set.contains(x.file_name.as_str()))
.collect();
let files: Vec<MigrationFile> = files.into_iter().filter(|x| !ignore_set.contains(x.file_name.as_str())).collect();

if files.is_empty() {
return Ok(());
Expand Down Expand Up @@ -92,10 +84,7 @@ CREATE TABLE IF NOT EXISTS _semaphores (

let name: String = path.file_name().unwrap().to_str().unwrap().to_string();
let queries: String = std::fs::read_to_string(path)?;
let result = MigrationFile {
file_name: name,
queries,
};
let result = MigrationFile { file_name: name, queries };

results.push(result);
}
Expand Down Expand Up @@ -127,9 +116,8 @@ SELECT file_name, executed_at FROM _migrations
async fn execute_migration_queries(&self, files: Vec<MigrationFile>) -> anyhow::Result<()> {
for f in files {
self.client.batch_execute(&f.queries).await?;
self.insert_migration_history(&f.file_name, &f.queries)
.await?;
info!({ f.file_name }, "processed migration file")
self.insert_migration_history(&f.file_name, &f.queries).await?;
info!(file_name = f.file_name, "processed migration file")
}

Ok(())
Expand All @@ -139,9 +127,7 @@ SELECT file_name, executed_at FROM _migrations
let statement = "\
INSERT INTO _migrations (file_name, queries) VALUES ($1, $2)
";
self.client
.execute(statement, &[&file_name, &queries])
.await?;
self.client.execute(statement, &[&file_name, &queries]).await?;

Ok(())
}
Expand All @@ -150,9 +136,7 @@ INSERT INTO _migrations (file_name, queries) VALUES ($1, $2)
let query = "\
INSERT INTO _semaphores (username, description) VALUES ($1, $2)
";
self.client
.execute(query, &[&self.username, &self.description])
.await?;
self.client.execute(query, &[&self.username, &self.description]).await?;

Ok(())
}
Expand Down
Loading

0 comments on commit 521e3cb

Please sign in to comment.