Skip to content

Commit a84d066

Browse files
committed
Feat: Implemented a functional email client.
1 parent 921757c commit a84d066

File tree

8 files changed

+281
-192
lines changed

8 files changed

+281
-192
lines changed

src/configuration.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,16 @@ pub struct EmailClientSettings {
3434

3535
impl EmailClientSettings {
3636
pub fn client(self) -> EmailClient {
37-
let sender = self.sender().expect("Invalid sender email address");
37+
let sender_email = self.sender().expect("Invalid sender email address");
3838
let timeout = std::time::Duration::from_millis(self.timeout_ms);
39-
EmailClient::new(self.base_url, sender, self.authorization_token, timeout)
39+
EmailClient::new(
40+
self.base_url,
41+
sender_email,
42+
self.authorization_token,
43+
timeout,
44+
)
4045
}
46+
4147
pub fn sender(&self) -> Result<SubscriberEmail, String> {
4248
SubscriberEmail::parse(self.sender_email.clone())
4349
}

src/issue_delivery_worker.rs

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
use crate::domain::SubscriberEmail;
2+
use crate::email_client::EmailClient;
3+
use crate::{configuration::Settings, startup::get_connection_pool};
4+
use sqlx::{PgPool, Postgres, Transaction};
5+
use std::time::Duration;
6+
use tracing::{field::display, Span};
7+
use uuid::Uuid;
8+
9+
pub enum ExecutionOutcome {
10+
TaskCompleted,
11+
EmptyQueue,
12+
}
13+
14+
#[tracing::instrument(
15+
skip_all,
16+
fields(
17+
newsletter_issue_id = tracing::field::Empty,
18+
subscriber_email = tracing::field::Empty,
19+
),
20+
err
21+
)]
22+
pub async fn try_execute_task(
23+
pool: &PgPool,
24+
email_client: &EmailClient,
25+
) -> Result<ExecutionOutcome, anyhow::Error> {
26+
let task = dequeue_task(pool).await?;
27+
if task.is_none() {
28+
return Ok(ExecutionOutcome::EmptyQueue);
29+
}
30+
31+
let (transaction, issue_id, email) = task.unwrap();
32+
Span::current()
33+
.record("newsletter_issue_id", &display(issue_id))
34+
.record("subscriber_email", &display(&email));
35+
36+
match SubscriberEmail::parse(email.clone()) {
37+
Ok(email) => {
38+
let issue = get_issue(pool, issue_id).await?;
39+
if let Err(e) = email_client
40+
.send_email(
41+
&email,
42+
&issue.title,
43+
&issue.html_content,
44+
&issue.text_content,
45+
)
46+
.await
47+
{
48+
tracing::error!(
49+
error.cause_chain = ?e,
50+
error.message = %e,
51+
"Failed to deliver issue to a confirmed subscriber. \
52+
Skipping.",
53+
);
54+
}
55+
}
56+
Err(e) => {
57+
tracing::error!(
58+
error.cause_chain = ?e,
59+
error.message = %e,
60+
"Skipping a confirmed subscriber. \
61+
Their stored contact details are invalid",
62+
);
63+
}
64+
}
65+
66+
delete_task(transaction, issue_id, &email).await?;
67+
Ok(ExecutionOutcome::TaskCompleted)
68+
}
69+
70+
type PgTransaction = Transaction<'static, Postgres>;
71+
72+
#[tracing::instrument(skip_all)]
73+
async fn dequeue_task(
74+
pool: &PgPool,
75+
) -> Result<Option<(PgTransaction, Uuid, String)>, anyhow::Error> {
76+
let mut transaction = pool.begin().await?;
77+
let r = sqlx::query!(
78+
r#"
79+
SELECT newsletter_issue_id, subscriber_email
80+
FROM issue_delivery_queue
81+
FOR UPDATE
82+
SKIP LOCKED
83+
LIMIT 1
84+
"#,
85+
)
86+
.fetch_optional(&mut transaction)
87+
.await?;
88+
if let Some(r) = r {
89+
Ok(Some((
90+
transaction,
91+
r.newsletter_issue_id,
92+
r.subscriber_email,
93+
)))
94+
} else {
95+
Ok(None)
96+
}
97+
}
98+
99+
#[tracing::instrument(skip_all)]
100+
async fn delete_task(
101+
mut transaction: PgTransaction,
102+
issue_id: Uuid,
103+
email: &str,
104+
) -> Result<(), anyhow::Error> {
105+
sqlx::query!(
106+
r#"
107+
DELETE FROM issue_delivery_queue
108+
WHERE
109+
newsletter_issue_id = $1 AND
110+
subscriber_email = $2
111+
"#,
112+
issue_id,
113+
email
114+
)
115+
.execute(&mut transaction)
116+
.await?;
117+
transaction.commit().await?;
118+
Ok(())
119+
}
120+
121+
struct NewsletterIssue {
122+
title: String,
123+
text_content: String,
124+
html_content: String,
125+
}
126+
127+
#[tracing::instrument(skip_all)]
128+
async fn get_issue(pool: &PgPool, issue_id: Uuid) -> Result<NewsletterIssue, anyhow::Error> {
129+
let issue = sqlx::query_as!(
130+
NewsletterIssue,
131+
r#"
132+
SELECT title, text_content, html_content
133+
FROM newsletter_issues
134+
WHERE
135+
newsletter_issue_id = $1
136+
"#,
137+
issue_id
138+
)
139+
.fetch_one(pool)
140+
.await?;
141+
Ok(issue)
142+
}
143+
144+
async fn worker_loop(pool: PgPool, email_client: EmailClient) -> Result<(), anyhow::Error> {
145+
loop {
146+
match try_execute_task(&pool, &email_client).await {
147+
Ok(ExecutionOutcome::EmptyQueue) => {
148+
tokio::time::sleep(Duration::from_secs(10)).await;
149+
}
150+
Err(_) => {
151+
tokio::time::sleep(Duration::from_secs(1)).await;
152+
}
153+
Ok(ExecutionOutcome::TaskCompleted) => {}
154+
}
155+
}
156+
}
157+
158+
pub async fn run_worker_until_stopped(configuration: Settings) -> Result<(), anyhow::Error> {
159+
let connection_pool = get_connection_pool(&configuration.database);
160+
let email_client = configuration.email_client.client();
161+
worker_loop(connection_pool, email_client).await
162+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ pub mod configuration;
33
pub mod domain;
44
pub mod email_client;
55
pub mod idempotency;
6+
pub mod issue_delivery_worker;
67
pub mod routes;
78
pub mod session_state;
89
pub mod startup;

src/main.rs

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,47 @@
11
use robust_rust::{
2-
configuration::get_configuration, startup::Application, telemetry::get_subscriber,
3-
telemetry::init_subscriber,
2+
configuration::get_configuration, issue_delivery_worker::run_worker_until_stopped,
3+
startup::Application, telemetry::get_subscriber, telemetry::init_subscriber,
44
};
5+
use std::fmt::{Debug, Display};
6+
use tokio::task::JoinError;
57

68
#[tokio::main]
79
async fn main() -> anyhow::Result<()> {
810
let subscriber = get_subscriber("robust_rust".into(), "info".into(), std::io::stdout);
911
init_subscriber(subscriber);
1012
let configuration = get_configuration().expect("Failed to read configuration.");
11-
let server = Application::build(configuration).await?;
12-
server.run_until_stopped().await?;
13+
let application = Application::build(configuration.clone()).await?;
14+
let application_task = tokio::spawn(application.run_until_stopped());
15+
let worker_task = tokio::spawn(run_worker_until_stopped(configuration));
16+
17+
tokio::select! {
18+
o = application_task => report_exit("API", o),
19+
o = worker_task => report_exit("Background worker", o),
20+
};
21+
1322
Ok(())
1423
}
24+
25+
fn report_exit(task_name: &str, outcome: Result<Result<(), impl Debug + Display>, JoinError>) {
26+
match outcome {
27+
Ok(Ok(())) => {
28+
tracing::info!("{} has exited", task_name)
29+
}
30+
Ok(Err(e)) => {
31+
tracing::error!(
32+
error.cause_chain = ?e,
33+
error.message = %e,
34+
"{} failed",
35+
task_name
36+
)
37+
}
38+
Err(e) => {
39+
tracing::error!(
40+
error.cause_chain = ?e,
41+
error.message = %e,
42+
"{}' task failed to complete",
43+
task_name
44+
)
45+
}
46+
}
47+
}

src/routes/admin/newsletters/post.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,8 +70,10 @@ pub async fn publish_newsletter(
7070
}
7171

7272
fn success_message() -> FlashMessage {
73-
FlashMessage::info("The newsletter issue has been accepted - \
74-
emails will go out shortly.",)
73+
FlashMessage::info(
74+
"The newsletter issue has been accepted - \
75+
emails will go out shortly.",
76+
)
7577
}
7678

7779
#[tracing::instrument(skip_all)]

src/startup.rs

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -28,18 +28,7 @@ impl Application {
2828
pub async fn build(configuration: Settings) -> Result<Self, anyhow::Error> {
2929
let connection_pool = get_connection_pool(&configuration.database);
3030

31-
let sender_email = configuration
32-
.email_client
33-
.sender()
34-
.expect("Invalid sender email address.");
35-
36-
let timeout = configuration.email_client.timeout();
37-
let email_client = EmailClient::new(
38-
configuration.email_client.base_url,
39-
sender_email,
40-
configuration.email_client.authorization_token,
41-
timeout,
42-
);
31+
let email_client = configuration.email_client.client();
4332

4433
let address = format!(
4534
"{}:{}",

tests/api/helpers.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use once_cell::sync::Lazy;
44
use robust_rust::{
55
configuration::{get_configuration, DatabaseSettings},
66
email_client::EmailClient,
7+
issue_delivery_worker::{try_execute_task, ExecutionOutcome},
78
startup::{get_connection_pool, Application},
89
telemetry::{get_subscriber, init_subscriber},
910
};
@@ -41,6 +42,17 @@ pub struct ConfirmationLinks {
4142
}
4243

4344
impl TestApp {
45+
pub async fn dispatch_all_pending_emails(&self) {
46+
loop {
47+
if let ExecutionOutcome::EmptyQueue =
48+
try_execute_task(&self.db_pool, &self.email_client)
49+
.await
50+
.unwrap()
51+
{
52+
break;
53+
}
54+
}
55+
}
4456
pub async fn get_login_html(&self) -> String {
4557
self.api_client
4658
.get(&format!("{}/login", &self.address))

0 commit comments

Comments
 (0)