An In-Depth Introduction To Idempotency

This article is a sample from Zero To Production In Rust, a book on backend development in Rust.
You can get a copy of the book on zero2prod.com.
Subscribe to the newsletter to be notified when a new episode is published.

TL;DR

We kept the first iteration of our newsletter endpoint very simple: emails are immediately sent out to all subscribers via Postmark, one API call at a time.
This is good enough if the audience is small - it breaks down, in a variety of ways, when dealing with hundreds of subscribers.

We want our application to be fault-tolerant.
Newsletter delivery should not be disrupted by transient failures like application crashes, Postmark API errors or network timeouts. To deliver a reliable service in the face of failure we will have to explore new concepts: idempotency, locking, queues and background jobs.

Chapter 11

1. POST /admin/newsletters - A Refresher

Let's refresh our memory before jumping straight into the task: what does POST /admin/newsletters look like?1

The endpoint is invoked when a logged-in newsletter author submits the HTML form served at GET /admin/newsletters.
We parse the form data out of the HTTP request body and, if nothing is amiss, kick-off the processing.

//! src/routes/admin/newsletter/post.rs
// [...]

#[derive(serde::Deserialize)]
pub struct FormData {
    title: String,
    text_content: String,
    html_content: String,
}

#[tracing::instrument(/* */)]
pub async fn publish_newsletter(
    form: web::Form<FormData>,
    pool: web::Data<PgPool>,
    email_client: web::Data<EmailClient>,
) -> Result<HttpResponse, actix_web::Error> {
    // [...]
}

We start by fetching all confirmed subscribers from our Postgres database.

//! src/routes/admin/newsletter/post.rs
// [...]

#[tracing::instrument(/* */)]
pub async fn publish_newsletter(/* */) -> Result<HttpResponse, actix_web::Error> {
    // [...]
    let subscribers = get_confirmed_subscribers(&pool).await.map_err(e500)?;
    // [...]
}

struct ConfirmedSubscriber {
    email: SubscriberEmail,
}

#[tracing::instrument(/* */)]
async fn get_confirmed_subscribers(
    pool: &PgPool,
) -> Result<Vec<Result<ConfirmedSubscriber, anyhow::Error>>, anyhow::Error> {
    /* */
}

We iterate over the retrieved subscribers, sequentially.
For each user, we try to send out an email with the new newsletter issue.

//! src/routes/admin/newsletter/post.rs
// [...]

#[tracing::instrument(/* */)]
pub async fn publish_newsletter(/* */) -> Result<HttpResponse, actix_web::Error> {
    // [...]
    let subscribers = get_confirmed_subscribers(&pool).await.map_err(e500)?;
    for subscriber in subscribers {
        match subscriber {
            Ok(subscriber) => {
                email_client
                    .send_email(/* */)
                    .await
                    .with_context(/* */)
                    .map_err(e500)?;
            }
            Err(error) => {
                tracing::warn!(/* */);
            }
        }
    }
    FlashMessage::info("The newsletter issue has been published!").send();
    Ok(see_other("/admin/newsletters"))
}

Once all subscribers have been taken care of, we redirect the author back to the newsletter form - they will be shown a flash message confirming that the issue was published successfully.

2. Our Goal

We want to ensure best-effort delivery: we strive to deliver the new newsletter issue to all subscribers.
We cannot guarantee that all emails will be delivered: some accounts might just have been deleted.

At the same time, we should try to minimize duplicates - i.e. a subscriber receiving the same issue multiple times. We cannot rule out duplicates entirely (we will later discuss why), but our implementation should minimize their frequency.

3. Failure Modes

Let's have a look at the possible failure modes of our POST /admin/newsletters endpoint.
Can we still achieve best-effort delivery when something goes awry?

3.1. Invalid Inputs

There might be issues with the incoming request: the body is malformed or the user has not authenticated.
Both scenarios are already handled appropriately:

3.2. Network I/O

Problems might arise when we interact with other machines over the network.

3.2.1. Postgres

The database might misbehave when we try to retrieve the current list of subscribers. We do not have a lot of options apart from retrying. We can:

The first option makes our application more resilient to spurious failures. Nonetheless, you can only perform a finite number of retries; you will have to give up eventually.

Our implementation opts for the second strategy from the get-go. It might result in a few more 500s, but it is not incompatible with our over-arching objective.

3.2.2. Postmark - API Errors

What about email delivery issues?
Let's start with the simplest scenario: Postmark returns an error when we try to email one of our subscribers.
Our current implementation bails out: we abort the processing and return a 500 Internal Server Error to the caller.

We are sending emails out sequentially. We will never get a chance to deliver the new issue to the subscribers at the end of the list if we abort as soon as an API error is encountered. This is far from being "best-effort delivery".

This is not the end of our problems either - can the newsletter author retry the form submission?

It depends on where the error occurred.

Was it the first subscriber in the list returned by our database query?
No problem, nothing has happened yet.

What if it were the third subscriber in the list? Or the fifth? Or the one-hundredth?
We have a problem: some subscribers have been sent the new issue, others haven't.
If the author retries, some subscribers are going to receive the issue twice.
If they don't retry, some subscribers might never receive the issue.

Damned if you do, damned if you don't.

You might recognize the struggle: we are dealing with a workflow, a combination of multiple sub-tasks.
We faced something similar in chapter 7 when we had to execute a sequence of SQL queries to create a new subscriber. Back then, we opted for an all-or-nothing semantics using SQL transactions: nothing happens unless all queries succeed. Postmark's API does not provide any3 kind of transactional semantics - each API call is its own unit of work, we have no way to link them together.

3.3. Application Crashes

Our application could crash at any point in time. It might, for example, run out of memory or the server it is running on might be abruptly terminated (welcome to the cloud!).

A crash, in particular, might happen after we started to process the subscribers list but before we got to the end of it. The author will receive an error message in the browser.
Re-submitting the form is likely to result in a high number of redundant deliveries, just like we observed when discussing the consequences of Postmark's API errors.

3.4. Author Actions

Last but not least, we might have issues in the interaction between the author and the API.

If we are dealing with a large audience, it might take minutes to process the entire subscribers list. The author might get impatient and choose to re-submit the form. The browser might decide to give up (client-side timeout). Or, equally problematic, the author might click on the Submit button more than once4, by mistake.

Once again, we end up in a corner because our implementation is not retry-safe.

4. Idempotency: An Introduction

POST /admin/newsletters is, all things considered, a pretty simple endpoint. Nonetheless, our investigation highlighted several scenarios where the current implementation fails to meet our expectations.
Most of our problems boil down to a specific limitation: it is not safe to retry.

Retry-safety has a dramatic impact on the ergonomics of an API. It is substantially easier to write a reliable API client if you can safely retry when something goes wrong.
But what does retry-safety actually entail?

We built an intuitive understanding of what it means in our domain, newsletter delivery - send the content to every subscriber no more than once. How does that transfer to another domain?
You might be surprised to find out that we do not have a clear industry-accepted definition. It is a tricky subject.
For the purpose of this book, we will define retry-safety as follows:

An API endpoint is retry-safe (or idempotent) if the caller has no way to observe if a request has been sent to the server once or multiple times.

We will probe and explore this definition for a few sections: it is important to fully understand its ramifications.

If you have been in the industry long enough, you have probably heard another term used to describe the concept of retry-safety: idempotency. They are mostly used as synonyms - we will use idempotency going forward, mostly to align with other industry terminology that will be relevant to our implementation (i.e. idempotency keys).

4.1. Idempotency In Action: Payments

Let's explore the implications of our idempotency definition in another domain, payments.
Our fictional payments API exposes three endpoints:

POST /payments, in particular, takes as input the beneficiary details and the payment amount. An API call triggers a money transfer from your account to the specified beneficiary; your balance is reduced accordingly (i.e. new_balance = old_balance - payment_amount).

Let's consider this scenario: your balance is 400 USD and you send a request to transfer 20 USD. The request succeeds: the API returned a 200 OK5, your balance was updated to 380 USD and the beneficiary received 20 USD.
You then retry the same request - e.g. you click twice on the Pay now button.
What should happen if POST /payments is idempotent?

Our idempotency definition is built around the concept of observability - properties of the system state that the caller can inspect by interacting with the system itself.
For example: you could easily determine that the second call is a retry by going through the logs emitted by the API. But the caller is not an operator - they have no way to inspect those logs. They are invisible to the users of the API - in so far as idempotency is concerned, they don't exist. They are not part of the domain model exposed and manipulated by the API.

The domain model in our example includes:

Given the above, we can say that POST /payments is idempotent if, when the request is retried,

There is one more detail to sort out - what HTTP response should the server return for the retried request?
The caller should not be able to observe that the second request was a retry. The payment succeeded, therefore the server should return a success response that is semantically equivalent to the HTTP response used to answer the initial request.

4.2. Idempotency Keys

There is room for ambiguity in our definition of idempotency: how do we distinguish between a retry and a user trying to perform two distinct payments for the same amount to the same beneficiary?

We need to understand the caller's intent.

We could try to use a heuristic - e.g. the second request is a duplicate if it was sent no more than 5 minutes later.
This could be a good starting point, but it is not bulletproof. The consequences of misclassification could be dire, both for the caller and our reputation as an organization (e.g. a late retry causing a double payment).

Given that this is all about understanding the caller's intent, there is no better strategy than empowering the caller themselves to tell us what they are trying to do. This is commonly accomplished using idempotency keys.
The caller generates a unique identifier, the idempotency key, for every state-altering operation they want to perform. The idempotency key is attached to the outgoing request, usually as an HTTP header (e.g. Idempotency-Key7).
The server can now easily spot duplicates:

We will start requiring an idempotency key in POST /admin/newsletters as part of our idempotency implementation.

4.3. Concurrent Requests

What should happen when two duplicate requests are fired concurrently - i.e. the second request reaches the server before it finishes processing the first one?

We do not yet know the outcome of the first request. Processing both requests in parallel might also introduce the risk of performing side effects more than once (e.g. initiating two distinct payments).

It is common to introduce synchronization: the second request should not be processed until the first one has completed.
We have two options:

Both are viable.
The latter is fully transparent to the caller, making it easier to consume the API - they don't have to handle yet another transient failure mode. There is a price9 to pay though: both the client and the server need to keep an open connection while spinning idle, waiting for the other task to complete.
Considering our use case (processing forms), we will go for the second strategy in order to minimize the number of user-visible errors - browsers do not automatically retry 409s.

5. Requirements As Tests #1

Let's start by focusing on the simplest scenario: a request was received and processed successfully, then a retry is performed.
We expect a success response with no duplicate newsletter delivery:

//! tests/api/newsletter.rs
// [...]

#[tokio::test]
async fn newsletter_creation_is_idempotent() {
    // Arrange
    let app = spawn_app().await;
    create_confirmed_subscriber(&app).await;
    app.test_user.login(&app).await;

    Mock::given(path("/email"))
        .and(method("POST"))
        .respond_with(ResponseTemplate::new(200))
        .expect(1)
        .mount(&app.email_server)
        .await;

    // Act - Part 1 - Submit newsletter form
    let newsletter_request_body = serde_json::json!({
        "title": "Newsletter title",
        "text_content": "Newsletter body as plain text",
        "html_content": "<p>Newsletter body as HTML</p>",
        // We expect the idempotency key as part of the
        // form data, not as an header
        "idempotency_key": uuid::Uuid::new_v4().to_string()
    });
    let response = app.post_publish_newsletter(&newsletter_request_body).await;
    assert_is_redirect_to(&response, "/admin/newsletters");

    // Act - Part 2 - Follow the redirect
    let html_page = app.get_publish_newsletter_html().await;
    assert!(
        html_page.contains("<p><i>The newsletter issue has been published!</i></p>")
    );

    // Act - Part 3 - Submit newsletter form **again**
    let response = app.post_publish_newsletter(&newsletter_request_body).await;
    assert_is_redirect_to(&response, "/admin/newsletters");

    // Act - Part 4 - Follow the redirect
    let html_page = app.get_publish_newsletter_html().await;
    assert!(
        html_page.contains("<p><i>The newsletter issue has been published!</i></p>")
    );

    // Mock verifies on Drop that we have sent the newsletter email **once**
}

cargo test should fail:

thread 'newsletter::newsletter_creation_is_idempotent' panicked at 
'Verifications failed:
- Mock #1.
    Expected range of matching incoming requests: == 1
    Number of matched incoming requests: 2
[...]'

The retry succeeded, but it resulted in the newsletter being delivered twice to our subscriber - the problematic behaviour we identified during the failure analysis at the very beginning of this chapter.

6. Implementation Strategies

How do we prevent the retried request from dispatching a new round of emails to our subscribers? We have two options - one requires state, the other doesn't.

6.1. Stateful Idempotency: Save And Replay

In the stateful approach, we process the first request and then store its idempotency key next to the HTTP response we are about to return. When a retry comes in, we look for a match in the store against its idempotency key, fetch the saved HTTP response and return it to the caller.
The entire handler logic is short-circuited - it never gets executed. Postmark's API is never called again, preventing duplicate deliveries.

6.2. Stateless Idempotency: Deterministic Key Generation

The stateless approach tries to achieve the same outcome without relying on persistence.
For every subscriber, we deterministically generate a new idempotency key using their subscriber id, the newsletter content10 and the idempotency key attached to the incoming request. Every time we call Postmark to send an email we make sure to pass along the subscriber-specific idempotency key.
When a retry comes in, we execute the same processing logic - this leads to the same sequence of HTTP calls to Postmark, using exactly the same idempotency keys. Assuming their idempotency implementation is sound, no new email is going to be dispatched.

6.3. Time Is a Tricky Beast

The stateless and the stateful approach are not 100% equivalent.

Let's consider what happens, for example, when a new person subscribes to our newsletter between the initial request and the following retry.
The stateless approach executes the handler logic in order to process the retried request. In particular, it re-generates the list of current subscribers before kicking off the email dispatching for loop. As a result, the new subscriber will receive the newsletter issue.
This is not the case when following the stateful approach - we retrieve the HTTP response from the store and return it to the caller without performing any kind of processing.

This is a symptom of a deeper discrepancy - the elapsed time between the initial request and the following retry affects the processing outcome when following the stateless approach.
We cannot execute our handler logic against the same snapshot of the state seen by the first request - therefore, the view of the world in the stateless approach is impacted by all the operations that have been committed since the first request was processed11 (e.g. new subscribers joining the mailing list).

Whether this is acceptable or not depends on the domain.
In our case, the fallout is quite minor - we are just sending extra newsletters out. We could live with it if the stateless approach led us to a dramatically simpler implementation.

6.4. Making A Choice

Unfortunately, the circumstances leave us with no wiggle room: Postmark's API does not provide any idempotency mechanism therefore we cannot follow the stateless approach.
The stateful approach happens to be trickier to implement - rejoice, we'll have a chance to learn some new patterns!

7. Idempotency Store

7.1. Which Database Should We Use?

For each idempotency key, we must store the associated HTTP response.
Our application currently uses two different data sources:

We do not want to store idempotency keys forever - it would be impractical and wasteful.
We also do not want actions performed by a user A to influence the outcome of actions performed by user B - there is a concrete security risk (cross-user data leakage) if proper isolation is not enforced.

Storing idempotency keys and responses into the session state of the user would guarantee both isolation and expiry out of the box. At the same time, it doesn't feel right to tie the lifespan of idempotency keys to the lifespan of the corresponding user sessions.

Based on our current requirements, Redis looks like the best solution to store our (user_id, idempotency_key, http_response) triplets. They would have their own time-to-live policy, with no ties to session states, and Redis would take care of cleaning old entries for us.

Unfortunately, new requirements will soon emerge and turn Redis into a limiting choice. There is not much to learn by taking the wrong turn here, so I'll cheat and force our hand towards Postgres.
Spoiler: we will leverage the possibility of modifying the idempotency triplets and our application state within a single SQL transaction.

7.2. Schema

We need to define a new table to store the following information:

The user id and the idempotency key can be used as a composite primary key. We should also record when each row was created in order to evict old idempotency keys.

There is a major unknown though: what type should be used to store HTTP responses?

We could treat the whole HTTP response as a blob of bytes, using bytea as column type.
Unfortunately, it'd be tricky to re-hydrate the bytes into an HttpResponse object - actix-web does not provide any serialization/deserialization implementation for HttpResponse.
We are going to write our own (de)serialisation code - we will work with the core components of an HTTP response:

We are not going to store the HTTP version - the assumption is that we are working exclusively with HTTP/1.1.
We can use smallint for the status code - it's maximum value is 32767, which is more than enough. bytea will do for the body.
What about headers? What is their type?

We can have multiple header values associated to the same header name, therefore it makes sense to represent them as an array of (name, value) pairs.
We can use TEXT for the name (see http's implementation) while value will require BYTEA because it allows opaque octets (see http's test cases).
Postgres does not support arrays of tuples, but there is a workaround: we can define a Postgres composite type - i.e. a named collection of fields, the equivalent of a struct in our Rust code.

CREATE TYPE header_pair AS (
    name TEXT,
    value BYTEA
);

We can now put together the migration script:

sqlx migrate add create_idempotency_table
-- migrations/20220211080603_create_idempotency_table.sql
CREATE TYPE header_pair AS (
    name TEXT,
    value BYTEA
);

CREATE TABLE idempotency (
   user_id uuid NOT NULL REFERENCES users(user_id),
   idempotency_key TEXT NOT NULL,
   response_status_code SMALLINT NOT NULL,
   response_headers header_pair[] NOT NULL,
   response_body BYTEA NOT NULL,
   created_at timestamptz NOT NULL,
   PRIMARY KEY(user_id, idempotency_key)
);
sqlx migrate run

We could have defined an overall http_response composite type, but we would have run into a bug in sqlx which is in turn caused by a bug in the Rust compiler. Best to avoid nested composite types for the time being.

8. Save And Replay

8.1. Read Idempotency Key

Our POST /admin/newsletters endpoint is being triggered by an HTML form submission, therefore we do not have control over the headers that are being sent to the server.
The most practical choice is to embed the idempotency key inside the form data:

//! src/routes/admin/newsletter/post.rs
// [...]

#[derive(serde::Deserialize)]
pub struct FormData {
    title: String,
    text_content: String,
    html_content: String,
    // New field!
    idempotency_key: String
}

We do not care about the exact format of the idempotency key, as long as it's not empty and it's reasonably long.
Let's define a new type to enforce minimal validation:

//! src/lib.rs
// [...]
// New module!
pub mod idempotency;
//! src/idempotency/mod.rs
mod key;
pub use key::IdempotencyKey;
//! src/idempotency/key.rs
#[derive(Debug)]
pub struct IdempotencyKey(String);

impl TryFrom<String> for IdempotencyKey {
    type Error = anyhow::Error;

    fn try_from(s: String) -> Result<Self, Self::Error> {
        if s.is_empty() {
            anyhow::bail!("The idempotency key cannot be empty");
        }
        let max_length = 50;
        if s.len() >= max_length {
            anyhow::bail!(
                "The idempotency key must be shorter 
                than {max_length} characters");
        }
        Ok(Self(s))
    }
}

impl From<IdempotencyKey> for String {
    fn from(k: IdempotencyKey) -> Self {
        k.0
    }
}

impl AsRef<str> for IdempotencyKey {
    fn as_ref(&self) -> &str {
        &self.0
    }
}

We can now use it in publish_newsletter:

//! src/utils.rs
use actix_web::http::StatusCode;
// [...]

// Return a 400 with the user-representation of the validation error as body.
// The error root cause is preserved for logging purposes.
pub fn e400<T: std::fmt::Debug + std::fmt::Display>(e: T) -> actix_web::Error
where
    T: std::fmt::Debug + std::fmt::Display + 'static
{
    actix_web::error::ErrorBadRequest(e)
}
//! src/routes/admin/newsletter/post.rs
use crate::idempotency::IdempotencyKey;
use crate::utils::e400;
// [...]

pub async fn publish_newsletter(/* */) -> Result<HttpResponse, actix_web::Error> {
    // We must destructure the form to avoid upsetting the borrow-checker
    let FormData { title, text_content, html_content, idempotency_key } = form.0;
    let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?;
    let subscribers = get_confirmed_subscribers(&pool).await.map_err(e500)?;
    for subscriber in subscribers {
        match subscriber {
            Ok(subscriber) => {
                // No longer using `form.<X>`
                email_client
                    .send_email(&subscriber.email, &title, &html_content, &text_content)
                    // [...]
            }
            // [...]
        }
    }
    // [...]
}

Success! The idempotency key has been parsed and validated.
Some of our old tests, though, are not particularly happy:

thread 'newsletter::you_must_be_logged_in_to_publish_a_newsletter' 
panicked at 'assertion failed: `(left == right)`
  left: `400`,
 right: `303`'

thread 'newsletter::newsletters_are_not_delivered_to_unconfirmed_subscribers' 
panicked at 'assertion failed: `(left == right)`
  left: `400`,
 right: `303`'

thread 'newsletter::newsletters_are_delivered_to_confirmed_subscribers' 
panicked at 'assertion failed: `(left == right)`
  left: `400`,
 right: `303`'

Our test requests are being rejected because they do not include an idempotency key.
Let's update them:

//! tests/api/newsletter.rs
// [...]

#[tokio::test]
async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() {
    // [...]
    let newsletter_request_body = serde_json::json!({
        // [...]
        "idempotency_key": uuid::Uuid::new_v4().to_string()
    });
}

#[tokio::test]
async fn newsletters_are_delivered_to_confirmed_subscribers() {
    // [...] 
    let newsletter_request_body = serde_json::json!({
        // [...]
        "idempotency_key": uuid::Uuid::new_v4().to_string()
    });
}

#[tokio::test]
async fn you_must_be_logged_in_to_publish_a_newsletter() {
    // [...]
    let newsletter_request_body = serde_json::json!({
        // [...]
        "idempotency_key": uuid::Uuid::new_v4().to_string()
    });
    // [...]
}

Those three tests should now pass again, leaving newsletter::newsletter_creation_is_idempotent as the only failing test.

We also need to update GET /admin/newsletters to embed a randomly-generated idempotency key in the HTML form:

//! src/routes/admin/newsletter/get.rs
// [...]

pub async fn publish_newsletter_form(/* */) -> Result<HttpResponse, actix_web::Error> {
    // [...]
    let idempotency_key = uuid::Uuid::new_v4();
    Ok(HttpResponse::Ok()
        .content_type(ContentType::html())
        .body(format!(
            r#"<!-- ... -->   
    <form action="/admin/newsletters" method="post">
        <!-- ... -->
        <input hidden type="text" name="idempotency_key" value="{idempotency_key}">
        <button type="submit">Publish</button>
    </form>
    <!-- ... -->"#,
        )))
}

8.2. Retrieve Saved Responses

The next step is trying to fetch a saved HTTP response from the store, assuming one exists.

It boils down to a single SQL query:

//! src/idempotency/mod.rs
// [...]
mod persistence;
pub use persistence::get_saved_response;
//! src/idempotency/persistence.rs
use super::IdempotencyKey;
use actix_web::HttpResponse;
use sqlx::PgPool;
use uuid::Uuid;

pub async fn get_saved_response(
    pool: &PgPool,
    idempotency_key: &IdempotencyKey,
    user_id: Uuid,
) -> Result<Option<HttpResponse>, anyhow::Error> {
    let saved_response = sqlx::query!(
        r#"
        SELECT 
            response_status_code, 
            response_headers,
            response_body
        FROM idempotency
        WHERE 
          user_id = $1 AND
          idempotency_key = $2
        "#,
        user_id,
        idempotency_key.as_ref()
    )
    .fetch_optional(pool)
    .await?;
    todo!()
}

There is a caveat - sqlx does not know how to handle our custom header_pair type:

error: unsupported type _header_pair of column #2 ("response_headers")
 |
 |       let saved_response = sqlx::query!(
 |  __________________________^
 | |         r#"
 | |         SELECT 
.. |
 | |         idempotency_key.as_ref()
 | |     )
   | |_____^

It might not be supported out of the box, but there is a mechanism for us to specify how it should be handled - the Type, Decode and Encode traits.
Luckily enough, we do not have to implement them manually - we can derive them with a macro!
We just need to specify the type fields and the name of the composite type as it appears in Postgres; the macro should take care of the rest:

//! src/idempotency/persistence.rs
// [...]

#[derive(Debug, sqlx::Type)]
#[sqlx(type_name = "header_pair")]
struct HeaderPairRecord {
    name: String,
    value: Vec<u8>,
}

Unfortunately, the error is still there.

error: unsupported type _header_pair of column #2 ("response_headers")
 |
 |       let saved_response = sqlx::query!(
 |  __________________________^
 | |         r#"
 | |         SELECT 
.. |
 | |         idempotency_key.as_ref()
 | |     )
 | |_____^

// [...] <new error> [...]

It turns out that sqlx::query! does not handle custom type automatically - we need to explain how we want the custom column to be handled by using an explicit type annotation.
The query becomes:

//! src/idempotency/persistence.rs
// [...]

pub async fn get_saved_response(/* */) -> Result</* */> {
    let saved_response = sqlx::query!(
        r#"
        SELECT 
            response_status_code, 
            response_headers as "response_headers: Vec<HeaderPairRecord>",
            response_body
        // [...]
        "#,
        // [...]
    )
    // [...]
}

At last, it compiles!
Let's map the retrieved data back into a proper HttpResponse:

//! src/idempotency/persistence.rs
use actix_web::http::StatusCode;
// [...]

pub async fn get_saved_response(/* */) -> Result<Option<HttpResponse>, anyhow::Error> {
    let saved_response = sqlx::query!(/* */)
        .fetch_optional(pool)
        .await?;
    if let Some(r) = saved_response {
        let status_code = StatusCode::from_u16(
            r.response_status_code.try_into()?
        )?;
        let mut response = HttpResponse::build(status_code);
        for HeaderPairRecord { name, value } in r.response_headers {
            response.append_header((name, value));
        }
        Ok(Some(response.body(r.response_body)))
    } else {
        Ok(None)
    }
}

We can now plug get_saved_response into our request handler:

//! src/routes/admin/newsletter/post.rs
// [...] 
use crate::idempotency::get_saved_response;

pub async fn publish_newsletter(
    // [...]
    // Inject the user id extracted from the user session
    user_id: ReqData<UserId>,
) -> Result<HttpResponse, actix_web::Error> {
    let user_id = user_id.into_inner();
    let FormData {
        title,
        text_content,
        html_content,
        idempotency_key,
    } = form.0;
    let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?;
    // Return early if we have a saved response in the database 
    if let Some(saved_response) = get_saved_response(&pool, &idempotency_key, *user_id)
        .await
        .map_err(e500)?
    {
        return Ok(saved_response);
    }
    // [...]
}

8.3. Save Responses

We have code to retrieve saved responses, but we don't have code yet to save responses - that's what we will be focusing on next.

Let's add a new function skeleton to our idempotency module:

//! src/idempotency/mod.rs
// [...]
pub use persistence::save_response;
//! src/idempotency/persistence.rs
// [...]

pub async fn save_response(
    _pool: &PgPool,
    _idempotency_key: &IdempotencyKey,
    _user_id: Uuid,
    _http_response: &HttpResponse
) -> Result<(), anyhow::Error> {
    todo!()
}

We need to break HttpResponse into its separate components before we write the INSERT query.
We can use .status() for the status code, .headers() for the headers... what about the body?
There is a .body() method - this is its signature:

/// Returns a reference to this response's body.
pub fn body(&self) -> &B {
    self.res.body()
}

What is B? We must include the impl block definition into the picture to grasp it:

impl<B> HttpResponse<B> {
    /// Returns a reference to this response's body.
    pub fn body(&self) -> &B {
        self.res.body()
    }
}

Well, well, it turns out that HttpResponse is generic over the body type!
But, you may ask, "we have been using HttpResponse for 400 pages without specifying any generic parameter, what's going on?"

There is a default generic parameter which kicks in if B is left unspecified:

/// An outgoing response.
pub struct HttpResponse<B = BoxBody> {/* */}

8.3.1. MessageBody and HTTP Streaming

Why does HttpResponse need to be generic over the body type in the first place? Can't it just use Vec<u8> or a similar bytes container?

We have always worked with responses that were fully formed on the server before being sent back to the caller. HTTP/1.1 supports another mechanism to transfer data - Transfer-Encoding: chunked, also known as HTTP streaming.
The server breaks down the payload into multiple chunks and sends them over to the caller one at a time instead of accumulating the entire body in memory first. It allows the server to significantly reduce its memory usage. It is quite useful when working on large payloads such as files or results from a large query (streaming all the way through!).

With HTTP streaming in mind, it becomes easier to understand the design of MessageBody, the trait that must be implemented to use a type as body in actix-web:

pub trait MessageBody {
    type Error: Into<Box<dyn Error + 'static, Global>>;
    fn size(&self) -> BodySize;
    fn poll_next(
        self: Pin<&mut Self>, 
        cx: &mut Context<'_>
    ) -> Poll<Option<Result<Bytes, Self::Error>>>;
    // [...]
}

You pull data, one chunk at a time, until you have fetched it all.
When the response is not being streamed, the data is available all at once - poll_next returns it all in one go.

Let's try to understand BoxBody, the default body type used by HttpResponse. The body type we have been using for several chapters, unknowingly!
BoxBody abstracts away the specific payload delivery mechanism. Under the hood, it is nothing more than an enum with a variant for each strategy, with a special case catering for body-less responses:

#[derive(Debug)]
pub struct BoxBody(BoxBodyInner);

enum BoxBodyInner {
    None(body::None),
    Bytes(Bytes),
    Stream(Pin<Box<dyn MessageBody<Error = Box<dyn StdError>>>>),
}

It worked for so long because we did not really care about the way the response was being sent back to the caller.
Implementing save_response forces us to look closer - we need to collect the response in memory12 in order to save it in the idempotency table of our database.

actix-web has a dedicated function for situation like ours: to_bytes.
It calls poll_next until there is no more data to fetch, than it returns the entire response back to us inside a Bytes container13.
I'd normally advise for caution when talking about to_bytes - if you are dealing with huge payloads, there is a risk of putting the server under significant memory pressure.
This is not our case - all our response bodies are small and don't actually take advantage of HTTP streaming, so to_bytes will not actually do any work.

Enough with the theory - let's piece it together:

//! src/idempotency/persistence.rs
use actix_web::body::to_bytes;
// [...]

pub async fn save_response(
    pool: &PgPool,
    idempotency_key: &IdempotencyKey,
    user_id: Uuid,
    http_response: &HttpResponse,
) -> Result<(), anyhow::Error> {
    let status_code = http_response.status().as_u16() as i16;
    let headers = {
        let mut h = Vec::with_capacity(http_response.headers().len());
        for (name, value) in http_response.headers().iter() {
            let name = name.as_str().to_owned();
            let value = value.as_bytes().to_owned();
            h.push(HeaderPairRecord { name, value });
        }
        h
    };
    let body = to_bytes(http_response.body()).await.unwrap();
    todo!()
}

The compiler is not happy:

error[E0277]: the trait bound `&BoxBody: MessageBody` is not satisfied
--> src/idempotency/persistence.rs
 |
 |     let body = to_bytes(http_response.body()).await.unwrap();
 |                -------- ^^^^^^^^^^^^^^^^^^^^ 
 the trait `MessageBody` is not implemented for `&BoxBody`
 |                |
 |                required by a bound introduced by this call
 |
 = help: the following implementations were found:
           <BoxBody as MessageBody>

BoxBody implements MessageBody, but &BoxBody doesn't - and .body() returns a reference, it does not give us ownership over the body.
Why do we need ownership? It's because of HTTP streaming, once again!

Pulling a chunk of data from the payload stream requires a mutable reference to the stream itself - once the chunk has been read, there is no way to "replay" the stream and read it again.

There is a common pattern to work around this:

.into_parts() requires ownership of HttpResponse - we'll have to change the signature of save_response to accommodate it. Instead of asking for a reference, we'll take ownership of the response and then return another owned HttpResponse in case of success.

Let's go for it:

//! src/idempotency/persistence.rs
// [...]

pub async fn save_response(
    // [...]
    // No longer a reference!
    http_response: HttpResponse,
) -> Result<HttpResponse, anyhow::Error> {
    let (response_head, body) = http_response.into_parts();
    // `MessageBody::Error` is not `Send` + `Sync`, 
    // therefore it doesn't play nicely with `anyhow`
    let body = to_bytes(body).await.map_err(|e| anyhow::anyhow!("{}", e))?;
    let status_code = response_head.status().as_u16() as i16;
    let headers = {
        let mut h = Vec::with_capacity(response_head.headers().len());
        for (name, value) in response_head.headers().iter() {
            let name = name.as_str().to_owned();
            let value = value.as_bytes().to_owned();
            h.push(HeaderPairRecord { name, value });
        }
        h
    };

    // TODO: SQL query

    // We need `.map_into_boxed_body` to go from 
    // `HttpResponse<Bytes>` to `HttpResponse<BoxBody>`
    let http_response = response_head.set_body(body).map_into_boxed_body();
    Ok(http_response)
}

That should compile, although it isn't particularly useful (yet).

8.3.2. Array Of Composite Postgres Types

Let's add the insertion query:

//! src/idempotency/persistence.rs
// [...]

pub async fn save_response(
    // [...]
) -> Result<HttpResponse, anyhow::Error> {
    // [...]
    sqlx::query!(
        r#"
        INSERT INTO idempotency (
            user_id, 
            idempotency_key, 
            response_status_code, 
            response_headers, 
            response_body,
            created_at
        )
        VALUES ($1, $2, $3, $4, $5, now())
        "#,
        user_id,
        idempotency_key.as_ref(),
        status_code,
        headers,
        body.as_ref()
    )
    .execute(pool)
    .await?;

    let http_response = response_head.set_body(body).map_into_boxed_body();
    Ok(http_response)
}

Compilation fails with an error:

error: unsupported type _header_pair for param #4
--> src/idempotency/persistence.rs
 |
 | /     sqlx::query!(
 | |         r#"
 | |         INSERT INTO idempotency (
 | |             user_id,
.. |
 | |         body.as_ref()
 | |     )
   | |_____^

It does make sense - we are using a custom type and sqlx::query! is not powerful enough to learn about it at compile-time in order to check our query. We will have to disable compile-time verification - use query_unchecked! instead of query!:

//! src/idempotency/persistence.rs
// [...]

pub async fn save_response(
    // [...]
) -> Result<HttpResponse, anyhow::Error> {
    // [...]
    sqlx::query_unchecked!(/* */)
    // [...]
}

We are getting closer - a different error!

error[E0277]: the trait bound `HeaderPairRecord: PgHasArrayType` is not satisfied
--> src/idempotency/persistence.rs
 |
 | /     sqlx::query_unchecked!(
 | |         r#"
 | |         INSERT INTO idempotency (
 | |             user_id,
.. |
 | |         body.as_ref()
 | |     )
 | |_____^ the trait `PgHasArrayType` is not implemented for `HeaderPairRecord`

sqlx knows, via our #[sqlx(type_name = "header_pair")] attribute, the name of the composite type itself. It does not know the name of the type for arrays containing header_pair elements.
Postgres creates an array type implicitly when we run a CREATE TYPE statement - it is simply the composite type name prefixed by an underscore14.

We can provide this information to sqlx by implementing the PgHasArrayType trait, just like the compiler suggested:

//! src/idempotency/persistence.rs
use sqlx::postgres::PgHasArrayType;
// [...]

impl PgHasArrayType for HeaderPairRecord {
    fn array_type_info() -> sqlx::postgres::PgTypeInfo {
        sqlx::postgres::PgTypeInfo::with_name("_header_pair")
    }
}

The code should finally compile.

8.3.3. Plug It In

It's a milestone, but it is a bit early to cheer - we don't know if it works yet. Our integration test is still red.
Let's plug save_response into our request handler:

//! src/routes/admin/newsletter/post.rs
use crate::idempotency::save_response;
// [...]

pub async fn publish_newsletter(/* */) -> Result</* */> {
    // [...]
    for subscriber in subscribers {
        // [...]
    }
    FlashMessage::info("The newsletter issue has been published!").send();
    let response = see_other("/admin/newsletters");
    let response = save_response(&pool, &idempotency_key, *user_id, response)
        .await
        .map_err(e500)?;
    Ok(response)
}

Low and behold, cargo test succeeds! We made it!

9. Concurrent Requests

We dealt with the "easy" scenario when it comes to idempotency: a request arrives, it's fully processed, then a retry comes in.

We will now deal with the more troublesome scenario - the retry arrives before the first request is fully processed.
We expect the second request to be queued behind the first one - once that finishes, it will retrieve the saved HTTP response from the store and return it to the caller.

9.1. Requirements As Tests #2

We can, once again, encode our requirements as tests:

//! tests/api/newsletter.rs
use std::time::Duration;
// [...]

#[tokio::test]
async fn concurrent_form_submission_is_handled_gracefully() {
    // Arrange
    let app = spawn_app().await;
    create_confirmed_subscriber(&app).await;
    app.test_user.login(&app).await;

    Mock::given(path("/email"))
        .and(method("POST"))
        // Setting a long delay to ensure that the second request 
        // arrives before the first one completes
        .respond_with(ResponseTemplate::new(200).set_delay(Duration::from_secs(2)))
        .expect(1)
        .mount(&app.email_server)
        .await;

    // Act - Submit two newsletter forms concurrently
    let newsletter_request_body = serde_json::json!({
        "title": "Newsletter title",
        "text_content": "Newsletter body as plain text",
        "html_content": "<p>Newsletter body as HTML</p>",
        "idempotency_key": uuid::Uuid::new_v4().to_string()
    });
    let response1 = app.post_publish_newsletter(&newsletter_request_body);
    let response2 = app.post_publish_newsletter(&newsletter_request_body);
    let (response1, response2) = tokio::join!(response1, response2);

    assert_eq!(response1.status(), response2.status());
    assert_eq!(response1.text().await.unwrap(), response2.text().await.unwrap());

    // Mock verifies on Drop that we have sent the newsletter email **once**
}

The test fails - our server returned a 500 Internal Server Error to one of the two requests:

thread 'newsletter::concurrent_form_submission_is_handled_gracefully' 
panicked at 'assertion failed: `(left == right)`
  left: `303`,
 right: `500`'

The logs explain what happened:

exception.details: 
    error returned from database: 
    duplicate key value violates unique constraint "idempotency_pkey"

    Caused by:
        duplicate key value violates unique constraint "idempotency_pkey"

The slowest request fails to insert into the idempotency table due to our uniqueness constraint.
The error response is not the only issue: both requests executed the email dispatch code (otherwise we wouldn't have seen the constraint violation!), resulting into duplicate delivery.

9.2. Synchronization

The second request is not aware of the first until it tries to insert into the database.
If we want to prevent duplicate delivery, we need to introduce cross-request synchronization before we start processing subscribers.

In-memory locks (e.g. tokio::sync::Mutex) would work if all incoming requests were being served by a single API instance. This is not our case: our API is replicated, therefore the two requests might end up being processed by two different instances.

Our synchronization mechanism will have to live out-of-process - our database being the natural candidate.
Let's think about it: we have an idempotency table, it contains one row for each unique combination of user id and idempotency key. Can we do something with it?

Our current implementation inserts a row into the idempotency table after processing the request, just before returning the response to the caller. We are going to change that: we will insert a new row as soon as the handler is invoked.
We don't know the final response at that point - we haven't started processing yet! We must relax the NOT NULL constraints on some of the columns:

sqlx migrate add relax_null_checks_on_idempotency
ALTER TABLE idempotency ALTER COLUMN response_status_code DROP NOT NULL;
ALTER TABLE idempotency ALTER COLUMN response_body DROP NOT NULL;
ALTER TABLE idempotency ALTER COLUMN response_headers DROP NOT NULL;
sqlx migrate run

We can now insert a row as soon as the handler gets invoked using the information we have up to that point - the user id and the idempotency key, our composite primary key.

The first request will succeed in inserting a row into idempotency. The second request, instead, will fail due to our uniqueness constraint.
That is not what we want:

The first scenario can be accommodated by using Postgres' ON CONFLICT statement - it allows us to define what should happen when an INSERT fails due to a constraint violation (e.g. uniqueness).
We have two options: ON CONFLICT DO NOTHING and ON CONFLICT DO UPDATE.
ON CONFLICT DO NOTHING, as you might guess, does nothing - it simply swallows the error. We can detect that the row was already there by checking the number of rows that were affected by the statement.
ON CONFLICT DO UPDATE, instead, can be used to modify the pre-existing row - e.g. ON CONFLICT DO UPDATE SET updated_at = now().

We will use ON CONFLICT DO NOTHING - if no new row was inserted, we will try to fetch the saved response.
Before we start implementing, there is an issue we need to solve: our code no longer compiles. Our code has not been updated to deal with the fact that a few columns in idempotency are now nullable. We must update the query to ask sqlx to forcefully assume that the columns will not be null - if we are wrong, it will cause an error at runtime.
The syntax is similar to the type casting syntax we used previously to deal with header pairs - we must append a ! to the column alias name:

//! src/idempotency/persistence.rs
// [...]

pub async fn get_saved_response(/* */) -> Result</* */> {
    let saved_response = sqlx::query!(
        r#"
        SELECT 
            response_status_code as "response_status_code!", 
            response_headers as "response_headers!: Vec<HeaderPairRecord>",
            response_body as "response_body!"
        [...]
        "#,
        user_id,
        idempotency_key.as_ref()
    )
    // [...]
}

Let's now define the skeleton of a new function, the one we will invoke at the beginning of our request handler - try_processing.
It will try to perform the insertion we just discussed - if it fails because a row already exists, we will assume that a response has been saved and try to return it.

//! src/idempotency/mod.rs
// [...]
pub use persistence::{try_processing, NextAction};
//! src/idempotency/persistence.rs
// [...]

pub enum NextAction {
    StartProcessing,
    ReturnSavedResponse(HttpResponse)
}

pub async fn try_processing(
    pool: &PgPool, 
    idempotency_key: &IdempotencyKey, 
    user_id: Uuid
) -> Result<NextAction, anyhow::Error> {
    todo!()
}

Our handler will invoke try_processing instead of get_saved_response:

//! src/routes/admin/newsletter/post.rs
use crate::idempotency::{try_processing, NextAction};
// [...]

pub async fn publish_newsletter(/* */) -> Result<HttpResponse, actix_web::Error> {
    // [...]
    let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?;
    match try_processing(&pool, &idempotency_key, *user_id)
        .await
        .map_err(e500)?
    {
        NextAction::StartProcessing => {}
        NextAction::ReturnSavedResponse(saved_response) => {
            success_message().send();
            return Ok(saved_response);
        }
    }
    // [...]
    success_message().send();
    let response = see_other("/admin/newsletters");
    let response = save_response(&pool, &idempotency_key, *user_id, response)
        .await
        .map_err(e500)?;
    Ok(response)
}

fn success_message() -> FlashMessage {
    FlashMessage::info("The newsletter issue has been published!")
}

We can now flesh out try_processing:

//! src/idempotency/persistence.rs
// [...]

pub async fn try_processing(
    pool: &PgPool,
    idempotency_key: &IdempotencyKey,
    user_id: Uuid,
) -> Result<NextAction, anyhow::Error> {
    let n_inserted_rows = sqlx::query!(
        r#"
        INSERT INTO idempotency (
            user_id, 
            idempotency_key,
            created_at
        ) 
        VALUES ($1, $2, now()) 
        ON CONFLICT DO NOTHING
        "#,
        user_id,
        idempotency_key.as_ref()
    )
    .execute(pool)
    .await?
    .rows_affected();
    if n_inserted_rows > 0 {
        Ok(NextAction::StartProcessing)
    } else {
        let saved_response = get_saved_response(pool, idempotency_key, user_id)
            .await?
            .ok_or_else(|| 
                anyhow::anyhow!("We expected a saved response, we didn't find it")
            )?;
        Ok(NextAction::ReturnSavedResponse(saved_response))
    }
}

A bunch of our tests will start failing. What is going on?
Log inspection highlights a duplicate key value violates unique constraint "idempotency_pkey". Guess what? We forgot to update save_response! It's trying to insert another row into idempotency for the same combination of user id and idempotency key - it needs to perform an UPDATE instead of an INSERT.

//! src/idempotency/persistence.rs
// [...]

pub async fn save_response(/* */) -> Result</* */> {
    // [...]
    sqlx::query_unchecked!(
        r#"
        UPDATE idempotency
        SET 
            response_status_code = $3, 
            response_headers = $4,
            response_body = $5
        WHERE
            user_id = $1 AND
            idempotency_key = $2
        "#,
        user_id,
        idempotency_key.as_ref(),
        status_code,
        headers,
        body.as_ref()
    )
    // [...]
}

We are back to square one - concurrent_form_submission_is_handled_gracefully is the only failing test. What have we gained?
Very little - the second request returns an error instead of sending emails out twice. An improvement, but not yet where we want to land.

We need to find a way to cause the INSERT in try_processing to wait instead of erroring out when a retry arrives before the first request has completed processing.

9.2.1. Transaction Isolation Levels

Let's do an experiment: we will wrap the INSERT in try_processing and the UPDATE in save_response in a single SQL transaction.
What do you think it's going to happen?

//! src/idempotency/persistence.rs
use sqlx::{Postgres, Transaction};
// [...]

#[allow(clippy::large_enum_variant)]
pub enum NextAction {
    // Return transaction for later usage
    StartProcessing(Transaction<'static, Postgres>),
    // [...]
}


pub async fn try_processing(/* */) -> Result</* */> {
    let mut transaction = pool.begin().await?;
    let n_inserted_rows = sqlx::query!(/* */)
        .execute(&mut transaction)
        .await?
        .rows_affected();
    if n_inserted_rows > 0 {
        Ok(NextAction::StartProcessing(transaction))
    } else {
        // [...]
    }
}

pub async fn save_response(
    // No longer a `Pool`!
    mut transaction: Transaction<'static, Postgres>,
    // [...]
) -> Result</* */> {
    // [...]
    sqlx::query_unchecked!(/* */)
        .execute(&mut transaction)
        .await?;
    transaction.commit().await?;
    // [...]
}
//! src/routes/admin/newsletter/post.rs
// [...]

pub async fn publish_newsletter(/* */) -> Result</* */> {
    // [...]
    let transaction = match try_processing(&pool, &idempotency_key, *user_id)
        .await
        .map_err(e500)?
    {
        NextAction::StartProcessing(t) => t,
        // [...]
    };
    // [...]
    let response = save_response(transaction, /* */)
        .await
        .map_err(e500)?;
    // [...]
}

All our tests are passing! But why?
It boils down to locks and transaction isolation levels!

READ COMMITTED is the default isolation level in Postgres. We have not tuned this setting, therefore this is the case for the queries in our application as well.
Postgres' documentation describes the behaviour at this isolation level as follows:

[...] a SELECT query (without a FOR UPDATE/SHARE clause) sees only data committed before the query began; it never sees either uncommitted data or changes committed during query execution by concurrent transactions. In effect, a SELECT query sees a snapshot of the database as of the instant the query begins to run.

Data-altering statements, instead, will be influenced by uncommitted transactions that are trying to alter the same set of rows:

UPDATE, DELETE, SELECT FOR UPDATE [...] will only find target rows that were committed as of the command start time. However, such a target row might have already been updated (or deleted or locked) by another concurrent transaction by the time it is found. In this case, the would-be updater will wait for the first updating transaction to commit or roll back (if it is still in progress).

This is exactly what is happening in our case.
The INSERT statement fired by the second request must wait for outcome of the SQL transaction started by the first request.
If the latter commits, the former will DO NOTHING.
If the latter rolls back, the former will actually perform the insertion.

It is worth highlighting that this strategy will not work if using stricter isolation levels.
We can test this pretty easily:

//! src/idempotency/persistence.rs
// [...]

pub async fn try_processing(/* */) -> Result</* */> {
    let mut transaction = pool.begin().await?;
    sqlx::query!("SET TRANSACTION ISOLATION LEVEL repeatable read")
        .execute(&mut transaction)
        .await?;
    let n_inserted_rows = sqlx::query!(/* */)
    // [...]
}

The second concurrent request will fail due to a database error: could not serialize access due to concurrent update.
repeatable read is designed to prevent non-repeatable reads (who would have guessed?): the same SELECT query, if run twice in a row within the same transaction, should return the same data.
This has consequences for statements such as UPDATE: if they are executed within a repeatable read transaction, they cannot modify or lock rows changed by other transactions after the repeatable read transaction began.
This is why the transaction initiated by the second request fails to commit in our little experiment above. The same would have happened if we had chosen serializable, the strictest isolation level available in Postgres.

10. Dealing With Errors

We made some solid progress - our implementation handles duplicated requests gracefully, no matter if they arrive concurrently or sequentially.
What about errors?

Let's add another test case:

#! Cargo.toml
# [...]
[dev-dependencies]
serde_urlencoded = "0.7.1"
# [...]
//! tests/api/newsletter.rs
use fake::faker::internet::en::SafeEmail;
use fake::faker::name::en::Name;
use fake::Fake;
use wiremock::MockBuilder;
// [...]

// Short-hand for a common mocking setup
fn when_sending_an_email() -> MockBuilder {
    Mock::given(path("/email")).and(method("POST"))
}

#[tokio::test]
async fn transient_errors_do_not_cause_duplicate_deliveries_on_retries() {
    // Arrange
    let app = spawn_app().await;
    let newsletter_request_body = serde_json::json!({
        "title": "Newsletter title",
        "text_content": "Newsletter body as plain text",
        "html_content": "<p>Newsletter body as HTML</p>",
        "idempotency_key": uuid::Uuid::new_v4().to_string()
    });
    // Two subscribers instead of one!
    create_confirmed_subscriber(&app).await;
    create_confirmed_subscriber(&app).await;
    app.test_user.login(&app).await;

    // Part 1 - Submit newsletter form
    // Email delivery fails for the second subscriber
    when_sending_an_email()
        .respond_with(ResponseTemplate::new(200))
        .up_to_n_times(1)
        .expect(1)
        .mount(&app.email_server)
        .await;
    when_sending_an_email()
        .respond_with(ResponseTemplate::new(500))
        .up_to_n_times(1)
        .expect(1)
        .mount(&app.email_server)
        .await;

    let response = app.post_publish_newsletter(&newsletter_request_body).await;
    assert_eq!(response.status().as_u16(), 500);

    // Part 2 - Retry submitting the form
    // Email delivery will succeed for both subscribers now
    when_sending_an_email()
        .respond_with(ResponseTemplate::new(200))
        .expect(1)
        .named("Delivery retry")
        .mount(&app.email_server)
        .await;
    let response = app.post_publish_newsletter(&newsletter_request_body).await;
    assert_eq!(response.status().as_u16(), 303);

    // Mock verifies on Drop that we did not send out duplicates
}

async fn create_unconfirmed_subscriber(app: &TestApp) -> ConfirmationLinks {
    // We are working with multiple subscribers now,
    // their details must be randomised to avoid conflicts!
    let name: String = Name().fake();
    let email: String = SafeEmail().fake();
    let body = serde_urlencoded::to_string(&serde_json::json!({
        "name": name,
        "email": email
    }))
    .unwrap();
    // [...]
}

The test does not pass - we are seeing yet another instance of duplicated delivery:

thread 'newsletter::transient_errors_do_not_cause_duplicate_deliveries_on_retries' 
panicked at 'Verifications failed:
- Delivery retry.
        Expected range of matching incoming requests: == 1
        Number of matched incoming requests: 2

It makes sense, if you think again about our idempotency implementation: the SQL transaction inserting into the idempotency table commits exclusively when processing succeeds.
Errors lead to an early return - this triggers a rollback when the Transaction<'static, Postgres> value is dropped.

Can we do better?

10.1. Distributed Transactions

The pain we are feeling is a common issue in real-world applications - you lose transactionality15 when executing logic that touches, at the same time, your local state and a remote state managed by another system16.

I happen to be fascinated by the technical challenges in distributed systems, but I am well aware that users do not share my passion. They want to get something done, they do not care about the internals - and rightly so.
A newsletter author expects one of the following scenarios after clicking on Submit:

Our implementation allows for a third scenario at the moment: the issue could not be published (500 Internal Server Error), but some subscribers received it anyway.
That won't do - partial execution is not acceptable, the system must end up in a sensible state.
There are two common approaches to solve this problem: backward recovery and forward recovery.

10.2. Backward Recovery

Backward recovery tries to achieve a semantic rollback by executing compensating actions.
Let's imagine we are working on an e-commerce checkout system: we have already charged the customer for the products in their basket but, when trying to authorize the shipment, we discover that one of the items is now out of stock.
We can perform a backward recovery by cancelling all shipment instructions and refunding the customer for the entire amount of their basket.
The recovery mechanism is not transparent to the customer - they will still see two payments on their transaction history, the original charge and the following refund. We are also likely to send out an email to explain what happened. But their balance, the state they care about, has been restored.

10.3. Forward Recovery

Backward recovery is not a good fit for our newsletter delivery system - we cannot "unsend" an email nor would it make sense to send a follow-up email asking subscribers to ignore the email we sent before (it'd be funny though).

We must try to perform forward recovery - drive the overall workflow to completion even if one or more sub-tasks did not succeed.
We have two options: active and passive recovery.

Passive recovery pushes on the API caller the responsibility to drive the workflow to completion.
The request handler leverages checkpoints to keep track of its progress - e.g. "123 emails have been sent out". If the handler crashes, the next API call will resume processing from the latest checkpoint, minimizing the amount of duplicated work (if any). After enough retries, the workflow will eventually complete.

Active recovery, instead, does not require the caller to do anything apart from kicking off the workflow. The system must self-heal.
We would rely on a background process - e.g. a background task on our API - to detect newsletter issues whose delivery stopped halfway. The process would then drive the delivery to completion.
Healing would happen asynchronously - outside the lifecycle of the original POST /admin/newsletters request.

Passive recovery makes for a poor user experience - the newsletter author has to submit the form over and over again until they receive a success response back. The author is in an awkward position - is the error they are seeing related to a transient issue encountered during delivery? Or is it the database failing when trying to fetch the list of subscribers? In other words, will retries actually lead to a success, eventually?
If they choose to give up retrying, while in the middle of delivery, the system is once again left in an inconsistent state.

We will therefore opt for active recovery in our implementation.

10.4. Asynchronous Processing

Active recovery has its rough edges as well.
We do not want the author to receive an error back from the API while, under the hood, newsletter delivery has been kicked off.

We can improve the user experience by changing the expectations for POST /admin/newsletters.
A successful form submission currently implies that the new newsletter issue has been validated and delivered to all subscribers.
We can reduce its scope: a successful form submission will mean that the newsletter has been validated and will be delivered to all subscribers, asynchronously.

In other words, a successful form submission guarantees to the author that the delivery workflow has been correctly kicked off. They just need to wait for all emails to go out, but they have nothing to worry about - it will happen17.

The request handler of POST /admin/newsletters is no longer going to dispatch emails - it will simply enqueue a list of tasks that will be fulfilled asynchronously by a set of background workers. We will use another Postgres table as our task queue - it will be named issue_delivery_queue.

At a glance, it might look like a small difference - we are just shifting around when work needs to happen. But it has a powerful implication: we recover transactionality.
Our subscribers' data, our idempotency records, the task queue - they all live in Postgres. All the operations performed by POST /admin/newsletters can be wrapped in a single SQL transaction - either they all succeed, or nothing happened.
The caller no longer needs to second guess the response of our API or try to reason about its implementation!

10.4.1. newsletter_issues

By dispatching eagerly, we never needed to store the details of the issues we were sending out. To pursue our new strategy, this has to change: we will start persisting newsletter issues in a dedicated newsletter_issues table.
The schema should not come as a surprise:

sqlx migrate add create_newsletter_issues_table
-- migrations/20220211080603_create_newsletter_issues_table.sql
CREATE TABLE newsletter_issues (
   newsletter_issue_id uuid NOT NULL,
   title TEXT NOT NULL,
   text_content TEXT NOT NULL,
   html_content TEXT NOT NULL,
   published_at TEXT NOT NULL,
   PRIMARY KEY(newsletter_issue_id)
);
sqlx migrate run

Let's write a matching insert_newsletter_issue function - we'll need it soon:

//! src/routes/admin/newsletter/post.rs
use sqlx::{Postgres, Transaction};
use uuid::Uuid;
// [...]

#[tracing::instrument(skip_all)]
async fn insert_newsletter_issue(
    transaction: &mut Transaction<'_, Postgres>,
    title: &str,
    text_content: &str,
    html_content: &str,
) -> Result<Uuid, sqlx::Error> {
    let newsletter_issue_id = Uuid::new_v4();
    sqlx::query!(
        r#"
        INSERT INTO newsletter_issues (
            newsletter_issue_id, 
            title, 
            text_content, 
            html_content,
            published_at
        )
        VALUES ($1, $2, $3, $4, now())
        "#,
        newsletter_issue_id,
        title,
        text_content,
        html_content 
    )
    .execute(transaction)
    .await?;
    Ok(newsletter_issue_id)
}

10.4.2. issue_delivery_queue

When it comes to tasks, we are going to keep it simple:

sqlx migrate add create_issue_delivery_queue_table
-- migrations/20220211080603_create_issue_delivery_queue_table.sql
CREATE TABLE issue_delivery_queue (
   newsletter_issue_id uuid NOT NULL REFERENCES newsletter_issues (newsletter_issue_id),
   subscriber_email TEXT NOT NULL,
   PRIMARY KEY(newsletter_issue_id, subscriber_email)
);
sqlx migrate run

We can create the task set using a single insert query:

//! src/routes/admin/newsletter/post.rs
// [...]

#[tracing::instrument(skip_all)]
async fn enqueue_delivery_tasks(
    transaction: &mut Transaction<'_, Postgres>,
    newsletter_issue_id: Uuid,
) -> Result<(), sqlx::Error> {
    sqlx::query!(
        r#"
        INSERT INTO issue_delivery_queue (
            newsletter_issue_id, 
            subscriber_email
        )
        SELECT $1, email
        FROM subscriptions
        WHERE status = 'confirmed'
        "#,
        newsletter_issue_id,
    )
    .execute(transaction)
    .await?;
    Ok(())
}

10.4.3. POST /admin/newsletters

We are ready to overhaul our request handler by putting together the pieces we just built:

//! src/routes/admin/newsletter/post.rs
// [...]

#[tracing::instrument(
    name = "Publish a newsletter issue",
    skip_all,
    fields(user_id=%&*user_id)
)]
pub async fn publish_newsletter(
    form: web::Form<FormData>,
    pool: web::Data<PgPool>,
    user_id: web::ReqData<UserId>,
) -> Result<HttpResponse, actix_web::Error> {
    let user_id = user_id.into_inner();
    let FormData {
        title,
        text_content,
        html_content,
        idempotency_key,
    } = form.0;
    let idempotency_key: IdempotencyKey = idempotency_key.try_into().map_err(e400)?;
    let mut transaction = match try_processing(&pool, &idempotency_key, *user_id)
        .await
        .map_err(e500)?
    {
        NextAction::StartProcessing(t) => t,
        NextAction::ReturnSavedResponse(saved_response) => {
            success_message().send();
            return Ok(saved_response);
        }
    };
    let issue_id = insert_newsletter_issue(&mut transaction, &title, &text_content, &html_content)
        .await
        .context("Failed to store newsletter issue details")
        .map_err(e500)?;
    enqueue_delivery_tasks(&mut transaction, issue_id)
        .await
        .context("Failed to enqueue delivery tasks")
        .map_err(e500)?;
    let response = see_other("/admin/newsletters");
    let response = save_response(transaction, &idempotency_key, *user_id, response)
        .await
        .map_err(e500)?;
    success_message().send();
    Ok(response)
}


fn success_message() -> FlashMessage {
    FlashMessage::info(
        "The newsletter issue has been accepted - \
        emails will go out shortly.",
    )
}

We can also delete get_confirmed_subscribers and ConfirmedSubscriber.

The logic in the request handler is now quite linear. The author is also going to have a quicker feedback loop - the endpoint no longer has to iterate over hundreds of subscribers before redirecting them to a success page.

10.4.4. Email Processing

Let's move our focus to the delivery instead.
We need to consume tasks from issue_delivery_queue. There are going to be multiple delivery workers running at the same time - at least one per API instance.
A naive approach would get us into trouble:

SELECT (newsletter_issue_id, subscriber_email)
FROM issue_delivery_queue
LIMIT 1

Multiple workers would pick the same task and we would end up with a lot of duplicated emails.

We need synchronization. Once again, we are going to leverage the database - we will use row-level locks.

Postgres 9.5 introduced the SKIP LOCKED clause - it allows SELECT statements to ignore all rows that are currently locked by another concurrent operation.
FOR UPDATE, instead, can be used to lock the rows returned by a SELECT.
We are going to combine them:

SELECT (newsletter_issue_id, subscriber_email)
FROM issue_delivery_queue
FOR UPDATE
SKIP LOCKED
LIMIT 1

This gives us a concurrency-safe queue.
Each worker is going to select an uncontested task (SKIP LOCKED and LIMIT 1); the task itself is going to become unavailable to other workers (FOR UPDATE) for the duration of the over-arching SQL transaction.
When the task is complete (i.e. the email has been sent), we are going to delete the corresponding row from issue_delivery_queue and commit our changes.

Let's code it up:

//! lib.rs
// [...]
pub mod issue_delivery_worker;
//! src/issue_delivery_worker;
use crate::email_client::EmailClient;
use sqlx::{PgPool, Postgres, Transaction};
use tracing::{field::display, Span};
use uuid::Uuid;

#[tracing::instrument(
    skip_all,
    fields(
        newsletter_issue_id=tracing::field::Empty,
        subscriber_email=tracing::field::Empty
    ),
    err
)]
async fn try_execute_task(
    pool: &PgPool, 
    email_client: &EmailClient
) -> Result<(), anyhow::Error> {
    if let Some((transaction, issue_id, email)) = dequeue_task(pool).await? {
        Span::current()
            .record("newsletter_issue_id", &display(issue_id))
            .record("subscriber_email", &display(&email));
        // TODO: send email
        delete_task(transaction, issue_id, &email).await?;
    }
    Ok(())
}

type PgTransaction = Transaction<'static, Postgres>;

#[tracing::instrument(skip_all)]
async fn dequeue_task(
    pool: &PgPool,
) -> Result<Option<(PgTransaction, Uuid, String)>, anyhow::Error> {
    let mut transaction = pool.begin().await?;
    let r = sqlx::query!(
        r#"
        SELECT newsletter_issue_id, subscriber_email
        FROM issue_delivery_queue
        FOR UPDATE
        SKIP LOCKED
        LIMIT 1
        "#,
    )
    .fetch_optional(&mut transaction)
    .await?;
    if let Some(r) = r {
        Ok(Some((
            transaction,
            r.newsletter_issue_id,
            r.subscriber_email,
        )))
    } else {
        Ok(None)
    }
}

#[tracing::instrument(skip_all)]
async fn delete_task(
    mut transaction: PgTransaction,
    issue_id: Uuid,
    email: &str,
) -> Result<(), anyhow::Error> {
    sqlx::query!(
        r#"
        DELETE FROM issue_delivery_queue
        WHERE 
            newsletter_issue_id = $1 AND
            subscriber_email = $2 
        "#,
        issue_id,
        email
    )
    .execute(&mut transaction)
    .await?;
    transaction.commit().await?;
    Ok(())
}

To actually send the email, we need to fetch the newsletter content first:

//! src/issue_delivery_worker;
// [...]

struct NewsletterIssue {
    title: String,
    text_content: String,
    html_content: String,
}

#[tracing::instrument(skip_all)]
async fn get_issue(
    pool: &PgPool,
    issue_id: Uuid
) -> Result<NewsletterIssue, anyhow::Error> {
    let issue = sqlx::query_as!(
        NewsletterIssue,
        r#"
        SELECT title, text_content, html_content
        FROM newsletter_issues
        WHERE
            newsletter_issue_id = $1
        "#,
        issue_id
    )
    .fetch_one(pool)
    .await?;
    Ok(issue)
}

We can then recover the dispatch logic that used to live in POST /admin/newsletters:

//! src/issue_delivery_worker;
use crate::domain::SubscriberEmail;
// [...]

#[tracing::instrument(/* */)]
async fn try_execute_task(
    pool: &PgPool, 
    email_client: &EmailClient
) -> Result<(), anyhow::Error> {
    if let Some((transaction, issue_id, email)) = dequeue_task(pool).await? {
        // [...]
        match SubscriberEmail::parse(email.clone()) {
            Ok(email) => {
                let issue = get_issue(pool, issue_id).await?;
                if let Err(e) = email_client
                    .send_email(
                        &email,
                        &issue.title,
                        &issue.html_content,
                        &issue.text_content,
                    )
                    .await
                {
                    tracing::error!(
                        error.cause_chain = ?e,
                        error.message = %e,
                        "Failed to deliver issue to a confirmed subscriber. \
                         Skipping.",
                    );
                }
            }
            Err(e) => {
                tracing::error!(
                    error.cause_chain = ?e,
                    error.message = %e,
                    "Skipping a confirmed subscriber. \
                     Their stored contact details are invalid",
                );
            }
        }
        delete_task(transaction, issue_id, &email).await?;
    }
    Ok(())
}

As you can see, we do not retry when the delivery attempt fails due to a Postmark error.
This could be changed by enhancing issue_delivery_queue - e.g. adding a n_retries and execute_after columns to keep track of how many attempts have already taken place and how long we should wait before trying again. Try implementing it as an exercise!

10.4.5. Worker loop

try_execute_task tries to deliver a single email - we need a background task that keeps pulling from issue_delivery_queue and fulfills tasks as they become available.

We can use an infinite loop:

//! src/issue_delivery_worker;
use std::time::Duration;
// [...]

async fn worker_loop(
    pool: PgPool, 
    email_client: EmailClient
) -> Result<(), anyhow::Error> {
    loop {
        if try_execute_task(&pool, &email_client).await.is_err() {
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
    }
}

If we experience a transient failure18, we need to sleep for a while to improve our future chances of success. This could be further refined by introducing an exponential backoff with jitter.

There is another scenario we need to keep in mind, apart from failure: issue_delivery_queue might be empty.
When that is the case, try_execute_task is going to be invoked continuously. That translates into an avalanche of unnecessary queries to the database.
We can mitigate this risk by changing the signature of try_execute_task - we need to know if it actually managed to dequeue something.

//! src/issue_delivery_worker.rs
// [...]

enum ExecutionOutcome {
    TaskCompleted,
    EmptyQueue,
}

#[tracing::instrument(/* */)]
async fn try_execute_task(/* */) -> Result<ExecutionOutcome, anyhow::Error> {
    let task = dequeue_task(pool).await?;
    if task.is_none() {
        return Ok(ExecutionOutcome::EmptyQueue);
    }
    let (transaction, issue_id, email) = task.unwrap();
    // [...]
    Ok(ExecutionOutcome::TaskCompleted)
}

worker_loop can now become smarter:

//! src/issue_delivery_worker.rs
// [...]

async fn worker_loop(/* */) -> Result</* */> {
    loop {
        match try_execute_task(&pool, &email_client).await {
            Ok(ExecutionOutcome::EmptyQueue) => {
                tokio::time::sleep(Duration::from_secs(10)).await;
            }
             Err(_) => {
                tokio::time::sleep(Duration::from_secs(1)).await;
            }
            Ok(ExecutionOutcome::TaskCompleted) => {}
        }
    }
}

No more busy looping, yay!

10.4.6. Launching Background Workers

We have a worker loop - but it is not launched anywhere.
Let's start by building the required dependencies based on the configuration values19:

//! src/issue_delivery_worker.rs
use crate::{configuration::Settings, startup::get_connection_pool};
// [...]

pub async fn run_worker_until_stopped(
    configuration: Settings
) -> Result<(), anyhow::Error> {
    let connection_pool = get_connection_pool(&configuration.database);

    let sender_email = configuration
        .email_client
        .sender()
        .expect("Invalid sender email address.");
    let timeout = configuration.email_client.timeout();
    let email_client = EmailClient::new(
        configuration.email_client.base_url,
        sender_email,
        configuration.email_client.authorization_token,
        timeout,
    );
    worker_loop(connection_pool, email_client).await
}

To run our background worker and the API side-to-side we need to restructure our main function.
We are going to build the Future for each of the two long-running tasks - Futures are lazy in Rust, so nothing happens until they are actually awaited.
We will use tokio::select! to get both tasks to make progress concurrently. tokio::select! returns as soon as one of the two tasks completes or errors out:

//! src/main.rs
use zero2prod::issue_delivery_worker::run_worker_until_stopped;
// [...]

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let subscriber = get_subscriber("zero2prod".into(), "info".into(), std::io::stdout);
    init_subscriber(subscriber);

    let configuration = get_configuration().expect("Failed to read configuration.");
    let application = Application::build(configuration.clone())
        .await?
        .run_until_stopped();
    let worker = run_worker_until_stopped(configuration);

    tokio::select! {
        _ = application => {},
        _ = worker => {},
    };

    Ok(())
}

There is a pitfall to be mindful of when using tokio::select! - all selected Futures are polled as a single task. This has consequences, as tokio's documentation highlights:

By running all async expressions on the current task, the expressions are able to run concurrently but not in parallel. This means all expressions are run on the same thread and if one branch blocks the thread, all other expressions will be unable to continue. If parallelism is required, spawn each async expression using tokio::spawn and pass the join handle to select!.

We should definitely follow their recommendation:

//! src/main.rs
// [...]

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // [...]
    let application = Application::build(configuration.clone()).await?;
    let application_task = tokio::spawn(application.run_until_stopped());
    let worker_task = tokio::spawn(run_worker_until_stopped(configuration));

    tokio::select! {
        _ = application_task => {},
        _ = worker_task => {},
    };

    Ok(())
}

As it stands, we have no visibility into which task completed first or if they completed successfully at all. Let's add some logging:

//! src/main.rs
use std::fmt::{Debug, Display};
use tokio::task::JoinError;
// [...]

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // [...]
    tokio::select! {
        o = application_task => report_exit("API", o),
        o = worker_task =>  report_exit("Background worker", o),
    };

    Ok(())
}


fn report_exit(
    task_name: &str, 
    outcome: Result<Result<(), impl Debug + Display>, JoinError>
) {
    match outcome {
        Ok(Ok(())) => {
            tracing::info!("{} has exited", task_name)
        }
        Ok(Err(e)) => {
            tracing::error!(
                error.cause_chain = ?e, 
                error.message = %e, 
                "{} failed", 
                task_name
            )
        }
        Err(e) => {
            tracing::error!(
                error.cause_chain = ?e, 
                error.message = %e, 
                "{}' task failed to complete", 
                task_name
            )
        }
    }
}

It is looking pretty solid!

10.4.7. Updating The Test Suite

We have one little problem left - many of our tests are failing. They were written when emails were being delivered synchronously, which is no longer the case. How should we deal with them?

Launching a background worker would mimic the behaviour of our application, but it would make for a fragile test suite - we would have to sleep for arbitrary time intervals waiting for the background worker to process the email tasks we just enqueued. We are bound to have flaky tests sooner or later.
An alternative approach relies on launching the worker on demand, asking it to consume all available tasks. It deviates slightly from the behaviour in our main function, but it manages to exercise most of the code while being significantly more robust. This is what we are going for!

Let's add an EmailClient instance to our TestApp:

//! src/configuration.rs
use crate::email_client::EmailClient;
// [...]

impl EmailClientSettings {
    pub fn client(self) -> EmailClient {
        let sender_email = self.sender().expect("Invalid sender email address.");
        let timeout = self.timeout();
        EmailClient::new(
            self.base_url,
            sender_email,
            self.authorization_token,
            timeout,
        )
    }

    // [...]
}
//! tests/api/helpers.rs
use zero2prod::email_client::EmailClient;
// [...]

pub struct TestApp {
    // [...]
    pub email_client: EmailClient
}

pub async fn spawn_app() -> TestApp {
    // [...]
    let test_app = TestApp {
        // [...]
        email_client: configuration.email_client.client()
    };
    // [...]
}
//! src/issue_delivery_worker.rs
// [...]

pub async fn run_worker_until_stopped(
    configuration: Settings
) -> Result<(), anyhow::Error> {
    let connection_pool = get_connection_pool(&configuration.database);
    // Use helper function!
    let email_client = configuration.email_client.client();
    worker_loop(connection_pool, email_client).await
}
//! src/startup.rs
// [...]
impl Application {
    pub async fn build(configuration: Settings) -> Result<Self, anyhow::Error> {
        let connection_pool = get_connection_pool(&configuration.database);
        // Use helper function!
        let email_client = configuration.email_client.client();
        // [...]
    }
    // [...]
}

We can then write a helper to consume all enqueued tasks:

//! tests/api/helpers.rs
use zero2prod::issue_delivery_worker::{try_execute_task, ExecutionOutcome};
// [...]

impl TestApp {
    pub async fn dispatch_all_pending_emails(&self) {
        loop {
            if let ExecutionOutcome::EmptyQueue =
                try_execute_task(&self.db_pool, &self.email_client)
                    .await
                    .unwrap()
            {
                break;
            }
        }
    }
    // [...]
}
//! src/issue_delivery_worker.rs
// [...] 

// Mark as pub
pub enum ExecutionOutcome {/* */}

#[tracing::instrument(/* */)]
// Mark as pub
pub async fn try_execute_task(/* */) -> Result</* */> {/* */}

We can update all the impacted test cases:

//! tests/api/newsletter.rs
// [...]

#[tokio::test]
async fn newsletters_are_not_delivered_to_unconfirmed_subscribers() {
    // [...]
    assert!(html_page.contains(
        "<p><i>The newsletter issue has been accepted - \
        emails will go out shortly.</i></p>"
    ));
    app.dispatch_all_pending_emails().await;
    // Mock verifies on Drop that we haven't sent the newsletter email
}

#[tokio::test]
async fn newsletters_are_delivered_to_confirmed_subscribers() {
    // [...]
    assert!(html_page.contains(
        "<p><i>The newsletter issue has been accepted - \
        emails will go out shortly.</i></p>"
    ));
    app.dispatch_all_pending_emails().await;
    // Mock verifies on Drop that we have sent the newsletter email
}

#[tokio::test]
async fn newsletter_creation_is_idempotent() {
    // [...]
    // Act - Part 2 - Follow the redirect
    let html_page = app.get_publish_newsletter_html().await;
    assert!(html_page.contains(
        "<p><i>The newsletter issue has been accepted - \
        emails will go out shortly.</i></p>"
    ));
    // [...]
    // Act - Part 4 - Follow the redirect
    let html_page = app.get_publish_newsletter_html().await;
    assert!(html_page.contains(
        "<p><i>The newsletter issue has been accepted - \
        emails will go out shortly.</i></p>"
    ));
    app.dispatch_all_pending_emails().await;
    // Mock verifies on Drop that we have sent the newsletter email **once**
}

#[tokio::test]
async fn concurrent_form_submission_is_handled_gracefully() {
    // [...]
    app.dispatch_all_pending_emails().await;
    // Mock verifies on Drop that we have sent the newsletter email **once**
}

// We deleted `transient_errors_do_not_cause_duplicate_deliveries_on_retries`
// It is no longer relevant given the redesign

The tests are passing, we made it!

Well, we almost made it.
We neglected one detail: there is no expiry mechanism for our idempotency keys. Try designing one as an exercise, using what we learned on background workers as a reference.

11. Epilogue

This is where our journey together comes to an end.

We started from an empty skeleton. Look at our project now: fully functional, well tested, reasonably secure - a proper minimum viable product. The project was never the goal though - it was an excuse, an opportunity to see what it feels like to write a production-ready API using Rust.

Zero To Production In Rust started with a question, a question I hear every other day:

Can Rust be a productive language for API development?

I have taken you on a tour. I showed you a little corner of the Rust ecosystem, an opinionated yet powerful toolkit. I tried to explain, to the best of my abilities, the key language features.
The choice is now yours: you have learned enough to keep walking on your own, if you wish to do so.

Rust's adoption in the industry is taking off: we are living through an inflection point. It was my ambition to write a book that could serve as a ticket for this rising tide - an onboarding guide for those who want to be a part of this story.
This is just the beginning - the future of this community is yet to be written, but it is looking bright.


Zero To Production In Rust is a hands-on introduction to backend development in Rust.
Subscribe to the newsletter to be notified when a new episode is published.

12. Footnotes

Click to expand!
1

At the end of chapter 10 you were asked to convert POST /newsletters (JSON + 'Basic' auth) into POST /admin/newsletters (HTML Form data + session-based auth) as a take-home exercise. Your implementation might differ slightly from mine, therefore the code blocks here might not match exactly what you see in your IDE. Check the book's GitHub repository to compare solutions.

2

It is up for debate if this is actually the best way to handle an invalid body. Assuming no mistakes were made on our side, submitting the HTML form we serve on GET /admin/newsletters should always result into a request body that passes the basic validation done by the Json extractor - a.k.a. we get all the fields we expect. But mistakes are a possibility - we cannot rule out that some of the types used in FormData as fields might start doing more advanced validation in the future - it'd be safer to redirect the user back to the form page with a proper error message when body validation fails. You can try it out as an exercise.

3

Postmark provides a batch email API - it is not clear, from their documentation, if they retry messages within a batch to ensure best-effort delivery. Regardless, there is a maximum batch size (500) - if your audience is big enough you have to think about how to batch batches: back to square zero. From a learning perspective, we can safely ignore their batch API entirely.

4

Client-side JavaScript can be used to disable buttons after they have been clicked, reducing the likelihood of this scenario.

5

Real-world payment systems would most likely return a 202 Accepted - payment authorization, execution and settlement happen at different points in time. We are keeping things simple for the sake of our example.

6

The other accounts on the payment network are not exposed to us via the API (i.e. we can't query for John's payment history), but we can still observe if one of our payments did or did not reach its beneficiary - e.g. by calling them or if they reach out to us to complain they haven't received the money yet! API calls can have a material effect on the physical world around us - that's why this whole computer thing is so powerful and scary at the same time!

7

An Internet-Draft in the httpapi IETF working group proposed to standardize the Idempotency-Key header, but the conversation does not seem to have moved forward (the draft expired in January 2022).

8

We are not actually going to implement a likeness check for incoming requests - it can get quite tricky: do the headers matter? All of them? A subset of them? Does the body need to match byte-by-byte? Is it enough if it's semantically equivalent (e.g. two JSON objects with identical keys and values)? It can be done, but it is beyond the scope of this chapter.

9

If we are being pessimistic, this could be abused to mount a denial of service attack against the API. It can be avoided by enforcing fair-usage limitations - e.g. it's OK to have a handful of concurrent requests for the same idempotency key, but the server will start returning errors if we end up dealing with tens of duplicates.

10

Two different newsletter issues should not generate the same subscriber-specific idempotency key. If that were to happen, you wouldn't be able to send two different issues one after the other because Postmark's idempotency logic would prevent the second set of emails from going out. This is why we must include a fingerprint of the incoming request content in the generation logic for the subscriber-specific idempotency key - it ensures a unique outcome for each subscriber-newsletter issue pair. Alternatively, we must implement a likeness check to ensure that the same idempotency key cannot be used for two different requests to POST /admin/newsletters - i.e. the idempotency key is enough to ensure that the newsletter content is not the same.

11

This is equivalent to a non-repeatable read in a relational database.

12

We technically have another option: stream the response body directly to the database and then stream it back from the database directly to the caller.

13

You can think of Bytes as a Vec<u8> with extra perks - check out the documentation of the bytes crate for more details.

14

If the type name ends up being too long, some truncation takes place as well.

15

Protocols like 2-phase commit would allow us to have an all-or-nothing semantic in a distributed system, but they are not widely supported due to their complexity and drawbacks.

16

More often than not, the other system lives within your organization - it's just a different microservice, with its own isolated data store. You have traded the inner complexity of the monolith for the complexity of orchestrating changes across multiple sub-system - complexity has to live somewhere.

17

The author would still benefit from having visibility into the delivery process - e.g. a page to track how many emails are still outstanding for a certain newsletter issue. Workflow observability is out of scope for the book, but it might be an interesting exercise to pursue on your own.

18

Almost all errors returned by try_execute_task are transient in nature, except for invalid subscriber emails - sleeping is not going to fix those. Try refining the implementation to distinguish between transient and fatal failures, empowering worker_loop to react appropriately.

19

We are not re-using the dependencies we built for our actix_web application. This separation enables us, for example, to precisely control how many database connections are allocated to background tasks vs our API workloads. At the same time, this is clearly unnecessary at this stage: we could have built a single pool and HTTP client, passing Arc pointers to both sub-systems (API and worker). The right choice depends on the circumstances and the overall set of constraints.

Book - Table Of Contents

Click to expand!

The Table of Contents is provisional and might change over time. The draft below is the most accurate picture at this point in time.

  1. Getting Started
    • Installing The Rust Toolchain
    • Project Setup
    • IDEs
    • Continuous Integration
  2. Our Driving Example
    • What Should Our Newsletter Do?
    • Working In Iterations
  3. Sign Up A New Subscriber
  4. Telemetry
    • Unknown Unknowns
    • Observability
    • Logging
    • Instrumenting /POST subscriptions
    • Structured Logging
  5. Go Live
    • We Must Talk About Deployments
    • Choosing Our Tools
    • A Dockerfile For Our Application
    • Deploy To DigitalOcean Apps Platform
  6. Rejecting Invalid Subscribers #1
    • Requirements
    • First Implementation
    • Validation Is A Leaky Cauldron
    • Type-Driven Development
    • Ownership Meets Invariants
    • Panics
    • Error As Values - Result
  7. Reject Invalid Subscribers #2
  8. Error Handling
    • What Is The Purpose Of Errors?
    • Error Reporting For Operators
    • Errors For Control Flow
    • Avoid "Ball Of Mud" Error Enums
    • Who Should Log Errors?
  9. Naive Newsletter Delivery
    • User Stories Are Not Set In Stone
    • Do Not Spam Unconfirmed Subscribers
    • All Confirmed Subscribers Receive New Issues
    • Implementation Strategy
    • Body Schema
    • Fetch Confirmed Subscribers List
    • Send Newsletter Emails
    • Validation Of Stored Data
    • Limitations Of The Naive Approach
  10. Securing Our API
  11. Fault-tolerant Newsletter Delivery