Naive Newsletter Delivery

This article is a sample from Zero To Production In Rust, a hands-on introduction to backend development in Rust.
You can get a copy of the book at zero2prod.com.

TL;DR

Our project is not yet a viable newsletter service: it cannot send out a new episode!

We will use this chapter to bootstrap newsletter delivery using a naive implementation.
It will be an opportunity to deepen our understanding of techniques we touched upon in previous chapters while building the foundation for a production-ready solution that ticks all our boxes (i.e. security and fault tolerance).

Chapter 9

  1. User Stories Are Not Set In Stone
  2. Do Not Spam Unconfirmed Subscribers
  3. All Confirmed Subscribers Receive New Issues
  4. Implementation Strategy
  5. Body Schema
  6. Fetch Confirmed Subscribers List
  7. Send Newsletter Emails
  8. Validation Of Stored Data
  9. Limitations Of The Naive Approach
  10. Summary

User Stories Are Not Set In Stone

What are we trying to achieve, exactly?

We can go back to the user story we wrote down in Chapter 2:

As the blog author,
I want to send an email to all my subscribers,
So that I can notify them when new content is published.

It looks simple, at least on the surface. The devil, as always, is in the details.
For example, in Chapter 7 we refined our domain model of a subscriber - we now have confirmed and unconfirmed subscribers.
Which ones should receive our newsletter issues?

That user story, as it stands, cannot help us - it was written before we started to make the distinction!

Make a habit of revisiting user stories throughout the lifecycle of a project.
When you spend time working on a problem you end up deepening your understanding of its domain. You often acquire a more precise language that can be used to refine earlier attempts of describing the desired functionality.

For this specific case: we only want newsletter issues to be sent to confirmed subscribers. Let's amend the user story accordingly:

As the blog author,
I want to send an email to all my confirmed subscribers,
So that I can notify them when new content is published.

Do Not Spam Unconfirmed Subscribers

We can get started by writing an integration test that specifies what should not happen: unconfirmed subscribers should not receive newsletter issues.

In Chapter 7 we selected Postmark as our email delivery service. If we are not calling Postmark, we are not sending an email out.
We can build on this fact to orchestrate a scenario that allows us to verify our business rule: if all subscribers are unconfirmed, no request is fired to Postmark when we publish a newsletter issue.

Let's translate that into code:

//! tests/api/main.rs
// [...]
// New test module!
mod newsletter;
//! tests/api/newsletter.rs
use crate::helpers::{spawn_app, TestApp};
use wiremock::matchers::{any, method, path};
use wiremock::{Mock, ResponseTemplate};

#[tokio::test]
async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() {
    // Arrange
    let app = spawn_app().await;
    create_unconfirmed_subscriber(&app).await;

    Mock::given(any())
        .respond_with(ResponseTemplate::new(200))
        // We assert that no request is fired at Postmark!
        .expect(0)
        .mount(&app.email_server)
        .await;

    // Act
    
    // A sketch of the newsletter payload structure.
    // We might change it later on.
    let newsletter_request_body = serde_json::json!({
         "title": "Newsletter title",
         "content": {
             "text": "Newsletter body as plain text",
             "html": "<p>Newsletter body as HTML</p>",
         }
    });
    let response = reqwest::Client::new()
         .post(&format!("{}/newsletters", &app.address))
         .json(&newsletter_request_body)
         .send()
         .await
         .expect("Failed to execute request.");

    // Assert
    assert_eq!(response.status().as_u16(), 200);
    // Mock verifies on Drop that we haven't sent the newsletter email 
}

/// Use the public API of the application under test to create
/// an unconfirmed subscriber.
async fn create_unconfirmed_subscriber(app: &TestApp) {
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    let _mock_guard = Mock::given(path("/email"))
        .and(method("POST"))
        .respond_with(ResponseTemplate::new(200))
        .named("Create unconfirmed subscriber")
        .expect(1)
        .mount_as_scoped(&app.email_server)
        .await;
    app.post_subscriptions(body.into())
        .await
        .error_for_status()
        .unwrap();
}

It fails, as expected:

thread 'newsletter::newsletters_are_not_delivered_to_unconfirmed_subscribers' 
panicked at 'assertion failed: `(left == right)`
  left: `404`,
 right: `200`'

There is no handler in our API for POST /newsletters: actix-web returns a 404 Not Found instead of the 200 OK the test is expecting.

Set Up State Using The Public API

Let's take a moment to look at the Arrange section for the test we just wrote.
Our test scenario makes some assumptions on the state of our application: we need to have one subscriber and they must be unconfirmed.

Each test spins up a brand-new application running on top of an empty database.

let app = spawn_app().await;

How do we fill it up according to the test requirements?

We stay true to the black-box approach we described in Chapter 3: when possible, we drive the application state by calling its public API.
That is what we are doing in create_unconfirmed_subscriber:

//! tests/api/newsletter.rs
// [...]

async fn create_unconfirmed_subscriber(app: &TestApp) {
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    let _mock_guard = Mock::given(path("/email"))
        .and(method("POST"))
        .respond_with(ResponseTemplate::new(200))
        .named("Create unconfirmed subscriber")
        .expect(1)
        .mount_as_scoped(&app.email_server)
        .await;
    app.post_subscriptions(body.into())
        .await
        .error_for_status()
        .unwrap();
}

We use the API client we built as part of TestApp to make a POST call to the /subscriptions endpoint.

Scoped Mocks

We know that POST /subscriptions will send a confirmation email out - we must make sure that our Postmark test server is ready to handle the incoming request by setting up the appropriate Mock.
The matching logic overlaps what we have in the test function body: how do we make sure the two mocks don't end up stepping on each other's toes?

We use a scoped mock:

let _mock_guard = Mock::given(path("/email"))
    .and(method("POST"))
    .respond_with(ResponseTemplate::new(200))
    .named("Create unconfirmed subscriber")
    .expect(1)
    // We are not using `mount`!
    .mount_as_scoped(&app.email_server)
    .await;

With mount, the behaviour we specify remains active as long as the underlying MockServer is up and running.
With mount_as_scoped, instead, we get back a guard object - a MockGuard.

MockGuard has a custom Drop implementation: when it goes out of scope, wiremock instructs the underlying MockServer to stop honouring the specified mock behaviour. In other words, we stop returning 200 to POST /email at the end of create_unconfirmed_subscriber.
The mock behaviour needed for our test helper stays local to the test helper itself.

One more thing happens when a MockGuard is dropped - we eagerly check that expectations on the scoped mock are verified.
This creates a useful feedback loop to keep our test helpers clean and up-to-date.

We have already witnessed how black-box testing pushes us to write an API client for our own application to keep our tests concise.
Over time, you build more and more helper functions to drive the application state - just like we just did with create_unconfirmed_subscriber. These helpers rely on mocks but, as the application evolves, some of those mocks end up no longer being necessary - a call gets removed, you stop using a certain provider, etc.
Eager evaluation of expectations for scoped mocks helps us to keep helper code in check and proactively clean up where possible.

Green Test

We can get the test to pass by providing a dummy implementation of POST /newsletters:

//! src/routes/mod.rs
// [...]
// New module!
mod newsletters;

pub use newsletters::*;
//! src/routes/newsletters.rs
use actix_web::HttpResponse;

// Dummy implementation
pub async fn publish_newsletter() -> HttpResponse {
    HttpResponse::Ok().finish()
}
//! src/startup.rs
// [...]
use crate::routes::{confirm, health_check, publish_newsletter, subscribe};

fn run(/* */) -> Result<Server, std::io::Error> {
    // [...]
    let server = HttpServer::new(move || {
        App::new()
            .wrap(TracingLogger::default())
            // Register the new handler!    
            .route("/newsletters", web::post().to(publish_newsletter))
            // [...]
    })
    // [...]
}

cargo test is happy again.

All Confirmed Subscribers Receive New Issues

Let's write another integration test, this time for a subset of the happy case: if we have one confirmed subscriber, they receive an email with the new issue of the newsletter.

Composing Test Helpers

As in the previous test, we need to get the application state where we want it to be before executing the test logic - it calls for another helper function, this time to create a confirmed subscriber.
By slightly reworking create_unconfirmed_subscriber we can avoid duplication:

//! tests/api/newsletter.rs
// [...]

async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks {
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    let _mock_guard = Mock::given(path("/email"))
        .and(method("POST"))
        .respond_with(ResponseTemplate::new(200))
        .named("Create unconfirmed subscriber")
        .expect(1)
        .mount_as_scoped(&app.email_server)
        .await;
    app.post_subscriptions(body.into())
        .await
        .error_for_status()
        .unwrap();

    // We now inspect the requests received by the mock Postmark server
    // to retrieve the confirmation link and return it 
    let email_request = &app
        .email_server
        .received_requests()
        .await
        .unwrap()
        .pop()
        .unwrap();
    app.get_confirmation_links(&email_request)
}

async fn create_confirmed_subscriber(app: &TestApp) {
    // We can then reuse the same helper and just add 
    // an extra step to actually call the confirmation link!
    let confirmation_link = create_unconfirmed_subscriber(app).await;
    reqwest::get(confirmation_link.html)
        .await
        .unwrap()
        .error_for_status()
        .unwrap();
}

Nothing needs to change in our existing test and we can immediately leverage create_confirmed_subscriber in the new one:

//! tests/api/newsletter.rs
// [...]

#[tokio::test]
async fn newsletters_are_delivered_to_confirmed_subscribers() {
    // Arrange
    let app = spawn_app().await;
    create_confirmed_subscriber(&app).await;

    Mock::given(path("/email"))
        .and(method("POST"))
        .respond_with(ResponseTemplate::new(200))
        .expect(1)
        .mount(&app.email_server)
        .await;

    // Act
    let newsletter_request_body = serde_json::json!({
        "title": "Newsletter title",
        "content": {
             "text": "Newsletter body as plain text",
             "html": "<p>Newsletter body as HTML</p>",
        }
    });
    let response = reqwest::Client::new()
        .post(&format!("{}/newsletters", &app.address))
        .json(&newsletter_request_body)
        .send()
        .await
        .expect("Failed to execute request.");

    // Assert
    assert_eq!(response.status().as_u16(), 200);
    // Mock verifies on Drop that we have sent the newsletter email
}

It fails, as it should:

thread 'newsletter::newsletters_are_delivered_to_confirmed_subscribers' panicked at 
Verifications failed:
- Mock #1.
        Expected range of matching incoming requests: == 1
        Number of matched incoming requests: 0

Implementation Strategy

We have more than enough tests to give us feedback now - let's kick off the implementation work!

We will start with a naive approach:

Let's do it!

Body Schema

What do we need to know about a newsletter in order to deliver it?

If we are striving to keep it as simple as possible:

We can encode our requirements using structs that derive serde::Deserialize, just like we did in POST /subscriptions with FormData.

//! src/routes/newsletters.rs
// [...]

#[derive(serde::Deserialize)]
pub struct BodyData {
    title: String,
    content: Content
}

#[derive(serde::Deserialize)]
pub struct Content {
    html: String,
    text: String
}

serde does not have any issue with our nested layout given that all field types in BodyData implement serde::Deserialize. We can then use an actix-web extractor to parse BodyData out of the incoming request body. There is just one question to answer: what serialization format are we using?

For POST /subscriptions, given that we were dealing with HTML forms, we used application/x-www-form-urlencoded as Content-Type.
For POST /newsletters we are not tied to a form embedded in a web page: we will use JSON, a common choice when building REST APIs.
The corresponding extractor is actix_web::web::Json:

//! src/routes/newsletters.rs
// [...]
use actix_web::web;

// We are prefixing `body` with a `_` to avoid 
// a compiler warning about unused arguments
pub async fn publish_newsletter(_body: web::Json<BodyData>) -> HttpResponse {
    HttpResponse::Ok().finish()
}

Test Invalid Inputs

Trust but verify: let's add a new test case that throws invalid data at our POST /newsletters endpoint.

//! tests/api/newsletter.rs
// [...]

#[tokio::test]
async fn newsletters_returns_400_for_invalid_data() {
    // Arrange
    let app = spawn_app().await;
    let test_cases = vec![
        (
            serde_json::json!({
                "content": {
                    "text": "Newsletter body as plain text",
                    "html": "<p>Newsletter body as HTML</p>",
                }
            }),
            "missing title",
        ),
        (
            serde_json::json!({"title": "Newsletter!"}),
            "missing content",
        ),
    ];

    for (invalid_body, error_message) in test_cases {
        let response = reqwest::Client::new()
            .post(&format!("{}/newsletters", &app.address))
            .json(&invalid_body)
            .send()
            .await
            .expect("Failed to execute request.");

        // Assert
        assert_eq!(
            400,
            response.status().as_u16(),
            "The API did not fail with 400 Bad Request when the payload was {}.",
            error_message
        );
    }
}

The new test passes - you can add a few more cases if you want to.
Let's seize the occasion to refactor a bit and remove some code duplication - we can extract the logic to fire a request to POST /newsletters into a shared helper method on TestApp, as we did for POST /subscriptions:

//! tests/api/helpers.rs
// [...]

impl TestApp {
    // [...]
    pub async fn post_newsletters(&self, body: serde_json::Value) -> reqwest::Response {
        reqwest::Client::new()
            .post(&format!("{}/newsletters", &self.address))
            .json(&body)
            .send()
            .await
            .expect("Failed to execute request.")
    }
}
//! tests/api/newsletter.rs
// [...]

#[tokio::test]
async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() {
    // [...]
    let response = app.post_newsletters(newsletter_request_body).await;
    // [...]
}

#[tokio::test]
async fn newsletters_are_delivered_to_confirmed_subscribers() {
    // [...]
    let response = app.post_newsletters(newsletter_request_body).await;
    // [...]
}

#[tokio::test]
async fn newsletters_returns_400_for_invalid_data() {
    // [...] 
    for (invalid_body, error_message) in test_cases {
        let response = app.post_newsletters(invalid_body).await;
        // [...]
    }
}

Fetch Confirmed Subscribers List

We need to write a new query to retrieve the list of all confirmed subscribers.
A WHERE clause on the status column is enough to isolate the rows we care about:

//! src/routes/newsletters.rs
// [...]
use sqlx::PgPool;

struct ConfirmedSubscriber {
    email: String,
}

#[tracing::instrument(name = "Get confirmed subscribers", skip(pool))]
async fn get_confirmed_subscribers(
    pool: &PgPool,
) -> Result<Vec<ConfirmedSubscriber>, anyhow::Error> {
    let rows = sqlx::query_as!(
        ConfirmedSubscriber,
        r#"
        SELECT email
        FROM subscriptions
        WHERE status = 'confirmed'
        "#,
    )
    .fetch_all(pool)
    .await?;
    Ok(rows)
}

There is something new in there: we are using sqlx::query_as! instead of sqlx::query!.
sqlx::query_as! maps the retrieved rows to the type specified as its first argument, ConfirmedSubscriber, saving us a bunch of boilerplate.

Notice that ConfirmedSubscriber has a single field - email. We are minimising the amount of data we are fetching from the database, limiting our query to the columns we actually need to send a newsletter out. Less work for the database, less data to move over the network.
It won't make a noticeable difference in this case, but it is a good practice to keep in mind when working on bigger applications with a heavier data footprint.

To leverage get_confirmed_subscribers in our handler we need a PgPool - we can extract one from the application state, just like we did in POST /subscriptions.

//! src/routes/newsletters.rs
// [...]

pub async fn publish_newsletter(
    _body: web::Json<BodyData>,
    pool: web::Data<PgPool>,
) -> HttpResponse {
    let _subscribers = get_confirmed_subscribers(&pool).await?;
    HttpResponse::Ok().finish()
}

The compiler is not happy:

21 |   ) -> HttpResponse {
   |  ___________________-
22 | |     let subscribers = get_confirmed_subscribers(&pool).await?;
   | |                       ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 
   | |                       cannot use the `?` operator in an async function 
   | |                       that returns `actix_web::HttpResponse`
   | |
23 | |     HttpResponse::Ok().finish()
24 | | }
   | |__ this function should return `Result` or `Option` to accept `?`

SQL queries may fail and so does get_confirmed_subscribers - we need to change the return type of publish_newsletter.
We need to return a Result with an appropriate error type, just like we did in the last chapter:

//! src/routes/newsletters.rs
// [...]
use actix_web::ResponseError;
use sqlx::PgPool;
use crate::routes::error_chain_fmt;
use actix_web::http::StatusCode;

#[derive(thiserror::Error)]
pub enum PublishError {
    #[error(transparent)]
    UnexpectedError(#[from] anyhow::Error),
}

// Same logic to get the full error chain on `Debug`
impl std::fmt::Debug for PublishError {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        error_chain_fmt(self, f)
    }
}

impl ResponseError for PublishError {
    fn status_code(&self) -> StatusCode {
        match self {
            PublishError::UnexpectedError(_) => StatusCode::INTERNAL_SERVER_ERROR,
        }
    }
}

pub async fn publish_newsletter(
    body: web::Json<BodyData>,
    pool: web::Data<PgPool>,
) -> Result<HttpResponse, PublishError> {
    let subscribers = get_confirmed_subscribers(&pool).await?;
    Ok(HttpResponse::Ok().finish())
}

Using what we learned in Chapter 8 it doesn't take that much to roll out a new error type!
Let me remark that we are future-proofing our code a bit: we modelled PublishError as an enumeration, but we only have one variant at the moment. A struct (or actix_web::error::InternalError) would have been more than enough for the time being.

cargo check should succeed now.

Send Newsletter Emails

Time to send those emails out!
We can leverage the EmailClient we wrote a few chapters ago - just like PgPool, it is already part of the application state and we can extract it using web::Data.

//! src/routes/newsletters.rs
// [...]
use crate::email_client::EmailClient;

pub async fn publish_newsletter(
    body: web::Json<BodyData>,
    pool: web::Data<PgPool>,
    // New argument!
    email_client: web::Data<EmailClient>,
) -> Result<HttpResponse, PublishError> {
    let subscribers = get_confirmed_subscribers(&pool).await?;
    for subscriber in subscribers {
        email_client
            .send_email(
                subscriber.email,
                &body.title,
                &body.content.html,
                &body.content.text,
            )
            .await?;
    }
    Ok(HttpResponse::Ok().finish())
}

It almost works:

error[E0308]: mismatched types
  --> src/routes/newsletters.rs
   |
48 |                 subscriber.email,
   |                 ^^^^^^^^^^^^^^^^ 
   |   expected struct `SubscriberEmail`, 
   |   found struct `std::string::String`

error[E0277]: `?` couldn't convert the error to `PublishError`
  --> src/routes/newsletters.rs:53:19
   |
53 |             .await?;
   |                   ^ 
   |   the trait `From<reqwest::Error>` 
   |   is not implemented for `PublishError`

context Vs with_context

We can quickly fix the second one

//! src/routes/newsletters.rs
// [...]
// Bring anyhow's extension trait into scope!
use anyhow::Context;

pub async fn publish_newsletter(/* */) -> Result<HttpResponse, PublishError> {
    // [...]
    for subscriber in subscribers {
        email_client
            .send_email(/* */)
            .await
            .with_context(|| {
                format!("Failed to send newsletter issue to {}", subscriber.email)
            })?;
    }
    // [...]
}

We are using a new method, with_context.

It is a close relative of context, the method we used extensively in Chapter 8 to convert the error variant of Result into anyhow::Error while enriching it with contextual information.

There is one key difference between the two: with_context is lazy.
It takes a closure as argument and the closure is only called in case of an error.

If the context you are adding is static - e.g. context("Oh no!") - they are equivalent.
If the context you are adding has a runtime cost, use with_context - you avoid paying for the error path when the fallible operation succeeds.

Let's look at our case, as an example: format! allocates memory on the heap to store its output string. Using context, we would be allocating that string every time we send an email out.
Using with_context, instead, we only invoke format! if email delivery fails.

Validation Of Stored Data

cargo check should return a single error now:

error[E0308]: mismatched types
  --> src/routes/newsletters.rs
   |
48 |                 subscriber.email,
   |                 ^^^^^^^^^^^^^^^^ 
   |   expected struct `SubscriberEmail`, 
   |   found struct `std::string::String`

We are not performing any validation on the data we retrieve from the database - ConfirmedSubscriber::email is of type String.
EmailClient::send_email, instead, expects a validated email address - a SubscriberEmail instance.

We can try the naive solution first - change ConfirmedSubscriber::email to be of type SubscriberEmail.

//! src/routes/newsletters.rs
// [...]
use crate::domain::SubscriberEmail;

struct ConfirmedSubscriber {
    email: SubscriberEmail,
}

#[tracing::instrument(name = "Get confirmed subscribers", skip(pool))]
async fn get_confirmed_subscribers(
    pool: &PgPool,
) -> Result<Vec<ConfirmedSubscriber>, anyhow::Error> {
    let rows = sqlx::query_as!(
        ConfirmedSubscriber,
        r#"
        SELECT email
        FROM subscriptions
        WHERE status = 'confirmed'
        "#,
    )
    .fetch_all(pool)
    .await?;
    Ok(rows)
}
error[E0308]: mismatched types
  --> src/routes/newsletters.rs
   |
69 |       let rows = sqlx::query_as!(
   |  ________________^
70 | |         ConfirmedSubscriber,
71 | |         r#"
72 | |         SELECT email
...  |
75 | |         "#,
76 | |     )
   | |_____^ expected struct `SubscriberEmail`, 
             found struct `std::string::String`

sqlx doesn't like it - it does not know how to convert a TEXT column into SubscriberEmail.
We could scan sqlx's documentation for a way to implement support for custom type - a lot of trouble for a minor upside.

We can follow a similar approach to the one we deployed for our POST /subscriptions endpoint - we use two structs:

For our query, it looks like this:

//! src/routes/newsletters.rs
// [...]

struct ConfirmedSubscriber {
    email: SubscriberEmail,
}

#[tracing::instrument(name = "Get confirmed subscribers", skip(pool))]
async fn get_confirmed_subscribers(
    pool: &PgPool,
) -> Result<Vec<ConfirmedSubscriber>, anyhow::Error> {
    // We only need `Row` to map the data coming out of this query.
    // Nesting its definition inside the function itself is a simple way
    // to clearly communicate this coupling (and to ensure it doesn't 
    // get used elsewhere by mistake).
    struct Row {
        email: String,
    }

    let rows = sqlx::query_as!(
        Row,
        r#"
        SELECT email
        FROM subscriptions
        WHERE status = 'confirmed'
        "#,
    )
    .fetch_all(pool)
    .await?;
    // Map into the domain type
    let confirmed_subscribers = rows
        .into_iter()
        .map(|r| ConfirmedSubscriber {
            // Just panic if validation fails
            email: SubscriberEmail::parse(r.email).unwrap(),
        })
        .collect();
    Ok(confirmed_subscribers)
}

Is that SubscriberEmail::parse(r.email).unwrap() a good idea?

The emails of all new subscribers go through the validation logic in SubscriberEmail::parse - it was a big focus topic for us in Chapter 6.
You might argue, then, that all the emails stored in our database are necessarily valid - there is no need to account validation failures here. It is safe to just unwrap them all, knowing it will never panic.

This reasoning is sound assuming our software never changes. But we are optimising for high deployment frequency!

Data stored in our Postgres instance creates a temporal coupling between old and new versions of our application.
The emails we are retrieving from our database were marked as a valid by a previous version of our application. The current version might disagree.

We might discover, for example, that our email validation logic is too lenient - some invalid emails are slipping through the cracks, leading to issues when attempting to deliver newsletters. We implement a stricter validation routine, deploy the patched version and, suddenly, email delivery does not work at all!
get_confirmed_subscribers panics when processing stored emails that were previously considered valid, but no longer are.

What should we do, then?
Should we skip validation entirely when retrieving data from the database?

There is no one-size-fits-all answer.
You need to evaluate the issue on a case by case basis given the requirements of your domain.

Sometimes it is unacceptable to process invalid records - the routine should fail and an operator must intervene to rectify the corrupt records.
Sometimes we need to process all historical records (e.g. analytics) and we should make minimal assumptions about the data - String is our safest bet.

In our case, we can meet half-way: we can skip invalid emails when fetching the list of recipients for our next newsletter issue. We will emit a warning for every invalid address we find, allowing an operator to identify the issue and correct the stored records at a certain point in the future.

//! src/routes/newsletters.rs
// [...]

async fn get_confirmed_subscribers(
    pool: &PgPool,
) -> Result<Vec<ConfirmedSubscriber>, anyhow::Error> {
    // [...]
  
    // Map into the domain type
    let confirmed_subscribers = rows
        .into_iter()
        .filter_map(|r| match SubscriberEmail::parse(r.email) {
            Ok(email) => Some(ConfirmedSubscriber { email }),
            Err(error) => {
                tracing::warn!(
                    "A confirmed subscriber is using an invalid email address.\n{}.",
                    error
                );
                None
            }
        })
        .collect();
    Ok(confirmed_subscribers)
}

filter_map is a handy combinator - it returns a new iterator containing only the items for which our closure returned a Some variant.

Responsibility Boundaries

We could get away with this, but it is worth taking a moment to reflect on who is doing what here.
Is get_confirmed_subscriber the most appropriate location to choose if we should skip or abort when encountering an invalid email address?
It feels like a business-level decision that would be better placed in publish_newsletter, the driving routine of our delivery workflow.

get_confirmed_subscriber should simply act as an adapter between our storage layer and our domain layer. It deals with the database-specific bits (i.e. the query) and the mapping logic, but it delegates to the caller the decision on what to do if the mapping or the query fail.

Let's refactor:

//! src/routes/newsletters.rs
// [...]

async fn get_confirmed_subscribers(
    pool: &PgPool,
    // We are returning a `Vec` of `Result`s in the happy case.
    // This allows the caller to bubble up errors due to network issues or other
    // transient failures using the `?` operator, while the compiler 
    // forces them to handle the subtler mapping error.
    // See http://sled.rs/errors.html for a deep-dive about this technique.
) -> Result<Vec<Result<ConfirmedSubscriber, anyhow::Error>>, anyhow::Error> {
    // [...]
  
    let confirmed_subscribers = rows
        .into_iter()
        // No longer using `filter_map`!   
        .map(|r| match SubscriberEmail::parse(r.email) {
            Ok(email) => Ok(ConfirmedSubscriber { email }),
            Err(error) => Err(anyhow::anyhow!(error)),
        })
        .collect();
    Ok(confirmed_subscribers)
}

We now get a compiler error at the calling site

error[E0609]: no field `email` on type `Result<ConfirmedSubscriber, anyhow::Error>`
  --> src/routes/newsletters.rs
   |
50 |                 subscriber.email,
   |

which we can immediately fix:

//! src/routes/newsletters.rs
// [...]

pub async fn publish_newsletter(/* */) -> Result<HttpResponse, PublishError> {
    let subscribers = get_confirmed_subscribers(&pool).await?;
    for subscriber in subscribers {
        // The compiler forces us to handle both the happy and unhappy case!
        match subscriber {
            Ok(subscriber) => {
                email_client
                    .send_email(
                        subscriber.email,
                        &body.title,
                        &body.content.html,
                        &body.content.text,
                    )
                    .await
                    .with_context(|| {
                        format!(
                            "Failed to send newsletter issue to {}", 
                            subscriber.email
                        )
                    })?;
            }
            Err(error) => {
                tracing::warn!(
                    // We record the error chain as a structured field 
                    // on the log record.
                    error.cause_chain = ?error,
                    // Using `\` to split a long string literal over
                    // two lines, without creating a `\n` character.
                    "Skipping a confirmed subscriber. \
                    Their stored contact details are invalid",
                );
            }
        }
    }
    Ok(HttpResponse::Ok().finish())
}

Follow The Compiler

The compiler is almost happy:

error[E0277]: `SubscriberEmail` doesn't implement `std::fmt::Display`
  --> src/routes/newsletters.rs:59:74
   |
59 |  format!("Failed to send newsletter issue to {}", subscriber.email)
   |                                                   ^^^^^^^^^^^^^^^^ 
   |   `SubscriberEmail` cannot be formatted with the default formatter

This is caused by our type change for email in ConfirmedSubscriber, from String to SubscriberEmail.
Let's implement Display for our new type:

//! src/domain/subscriber_email.rs
// [...]


impl std::fmt::Display for SubscriberEmail {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        // We just forward to the Display implementation of 
        // the wrapped String.
        self.0.fmt(f)
    }
}

Progress! Different compiler error, this time from the borrow checker!

error[E0382]: borrow of partially moved value: `subscriber`
  --> src/routes/newsletters.rs
   |
52 |      subscriber.email,
   |      ---------------- value partially moved here
...
58 |  .with_context(|| {
   |                ^^ value borrowed here after partial move
59 |      format!("Failed to send newsletter issue to {}", subscriber.email)
   |                                                       ---------- 
                                  borrow occurs due to use in closure

We could just slap a .clone() on the first usage and call it a day.
But let's try to be sophisticated: do we really need to take ownership of SubscriberEmail in EmailClient::send_email?

//! src/email_client.rs
// [...]

pub async fn send_email(
    &self,
    recipient: SubscriberEmail,
    /* */
) -> Result<(), reqwest::Error> {
    // [...]
    let request_body = SendEmailRequest {
        to: recipient.as_ref(),
        // [...]
    };
    // [...]
}

We just need to be able to call as_ref on it - a &SubscriberEmail would work just fine.
Let's change the signature accordingly:

//! src/email_client.rs
// [...]

pub async fn send_email(
    &self,
    recipient: &SubscriberEmail,
    /* */
) -> Result<(), reqwest::Error> {
    // [...]
}

There are a few calling sites that need to be updated - the compiler is gentle enough to point them out. I'll leave the fixes to you, the reader, as an exercise.
The test suite should pass when you are done.

Remove Some Boilerplate

Before moving forward, let's take one last look at get_confirmed_subscribers:

//! src/routes/newsletters.rs
// [...]

#[tracing::instrument(name = "Get confirmed subscribers", skip(pool))]
async fn get_confirmed_subscribers(
    pool: &PgPool,
) -> Result<Vec<Result<ConfirmedSubscriber, anyhow::Error>>, anyhow::Error> {
    struct Row {
        email: String,
    }

    let rows = sqlx::query_as!(
        Row,
        r#"
        SELECT email
        FROM subscriptions
        WHERE status = 'confirmed'
        "#,
    )
    .fetch_all(pool)
    .await?;
    let confirmed_subscribers = rows
        .into_iter()
        .map(|r| match SubscriberEmail::parse(r.email) {
            Ok(email) => Ok(ConfirmedSubscriber { email }),
            Err(error) => Err(anyhow::anyhow!(error)),
        })
        .collect();
    Ok(confirmed_subscribers)
}

Is Row adding any value?
Not really - the query is simple enough that we do not benefit significantly from having a dedicated type to represent the returned data.
We can switch back to query! and remove Row entirely:


//! src/routes/newsletters.rs
// [...]

#[tracing::instrument(name = "Get confirmed subscribers", skip(pool))]
async fn get_confirmed_subscribers(
    pool: &PgPool,
) -> Result<Vec<Result<ConfirmedSubscriber, anyhow::Error>>, anyhow::Error> {
    let confirmed_subscribers = sqlx::query!(
        r#"
        SELECT email
        FROM subscriptions
        WHERE status = 'confirmed'
        "#,
    )
    .fetch_all(pool)
    .await?
    .into_iter()
    .map(|r| match SubscriberEmail::parse(r.email) {
        Ok(email) => Ok(ConfirmedSubscriber { email }),
        Err(error) => Err(anyhow::anyhow!(error)),
    })
    .collect();
    Ok(confirmed_subscribers)
}

We didn't even need to touch the remaining code - it compiled straight-away.

Limitations Of The Naive Approach

We did it - we have an implementation that passes our two integration tests!
What now? Do we pat ourselves on the back and ship it to production?

Not so fast.
We said it at the very beginning - the approach we took is the simplest possible to get something up and running.
Is it good enough, though?

Let's have a hard look at its shortcomings!

  1. Security
    Our POST /newsletters endpoint is unprotected - anyone can fire a request to it and broadcast to our entire audience, unchecked.
  2. You Only Get One Shot
    As soon you hit POST /newsletters, your content goes out to your entire mailing list. No chance to edit or review it in draft mode before giving the green light for publishing.
  3. Performance
    We are sending emails out one at a time.
    We wait for the current one to be dispatched successfully before moving on to the next in line.
    This is not a massive issue if you have 10 or 20 subscribers, but it becomes noticeable shortly afterwards: latency is going to be horrible for newsletters with a sizeable audience.
  4. Fault Tolerance
    If we fail to dispatch one email we bubble up the error using ? and return a 500 Internal Server Error to the caller.
    The remaining emails are never sent, nor we retry to dispatch the failed one.
  5. Retry Safety
    Many things can go wrong when communicating over the network. What should a consumer of our API do if they experience a timeout or a 500 Internal Server Error when calling our service?
    They cannot retry - they risk sending the newsletter issue twice to the entire mailing list.

Number 2. and 3. are annoying, but we could live with them for a while.
Number 4. and 5. are fairly serious limitations, with a visible impact on our audience.
Number 1. is simply non-negotiable: we must protect the endpoint before releasing our API.

Summary

We built a prototype of our newsletter delivery logic: it satisfies our functional requirements, but it is not yet ready for prime time.
The shortcomings of our MVP will become the focus of the next chapters, in priority order: we will tackle authentication/authorization first before moving on to fault tolerance.


This article is a sample from Zero To Production In Rust, a hands-on introduction to backend development in Rust.
You can get a copy of the book at zero2prod.com.

Book - Table Of Contents

Click to expand!

The Table of Contents is provisional and might change over time. The draft below is the most accurate picture at this point in time.

  1. Getting Started
    • Installing The Rust Toolchain
    • Project Setup
    • IDEs
    • Continuous Integration
  2. Our Driving Example
    • What Should Our Newsletter Do?
    • Working In Iterations
  3. Sign Up A New Subscriber
  4. Telemetry
    • Unknown Unknowns
    • Observability
    • Logging
    • Instrumenting /POST subscriptions
    • Structured Logging
  5. Go Live
    • We Must Talk About Deployments
    • Choosing Our Tools
    • A Dockerfile For Our Application
    • Deploy To DigitalOcean Apps Platform
  6. Rejecting Invalid Subscribers #1
    • Requirements
    • First Implementation
    • Validation Is A Leaky Cauldron
    • Type-Driven Development
    • Ownership Meets Invariants
    • Panics
    • Error As Values - Result
  7. Reject Invalid Subscribers #2
  8. Error Handling
    • What Is The Purpose Of Errors?
    • Error Reporting For Operators
    • Errors For Control Flow
    • Avoid "Ball Of Mud" Error Enums
    • Who Should Log Errors?
  9. Naive Newsletter Delivery
    • User Stories Are Not Set In Stone
    • Do Not Spam Unconfirmed Subscribers
    • All Confirmed Subscribers Receive New Issues
    • Implementation Strategy
    • Body Schema
    • Fetch Confirmed Subscribers List
    • Send Newsletter Emails
    • Validation Of Stored Data
    • Limitations Of The Naive Approach
  10. Securing Our API
  11. Fault-tolerant Newsletter Delivery