Zero Downtime Deployments

This article is a sample from Zero To Production In Rust, a book on backend development in Rust.
You can get a copy of the book on zero2prod.com.
Subscribe to the newsletter to be notified when a new episode is published.

TL;DR

We are ready to stitch together our email confirmation flow, but how do we roll it out without disrupting our service?

We approach the topic of zero downtime deployments: what it takes (💖 load balancers), how it impacts the way we sequence our work (e.g. database migrations).
By the end of the chapter, after a few red-green-refactor iterations, we'll finally get to deploy the new feature.

Chapter 7 - Part 2

  1. Confirmation Emails
  2. Zero Downtime Deployments
  3. Database Migrations
  4. Sending A Confirmation Email
  5. Database Transactions
  6. Summary

Confirmation Emails

Time to go back to the plan we drafted at the beginning of chapter 7:

  • write a module to send an email;
  • adapt the logic of our existing POST /subscriptions request handler to match the new requirements;
  • write a GET /subscriptions/confirm request handler from scratch.

The first item is done, time to move on to the remaining two on the list.
We had a sketch of how the two handlers should work:

POST /subscriptions will:

  • add the subscriber details to the database in the subscriptions table, with status equal to pending_confirmation;
  • generate a (unique) subscription_token;
  • store subscription_token in our database against the subscriber id in a subscription_tokens table;
  • send an email to the new subscriber containing a link structured as https://<our-api-domain>/subscriptions/confirm?token=<subscription_token>;
  • return a 200 OK.

Once they click on the link, a browser tab will open up and a GET request will be fired to our GET /subscriptions/confirm endpoint. The request handler will:

  • retrieve subscription_token from the query parameters;
  • retrieve the subscriber id associated with subscription_token from the subscription_tokens table;
  • update the subscriber status from pending_confirmation to active in the subscriptions table;
  • return a 200 OK.

This gives us a fairly precise picture of how the application is going to work once we are done with the implementation.
It does not help us much to figure out how to get there.

Where should we start from?
Should we immediately tackle the changes to /subscriptions?
Should we get /subscriptions/confirm out of the way?

We need to find an implementation route that can be rolled out with zero downtime.

Zero Downtime Deployments

Reliability

In Chapter 5 we deployed our application to a public cloud provider.
It is live: we are not sending out newsletter issues yet, but people can subscribe while we figure that part out.

Once an application is serving production traffic, we need to make sure it is reliable.

Reliable means different things in different contexts. If you are selling a data storage solution, for example, it should not lose (or corrupt!) customers' data.

In a commercial setting, the definition of reliability for your application will often be encoded in a Service Level Agreement (SLA).
An SLA is a contractual obligation: you guarantee a certain level of reliability and commit to compensate your customers (usually with discounts or credits) if your service fails to live up to the expectations.

If you are selling access to an API, for example, you will usually have something related to availability - e.g. the API should successfully respond to at least 99.99% of well-formed incoming requests, often referred to as "four nines of availability".
Phrased differently (and assuming a uniform distribution of incoming requests over time), you can only afford up to 52 minutes of downtime over a whole year. Achieving four nines of availability is tough.

There is no silver bullet to build a highly available solution: it requires work from the application layer all the way down to the infrastructure layer.
One thing is certain, though: if you want to operate a highly available service, you should master zero downtime deployments - users should be able to use the service before, during and after the rollout of a new version of the application to production.
This is even more important if you are practising continuous deployment: you cannot release multiple times a day if every release triggers a small outage.

Deployment Strategies

Naive Deployment

Before diving deeper into zero downtime deployments, let's have a look at the "naive" approach.
Version A of our service is running in production and we want to roll out version B:

There is a non-zero amount of time where there is no application running in the cluster able to serve user traffic - we are experiencing downtime!

To do better we need to take a closer look at how our infrastructure is set up.

Load Balancers

A load-balanced application with three replicas

We have multiple copies1 of our application running behind a load balancer.
Each replica of our application is registered with the load balancer as a backend.
Every time somebody sends a request to our API, they hit our load balancer which is then in charge of choosing one of the available backends to fulfill the incoming request.

Load balancers usually support adding (and removing) backends dynamically.
This enables a few interesting patterns.

Horizontal Scaling

We can add more capacity when experiencing a traffic spike by spinning up more replicas of our application (i.e. horizontal scaling).
It helps to spread the load until the work expected of a single instance becomes manageable.

We will get back to this topic later in the book when discussing metrics and autoscaling.

Health Checks

We can ask the load balancer to keep an eye on the health of the registered backends.
Oversimplifying, health checking can be:

This is a critical capability to achieve self-healing in a cloud-native environment: the platform can detect if an application is not behaving as expected and automatically remove it from the list of available backends to mitigate or nullify the impact on users2.

Rolling Update Deployments

We can leverage our load balancer to perform zero downtime deployments.

Let's look at a snapshot of our production environments: we have three replicas of version A of our application registered as backends for our load balancer.
We want to deploy version B.

A load-balanced application with three replicas on version A

We start by spinning up one replica of version B of our application.
When the application is ready to serve traffic (i.e. a few health check requests have succeeded) we register it as a backend with our load balancer.

A load-balanced application with three replicas on version A and one on version B

We have four replicas of our application now: 3 running version A, 1 running version B. All four are serving live traffic.
If all is well, we switch off one of the replicas running version A.

A load-balanced application with two replicas on version A and one on version B

We follow the same process to replace all replicas running version A until all registered backends are running version B.

This deployment strategy is called rolling update: we run the old and the new version of the application side by side, serving live traffic with both.
Throughout the process we always have three or more healthy backends: users should not experience any kind of service degradation (assuming version B is not buggy).

Digital Ocean App Platform

We are running our application on Digital Ocean App Platform.
Their documentation boasts of offering zero downtime deployments out of the box, but they do not provide details on how it is achieved.

A few experiments confirmed that they are indeed relying on a rolling update deployment strategy.

A rolling update is not the only possible strategy for a zero downtime deployment - blue-green and canary deployments are equally popular variations over the same underlying principles.
Choose the most appropriate solution for your application based on the capabilities offered by your platform and your requirements.

Database Migrations

State Is Kept Outside The Application

Load balancing relies on a strong assumption: no matter which backend is used to serve an incoming request, the outcome will be the same.

This is something we discussed already in Chapter 3: to ensure high availability in a fault-prone environment, cloud-native applications are stateless - they delegate all persistence concerns to external systems (i.e. databases).

That's why load balancing works: all backends are talking to the same database to query and manipulate the same state.

Think of a database as a single gigantic global variable. Continuously accessed and mutated by all replicas of our application.
State is hard.

Deployments And Migrations

During a rolling update deployment, the old and the new version of the application are both serving live traffic, side by side.
From a different perspective: the old and the new version of the application are using the same database at the same time.

To avoid downtime, we need a database schema that is understood by both versions.
This is not an issue for most of our deployments, but it is a serious constraint when we need to evolve the schema.

Let's circle back to the job we set out to do, confirmation emails.
To move forward with the implementation strategy we identified, we need to evolve our database schema as follows:

Let's go over the possible scenarios to convince ourselves that we cannot possibly deploy confirmation emails all at once without incurring downtime.

We could first migrate the database and then deploy the new version.
This implies that the current version is running against the migrated database for some time: our current implementation of POST /subscriptions does not know about status and it tries to insert new rows into subscriptions without populating it. Given that status is constrained to be NOT NULL (i.e. it's mandatory), all inserts would fail - we would not be able to accept new subscribers until the new version of the application is deployed.
Not good.

We could first deploy the new version and then migrate the database.
We get the opposite scenario: the new version of the application is running against the old database schema. When POST /subscriptions is called, it tries to insert a row into subscriptions with a status field that does not exist - all inserts fail and we cannot accept new subscribers until the database is migrated.
Once again, not good.

Multi-step Migrations

A big bang release won't cut it - we need to get there in multiple, smaller steps.
The pattern is somewhat similar to what we see in test-driven development: we don't change code and tests at the same time - one of the two needs to stay still while the other changes.

The same applies to database migrations and deployments: if we want to evolve the database schema we cannot change the application behaviour at the same time.
Think of it as a database refactoring: we are laying down the foundations in order to build the behaviour we need later on.

A New Mandatory Column

Let's start by looking at the status column.

Step 1: Add As Optional

We start by keeping the application code stable.
On the database side, we generate a new migration script:

sqlx migrate add add_status_to_subscriptions
Creating migrations/20210307181858_add_status_to_subscriptions.sql

We can now edit the migration script to add status as an optional column to subscriptions:

ALTER TABLE subscriptions ADD COLUMN status TEXT NULL;

Run the migration against your local database (SKIP_DOCKER=true ./scripts/init_db.sh): we can now run our test suite to make sure that the code works as is even against the new database schema.

It should pass: go ahead and migrate the production database.

Step 2: Start Using The New Column

status now exists: we can start using it!
To be precise, we can start writing to it: every time a new subscriber is inserted, we will set status to confirmed.

We just need to change our insertion query from

//! src/routes/subscriptions.rs
// [...]

pub async fn insert_subscriber([...]) -> Result<(), sqlx::Error> {
    sqlx::query!(
        r#"INSERT INTO subscriptions (id, email, name, subscribed_at)
        VALUES ($1, $2, $3, $4)"#,
        // [...]
    )
    // [...]
}

to

//! src/routes/subscriptions.rs
// [...]

pub async fn insert_subscriber([...]) -> Result<(), sqlx::Error> {
    sqlx::query!(
        r#"INSERT INTO subscriptions (id, email, name, subscribed_at, status)
        VALUES ($1, $2, $3, $4, 'confirmed')"#,
        // [...]
    )
    // [...]
}

Tests should pass - deploy the new version of the application to production.

Step 3: Backfill And Mark As NOT NULL

The latest version of the application ensures that status is populated for all new subscribers.
To mark status as NOT NULL we just need to backfill the value for historical records: we'll then be free to alter the column.

Let's generate a new migration script:

sqlx migrate add make_status_not_null_in_subscriptions
Creating migrations/20210307184428_make_status_not_null_in_subscriptions.sql

The SQL migration looks like this:

-- We wrap the whole migration in a transaction to make sure
-- it succeeds or fails atomically. We will discuss SQL transactions
-- in more details towards the end of this chapter!
-- `sqlx` does not do it automatically for us.
BEGIN;
    -- Backfill `status` for historical entries
    UPDATE subscriptions
        SET status = 'confirmed'
        WHERE status IS NULL;
    -- Make `status` mandatory
    ALTER TABLE subscriptions ALTER COLUMN status SET NOT NULL;
COMMIT;

We can migrate our local database, run our test suite and then deploy our production database.
We made it, we added status as a new mandatory column!

A New Table

What about subscription_tokens? Do we need three steps there as well?

No, it is much simpler: we add the new table in a migration while the application keeps ignoring it.
We can then deploy a new version of the application that uses it to enable confirmation emails.

Let's generate a new migration script:

sqlx migrate add create_subscription_tokens_table
Creating migrations/20210307185410_create_subscription_tokens_table.sql

The migration is similar to the very first one we wrote to add subscriptions:

-- Create Subscription Tokens Table
CREATE TABLE subscription_tokens(
   subscription_token TEXT NOT NULL,
   subscriber_id uuid NOT NULL
      REFERENCES subscriptions (id),
   PRIMARY KEY (subscription_token)
);

Pay attention to the details here: the subscriber_id column in subscription_tokens is a foreign key.
For each row in subscription_tokens there must exist a row in subscriptions whose id field has the same value of subscriber_id, otherwise the insertion fails. This guarantees that all tokens are attached to a legitimate subscriber.

Migrate the production database again - we are done!

Sending A Confirmation Email

It took us a while, but the groundwork is done: our production database is ready to accommodate the new feature we want to build, confirmation emails.
Time to focus on the application code.

We will build the whole feature in a proper test-driven fashion: small steps in a tight red-green-refactor loop. Get ready!

A Static Email

We will start simple: we will test that POST /subscriptions is sending out an email.
We will not be looking, at this stage, at the body of the email - in particular, we will not check that it contains a confirmation link.

Red test

To write this test we need to enhance our TestApp.

It currently holds our application and a handle to a pool of connections to the database:

//! tests/api/helpers.rs
// [...]

pub struct TestApp {
  pub address: String,
  pub db_pool: PgPool,
}

We need to spin up a mock server to stand in for Postmark's API and intercept outgoing requests, just like we did when we built the email client.

Let's edit spawn_app accordingly:

//! tests/api/helpers.rs

// New import!
use wiremock::MockServer;
// [...]

pub struct TestApp {
    pub address: String,
    pub db_pool: PgPool,
    // New field!
    pub email_server: MockServer,
}

pub async fn spawn_app() -> TestApp {
    // [...]
    // Launch a mock server to stand in for Postmark's API
    let email_server = MockServer::start().await;

    // Randomise configuration to ensure test isolation
    let configuration = {
        let mut c = get_configuration().expect("Failed to read configuration.");
        // [...]
        // Use the mock server as email API
        c.email_client.base_url = email_server.uri();
        c
    };
      
    // [...]

    TestApp {
        // [...],
        email_server,
    }
}

We can now write the new test case:

//! tests/api/subscriptions.rs
// New imports!
use wiremock::matchers::{method, path};
use wiremock::{Mock, ResponseTemplate};
// [...]

#[actix_rt::test]
async fn subscribe_sends_a_confirmation_email_for_valid_data() {
  // Arrange
  let app = spawn_app().await;
  let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

  Mock::given(path("/email"))
          .and(method("POST"))
          .respond_with(ResponseTemplate::new(200))
          .expect(1)
          .mount(&app.email_server)
          .await;

  // Act
  app.post_subscriptions(body.into()).await;

  // Assert
  // Mock asserts on drop
}

The test, as expected, fails:

failures:

---- subscriptions::subscribe_sends_a_confirmation_email_for_valid_data stdout ----
thread 'subscriptions::subscribe_sends_a_confirmation_email_for_valid_data' panicked at 
'Verifications failed:
- Mock #0.
        Expected range of matching incoming requests: == 1
        Number of matched incoming requests: 0'

Notice that, on failure, wiremock gives us a detailed breakdown of what happened: we expected an incoming request, we received none.
Let's fix that.

Green test

Our handler looks like this right now:

//! src/routes/subscriptions.rs
// [...]

#[tracing::instrument([...])]
pub async fn subscribe(
    form: web::Form<FormData>,
    pool: web::Data<PgPool>,
) -> Result<HttpResponse, HttpResponse> {
    let new_subscriber = form
        .0
        .try_into()
        .map_err(|_| HttpResponse::BadRequest().finish())?;
    insert_subscriber(&pool, &new_subscriber)
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    Ok(HttpResponse::Ok().finish())
}

To send an email we need to get our hands on an instance of EmailClient.
As part of the work we did when writing the module, we also registered it in the application context:

//! src/startup.rs
// [...]

fn run([...]) -> Result<Server, std::io::Error> {
    // [...]
    let email_client = Data::new(email_client);
    let server = HttpServer::new(move || {
        App::new()
            .wrap(TracingLogger)
            // [...]
            // Here!
            .app_data(email_client.clone())
    })
    .listen(listener)?
    .run();
    Ok(server)
}

We can therefore access it in our handler using web::Data, just like we did for pool:

//! src/routes/subscriptions.rs
// New import!
use crate::email_client::EmailClient;
// [...]

#[tracing::instrument(
    name = "Adding a new subscriber",
    skip(form, pool, email_client),
    fields(
        email = %form.email,
        name = %form.name
    )
)]
pub async fn subscribe(
    form: web::Form<FormData>,
    pool: web::Data<PgPool>,
    // Get the email client from the app context
    email_client: web::Data<EmailClient>,
) -> Result<HttpResponse, HttpResponse> {
    // [...]
    // Send a (useless) email to the new subscriber.
    // We are ignoring email delivery errors for now.
    let _ = email_client
        .send_email(
            new_subscriber.email,
            "Welcome!",
            "Welcome to our newsletter!",
            "Welcome to our newsletter!",
        )
        .await;
    Ok(HttpResponse::Ok().finish())
}

Not a big change, but the test passes!
There is not much to refactor at the moment, let's press forward.

Let's raise the bar a bit - we will scan the body of the email to retrieve a confirmation link.

Red Test

We don't care (yet) about the link being dynamic or actually meaningful - we just want to be sure that there is something in the body that looks like a link.
We should also have the same link in both the plain text and the HTML version of the email body.

How do we get the body of a request intercepted by wiremock::MockServer?
We can use its received_requests method - it returns a vector of all the requests intercepted by the server as long as request recording was enabled (the default).

//! tests/api/subscriptions.rs
// [...]

#[actix_rt::test]
async fn subscribe_sends_a_confirmation_email_with_a_link() {
    // Arrange
    let app = spawn_app().await;
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";
  
    Mock::given(path("/email"))
        .and(method("POST"))
        .respond_with(ResponseTemplate::new(200))
        // We are not setting an expectation here anymore
        // The test is focused on another aspect of the app
        // behaviour.    
        .mount(&app.email_server)
        .await;
  
    // Act
    app.post_subscriptions(body.into()).await;
  
    // Assert
    // Get the first intercepted request
    let email_request = &app.email_server.received_requests().await.unwrap()[0];
    // Parse the body as JSON, starting from raw bytes
    let body: serde_json::Value = serde_json::from_slice(&email_request.body).unwrap();
}

We now need to extract links out of it.
The most obvious way forward would be a regular expression. Let's face it though: regexes are a messy business and it takes a while to get them right.
Once again, we can leverage the work done by the larger Rust ecosystem - let's add linkify as a development dependency:

cargo add linkify --vers 0.5.0 --dev

We can use linkify to scan text and return an iterator of extracted links.

//! tests/api/subscriptions.rs
// [...]

#[actix_rt::test]
async fn subscribe_sends_a_confirmation_email_with_a_link() {
    // [...] 
    let body: serde_json::Value = serde_json::from_slice(&email_request.body).unwrap();
   
    // Extract the link from one of the request fields.
    let get_link = |s: &str| {
        let links: Vec<_> = linkify::LinkFinder::new()
            .links(s)
            .filter(|l| *l.kind() == linkify::LinkKind::Url)
            .collect();
            assert_eq!(links.len(), 1);
        links[0].as_str().to_owned()
    };

    let html_link = get_link(&body["HtmlBody"].as_str().unwrap());
    let text_link = get_link(&body["TextBody"].as_str().unwrap());
    // The two links should be identical
    assert_eq!(html_link, text_link);
}

If we run the test suite, we should see the new test case failing:

failures:

thread 'subscriptions::subscribe_sends_a_confirmation_email_with_a_link' 
panicked at 'assertion failed: `(left == right)`
  left: `0`,
 right: `1`', tests/api/subscriptions.rs:71:9

Green Test

We need to tweak our request handler again to satisfy the new test case:

//! src/routes/subscriptions.rs 
// [...]

#[tracing::instrument([...])]
pub async fn subscribe([...]) -> Result<HttpResponse, HttpResponse> {
    // [...]
    let confirmation_link = 
        "https://my-api.com/subscriptions/confirm";
    let _ = email_client
        .send_email(
            new_subscriber.email,
            "Welcome!",
            &format!(
                "Welcome to our newsletter!<br />\
                Click <a href=\"{}\">here</a> to confirm your subscription.",
                confirmation_link
            ),
            &format!(
                "Welcome to our newsletter!\nVisit {} to confirm your subscription.",
                confirmation_link
        ),
    )
    .await;
  Ok(HttpResponse::Ok().finish())
}

The test should pass straight away.

Refactor

Our request handler is getting a bit busy - there is a lot of code dealing with our confirmation email now.
Let's extract it into a separate function:

//! src/routes/subscriptions.rs 
// [...]

#[tracing::instrument([...])]
pub async fn subscribe([...]) -> Result<HttpResponse, HttpResponse> {
    let new_subscriber = form
        .0
        .try_into()
        .map_err(|_| HttpResponse::BadRequest().finish())?;
    insert_subscriber(&pool, &new_subscriber)
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    // We are swallowing the error for the time being.
    let _ = send_confirmation_email(&email_client, new_subscriber).await;
    Ok(HttpResponse::Ok().finish())
}

#[tracing::instrument(
    name = "Send a confirmation email to a new subscriber",
    skip(email_client, new_subscriber)
)]
pub async fn send_confirmation_email(
    email_client: &EmailClient,
    new_subscriber: NewSubscriber,
) -> Result<(), reqwest::Error> {
    let confirmation_link = "https://my-api.com/subscriptions/confirm";
    let plain_body = format!(
        "Welcome to our newsletter!\nVisit {} to confirm your subscription.",
        confirmation_link
    );
    let html_body = format!(
        "Welcome to our newsletter!<br />\
        Click <a href=\"{}\">here</a> to confirm your subscription.",
      confirmation_link
    );
    email_client
        .send_email(
            new_subscriber.email,
            "Welcome!",
            &html_body,
            &plain_body,
        )
       .await
}

subscribe is once again focused on the overall flow, without bothering with details of any of its steps.

Pending Confirmation

Let's look at the status for a new subscriber now.
We are currently setting their status to confirmed in POST /subscriptions, while it should be pending_confirmation until they click on the confirmation link.

Time to fix it.

Red test

We can start by having a second look at our first "happy path" test:

//! tests/api/subscriptions.rs
// [...]

#[actix_rt::test]
async fn subscribe_returns_a_200_for_valid_form_data() {
    // Arrange
    let app = spawn_app().await;
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    // Act
    let response = app.post_subscriptions(body.into()).await;

    // Assert
    assert_eq!(200, response.status().as_u16());

    let saved = sqlx::query!("SELECT email, name FROM subscriptions",)
        .fetch_one(&app.db_pool)
        .await
        .expect("Failed to fetch saved subscription.");

    assert_eq!(saved.email, "ursula_le_guin@gmail.com");
    assert_eq!(saved.name, "le guin");
}

The name is a bit of a lie - it is checking the status code and performing some assertions against the state stored in the database.
Let's split it into two separate test cases:

//! tests/api/subscriptions.rs
// [...]

#[actix_rt::test]
async fn subscribe_returns_a_200_for_valid_form_data() {
    // Arrange
    let app = spawn_app().await;
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";
  
    // Act
    let response = app.post_subscriptions(body.into()).await;
  
    // Assert
    assert_eq!(200, response.status().as_u16());
}

#[actix_rt::test]
async fn subscribe_persists_the_new_subscriber() {
    // Arrange
    let app = spawn_app().await;
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";
  
    // Act
    app.post_subscriptions(body.into()).await;
  
    // Assert
    let saved = sqlx::query!("SELECT email, name FROM subscriptions",)
        .fetch_one(&app.db_pool)
        .await
        .expect("Failed to fetch saved subscription.");
  
    assert_eq!(saved.email, "ursula_le_guin@gmail.com");
    assert_eq!(saved.name, "le guin");
}

We can now modify the second test case to check the status as well.

//! tests/api/subscriptions.rs
// [...]

#[actix_rt::test]
async fn subscribe_persists_the_new_subscriber() {
    // [...]
  
    // Assert
    let saved = sqlx::query!("SELECT email, name, status FROM subscriptions",)
        .fetch_one(&app.db_pool)
        .await
        .expect("Failed to fetch saved subscription.");
  
    assert_eq!(saved.email, "ursula_le_guin@gmail.com");
    assert_eq!(saved.name, "le guin");
    assert_eq!(saved.status, "pending_confirmation");
}

The test fails as expected:

failures:

---- subscriptions::subscribe_persists_the_new_subscriber stdout ----
thread 'subscriptions::subscribe_persists_the_new_subscriber' 
panicked at 'assertion failed: `(left == right)`
  left: `"confirmed"`,
 right: `"pending_confirmation"`'

Green Test

We can turn it green by touching again our insert query:

//! src/routes/subscriptions.rs

#[tracing::instrument([...])]
pub async fn insert_subscriber([...]) -> Result<(), sqlx::Error> {
    sqlx::query!(
        r#"INSERT INTO subscriptions (id, email, name, subscribed_at, status)
        VALUES ($1, $2, $3, $4, 'confirmed')"#,
        // [...]
    )
    // [...]
}

We just need to change confirmed into pending_confirmation:

//! src/routes/subscriptions.rs

#[tracing::instrument([...])]
pub async fn insert_subscriber([...]) -> Result<(), sqlx::Error> {
    sqlx::query!(
        r#"INSERT INTO subscriptions (id, email, name, subscribed_at, status)
        VALUES ($1, $2, $3, $4, 'pending_confirmation')"#,
        // [...]
    )
  // [...]
}

Tests should be green.

Skeleton of GET /subscriptions/confirm

We have done most of the groundwork on POST /subscriptions - time to shift our focus to the other half of the journey, GET /subscriptions/confirm.
We want to build up the skeleton of the endpoint - we need to register the handler against the path in src/startup.rs and reject incoming requests without the required query parameter, subscription_token.
This will allow us to then build the happy path without having to write a massive amount of code all at once - baby steps!

Red Test

Let's add a new module to our tests project to host all test cases dealing with the confirmation callback.

//! tests/api/main.rs

mod health_check;
mod helpers;
mod subscriptions;
// New module!
mod subscriptions_confirm;
//! tests/api/subscriptions_confirm.rs
use crate::helpers::spawn_app;

#[actix_rt::test]
async fn confirmations_without_token_are_rejected_with_a_400() {
    // Arrange
    let app = spawn_app().await;

    // Act
    let response = reqwest::get(&format!("{}/subscriptions/confirm", app.address))
        .await
        .unwrap();

    // Assert
    assert_eq!(response.status().as_u16(), 400);
}

Which fails as expected, given that we have no handler yet:

---- subscriptions_confirm::confirmations_without_token_are_rejected_with_a_400 stdout ----
thread 'subscriptions_confirm::confirmations_without_token_are_rejected_with_a_400' 
panicked at 'assertion failed: `(left == right)`
  left: `404`,
 right: `400`'

Green Test

Let's start with a dummy handler that returns 200 OK regardless of the incoming request:

//! src/routes/mod.rs
 
mod health_check;
mod subscriptions;
// New module!
mod subscriptions_confirm;

pub use health_check::*;
pub use subscriptions::*;
pub use subscriptions_confirm::*;
//! src/routes/subscriptions_confirm.rs

use actix_web::HttpResponse;

#[tracing::instrument(
    name = "Confirm a pending subscriber",
)]
pub async fn confirm() -> HttpResponse {
    HttpResponse::Ok().finish()
}
//! src/startup.rs
// [...]
use crate::routes::confirm;

fn run([...]) -> Result<Server, std::io::Error> {
    // [...]
    let server = HttpServer::new(move || {
        App::new()
            // [...]
            .route("/subscriptions/confirm", web::get().to(confirm))
            // [...]
    })
    // [...]
}

We should get a different error now when running cargo test:

---- subscriptions_confirm::confirmations_without_token_are_rejected_with_a_400 stdout ----
thread 'subscriptions_confirm::confirmations_without_token_are_rejected_with_a_400' 
panicked at 'assertion failed: `(left == right)`
  left: `200`,
 right: `400`'

It worked!
Time to turn that 200 OK in a 400 Bad Request.
We want to ensure that there is a subscription_token query parameter: we can rely on another one actix-web's extractors - Query.

//! src/routes/subscriptions_confirm.rs
use actix_web::{HttpResponse, web};

#[derive(serde::Deserialize)]
pub struct Parameters {
    subscription_token: String
}

#[tracing::instrument(
    name = "Confirm a pending subscriber",
    skip(_parameters)
)]
pub async fn confirm(_parameters: web::Query<Parameters>) -> HttpResponse {
    HttpResponse::Ok().finish()
}

The Parameters struct defines all the query parameters that we expect to see in the incoming request. It needs to implement serde::Deserialize to enable actix-web to build it from the incoming request path. It is enough to add a function parameter of type web::Query<Parameter> to confirm to instruct actix-web to only call the handler if the extraction was successful. If the extraction failed a 400 Bad Request is automatically returned to the caller.

Our test should now pass.

Connecting The Dots

Now that we have a GET /subscriptions/confirm handler we can try to perform the fully journey!

Red Test

We will behave like a user: we will call POST /subscriptions, we will extract the confirmation link from the outgoing email request (using the linkify machinery we already built) and then call it to confirm our subscription - expecting a 200 OK.
We will not be checking the status from the database (yet) - that is going to be our grand finale.

Let's write it down:

//! tests/api/subscriptions_confirm.rs
// [...]
use reqwest::Url;
use wiremock::{ResponseTemplate, Mock};
use wiremock::matchers::{path, method};

#[actix_rt::test]
async fn the_link_returned_by_subscribe_returns_a_200_if_called() {
    // Arrange
    let app = spawn_app().await;
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    Mock::given(path("/email"))
        .and(method("POST"))
        .respond_with(ResponseTemplate::new(200))
        .mount(&app.email_server)
        .await;

    app.post_subscriptions(body.into()).await;
    let email_request = &app.email_server.received_requests().await.unwrap()[0];
    let body: serde_json::Value = serde_json::from_slice(&email_request.body).unwrap();

    // Extract the link from one of the request fields.
    let get_link = |s: &str| {
        let links: Vec<_> = linkify::LinkFinder::new()
            .links(s)
            .filter(|l| *l.kind() == linkify::LinkKind::Url)
            .collect();
        assert_eq!(links.len(), 1);
        links[0].as_str().to_owned()
    };
    let raw_confirmation_link = &get_link(&body["HtmlBody"].as_str().unwrap());
    let confirmation_link = Url::parse(raw_confirmation_link).unwrap();
    // Let's make sure we don't call random APIs on the web
    assert_eq!(confirmation_link.host_str().unwrap(), "127.0.0.1");

    // Act
    let response = reqwest::get(confirmation_link)
        .await
        .unwrap();

    // Assert
    assert_eq!(response.status.as_u16(), 200);
}

It fails with

thread 'subscriptions_confirm::the_link_returned_by_subscribe_returns_a_200_if_called' 
panicked at 'assertion failed: `(left == right)`
  left: `"my-api.com"`,
 right: `"127.0.0.1"`'

There is a fair amount of code duplication going on here, but we will take care of it in due time.
Our primary focus is getting the test to pass now.

Green Test

Let's begin by taking care of that URL issue.
It is currently hard-coded in

//! src/routes/subscriptions.rs
// [...] 

#[tracing::instrument([...])]
pub async fn send_confirmation_email([...]) -> Result<(), reqwest::Error> {
    let confirmation_link = "https://my-api.com/subscriptions/confirm";
    // [...]
}

The domain and the protocol are going to vary according to the environment the application is running into: it will be http://127.0.0.1 for our tests, it should be a proper DNS record with HTTPS when our application is running in production.
The easiest way to get it right is to pass the domain in as a configuration value.
Let's add a new field to ApplicationSettings:

//! src/configuration.rs
// [...]

#[derive(serde::Deserialize, Clone)]
pub struct ApplicationSettings {
    #[serde(deserialize_with = "deserialize_number_from_string")]
    pub port: u16,
    pub host: String,
    // New field!
    pub base_url: String
}
# configuration/local.yaml
application:
    base_url: "http://127.0.0.1"
# [...]
#! spec.yaml
# [...]
services:
  - name: zero2prod
    # [...]
    envs:
      # We use DO's APP_URL to inject the dynamically
      # provisioned base url as an environment variable
      - key: APP_APPLICATION__BASE_URL
        scope: RUN_TIME
        value: ${_self.APP_URL}
# [...]

We then need to register the value in the application context - you should be familiar with the process at this point:

//! src/startup.rs
// [...]

impl Application {
    pub async fn build(configuration: Settings) -> Result<Self, std::io::Error> {
        // [...]
        let server = run(
            listener,
            connection_pool,
            email_client,
            // New parameter!
            configuration.application.base_url,
        )?;

        Ok(Self { port, server })
    }
    
    // [...]
}

// We need to define a wrapper type in order to retrieve the URL
// in the `subscribe` handler. 
// Retrieval from the context, in actix-web, is type-based: using
// a raw `String` would expose us to conflicts.
pub struct ApplicationBaseUrl(pub String);

fn run(
    listener: TcpListener,
    db_pool: PgPool,
    email_client: EmailClient,
    // New parameter!
    base_url: String,
) -> Result<Server, std::io::Error> {
    // [...]
    let server = HttpServer::new(move || {
        App::new()
            // [...]
            .data(ApplicationBaseUrl(base_url.clone()))
    })
    // [...]
}

We can now access it in the request handler:

//! src/routes/subscriptions.rs
use crate::startup::ApplicationBaseUrl;
// [...]

#[tracing::instrument(
    skip(form, pool, email_client, base_url),
    [...]
)]
pub async fn subscribe(
    // [...]
    // New parameter!
    base_url: web::Data<ApplicationBaseUrl>,
) -> Result<HttpResponse, HttpResponse> {
    // [...]
    // Pass the application url
    let _ = send_confirmation_email(
        &email_client, 
        new_subscriber, 
        &base_url.0
    ).await;
    // [...]
}

#[tracing::instrument(
    skip(email_client, new_subscriber, base_url)
    [...]
)]
pub async fn send_confirmation_email(
    // [...]
    // New parameter!
    base_url: &str,
) -> Result<(), reqwest::Error> {
    // Build a confirmation link with a dynamic root
    let confirmation_link = format!("{}/subscriptions/confirm", base_url);
    // [...]
}

Let's run the test suite again:

thread 'subscriptions_confirm::the_link_returned_by_subscribe_returns_a_200_if_called' 
panicked at 'called `Result::unwrap()` on an `Err` value: 
    reqwest::Error { 
        kind: Request, 
        url: Url { 
            scheme: "http", 
            host: Some(Ipv4(127.0.0.1)), 
            port: None, 
            path: "/subscriptions/confirm", 
            query: None, 
            fragment: None }, 
        source: hyper::Error(
            Connect, 
            ConnectError(
                "tcp connect error", 
                Os { 
                    code: 111, 
                    kind: ConnectionRefused, 
                    message: "Connection refused" 
                }
            )
        ) 
    }'

The host is correct, but the reqwest::Client in our test is failing to establish a connection. What is going wrong?
If you look closely, you'll notice port: None - we are sending our request to http://127.0.0.1/subscriptions/confirm without specifying the port our test server is listening on.
The tricky bit, here, is the sequence of events: we pass in the application_url configuration value before spinning up the server, therefore we do not know what port it is going to listen to (given that the port is randomised using 0!).
This is non-issue for production workloads where the DNS domain is enough - we'll just patch it in the test.
Let's store the application port in its own field within TestApp:

//! tests/api/helpers.rs
// [...]

pub struct TestApp {
    // New field!
    pub port: u16,
    // [...]
}

pub async fn spawn_app() -> TestApp {
    // [...]

    let application = Application::build(configuration.clone())
        .await
        .expect("Failed to build application.");
    let application_port = application.port();
    let _ = tokio::spawn(application.run_until_stopped());

    TestApp {
        address: format!("http://localhost:{}", application_port),
        port: application_port,
        db_pool: get_connection_pool(&configuration.database)
            .await
            .expect("Failed to connect to the database"),
        email_server,
    }
}

We can then use it in the test logic to edit the confirmation link:

//! tests/api/subscriptions_confirm.rs
// [...]

#[actix_rt::test]
async fn the_link_returned_by_subscribe_returns_a_200_if_called() {
    // [...]
    let mut confirmation_link = Url::parse(raw_confirmation_link).unwrap();
    assert_eq!(confirmation_link.host_str().unwrap(), "127.0.0.1");
    // Let's rewrite the URL to include the port
    confirmation_link.set_port(Some(app.port)).unwrap();

    // [...]
}

Not the prettiest, but it gets the job done.
Let's run the test again:

thread 'subscriptions_confirm::the_link_returned_by_subscribe_returns_a_200_if_called' 
panicked at 'assertion failed: `(left == right)`
  left: `400`,
 right: `200`'

We get a 400 Bad Request back because our confirmation link does not have a subscription_token query parameter attached.
Let's fix it by hard-coding one for the time being:

//! src/routes/subscriptions.rs
// [...]

pub async fn send_confirmation_email([...]) -> Result<(), reqwest::Error> {
    let confirmation_link = format!(
        "{}/subscriptions/confirm?subscription_token=mytoken", 
        base_url
    );
    // [...]
}

The test passes!

Refactor

The logic to extract the two confirmation links from the outgoing email request is duplicated across two of our tests - we will likely add more that rely on it as we flesh out the remaining bits and pieces of this feature. It makes sense to extract it in its own helper function.

//! tests/api/helpers.rs
// [...]

/// Confirmation links embedded in the request to the email API.
pub struct ConfirmationLinks {
    pub html: reqwest::Url,
    pub plain_text: reqwest::Url
}

impl TestApp {
    // [...]

    /// Extract the confirmation links embedded in the request to the email API.
    pub fn get_confirmation_links(
        &self, 
        email_request: &wiremock::Request
    ) -> ConfirmationLinks {
        let body: serde_json::Value = serde_json::from_slice(
            &email_request.body
        ).unwrap();

        // Extract the link from one of the request fields.
        let get_link = |s: &str| {
            let links: Vec<_> = linkify::LinkFinder::new()
                .links(s)
                .filter(|l| *l.kind() == linkify::LinkKind::Url)
                .collect();
            assert_eq!(links.len(), 1);
            let raw_link = links[0].as_str().to_owned();
            let mut confirmation_link = reqwest::Url::parse(&raw_link).unwrap();
            // Let's make sure we don't call random APIs on the web
            assert_eq!(confirmation_link.host_str().unwrap(), "127.0.0.1");
            confirmation_link.set_port(Some(self.port)).unwrap();
            confirmation_link
        };

        let html = get_link(&body["HtmlBody"].as_str().unwrap());
        let plain_text = get_link(&body["TextBody"].as_str().unwrap());
        ConfirmationLinks {
            html,
            plain_text
        }
    }
}

We are adding it as a method on TestApp in order to get access to the application port, which we need to inject into the links.
It could as well have been a free function taking both wiremock::Request and TestApp (or u16) as parameters - a matter of taste.

We can now massively simplify our two test cases:

//! tests/api/subscriptions.rs
// [...]

#[actix_rt::test]
async fn subscribe_sends_a_confirmation_email_with_a_link() {
    // Arrange
    let app = spawn_app().await;
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    Mock::given(path("/email"))
        .and(method("POST"))
        .respond_with(ResponseTemplate::new(200))
        .mount(&app.email_server)
        .await;

    // Act
    app.post_subscriptions(body.into()).await;

    // Assert
    let email_request = &app.email_server.received_requests().await.unwrap()[0];
    let confirmation_links = app.get_confirmation_links(&email_request);

    // The two links should be identical
    assert_eq!(confirmation_links.html, confirmation_links.plain_text);
}
//! tests/api/subscriptions_confirm.rs
// [...]

#[actix_rt::test]
async fn the_link_returned_by_subscribe_returns_a_200_if_called() {
    // Arrange
    let app = spawn_app().await;
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    Mock::given(path("/email"))
        .and(method("POST"))
        .respond_with(ResponseTemplate::new(200))
        .mount(&app.email_server)
        .await;

    app.post_subscriptions(body.into()).await;
    let email_request = &app.email_server.received_requests().await.unwrap()[0];
    let confirmation_links = app.get_confirmation_links(&email_request);

    // Act
    let response = reqwest::get(confirmation_links.html)
        .await
        .unwrap();

    // Assert
    assert_eq!(response.status().as_u16(), 200);
}

The intent of those two test cases is much clearer now.

Subscription Tokens

We are ready to tackle the elephant in the room: we need to start generating subscription tokens.

Red Test

We will add a new test case which builds on top of the work we just did: instead of asserting against the returned status code we will check the status of the subscriber stored in the database.

//! tests/api/subscriptions_confirm.rs
// [...]

#[actix_rt::test]
async fn clicking_on_the_confirmation_link_confirms_a_subscriber() {
    // Arrange
    let app = spawn_app().await;
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    Mock::given(path("/email"))
        .and(method("POST"))
        .respond_with(ResponseTemplate::new(200))
        .mount(&app.email_server)
        .await;

    app.post_subscriptions(body.into()).await;
    let email_request = &app.email_server.received_requests().await.unwrap()[0];
    let confirmation_links = app.get_confirmation_links(&email_request);

    // Act
    reqwest::get(confirmation_links.html)
        .await
        .unwrap()
        .error_for_status()
        .unwrap();

    // Assert
    let saved = sqlx::query!("SELECT email, name, status FROM subscriptions",)
        .fetch_one(&app.db_pool)
        .await
        .expect("Failed to fetch saved subscription.");

    assert_eq!(saved.email, "ursula_le_guin@gmail.com");
    assert_eq!(saved.name, "le guin");
    assert_eq!(saved.status, "confirmed");
}

The test fails, as expected:

thread 'subscriptions_confirm::clicking_on_the_confirmation_link_confirms_a_subscriber' 
panicked at 'assertion failed: `(left == right)`
  left: `"pending_confirmation"`,
 right: `"confirmed"`'

Green Test

To get the previous test case to pass, we hard-coded a subscription token in the confirmation link:

//! src/routes/subscriptions.rs
// [...]

pub async fn send_confirmation_email([...]) -> Result<(), reqwest::Error> {
    let confirmation_link = format!(
        "{}/subscriptions/confirm?subscription_token=mytoken", 
        base_url
    );
    // [...]
}

Let's refactor send_confirmation_email to take the token as a parameter - it will make it easier to add the generation logic upstream.

//! src/routes/subscriptions.rs
// [...]

#[tracing::instrument([...])]
pub async fn subscribe([...]) -> Result<HttpResponse, HttpResponse> {
    // [...]
    let _ = send_confirmation_email(
        &email_client, 
        new_subscriber, 
        &base_url.0, 
        "mytoken"
    ).await;
    // [...]
}

#[tracing::instrument(
    name = "Send a confirmation email to a new subscriber",
    skip(email_client, new_subscriber, base_url, subscription_token)
)]
pub async fn send_confirmation_email(
    email_client: &EmailClient,
    new_subscriber: NewSubscriber,
    base_url: &str,
    subscription_token: &str
) -> Result<(), reqwest::Error> {
    let confirmation_link = format!(
        "{}/subscriptions/confirm?subscription_token={}", 
        base_url, 
        subscription_token
    );
    // [...]
}

Our subscription tokens are not passwords: they are single-use and they do not grant access to protected information.3 We need them to be hard enough to guess while keeping in mind that the worst-case scenario is an unwanted newsletter subscription landing in someone's inbox.

Given our requirements it should be enough to use a cryptographically secure pseudo-random number generator - a CSPRNG, if you are into obscure acronyms.
Every time we need to generate a subscription token we can sample a sufficiently-long sequence of alphanumeric characters.

To pull it off we need to add rand as a dependency:

#! Cargo.toml
# [...]

[dependencies]
# [...]
# We need the `std_rng` to get access to the PRNG we want
rand = { version = "0.8", features=["std_rng"] }
//! src/routes/subscriptions.rs
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};
// [...]

/// Generate a random 25-characters-long case-sensitive subscription token.
fn generate_subscription_token() -> String {
  let mut rng = thread_rng();
  std::iter::repeat_with(|| rng.sample(Alphanumeric))
          .map(char::from)
          .take(25)
          .collect()
}

Using 25 characters we get roughly ~10^45 possible tokens - it should be more than enough for our use case.
To check if a token is valid in GET /subscriptions/confirm we need POST /subscriptions to store the newly minted tokens in the database.
The table we added for this purpose, subscription_tokens, has two columns: subscription_token and subscriber_id.

We are currently generating the subscriber identifier in insert_subscriber but we never return it to the caller:

#[tracing::instrument([...])]
pub async fn insert_subscriber([...]) -> Result<(), sqlx::Error> {
    sqlx::query!(
        r#"[...]"#,
        // The subscriber id, never returned or bound to a variable
        Uuid::new_v4(),
        // [...]
    )
    // [...]
}

Let's refactor insert_subscriber to give us back the identifier:

#[tracing::instrument([...])]
pub async fn insert_subscriber([...]) -> Result<Uuid, sqlx::Error> {
    let subscriber_id = Uuid::new_v4();
    sqlx::query!(
        r#"[...]"#,
        subscriber_id,
        // [...]
    )
    // [...]
    Ok(subscriber_id)
}

We can now tie everything together:

//! src/routes/subscriptions.rs
// [...]

pub async fn subscribe([...]) -> Result<HttpResponse, HttpResponse> {
    // [...]
    let subscriber_id = insert_subscriber(&pool, &new_subscriber)
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    let subscription_token = generate_subscription_token();
    store_token(&pool, subscriber_id, &subscription_token)
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    let _ = send_confirmation_email(
        &email_client,
        new_subscriber,
        &base_url.0,
        &subscription_token,
    )
    .await;
    Ok(HttpResponse::Ok().finish())
}


#[tracing::instrument(
    name = "Store subscription token in the database",
    skip(subscription_token, pool)
)]
pub async fn store_token(
    pool: &PgPool,
    subscriber_id: Uuid,
    subscription_token: &str,
) -> Result<(), sqlx::Error> {
    sqlx::query!(
        r#"INSERT INTO subscription_tokens (subscription_token, subscriber_id)
        VALUES ($1, $2)"#,
        subscription_token,
        subscriber_id
    )
    .execute(pool)
    .await
    .map_err(|e| {
        tracing::error!("Failed to execute query: {:?}", e);
        e
    })?;
    Ok(())
}

We are done on POST /subscriptions, let's shift to GET /subscription/confirm:

//! routes/subscriptions_confirm.rs
use actix_web::{HttpResponse, web};

#[derive(serde::Deserialize)]
pub struct Parameters {
    subscription_token: String
}

#[tracing::instrument(
    name = "Confirm a pending subscriber",
    skip(_parameters)
)]
pub async fn confirm(_parameters: web::Query<Parameters>) -> HttpResponse {
    HttpResponse::Ok().finish()
}

We need to:

Nothing we haven't done before - let's get cracking!

use actix_web::{web, HttpResponse};
use sqlx::PgPool;
use uuid::Uuid;

#[derive(serde::Deserialize)]
pub struct Parameters {
    subscription_token: String,
}

#[tracing::instrument(
    name = "Confirm a pending subscriber", 
    skip(parameters, pool)
)]
pub async fn confirm(
    parameters: web::Query<Parameters>,
    pool: web::Data<PgPool>,
) -> Result<HttpResponse, HttpResponse> {
    let id = get_subscriber_id_from_token(&pool, &parameters.subscription_token)
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    match id {
        // Non-existing token!
        None => Err(HttpResponse::Unauthorized().finish()),
        Some(subscriber_id) => {
            confirm_subscriber(&pool, subscriber_id)
                .await
                .map_err(|_| HttpResponse::InternalServerError().finish())?;
            Ok(HttpResponse::Ok().finish())
        }
    }
}

#[tracing::instrument(
    name = "Mark subscriber as confirmed", 
    skip(subscriber_id, pool)
)]
pub async fn confirm_subscriber(
    pool: &PgPool, 
    subscriber_id: Uuid
) -> Result<(), sqlx::Error> {
    sqlx::query!(
        r#"UPDATE subscriptions SET status = 'confirmed' WHERE id = $1"#,
        subscriber_id,
        )
        .execute(pool)
        .await
        .map_err(|e| {
            tracing::error!("Failed to execute query: {:?}", e);
            e
        })?;
    Ok(())
}

#[tracing::instrument(
    name = "Get subscriber_id from token", 
    skip(subscription_token, pool)
)]
pub async fn get_subscriber_id_from_token(
    pool: &PgPool,
    subscription_token: &str,
) -> Result<Option<Uuid>, sqlx::Error> {
    let result = sqlx::query!(
      r#"SELECT subscriber_id FROM subscription_tokens WHERE subscription_token = $1"#,
      subscription_token,
    )
    .fetch_optional(pool)
    .await
    .map_err(|e| {
        tracing::error!("Failed to execute query: {:?}", e);
        e
    })?;
    Ok(result.map(|r| r.subscriber_id))
}

Is it enough? Did we miss anything during the journey?
There is only one way to find out.

cargo test 
     Running target/debug/deps/api-5a717281b98f7c41
running 10 tests
[...]

test result: ok. 10 passed; 0 failed; finished in 0.92s

Oh, yes! It works!

Database Transactions

All Or Nothing

It is too soon to declare victory though.
Our POST /subscriptions handler has grown in complexity - we are now performing two INSERT queries against our Postgres database: one to store the details of the new subscriber, one to store the newly generated subscription token.
What happens if the application crashes between those two operations?

The first query might complete successfully, but the second one might never be executed.

There are three possible states for our database after an invocation of POST /subscriptions:

The more queries you have, the worse it gets to reason about the possible end states of our database.

Relational databases (and a few others) provide a mechanism to mitigate this issue: transactions.

Transactions are a way to group together related operations in a single unit of work.
The database guarantees that all operations within a transaction will succeed or fail together: the database will never be left in a state where the effect of only a subset of the queries in a transaction is visible.

Going back to our example, if we wrap the two INSERT queries in a transaction we now have two possible end states:

Much easier to deal with.

Transactions In Postgres

To start a transaction in Postgres you use a BEGIN statement. All queries after BEGIN are part of the transaction.
The transaction is then finalised with a COMMIT statement.

We have actually already used a transaction in one of our migration scripts!

BEGIN;
UPDATE subscriptions SET status = 'confirmed' WHERE status IS NULL;
ALTER TABLE subscriptions ALTER COLUMN status SET NOT NULL;
COMMIT;

If any of the queries within a transaction fails the database rolls back: all changes performed by previous queries are reverted, the operation is aborted.
You can also explicitly trigger a rollback with the ROLLBACK statement.

Transactions are a deep topic: they not only provide a way to convert multiple statements into an all-or-nothing operation, they also hide the effect of uncommited changes from other queries that might be running, concurrently, against the same tables.
As your needs evolves, you will often want to explicitly choose the isolation level of your transactions to fine-tune the concurrency guarantees provided by the database on your operations. Getting a good grip on the different kinds of concurrency-related issues (e.g. dirty reads, phantom reads, etc.) becomes more and more important as your system grows in scale and complexity.

I can't recommend "Designing Data Intensive Applications" enough if you want to learn more about these topics.

Transactions In Sqlx

Back to the code: how do we leverage transactions in sqlx?

You don't have to manually write a BEGIN statement: transactions are so central to the usage of relational databases that sqlx provides a dedicated API.
By calling begin on our pool we acquire a connection from the pool and kick off a transaction:

//! src/routes/subscriptions.rs
// [...]

pub async fn subscribe([...]) -> Result<HttpResponse, HttpResponse> {
    let new_subscriber = // [...]
    let transaction = pool
        .begin()
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    // [...]

begin, if successful, returns a Transaction struct.
A mutable reference to a Transaction implements sqlx's Executor trait therefore it can be used to run queries. All queries run using a Transaction as executor become of the transaction.

Let's pass transaction down to insert_subscriber and store_token instead of pool:

//! src/routes/subscriptions.rs
use sqlx::Postgres;
// [...]

#[tracing::instrument([...])]
pub async fn subscribe([...]) -> Result<HttpResponse, HttpResponse> {
    // [...]
    let mut transaction = pool
        .begin()
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    let subscriber_id = insert_subscriber(&mut transaction, &new_subscriber)
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    let subscription_token = generate_subscription_token();
    store_token(&mut transaction, subscriber_id, &subscription_token)
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    // [...]
}

#[tracing::instrument(
    name = "Saving new subscriber details in the database",
    skip(new_subscriber, transaction)
)]
pub async fn insert_subscriber(
    transaction: &mut Transaction<'_, Postgres>,
    new_subscriber: &NewSubscriber,
) -> Result<Uuid, sqlx::Error> {
    let subscriber_id = Uuid::new_v4();
    sqlx::query!([...])
        .execute(transaction)
    // [...]
}

#[tracing::instrument(
    name = "Store subscription token in the database",
    skip(subscription_token, transaction)
)]
pub async fn store_token(
    transaction: &mut Transaction<'_, Postgres>,
    subscriber_id: Uuid,
    subscription_token: &str,
) -> Result<(), sqlx::Error> {
    sqlx::query!([..])
        .execute(transaction)
    // [...]
}

If you run cargo test now you will see something funny: some of our tests are failing!
Why is that happening?

As we discussed, a transaction has to either be committed or rolled back.
Transaction exposes two dedicated methods: Transaction::commit, to persist changes, and Transaction::rollback, to abort the whole operation.
We are not calling either - what happens in that case?

We can look at sqlx's source code to understand better.
In particular, Transaction's Drop implementation:

impl<'c, DB> Drop for Transaction<'c, DB>
where
    DB: Database,
{
    fn drop(&mut self) {
        if self.open {
            // starts a rollback operation

            // what this does depends on the database but generally 
            // this means we queue a rollback operation that will 
            // happen on the next asynchronous invocation of the 
            // underlying connection (including if the connection 
            // is returned to a pool)
            DB::TransactionManager::start_rollback(&mut self.connection);
        }
    }
}

self.open is an internal boolean flag attached to the connection used to begin the transaction and run the queries attached to it.
When a transaction is created, using begin, it is set to true until either rollback or commit are called:

impl<'c, DB> Transaction<'c, DB>
where
    DB: Database,
{
    pub(crate) fn begin(
        conn: impl Into<MaybePoolConnection<'c, DB>>,
    ) -> BoxFuture<'c, Result<Self, Error>> {
        let mut conn = conn.into();

        Box::pin(async move {
            DB::TransactionManager::begin(&mut conn).await?;

            Ok(Self {
                connection: conn,
                open: true,
            })
        })
    }

    pub async fn commit(mut self) -> Result<(), Error> {
        DB::TransactionManager::commit(&mut self.connection).await?;
        self.open = false;

        Ok(())
    }

    pub async fn rollback(mut self) -> Result<(), Error> {
        DB::TransactionManager::rollback(&mut self.connection).await?;
        self.open = false;

        Ok(())
    }
}

In other words: if commit or rollback have not been called before the Transaction object goes out of scope (i.e. Drop is invoked), a rollback command is queued to be executed as soon as an opportunity arises.4
That is why our tests are failing: we are using a transaction but we are not explicitly committing the changes. When the connection goes back into the pool, at the end of our request handler, all changes are rolled back and our test expectations are not met.

We can fix it by adding a one-liner to subscribe:

//! src/routes/subscriptions.rs
use sqlx::{Postgres, Transaction};
// [...]

#[tracing::instrument([...])]
pub async fn subscribe([...]) -> Result<HttpResponse, HttpResponse> {
    // [...]
    let mut transaction = pool
        .begin()
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    let subscriber_id = insert_subscriber(&mut transaction, &new_subscriber)
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    let subscription_token = generate_subscription_token();
    store_token(&mut transaction, subscriber_id, &subscription_token)
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    transaction
        .commit()
        .await
        .map_err(|_| HttpResponse::InternalServerError().finish())?;
    // [...]
}

The test suite should succeed once again.
Go ahead and deploy the application: seeing a feature working in a live environment adds a whole new level of satisfaction!

Summary

This chapter was a long journey, but you have come a long way as well!
The skeleton of our application has started to shape up, starting with our test suite. Features are moving along as well: we now have a functional subscription flow, with a proper confirmation email.
More importantly: we are getting into the rhythm of writing Rust code.
The very end of the chapter has been a long pair programming session where we have made significant progress without introducing many new concepts.

This is a great moment to go off and explore a bit on your own: improve on what we built so far!
There are plenty of opportunities:

It takes deliberate practice to achieve mastery.

As always, all the code we wrote in this chapter can be found on GitHub.

See you next time!


Zero To Production In Rust is a hands-on introduction to backend development in Rust.
Subscribe to the newsletter to be notified when a new episode is published.

Footnotes

Click to expand!
1

You might recall that we set the number of replicas to 1 in chapter 5 to reduce the bill while experimenting. Even if you are running a single replica there is load balancer between your users and your application. Deployments are still performed using a rolling update strategy.

2

This is true as long as the platform is also capable of automatically provisioning a new replica of the application when the number of healthy instances falls below a pre-determined threshold.

3

You could say that our token is a nonce.

4

Rust does not currently support asynchronous destructors, a.k.a. AsyncDrop. There have been some discussions on the topic, but there is no consensus yet. This is a constraint on sqlx: when Transaction goes out of scope it can enqueue a rollback operation, but it cannot execute it immediately! Is it ok? Is it a sound API? There are different views - see diesel's async issue for an overview. My personal view is that the benefits brought by sqlx to the table offset the risks, but you should make an informed decision taking into account the tradeoffs of your application and use case.

Book - Table Of Contents

Click to expand!

The Table of Contents is provisional and might change over time. The draft below is the most accurate picture at this point in time.

  1. Getting Started
    • Installing The Rust Toolchain
    • Project Setup
    • IDEs
    • Continuous Integration
  2. Our Driving Example
    • What Should Our Newsletter Do?
    • Working In Iterations
  3. Sign Up A New Subscriber
  4. Telemetry
    • Unknown Unknowns
    • Observability
    • Logging
    • Instrumenting /POST subscriptions
    • Structured Logging
  5. Go Live
    • We Must Talk About Deployments
    • Choosing Our Tools
    • A Dockerfile For Our Application
    • Deploy To DigitalOcean Apps Platform
  6. Rejecting Invalid Subscribers #1
    • Requirements
    • First Implementation
    • Validation Is A Leaky Cauldron
    • Type-Driven Development
    • Ownership Meets Invariants
    • Panics
    • Error As Values - Result
  7. Reject Invalid Subscribers #2
  8. Publish A Newsletter Issue
    • The Newsletter State Machine
    • /newsletters/draft
    • /newsletters/issue
  9. Securing Our API
    • Security Properties
    • Trasport Layer Security (TLS)
    • Authentication / Authorization
  10. Hardening Our Email Delivery Logic
    • Send Emails Asynchronously
  11. Monitoring
    • Metrics: Prometheus
    • Dashboards: Grafana
    • Alerts: AlertManager
  12. Performance
    • Micro-benchmarking
    • Load Testing