HTML forms, Databases, Integration tests

Zero To Production is a in-progress book on API development in Rust, published chapter by chapter on this blog. If you'd like to be notified when a new episode comes out you can subscribe to the mailing list:

Chapter #3.5

  1. Previously On Zero To Production
  2. Working with HTML forms
  3. Storing Data: Databases
  4. Next Up On Zero To Production

You can discuss the article on HackerNews or r/rust.

1. Previously On Zero To Production

Zero To Production focuses on the challenges of writing Cloud-native applications with Rust.
Zero To Production is built around the idea of problem-based learning: choose a problem you want to solve, let the problem drive the introduction of new concepts and techniques.
Our problem is building an email newsletter.

In the first part of Chapter 3 we covered a fair amount of ground - we set out to implement a /health_check endpoint and that gave us the opportunity to learn more about the fundamentals of our web framework, actix-web, as well as the basics of (integration) testing for Rust APIs.

It is now time to capitalise on what we learned to finally fulfill the first user story of our email newsletter project:

As a blog visitor,
I want to subscribe to the newsletter,
So that I can receive email updates when new content is published on the blog.

We expect our blog visitors to input their email address in a form embedded on a web page.
The form will trigger a POST /subscribtions call to our backend API that will actually process the information, store it and send back a response.

We will have to dig into:

If all goes well, we should be able to demo the subscription page at the end of the article.
Let's get started!

The source code of our email newsletter project is on GitHub!
If you haven't read the previous chapters yet (or you are not planning to) you can just get started from the code in the chapter03-0 folder.
The code for this chapter is in the chapter03-1 folder.

2. Working With HTML forms

2.1. Refining Our Requirements

What information should we collect from a visitor in order to enroll them as a subscriber of our email newsletter?

Well, we certainly need their email addresses (it is an email newsletter after all).
What else?

This would usually spark a conversation among the engineers on the team as well as the product manager in your typical business setup. In this case, we are both the technical leads and the product owners so we get to call the shots!

Speaking from personal experience, people generally use throwaway or masked emails when subscribing to newsletters (or, at least, most of you did when subscribing to Zero To Production!).
It would thus be nice to collect a name that we could use for our email greetings (the infamous Hey {{subscriber.name}}!) as well as to spot mutuals or people we know in the list of subscribers.
We are not cops, we have no interest in the name field being authentic - we will let people input whatever they feel like using as their identifier in our newsletter system: DenverCoder9, we welcome you.

It is settled then: we want an email address and a name for all new subscribers.

Given that the data is collected via a HTML form, it will be passed to our backend API in the body of a POST request. How is the body going to be encoded?
There are a few options available when using HTML forms: application/x-www-form-urlencoded is the most suitable to our usecase.
Quoting MDN web docs, with application/x-www-form-urlencoded

the keys and values [in our form] are encoded in key-value tuples separated by '&', with a '=' between the key and the value. Non-alphanumeric characters in both keys and values are percent encoded.

For example: if the name is Le Guin and the email is ursula_le_guin@gmail.com the POST request body should be name=le%20guin&email=ursula_le_guin%40gmail.com (spaces are replaced by %20 while @ becomes %40 - a reference conversion table can be found here).

To summarise:

2.2. Capturing Our Requirements As Tests

Now that we understand better what needs to happen, let's encode our expectations in a couple of integration tests.

Let's add the new tests to the existing tests/health_check.rs file - we will re-organise our test suite folder structure afterwards.

//! tests/health_check.rs
use zero2prod::run;
use std::net::TcpListener;

/// Spin up an instance of our application 
/// and returns its address (i.e. http://localhost:XXXX)
fn spawn_app() -> String {
    [...]
}

#[actix_rt::test]
async fn health_check_works() {
    [...]
}

#[actix_rt::test]
async fn subscribe_returns_a_200_for_valid_form_data() {
    // Arrange
    let app_address = spawn_app();
    let client = reqwest::Client::new();
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    // Act
    let response = client
        .post(&format!("{}/subscriptions", &app_address))
        .header("Content-Type", "application/x-www-form-urlencoded")
        .body(body)
        .send()
        .await
        .expect("Failed to execute request.");

    // Assert
    assert_eq!(200, response.status().as_u16());
}


#[actix_rt::test]
async fn subscribe_returns_a_400_when_data_is_missing() {
    // Arrange
    let app_address = spawn_app();
    let client = reqwest::Client::new();
    let test_cases = vec![
        ("name=le%20guin", "missing the email"),
        ("email=ursula_le_guin%40gmail.com", "missing the name"),
        ("", "missing both name and email")
    ];

    for (invalid_body, error_message) in test_cases {
        // Act
        let response = client
            .post(&format!("{}/subscriptions", &app_address))
            .header("Content-Type", "application/x-www-form-urlencoded")
            .body(invalid_body)
            .send()
            .await
            .expect("Failed to execute request.");

        // Assert
        assert_eq!(
            400,
            response.status().as_u16(),
            // Additional customised error message on test failure
            "The API did not fail with 400 Bad Request when the payload was {}.",
            error_message
        );
    }
}

subscribe_returns_a_400_when_data_is_missing is an example of table-driven test also known as parametrised test.
It is particularly helpful when dealing with bad inputs - instead of duplicating test logic several times we can simply run the same assertion against a collection of known invalid bodies that we expect to fail in the same way.
With parametrised tests it is important to have good error messages on failures: assertion failed on line XYZ is not great if you cannot tell which specific input is broken! On the flip side, that parametrised test is covering a lot of ground so it makes sense to invest a bit more time in generating a nice failure message.
Test frameworks in other languages sometimes have native support for this testing style (e.g. parametrised tests in pytest or InlineData in xUnit for C#) - there are a few crates in the Rust ecosystem that extend the basic test framework with similar features, but unfortunately they do not interop very well with the #[actix_rt::test] macro that we need to write asynchronous tests idiomatically (see rstest or test-case).

Let's run our test suite now:

---- health_check::subscribe_accepts_valid_form_data stdout ----
thread 'health_check::subscribe_accepts_valid_form_data' 
panicked at 'assertion failed: `(left == right)`
  left: `200`,
 right: `404`: 

---- health_check::subscribe_returns_a_400_when_data_is_missing stdout ----
thread 'health_check::subscribe_returns_a_400_when_data_is_missing' 
panicked at 'assertion failed: `(left == right)`
  left: `400`,
 right: `404`: 
 The API did not fail with 400 Bad Request when the payload was missing the email.'

As expected, all our new tests are failing.
You can immediately spot a limitation of "roll-your-own" parametrised tests: as soon as one test case fails, the execution stops and we do not know the outcome for the following tests cases.

Let's get started on the implementation.

2.3. Parsing Form Data From A POST Request

All tests are failing because the application returns a 404 NOT FOUND for POST requests hitting /subscriptions. Legitimate behaviour: we do not have a handler registered for that path.
Let's fix it by adding a matching route to our App in src/lib.rs:

//! src/lib.rs
use actix_web::dev::Server;
use actix_web::{web, App, HttpResponse, HttpServer};
use std::net::TcpListener;

async fn health_check() -> HttpResponse {
    HttpResponse::Ok().finish()
}

// Let's start simple: we always return a 200 OK
async fn subscribe() -> HttpResponse {
    HttpResponse::Ok().finish()
}

pub fn run(listener: TcpListener) -> Result<Server, std::io::Error> {
    let server = HttpServer::new(|| {
        App::new()
            .route("/health_check", web::get().to(health_check))
            // A new entry in our routing table for POST /subscriptions requests
            .route("/subscriptions", web::post().to(subscribe))
    })
    .listen(listener)?
    .run();
    Ok(server)
}

Running our test suite again:

running 3 tests
test health_check::health_check_works ... ok
test health_check::subscribe_accepts_valid_form_data ... ok
test health_check::subscribe_returns_a_400_when_data_is_missing ... FAILED

failures:

---- health_check::subscribe_returns_a_400_when_data_is_missing stdout ----
thread 'health_check::subscribe_returns_a_400_when_data_is_missing' 
panicked at 'assertion failed: `(left == right)`
  left: `400`,
 right: `200`: 
 The API did not fail with 400 Bad Request when the payload was missing the email.'

failures:
    health_check::subscribe_returns_a_400_when_data_is_missing

test result: FAILED. 2 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out

subscribe_accepts_valid_form_data now passes: well, our handler accepts all incoming data as valid, no surprises there.
subscribe_returns_a_400_when_data_is_missing, instead, is still red.
Time to do some real parsing on that request body. What does actix-web offer us?

2.3.1. Extractors

Quite prominent on actix-web's User Guide is the Extractors' section.
Extractors are used, as the name implies, to tell the framework to extract certain pieces of information from an incoming request.
actix-web provides several extractors out of the box to cater for the most common usecases:

Luckily enough, there is an extractor that serves exactly our usecase: Form.
Reading straight from its documentation:

Form data helper (application/x-www-form-urlencoded).
Can be used to extract url-encoded data from the request body, or send url-encoded data as the response.

That's music to my ears.
How do we use it?

Looking at actix-web's User Guide:

An extractor can be accessed as an argument to a handler function. Actix-web supports up to 10 extractors per handler function. Argument position does not matter.

Example:

use actix_web::web;

#[derive(serde::Deserialize)]
struct FormData {
    username: String,
}

/// Extract form data using serde.
/// This handler get called only if content type is *x-www-form-urlencoded*
/// and content of the request could be deserialized to a `FormData` struct
fn index(form: web::Form<FormData>) -> String {
    format!("Welcome {}!", form.username)
}

So, basically... you just slap it there as an argument of your handler and actix-web, when a request comes in, will somehow do the heavy-lifting for you. Let's ride along for now and we will circle back later to understand what is happening under the hood.

Our subscribe handler currently looks like this:

//! src/lib.rs
// Let's start simple: we always return a 200 OK
async fn subscribe() -> HttpResponse {
    HttpResponse::Ok().finish()
}

Using the example as a blueprint, we probably want something along these lines:

//! src/lib.rs
// [...]

#[derive(serde::Deserialize)]
struct FormData {
    email: String,
    name: String
}

async fn subscribe(_form: web::Form<FormData>) -> HttpResponse {
    HttpResponse::Ok().finish()
}

cargo check is not happy:

error[E0433]: failed to resolve: use of undeclared type or module `serde`
 --> src/lib.rs:9:10
  |
9 | #[derive(serde::Deserialize)]
  |          ^^^^^ use of undeclared type or module `serde`

Fair enough: we need to add serde to our dependencies. Let's add a new line to our Cargo.toml:

[dependencies]
# We need the optional `derive` feature to use `serde`'s procedural macros: 
# `#[derive(Serialize)]` and `#[derive(Deserialize)]`.
# The feature is not enabled by default to avoid pulling in 
# unnecessary dependencies for projects that do not need it.
serde = { version = "1", features = ["derive"]}

cargo check should succeed now. What about cargo test?

running 3 tests
test health_check_works ... ok
test subscribe_returns_a_200_for_valid_form_data ... ok
test subscribe_returns_a_400_when_data_is_missing ... ok

test result: ok. 3 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

They are all green!

But why?

2.3.2. Form and FromRequest

Let's go straight to the source: what does Form look like?
You can find its source code here.

The definition seems fairly innocent:

#[derive(PartialEq, Eq, PartialOrd, Ord)]
pub struct Form<T>(pub T);

It is nothing more than a wrapper: it is generic over a type T which is then used to populate Form's only field.
Not much to see here.
Where does the extraction magic take place?

An extractor is a type that implements the FromRequest trait.
FromRequest's definition is a bit noisy because Rust does not yet support async fn in trait definitions. Reworking it slightly, it boils down to something that looks more or less like this:

/// Trait implemented by types that can be extracted from request.
///
/// Types that implement this trait can be used with `Route` handlers.
pub trait FromRequest: Sized {
    type Error = Into<actix_web::Error>;

    async fn from_request(req: &HttpRequest, payload: &mut Payload) -> Result<Self, Self::Error>;
    
    /// Omitting some ancillary methods that actix-web implements 
    /// out of the box for you and supporting associated types
    ///  [...]
}

from_request takes as inputs the head of the incoming HTTP request (i.e. HttpRequest) and the bytes of its payload (i.e. Payload). It then returns Self, if the extraction succeeds, or an error type that can be converted into actix_web::Error.
All arguments in the signature of a route handler must implement the FromRequest trait: actix-web will invoke from_request for each argument and, if the extraction succeeds for all of them, it will then run the actual handler function.
If one of the extractions fails, the corresponding error is returned to the caller and the handler is never invoked (actix_web::Error can be converted to a HttpResponse).

This is extremely convenient: your handler does not have to deal with the raw incoming request and can instead work directly with strongly-typed information, significantly simplifying the code that you need to write to handle a request.

Let's look at Form's FromRequest implementation: what does it do?
Once again, I slightly reshaped the actual code to highlight the key elements and ignore the nitty-gritty implementation details.

impl<T> FromRequest for Form<T>
where
    T: DeserializeOwned + 'static,
{
    type Error = actix_web::Error;

    async fn from_request(req: &HttpRequest, payload: &mut Payload) -> Result<Self, Self::Error> {
        // Omitted stuff around extractor configuration (e.g. payload size limits)

        match UrlEncoded::new(req, payload).await {
            Ok(item) => Ok(Form(item)),
            // The error handler can be customised.
            // The default one will return a 400, which is what we want.
            Err(e) => Err(error_handler(e))
        }       
    }
}

All the heavy-lifting seems to be happening inside that UrlEncoded struct.
UrlEncoded does a lot: it transparently handles compressed and uncompressed payloads, it deals with the fact that the request body arrives a chunk at a time as a stream of bytes, etc.
The key passage, after all those things have been taken care of, is:

serde_urlencoded::from_bytes::<T>(&body).map_err(|_| UrlencodedError::Parse)

serde_urlencoded provides (de)serialisation support for the application/x-www-form-urlencoded data format.
from_bytes takes as input a contiguous slice of bytes and it deserialises an instance of type T from it according to rules of the URL-encoded format: the keys and values are encoded in key-value tuples separated by &, with a = between the key and the value; non-alphanumeric characters in both keys and values are percent encoded.

How does it know how to do it for a generic type T?
It is because T implements the DeserializedOwned trait from serde:

impl<T> FromRequest for Form<T>
where
    T: DeserializeOwned + 'static,
{
// [...]
}

To understand what is actually going under the hood we need to take a closer look at serde itself.

2.3.3. Serialisation in Rust: serde

Why do we need serde? What does serde actually do for us?
Quoting from its guide:

Serde is a framework for serializing and deserializing Rust data structures efficiently and generically.

2.3.3.1. Generically

serde does not, by itself, provide support for (de)serialisation from/to any specific data format: you will not find any code inside serde that deals with the specifics of JSON, Avro or MessagePack. If you need support for a specific data format, you need to pull in another crate (e.g. serde_json for JSON or avro-rs for Avro).
serde defines a set of interfaces or, as they themselves call it, a data model.

If you want to implement a library to support serialisation for a new data format, you have to provide an implementation of the Serializer trait.
Each method on the Serializer trait corresponds to one of the 29 types that form serde's data model - your implementation of Serializer specifies how each of those types maps to your specific data format.
For example, if you were adding support for JSON serialisation, your serialize_seq implementation would output an opening square bracket [ and return a type which can used to serialize sequence elements.1

On the other side, you have the Serialize trait: your implementation of Serialize::serialize for a Rust type is meant to specify how to decompose it according to serde's data model using the methods available on the Serializer trait.
Using again the sequence example, this how Serialize is implemented for a Rust vector:

use serde::ser::{Serialize, Serializer, SerializeSeq};

impl<T> Serialize for Vec<T>
where
    T: Serialize,
{
    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        let mut seq = serializer.serialize_seq(Some(self.len()))?;
        for element in self {
            seq.serialize_element(element)?;
        }
        seq.end()
    }
}

That is what allows serde to be agnostic with respect to data formats: once your type implements Serialize, you are then free to use any concrete implementation of Serializer to actually perform the serialisation step - i.e. you can serialize your type to any format for which there is an available Serializer implementation on crates.io (spoiler: almost all commonly used data formats).
The same is true for deserialisation, via Deserialize and Deserializer, with a few additional details around lifetimes to support zero-copy deserialisation.

2.3.3.2. Efficiently

What about speed?
Is serde slower due to the fact that it is generic over the underlying data formats?

No, thanks to a process called monomorphization.
Every time a generic function is called with a concrete set of types, the Rust compiler will create a copy of the function body replacing the generic type parameters with the concrete types. This allows the compiler to optimize each instance of the function body with respect to the concrete types involved: the result is no different from what we would have achieved writing down separate functions for each type, without using generics or traits. In other words, we do not pay any runtime costs for using generics2.

This concept is extremely powerful and it’s often referred to as zero-cost abstraction: using higher-level language constructs results in the same machine code you would have obtained with uglier/more "hand-rolled" implementations. We can therefore write code that is easier to read for a human (as it’s supposed be!) without having to compromise on the quality of the final artifact.

serde is also extremely careful when it comes to memory usage: the intermediate data model that we spoke about is implicitly defined via trait methods, there is no real intermediate serialised struct. If you want to learn more about it, Josh Mcguigan wrote an amazing deep-dive titled Understanding Serde.

It is also worth pointing out that all information required to (de)serialize a specific type for a specific data format are available at compile-time, there is no runtime overhead.
(De)serializers in other languages often leverage runtime reflection to fetch information about the type you want to (de)serialize (e.g. the list of their field names). Rust does not provide runtime reflection and everything has to be specified upfront.

2.3.3.3. Conveniently

This is where #[derive(Serialize)] and #[derive(Deserialize)] come into the picture.
You really do not want to spell out, manually, how serialisation should be performed for every single type defined in your project. It is tedious, error-prone and it takes time away from the application-specific logic that you are supposed to be focused on.

Those two procedural macros, bundled with serde behind the derive feature flag, will parse the definition of your type and automatically generate for you the right Serialize/Deserialize implementation.

2.3.4. Putting everything together

Given everything we learned so far, let's take a second look at our subscribe handler:

#[derive(serde::Deserialize)]
pub struct FormData {
    email: String,
    name: String,
}

// Let's start simple: we always return a 200 OK
async fn subscribe(_form: web::Form<FormData>) -> HttpResponse {
    HttpResponse::Ok().finish()
}

We now have a good picture of what is happening:

Take a moment to be amazed: it looks so deceptively simple, yet there is so much going on in there - we are leaning heavily on Rust's strength as well as some of the most polished crates in its ecosystem.

3. Storing Data: Databases

Our POST /subscriptions endpoint passes our tests but its usefulness is fairly limited: we are not storing valid emails and names anywhere.
There is no permanent record of the information that we collected from our HTML form.
How do we fix it?

When we defined what Cloud-native stands for we listed some of the emergent behaviour that we expect to see in our system: in particular, we want it to achieve high availability while running in a fault-prone environment. Our application is therefore forced to be distributed - there should be multiple instances of it running on multiple machines in order to survive hardware failures.

This has consequences when it comes to data persistence: we cannot rely on the filesystem of our host as a storage layer for incoming data.
Anything that we save on disk would only be available to one of the many replicas of our application3. Furthermore, it would probably disappear if the underlying host crashed.

This explains why Cloud-native applications are usually stateless: their persistence needs are delegated to specialised external systems - databases.

3.1. Choosing A Database

What database should we use for our newsletter project?

I will lay down my personal rule-of-thumb, which might sound controversial:

If you are uncertain about your persistence requirements, use a relational database.
If you have no reason to expect massive scale, use PostgreSQL.

The offering when it comes to databases has exploded in the last twenty years.
From a data-model perspective, the NoSQL movement has brought us document-stores (e.g. MongoDB), key-value stores (e.g. AWS DynamoDB), graph databases (e.g. Neo4J), etc.
We have databases that use RAM as their primary storage (e.g. Redis).
We have databases that are optimised for analytical queries via columnar storage (e.g. AWS RedShift).

There is a world of possibilities and you should definitely leverage this richness when designing systems.
Nonetheless, it is much easier to design yourself in a corner by using a specialised data storage solution when you still do not have a clear picture of the data access patterns used by your application.
Relational databases are reasonably good as jack-of-all-trades: they will often be a good choice when building the first version of your application, supporting you along the way while you explore the constraints of your domain.

Even when it comes to relational databases there is plenty of choice.
Alongside classics like PostgreSQL and MySQL you will find some exciting new entries like AWS Aurora, Google Spanner and CockroachDB.
What do they all have in common?
They are built to scale. Way beyond what traditional SQL databases were supposed to be able to handle.
If scale is a problem of yours, by all means, take a look there. If it isn't, you do not need to take onboard the additional complexity.

This is how we end up with PostgreSQL: a battle-tested piece of technology, widely supported across all cloud providers if you need a managed offering, opensource, exhaustive documentation, easy to run locally and in CI via Docker, well-supported within the Rust ecosystem.

3.2. Choosing A Database Crate

As of August 2020, there are three top-of-mind options when it comes to interacting with PostgreSQL in a Rust project:

All three are massively popular projects that have seen significant adoption with a fair share of production usage. How do you pick one?

It boils down to how you feel about three topics:

3.2.1. Compile-time Safety

When interacting with a relational database it is fairly easy to make mistakes - we might, for example,

The key question is: when do we realise we made a mistake?

In most programming languages, it will be at runtime: when we try to execute our query the database will reject it and we will get an error or an exception. This is what happens when using tokio-postgres.

diesel and sqlx try to speed up the feedback cycle by detecting at compile-time most of these mistakes.
diesel leverages its CLI to generate a representation of the database schema as Rust code, which is then used to check assumptions on all of your queries.
sqlx, instead, uses procedural macros to connect to a database at compile-time and check if the provided query is indeed sound4.

3.2.2. Query Interface

Both tokio-postgres and sqlx expect you to use SQL directly to write your queries.

diesel, instead, provides its own query builder: queries are represented as Rust types and you add filters, perform joins and similar operations by calling methods on them. This is often referred to with the name of Domain Specific Language (DSL).

Which one is better?
As always, it depends.

SQL is extremely portable - you can use it in any project where you have to interact with a relational database, regardless of the programming language or the framework the application is written with.
diesel's DSL, instead, is only relevant when working with diesel: you have to pay an upfront learning cost to become fluent with it and it only pays off if you stick to diesel for your current and future projects. It is also worth pointing out that expressing complex queries using diesel's DSL can be difficult and you might end up having to write raw SQL anyway.

On the flip side, diesel's DSL makes it easier to write reusable components: you can split your complex queries into smaller units and leverage them in multiple places, as you would do with a normal Rust function.

3.2.3. Async Support

I remember reading somewhere a killer explanation of async IO that more or less sounded like this:

Threads are for working in parallel, async is for waiting in parallel.

Your database is not sitting next to your application on the same physical machine host: to run queries you have to perform network calls.
An asynchronous database driver will not reduce how long it takes to process a single query, but it will enable your application to leverage all CPU cores to perform other meaningful work (e.g. serve another HTTP request) while waiting for the database to return results.

Is this a significant enough benefit to take onboard the additional complexity introduced by asynchronous code?
It depends on the performance requirements of your application.
Generally speaking, running queries on a separate threadpool should be more than enough for most usecases. At the same time, if your web framework is already asynchronous, using an asynchronous database driver will actually give you less headaches5.

Both sqlx and tokio-postgres provide an asynchronous interface, while diesel is synchronous and does not plan to roll out async support in the near future.

It is also worth mentioning that tokio-postgres is, at the moment, the only crate that supports query pipelining. The feature is still at the design stage for sqlx while I could not find it mentioned anywhere in diesel's docs or issue tracker.

3.2.4. Summary

Let's summarise everything we covered in a comparison matrix:

CrateCompile-time safetyQuery interfaceAsync
tokio-postgresNoSQLYes
sqlxYesSQLYes
dieselYesDSLNo

3.2.5. Our Pick: sqlx

For Zero To Production we will use sqlx: its asynchronous support simplifies the integration with actix-web without forcing us to compromise on compile-time guarantees. It also limits the API surface that we have to cover and become proficient with thanks to its usage of raw SQL for queries.

3.3. Integration Testing With Side-effects

What do we want to accomplish?
Let's look again at our "happy case" test:

//! tests/health_check.rs
// [...]

#[actix_rt::test]
async fn subscribe_returns_a_200_for_valid_form_data() {
    // Arrange
    let app_address = spawn_app();
    let client = reqwest::Client::new();
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    // Act
    let response = client
        .post(&format!("{}/subscriptions", &app_address))
        .header("Content-Type", "application/x-www-form-urlencoded")
        .body(body)
        .send()
        .await
        .expect("Failed to execute request.");

    // Assert
    assert_eq!(200, response.status().as_u16());
}

The assertion we have there is not enough.
We have no way to tell, just by looking at the API response, if the desired business outcome has been achieved - we are interested to know if a side-effect has taken place, i.e. data storage.

We want to check if the details of our new subscriber have actually been persisted.
How do we go about it?

We have two options:

  1. leverage another endpoint of our public API to inspect the application state;
  2. query directly the database in our test case.

Option 1 should be your go-to when possible: your tests remain oblivious to the implementation details of the API (e.g. the underlying database technology and its schema) and are therefore less likely to be disrupted by future refactorings.

Unfortunately we do not have any public endpoint on our API that allows us to verify if a subscriber exists.
We could add a GET /subscriptions endpoint to fetch the list of existing subscribers, but we would then have to worry about securing it: we do not want to have the names and emails of our subscribers exposed on the public internet without any form of authentication.
We will probably end up writing a GET /subscriptions endpoint down the line (i.e. we do not want to log into our production database to check the list of our subscribers), but we should not start writing a new feature just to test the one we are working on.

Let's bite the bullet and write a small query in our test. We will remove it down the line when a better testing strategy becomes available.

3.4. Database Setup

To run queries in our test suite we need:

3.4.1. Docker

To run Postgres we will use Docker - before launching our test suite we will launch a new Docker container using Postgres' official Docker image.
You can follow the instructions on Docker's website to install it on your machine.

Let's create a small bash script for it, scripts/init_db.sh, with a few knobs to customise Postgres' default settings:

#!/usr/bin/env bash
set -x
set -eo pipefail

# Check if a custom user has been set, otherwise default to 'postgres'
DB_USER=${POSTGRES_USER:=postgres}
# Check if a custom password has been set, otherwise default to 'password'
DB_PASSWORD="${POSTGRES_PASSWORD:=password}"
# Check if a custom password has been set, otherwise default to 'newsletter'
DB_NAME="${POSTGRES_DB:=newsletter}"
# Check if a custom port has been set, otherwise default to '5432'
DB_PORT="${POSTGRES_PORT:=5432}"

# Launch postgres using Docker
docker run \
  -e POSTGRES_USER=${DB_USER} \
  -e POSTGRES_PASSWORD=${DB_PASSWORD} \
  -e POSTGRES_DB=${DB_NAME} \
  -p "${DB_PORT}":5432 \
  -d postgres \
  postgres -N 1000
  # ^ Increased maximum number of connections for testing purposes

Let's make it executable:

chmod +x scripts/init_db.sh

We can then launch Postgres with

./scripts/init_db.sh

If you run docker ps you should see something along the lines of

IMAGE            PORTS                   STATUS 
postgres   126.0.0.1:5432->5432/tcp   Up 12 seconds   [...]

3.4.2. Database Migrations

To store our subscribers details we need to create our first table.
To add a new table to our database we need to change its schema - this is commonly referred to as a database migration.

3.4.2.1. sqlx-cli

The latest stable release of sqlx, 0.3.x, does not provide any tooling to manage migrations. A dedicated CLI, instead, is coming alongside their 0.4.0-beta.1 beta release.
We will therefore use sqlx's beta release: by the time I finish writing Zero To Production I expect it to be more than battle-tested.

We can install the CLI with

# We only support for Postgres
cargo install --version=0.1.0-beta.1 sqlx-cli --no-default-features --features postgres

Run sqlx --help to check that everything is working as expected.

3.4.2.2. Database Creation

The first command we will usually want to run is sqlx database create. According to the help docs:

sqlx-database-create
Creates the database specified in your DATABASE_URL

USAGE:
    sqlx database create

FLAGS:
    -h, --help       Prints help information
    -V, --version    Prints version information

In our case, this is not strictly necessary: our Postgres Docker instance already comes with a default database named newsletter, thanks to the settings we specified when launching it using environment variables. Nonetheless, you will have to go through the creation step in your CI pipeline and in your production environment, so worth covering it anyway.

As the help docs imply, sqlx database create relies on the DATABASE_URL environment variable to know what to do.
DATABASE_URL is expected to be a valid Postgres connection string - the format is as follows:

postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:${DB_PORT}/${DB_NAME}

We can therefore add a couple more lines to our scripts/init_db.sh script:

# [...]

export DATABASE_URL=postgres://${DB_USER}:${DB_PASSWORD}@localhost:${DB_PORT}/${DB_NAME}
sqlx database create

You might run into an annoying issue from time to time: the Postgres container will not be ready to accept connections when we try to run sqlx database create.
It happened to me often enough to look for a workaround: we need to wait for Postgres to be healthy before starting to run commands against it. Let's update our script to:

#!/usr/bin/env bash
set -x
set -eo pipefail

DB_USER=${POSTGRES_USER:=postgres}
DB_PASSWORD="${POSTGRES_PASSWORD:=password}"
DB_NAME="${POSTGRES_DB:=newsletter}"
DB_PORT="${POSTGRES_PORT:=5432}"

docker run \
  -e POSTGRES_USER=${DB_USER} \
  -e POSTGRES_PASSWORD=${DB_PASSWORD} \
  -e POSTGRES_DB=${DB_NAME} \
  -p "${DB_PORT}":5432 \
  -d postgres \
  postgres -N 1000

# Keep pinging Postgres until it's ready to accept commands
until PGPASSWORD="${DB_PASSWORD}" psql -h "localhost" -U "${DB_USER}" -p "${DB_PORT}" -d "postgres" -c '\q'; do
  >&2 echo "Postgres is still unavailable - sleeping"
  sleep 1
done

>&2 echo "Postgres is up and running on port ${DB_PORT}!"

export DATABASE_URL=postgres://${DB_USER}:${DB_PASSWORD}@localhost:${DB_PORT}/${DB_NAME}
sqlx database create

Problem solved!
The health check uses psql, the command line client for Postgres. Check these instructions on how to install it on your OS.

3.4.2.3. Adding A Migration

Let's create our first migration now with

sqlx migrate add create_subscriptions_table

A new top-level directory should have now appeared in your project - migrations. This is where all migrations for our project will be stored by sqlx's CLI.
Under migrations you should already have one file called {timestamp}_create_subscriptions_table.sql - this is where we have to write the SQL code for our first migration.

Let's quickly sketch the query we need:

-- migrations/{timestamp}_create_subscriptions_table.sql
-- Create Subscriptions Table
CREATE TABLE subscriptions(
   id uuid NOT NULL,
   PRIMARY KEY (id),
   email TEXT NOT NULL UNIQUE,
   name TEXT NOT NULL,
   subscribed_at timestamptz NOT NULL
);

There is a endless debate when it comes to primary keys: some people prefer to use columns with a business meaning (e.g. email, a natural key), others feel safer with a synthetic key without any business meaning (e.g. id, a randomly generated UUID, a surrogate key).
I generally default to a synthetic identifier unless I have a very compelling reason not to - feel free to disagree with me here.

A couple of other things to make a note of:

Database constraints are useful as a last line of defence from application bugs but they come at a cost - the database has to ensure all checks pass before writing new data into the table. Therefore constraints impact our write-throughput, i.e. the number of rows we can INSERT per unit of time in a table.
UNIQUE, in particular, introduces an additional B-tree index on our email column: the index has to be updated on every INSERT/UPDATE/DELETE query and it takes space on disk.

In our specific case, I would not be too worried: our mailing list would have to incredibly popular for us to encounter issues with our write throughput. Definitely a good problem to have, if it comes to that.

3.4.2.4. Running Migrations

We can run migrations against our database with

sqlx migrate run

It has the same behaviour of sqlx database create - it will look at the DATABASE_URL environment variable to understand what database needs to be migrated.

Let's add it to our scripts/init_db.sh script:


#!/usr/bin/env bash
set -x
set -eo pipefail

DB_USER=${POSTGRES_USER:=postgres}
DB_PASSWORD="${POSTGRES_PASSWORD:=password}"
DB_NAME="${POSTGRES_DB:=newsletter}"
DB_PORT="${POSTGRES_PORT:=5432}"

# Allow to skip Docker if a dockerized Postgres database is already running
if [[ -z "${SKIP_DOCKER}" ]]
then
  docker run \
      -e POSTGRES_USER=${DB_USER} \
      -e POSTGRES_PASSWORD=${DB_PASSWORD} \
      -e POSTGRES_DB=${DB_NAME} \
      -p "${DB_PORT}":5432 \
      -d postgres \
      postgres -N 1000
fi

until PGPASSWORD="${DB_PASSWORD}" psql -h "localhost" -U "${DB_USER}" -p "${DB_PORT}" -d "postgres" -c '\q'; do
  >&2 echo "Postgres is still unavailable - sleeping"
  sleep 1
done

>&2 echo "Postgres is up and running on port ${DB_PORT} - running migrations now!"

export DATABASE_URL=postgres://${DB_USER}:${DB_PASSWORD}@localhost:${DB_PORT}/${DB_NAME}
sqlx database create
sqlx migrate run

>&2 echo "Postgres has been migrated, ready to go!"

We have put the docker run command behind a SKIP_DOCKER flag to make it easy to run migrations against an existing Postgres instance without having to tear it down manually and re-create it with scripts/init_db.sh. It will also be useful in CI, if Postgres is not spinned up by our script.

We can now migrate the database with

SKIP_DOCKER=true ./scripts/init_db.sh

You should be able to spot, in the output, something like

+ sqlx migrate run
20200823135036/migrate create subscriptions table (7.563944ms)

If you check your database using your favourite graphic interface for Postgres you will now see a subscriptions table alongside a brand new _sqlx_migrations table: this is where sqlx keeps track of what migrations have been run against your database - it should contain a single row now for our create_subscriptions_table migration.

3.5. Writing Our First Query

We have a migrated database up and running. How do we talk to it?

3.5.1. Sqlx Features Flags

We installed sqlx-cli, but we have actually not yet added sqlx itself as a dependency of our application.
Let's append a new line to our Cargo.toml:

[dependencies]
# [...]
sqlx = { version = "0.4.0-beta.1", default-features = false, features = [ "runtime-tokio", "macros", "postgres", "uuid", "chrono", "migrate"] }

Yeah, there are a lot of feature flags. Let's go through all of them one by one:

These should be enough for what we need to do in this chapter.

3.5.2. Configuration Management

The simplest entrypoint to connect to a Postgres database is PgConnection.
PgConnection implements the Connection trait which provides us with a connect method: it takes as input a connection string and returns us, asynchronously, a Result<PostgresConnection, sqlx::Error>.

Where do we get a connection string?

We could hard-code one in our application and then use it for our tests as well.
Or we could choose to introduce immediately some basic mechanism of configuration management.

It is simpler than it sounds and it will save us the cost of tracking down a bunch of hard-coded values across the whole application.
The config crate is Rust's swiss-army knife when it comes to configuration: it supports multiple file formats and it lets you combine different sources hierarchically (e.g. environment variables, configuration files, etc.) to easily customise the behaviour of your application for each deployment environment.

We do not need anything fancy for the time being: a single configuration file will do.

3.5.2.1. Making Space

Right now all our application code lives in a single file, lib.rs.
Let's quickly split it into multiple sub-modules to avoid chaos now that we are adding new functionality. We want to land on this folder structure:

src/
  configuration.rs
  lib.rs
  main.rs
  routes/
    mod.rs
    health_check.rs
    subscriptions.rs
  startup.rs

Our lib.rs file becomes

//! src/lib.rs
pub mod configuration;
pub mod routes;
pub mod startup;

startup.rs will host our run function, health_check goes into routes/health_check.rs, subscribe and FormData into routes/subscriptions.rs, configuration.rs starts empty. Both handlers are re-exported in routes/mod.rs:

//! src/routes/mod.rs
mod health_check;
mod subscriptions;

pub use health_check::*;
pub use subscriptions::*;

You might have to add a few pub visibility modifiers here and there, as well as performing a few corrections to use statements in main.rs and tests/health_check.rs.

Make sure cargo test comes out green before moving forward.

3.5.2.2. Reading A Configuration File

To manage configuration with config we must represent our application settings as a Rust type that implements serde's Deserialize trait.
Let's create a new Settings struct:

//! src/configuration.rs
#[derive(serde::Deserialize)]
pub struct Settings {}

We have two groups of configuration values at the moment:

Let's add a field for each of them to Settings:

//! src/configuration.rs
#[derive(serde::Deserialize)]
pub struct Settings {
    pub database: DatabaseSettings,
    pub application_port: u16
}

#[derive(serde::Deserialize)]
pub struct DatabaseSettings {
    pub username: String,
    pub password: String,
    pub port: u16,
    pub host: String,
    pub database_name: String,
}

We need #[derive(serde::Deserialize)] on top of DatabaseSettings otherwise the compiler will complain with

error[E0277]: the trait bound 
`configuration::DatabaseSettings: configuration::_::_serde::Deserialize<'_>` 
is not satisfied
 --> src/configuration.rs:3:5
  |
3 |     pub database: DatabaseSettings,
  |     ^^^ the trait `configuration::_::_serde::Deserialize<'_>` 
  |         is not implemented for `configuration::DatabaseSettings`
  |
  = note: required by `configuration::_::_serde::de::SeqAccess::next_element`

It makes sense: all fields in a type have to be deserialisable in order for the type as a whole to be deserialisable.

We have our configuration type, what now?

First of all, let's add config to our dependencies with

cargo add config

We want to read our application settings from a configuration file named configuration:

//! src/configuration.rs
// [...]

pub fn get_configuration() -> Result<Settings, config::ConfigError> {
    // Initialise our configuration reader
    let mut settings = config::Config::default();

    // Add configuration values from a file named `configuration`.
    // It will look for any top-level file with an extension
    // that `config` knows how to parse: yaml, json, etc.
    settings.merge(config::File::with_name("configuration"))?;

    // Try to convert the configuration values it read into
    // our Settings type
    settings.try_into()
}

Let's modify our main function to read configuration as its first step:

//! src/main.rs
use std::net::TcpListener;
use zero2prod::startup::run;
use zero2prod::configuration::get_configuration;

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    // Panic if we can't read configuration
    let configuration = get_configuration().expect("Failed to read configuration.");
    // We have removed the hard-coded `8000` - it's now coming from our settings!
    let address = format!("127.0.0.1:{}", configuration.application_port);
    let listener = TcpListener::bind(address)?;
    run(listener)?.await
}

If you try to launch the application with cargo run it should crash:

Running `target/debug/zero2prod`

thread 'main' panicked at 'Failed to read configuration.: 
configuration file "configuration" not found', src/main.rs:7:25

note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Panic in Arbiter thread.

Let's fix it by adding a configuration file.
We can use any file format for it, as long as config knows how to deal with it: we will go for YAML.

# configuration.yaml
application_port: 8000
database:
  host: "localhost"
  port: 5432
  username: "postgres"
  password: "password"
  database_name: "newsletter"

cargo run should now execute smoothly.

3.5.3. Connecting To Postgres

PgConnection::connect wants a single connection string as input, while DatabaseSettings provides us with granular access to all the connection parameters. Let's add a convenient connection_string method to do it:

impl DatabaseSettings {
    pub fn connection_string(&self) -> String {
        format!(
            "postgres://{}:{}@{}:{}/{}",
            self.username, self.password, self.host, self.port, self.database_name
        )
    }
}

We are finally ready to connect!
Let's tweak our happy case test:

//! tests/health_check.rs
use sqlx::{PgConnection, Connection};
// [...]

#[actix_rt::test]
async fn subscribe_returns_a_200_for_valid_form_data() {
    // Arrange
    let app_address = spawn_app();
    let configuration = get_configuration().expect("Failed to read configuration");
    let connection_string = configuration.database.connection_string();
    let connection = PgConnection::connect(&connection_string)
        .await
        .expect("Failed to connect to Postgres.");
    let client = reqwest::Client::new();
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    // Act
    let response = client
        .post(&format!("{}/subscriptions", &app_address))
        .header("Content-Type", "application/x-www-form-urlencoded")
        .body(body)
        .send()
        .await
        .expect("Failed to execute request.");

    // Assert
    assert_eq!(200, response.status().as_u16());
}

And... cargo test works!
We just confirmed that we can successfully connect to Postgres from our tests!
A small step for the world, a huge leap forward for us.

3.5.4. Our Test Assertion

Now that we are connected, we can finally write the test assertions we have been dreaming about for the past 10 pages.
We will use sqlx's query! macro:

#[actix_rt::test]
async fn subscribe_returns_a_200_for_valid_form_data() {
    // [...]
    // The connection has to be marked as mutable!
    let mut connection = ...

    // Assert
    assert_eq!(200, response.status().as_u16());

    let saved = sqlx::query!("SELECT email, name FROM subscriptions",)
        .fetch_one(&mut connection)
        .await
        .expect("Failed to fetch saved subscription.");

    assert_eq!(saved.email, "ursula_le_guin@gmail.com");
    assert_eq!(saved.name, "le guin");
}

What is the type of saved? The query! macro returns an anonymous record type: a struct definition is generated at compile-time after having verified that the query is valid, with a member for each column on the result (i.e. save.email for the email column).

If we try to run cargo test we will get an error:

error: `DATABASE_URL` must be set to use query macros
  --> tests/health_check.rs:59:17
   |
59 |     let saved = sqlx::query!("SELECT email, name FROM subscriptions",)
   |                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |
   = note: this error originates in a macro (in Nightly builds, 
     run with -Z macro-backtrace for more info)

As we discussed before, sqlx reaches out to Postgres at compile-time to check that queries are well-formed. Just like sqlx-cli commands, it relies on the DATABASE_URL environment variable to know where to find the database.
We could export DATABASE_URL manually, but we would then run in the same issue everytime we boot our machine and start working on this project. Let's take the advice of sqlx's authors - we'll add a top-level .env file

DATABASE_URL="postgres://postgres:password@localhost:5432/newsletter"

sqlx will read DATABASE_URL from it and save us the hassle of re-exporting the environment variable every single time.
It feels a bit dirty to have the database connection parameters in two places (.env and configuration.yaml), but it is not a major problem: configuration.yaml can be used to alter the runtime behaviour of the application after it has been compiled, while .env is only relevant for our development process, build and test steps.

Let's try to run cargo test again:

running 3 tests
test health_check_works ... ok
test subscribe_returns_a_400_when_data_is_missing ... ok
test subscribe_returns_a_200_for_valid_form_data ... FAILED

failures:

---- subscribe_returns_a_200_for_valid_form_data stdout ----
thread 'subscribe_returns_a_200_for_valid_form_data' panicked at 
'Failed to fetch saved subscription.: RowNotFound', tests/health_check.rs:59:17

failures:
    subscribe_returns_a_200_for_valid_form_data

It failed, which is exactly what we wanted!
We can now focus on patching the application to turn it green.

3.6. Persisting A New Subscriber

Just as we wrote a SELECT query to inspect what subscriptions had been persisted to the database in our test, we now need to write an INSERT query to actually store the details of a new subscriber when we receive a valid POST /subscriptions request.

Let's have a look at our request handler:

//! src/routes/subscriptions.rs 
use actix_web::{web, HttpResponse};

#[derive(serde::Deserialize)]
pub struct FormData {
    email: String,
    name: String,
}

// Let's start simple: we always return a 200 OK
pub async fn subscribe(_form: web::Form<FormData>) -> HttpResponse {
    HttpResponse::Ok().finish()
}

To execute a query within subscribe we need to get our hands on a database connection.
Let's see figure out how to get one.

3.6.1. Application State In Actix-web

So far our application has been entirely stateless: our handlers work solely with the data from the incoming request.
actix-web gives us the possibility to attach to the application other pieces of data that are not related to the lifecycle of a single incoming request - the so-called application state.

You can add information to the application state using two methods on App: data and app_data.
Handlers can then access the application state using the web::Data extractor.

Let's try to use data to register a PgConnection as part of our application state. We need to modify our run method to accept a PgConnection alongside the TcpListener:

use crate::routes::{health_check, subscribe};
use actix_web::dev::Server;
use actix_web::{web, App, HttpServer};
use sqlx::PgConnection;
use std::net::TcpListener;

pub fn run(listener: TcpListener, connection: PgConnection) -> Result<Server, std::io::Error> {
    let server = HttpServer::new(|| {
        App::new()
            .route("/health_check", web::get().to(health_check))
            .route("/subscriptions", web::post().to(subscribe))
            // Register the connection as part of the application state
            .data(connection)
    })
    .listen(listener)?
    .run();
    Ok(server)
}

cargo check is screaming at us:

error[E0277]: the trait bound `PgConnection: std::clone::Clone` 
is not satisfied in `[closure@src/startup.rs:8:34: 13:6 PgConnection]`
  --> src/startup.rs:8:18
   |
8  |       let server = HttpServer::new(|| {
   |  __________________^^^^^^^^^^^^^^^_-
   | |                  |
   | |                  within `[closure@src/startup.rs:8:34: 13:6 PgConnection]`, 
   | |                  the trait `std::clone::Clone` is not implemented for `PgConnection`
9  | |         App::new()
10 | |             .route("/health_check", web::get().to(health_check))
11 | |             .route("/subscriptions", web::post().to(subscribe))
12 | |             .data(connection)
13 | |     })
   | |_____- within this `[closure@src/startup.rs:8:34: 13:6 PgConnection]`
   |
   = note: required because it appears within the type `[closure@src/startup.rs:8:34: 13:6 PgConnection]`
   = note: required by `actix_web::server::HttpServer::<F, I, S, B>::new`

error[E0277]: the trait bound `PgConnection: std::clone::Clone` 
is not satisfied in `[closure@src/startup.rs:8:34: 13:6 PgConnection]`
  --> src/startup.rs:8:18
   |
8  |        let server = HttpServer::new(|| {
   |   __________________^_______________-
   |  |__________________|
   | ||
9  | ||         App::new()
10 | ||             .route("/health_check", web::get().to(health_check))
11 | ||             .route("/subscriptions", web::post().to(subscribe))
12 | ||             .data(connection)
13 | ||     })
   | ||_____- within this `[closure@src/startup.rs:8:34: 13:6 ::PgConnection]`
14 | |      .listen(listener)?
   | |_______________________^ within `[closure@src/startup.rs:8:34: 13:6 ::PgConnection]`, 
   |                           the trait `std::clone::Clone` is not implemented for `PgConnection`
   |
   |
56 |        F: Fn() -> I + Send + Clone + 'static,
   |                              ----- required by this bound in `actix_web::server::HttpServer`
   |
   = note: required because it appears within the type `[closure@src/startup.rs:8:34: 13:6 PgConnection]`

HttpServer expects PgConnection to be cloneable, which unfortunately is not the case.
Why does it need to implement Clone in the first place though?

3.6.2. Actix-web Workers

Let's zoom in on our invocation of HttpServer::new:

let server = HttpServer::new(|| {
    App::new()
        .route("/health_check", web::get().to(health_check))
        .route("/subscriptions", web::post().to(subscribe))
})

HttpServer::new does not take App as argument - it wants a closure that returns an App struct.
This is to support actix-web's runtime model: actix-web will spin up a worker process for each available core on your machine.
Each worker runs its own copy of the application built by HttpServer calling the very same closure that HttpServer::new takes as argument.

That is why connection has to be cloneable - we need to have one for every copy of App.
But, as we said, PgConnection does not implement Clone because it sits on top of a non-cloneable system resource, a TCP connection with Postgres. What do we do?

We can wrap our connection in an atomic reference counter point, an Arc: each instance of the application, instead of getting a raw copy of a PgConnection, will get a pointer to one.
Arc<T> is always cloneable, no matter who T is: cloning an Arc increments the number of active references and hands over a new copy of the memory address of the wrapped value.

Let's give it a try:

//! src/startup.rs
use crate::routes::{health_check, subscribe};
use actix_web::dev::Server;
use actix_web::{web, App, HttpServer};
use sqlx::PgConnection;
use std::net::TcpListener;
use std::sync::Arc;

pub fn run(listener: TcpListener, connection: PgConnection) -> Result<Server, std::io::Error> {
    // Wrap the connection in an Arc smart pointer
    let connection = Arc::new(connection);
    // Capture `connection` from the surrounding environment
    let server = HttpServer::new(move || {
        App::new()
            .route("/health_check", web::get().to(health_check))
            .route("/subscriptions", web::post().to(subscribe))
            // Get a pointer copy and attach it to the application state
            .data(connection.clone())
    })
    .listen(listener)?
    .run();
    Ok(server)
}

It doesn't compile yet, but we just need to do a bit of house-keeping:

error[E0061]: this function takes 2 arguments but 1 argument was supplied
  --> src/main.rs:11:5
   |
11 |     run(listener)?.await
   |     ^^^ -------- supplied 1 argument
   |     |
   |     expected 2 arguments

Let's fix the issue real quick:

//! src/main.rs
use zero2prod::configuration::get_configuration;
use zero2prod::startup::run;
use sqlx::{Connection, PgConnection};
use std::net::TcpListener;

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    let configuration = get_configuration().expect("Failed to read configuration.");
    let connection = PgConnection::connect(&configuration.database.connection_string())
        .await
        .expect("Failed to connect to Postgres.");
    let address = format!("127.0.0.1:{}", configuration.application_port);
    let listener = TcpListener::bind(address)?;
    run(listener, connection)?.await
}

Perfect, it compiles.

3.6.3. The Data Extractor

We can now get our hands on an Arc<PgConnection> in our request handler, subscribe, using the web::Data extractor:

//! src/routes/subscriptions.rs
use actix_web::{web, HttpResponse};
use sqlx::PgConnection;
use std::sync::Arc;

#[derive(serde::Deserialize)]
pub struct FormData {
    email: String,
    name: String,
}

pub async fn subscribe(
    _form: web::Form<FormData>,
    // Retrieving a connection from the application state!
    _connection: web::Data<Arc<PgConnection>>,
) -> HttpResponse {
    HttpResponse::Ok().finish()
}

We called Data an extractor, but what is it extracting a PgConnection from?
actix-web uses a type-map to represent its application state: a HashMap that stores arbitrary data (using the Any type) against their unique type identifier (obtained via TypeId::of).

web::Data, when a new request comes in, computes the TypeId of the type you specified in the signature (in our case Arc<PgConnection>) and checks if there is a record corresponding to it in the type-map. If it there is one, it casts the retrieved Any value to the type you specified (TypeId is unique, nothing to worry about) and passes it to your handler.
It is an interesting technique to perform what in other language ecosystems might be referred to as dependency injection.

3.6.4. The INSERT query

We finally have a connection in subscribe: let's try to persist the details of our new subscriber.
We will use again the query! macro that we leveraged in our happy-case test.

//! src/routes/subscriptions.rs
use actix_web::{web, HttpResponse};
use chrono::Utc;
use sqlx::PgConnection;
use std::sync::Arc;
use uuid::Uuid;
use std::ops::Deref;

#[derive(serde::Deserialize)]
pub struct FormData {
    email: String,
    name: String,
}

pub async fn subscribe(
    form: web::Form<FormData>,
    connection: web::Data<Arc<PgConnection>>,
) -> Result<HttpResponse, HttpResponse> {
    sqlx::query!(
        r#"
        INSERT INTO subscriptions (id, email, name, subscribed_at)
        VALUES ($1, $2, $3, $4)
        "#,
        Uuid::new_v4(),
        form.email,
        form.name,
        Utc::now()
    )
    // There is a bit of cerenomy here to get our hands on a &PgConnection.
    // web::Data<Arc<PgConnection>> is equivalent to Arc<Arc<PgConnection>>
    // Therefore connection.get_ref() returns a &Arc<PgConnection> 
    // which we can then deref to a &PgConnection.
    // We could have avoided the double Arc wrapping using .app_data()
    // instead of .data() in src/startup.rs
    .execute(connection.get_ref().deref())
    .await
    .map_err(|e| {
        eprintln!("Failed to execute query: {}", e);
        HttpResponse::InternalServerError().finish()
    })?;
    Ok(HttpResponse::Ok().finish())
}

Let's unpack what is happening:

We have to add two new dependencies as well to our Cargo.toml to fix the obvious compiler errors:

[dependencies]
# [...]
uuid = { version = "0.8.1", features = ["v4"] }
chrono = "0.4.15"

What happens if we try to compile it again?

error[E0277]: the trait bound `&PgConnection: sqlx_core::executor::Executor<'_>` is not satisfied
  --> src/routes/subscriptions.rs:29:14
   |
29 |     .execute(connection.get_ref().deref())
   |              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 
   |              the trait `sqlx_core::executor::Executor<'_>` is not implemented for `&PgConnection`
   |
   = help: the following implementations were found:
             <&'c mut PgConnection as sqlx_core::executor::Executor<'c>>
   = note: `sqlx_core::executor::Executor<'_>` is implemented for `&mut PgConnection`, 
            but not for `&PgConnection`

error: aborting due to previous error

execute wants an argument that implements sqlx's Executor trait and it turns out, as we should have probably remembered from the query we wrote in our test, that &PgConnection does not implement Executor - only &mut PgConnection does.
Why is that the case?

sqlx has an asynchronous interface, but it does not allow you to run multiple queries concurrently over the same database connection.
Requiring a mutable reference allows them to enforce this guarantee in their API. You can think of a mutable reference as a unique reference: the compiler guarantees to execute that they have indeed exclusive access to that PgConnection because there cannot be two active mutable references to the same value at the same time in the whole program. Quite neat.

Nonetheless it might look like we designed ourselves in a corner: web::Data will never give us mutable access to the application state.
We could leverage interior mutability - e.g. putting our PgConnection behind a lock (e.g. a Mutex)) would allow us to synchronise access to the underlying TCP socket and get a mutable reference to the wrapped connection once the lock has been acquired.
We could make it work, but it would not be ideal: we would be basically be constrained to run at most one query at a time. Not great.

Let's take a second look at the documentation for sqlx's Executor trait: what else implements Executor apart from &mut PgConnection?
Bingo: a shared reference to PgPool.

PgPool is a pool of connections to a Postgres database. How does it bypass the concurrency issue that we just discussed for PgConnection?
There is still interior mutability at play, but of a different kind: when you run a query against a &PgPool, sqlx will borrow a PgConnection from the pool and use it to execute the query; if no connection is available, it will create a new one or wait until one frees up.
This increases the number of concurrent queries that our application can run and it also improves its resiliency: a single slow query will not impact the performance of all incoming requests by creating contention on the connection lock.

Let's refactor run, main and subscribe to work with a PgPool instead of a single PgConnection:

//! src/main.rs
use zero2prod::configuration::get_configuration;
use zero2prod::startup::run;
use sqlx::PgPool;
use std::net::TcpListener;

#[actix_rt::main]
async fn main() -> std::io::Result<()> {
    let configuration = get_configuration().expect("Failed to read configuration.");
    let connection = PgPool::connect(&configuration.database.connection_string())
        .await
        .expect("Failed to connect to Postgres.");
    let address = format!("127.0.0.1:{}", configuration.application_port);
    let listener = TcpListener::bind(address)?;
    run(listener, connection)?.await
}
//! src/startup.rs
use crate::routes::{health_check, subscribe};
use actix_web::dev::Server;
use actix_web::{web, App, HttpServer};
use sqlx::PgPool;
use std::net::TcpListener;
use std::sync::Arc;

pub fn run(listener: TcpListener, db_pool: PgPool) -> Result<Server, std::io::Error> {
    // Wrap the pool in an Arc smart pointer
    let db_pool = web::Data::new(db_pool);
    let server = HttpServer::new(move || {
        App::new()
            .route("/health_check", web::get().to(health_check))
            .route("/subscriptions", web::post().to(subscribe))
            // Our pool is already wrapped in an Arc pointer: 
            // using .data would add another Arc pointer on top 
            // of the existing one - an unnecessary indirection.
            // .app_data instead does not perform an additional layer of wrapping.
            .app_data(db_pool.clone())
    })
    .listen(listener)?
    .run();
    Ok(server)
}
//! src/routes/subscriptions.rs
use actix_web::{web, HttpResponse};
use chrono::Utc;
use sqlx::PgPool;
use uuid::Uuid;

#[derive(serde::Deserialize)]
pub struct FormData {
    email: String,
    name: String,
}

pub async fn subscribe(
    form: web::Form<FormData>,
    connection: web::Data<PgPool>,
) -> Result<HttpResponse, HttpResponse> {
    sqlx::query!(
        r#"
        INSERT INTO subscriptions (id, email, name, subscribed_at)
        VALUES ($1, $2, $3, $4)
        "#,
        Uuid::new_v4(),
        form.email,
        form.name,
        Utc::now()
    )
    // We got rid of the double-wrapping using .app_data()
    .execute(connection.get_ref())
    .await
    .map_err(|e| {
        println!("Failed to execute query: {}", e);
        HttpResponse::InternalServerError().finish()
    })?;
    Ok(HttpResponse::Ok().finish())
}

The compiler is happy: cargo check completes successfully.
The same cannot be said for cargo test:

error[E0061]: this function takes 2 arguments but 1 argument was supplied
  --> tests/health_check.rs:10:18
   |
10 |     let server = run(listener).expect("Failed to bind address");
   |                  ^^^ -------- supplied 1 argument
   |                  |
   |                  expected 2 arguments

error: aborting due to previous error

3.7. Updating Our Tests

The error is in our spawn_app helper function:

//! tests/health_check.rs
use zero2prod::startup::run;
use std::net::TcpListener;
// [...]

fn spawn_app() -> String {
    let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port");
    // We retrieve the port assigned to us by the OS
    let port = listener.local_addr().unwrap().port();
    let server = run(listener).expect("Failed to bind address");
    let _ = tokio::spawn(server);
    // We return the application address to the caller!
    format!("http://127.0.0.1:{}", port)
}

We need to pass a connection pool to run.
Given that we are then going to need that very same connection pool in subscribe_returns_a_200_for_valid_form_data to perform our SELECT query, it makes to generalise a bit spawn_app: instead of returning a raw String, we will give the caller a struct, TestApp. TestApp will hold both the address of our test application instance and a handle to the connection pool, simplifying the arrange steps in our test cases.

//! tests/health_check.rs
use zero2prod::configuration::get_configuration;
use zero2prod::startup::run;
use sqlx::PgPool;
use std::net::TcpListener;

pub struct TestApp {
    pub address: String,
    pub db_pool: PgPool,
}

// The function is asynchronous now!
async fn spawn_app() -> TestApp {
    let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port");
    let port = listener.local_addr().unwrap().port();
    let address = format!("http://127.0.0.1:{}", port);

    let configuration = get_configuration().expect("Failed to read configuration.");
    let connection_pool = PgPool::connect(&configuration.database.connection_string())
        .await
        .expect("Failed to connect to Postgres.");
    
    let server = run(listener, connection_pool.clone()).expect("Failed to bind address");
    let _ = tokio::spawn(server);
    TestApp {
        address,
        db_pool: connection_pool,
    }
}

All test cases have then to be updated accordingly - an off-screen exercise that I leave to you, my dear reader.
Let's just have a look together at what subscribe_returns_a_200_for_valid_form_data looks like after the required changes:

//! tests/health_check.rs
// [...]
#[actix_rt::test]
async fn subscribe_returns_a_200_for_valid_form_data() {
    // Arrange
    let app = spawn_app().await;
    let client = reqwest::Client::new();
    let body = "name=le%20guin&email=ursula_le_guin%40gmail.com";

    // Act
    let response = client
        .post(&format!("{}/subscriptions", &app.address))
        .header("Content-Type", "application/x-www-form-urlencoded")
        .body(body)
        .send()
        .await
        .expect("Failed to execute request.");

    // Assert
    assert_eq!(200, response.status().as_u16());

    let saved = sqlx::query!("SELECT email, name FROM subscriptions",)
        .fetch_one(&app.db_pool)
        .await
        .expect("Failed to fetch saved subscription.");

    assert_eq!(saved.email, "ursula_le_guin@gmail.com");
    assert_eq!(saved.name, "le guin");
}

The test intent is much clearer now that we got rid of most of the boilerplate related to establishing the connection with the database.
TestApp is foundation we will be building on going forward to pull out supporting functionality that is useful to most of our integration tests.

The moment of the truth has finally come: is our updated subscribe implementation enough to turn subscribe_returns_a_200_for_valid_form_data green?

running 3 tests
test health_check_works ... ok
test subscribe_returns_a_400_when_data_is_missing ... ok
test subscribe_returns_a_200_for_valid_form_data ... ok

test result: ok. 3 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

Yesssssssss!
Success!

Let's run it again to bathe in the light of this glorious moment!

cargo test
running 3 tests
test health_check_works ... ok
Failed to execute query: error returned from database: 
duplicate key value violates unique constraint "subscriptions_email_key"
thread 'subscribe_returns_a_200_for_valid_form_data' panicked at 'assertion failed: `(left == right)`
  left: `200`,
 right: `500`', tests/health_check.rs:66:5
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
Panic in Arbiter thread.
test subscribe_returns_a_400_when_data_is_missing ... ok
test subscribe_returns_a_200_for_valid_form_data ... FAILED

failures:

failures:
    subscribe_returns_a_200_for_valid_form_data

test result: FAILED. 2 passed; 1 failed; 0 ignored; 0 measured; 0 filtered out

Wait, no, what the fuck! Don't do this to us!

Ok, I lied - I knew this was going to happen.
I am sorry, I let you taste the sweet flavour of victory and then I threw you back into the mud.
There is an important lesson to be learned here, trust me.

3.7.1. Test Isolation

Your database is a gigantic global variable: all your tests are interacting with it and whatever they leave behind will be available to other tests in the suite as well as to the following test runs.
This is precisely what happened to us a moment ago: our first test run commanded our application to register a new subscriber with ursula_le_guin@gmail.com as their email; the application obliged.
When we re-ran our test suite we tried again to perform another INSERT using the same email, but our UNIQUE constraint on the email column raised a unique key violation and rejected the query, forcing the application to return us a 500 INTERNAL_SERVER_ERROR.

You really do not want to have any kind of interaction between your tests: it makes your test runs non-deterministic and it leads down the line to spurious test failures that are extremely tricky to hunt down and fix.

There are two techniques I am aware of to ensure test isolation when interacting with a relational database in a test:

The first is clever and will generally be faster: rolling back a SQL transaction takes less time than spinning up a new logical database. It works quite well when writing unit tests for your queries but it is tricky to pull off in an integration test like ours: our application will borrow a PgConnection from a PgPool and we have no way to "capture" that connection in a SQL transaction context.
Which leads us to the second option: potentially slower, yet much easier to implement.

How?
Before each test run, we want to:

The best place to do this is spawn_app, before launching our actix-web test application.
Let's look at it again:

//! tests/health_check.rs
use zero2prod::configuration::get_configuration;
use zero2prod::startup::run;
use sqlx::PgPool;
use std::net::TcpListener;

pub struct TestApp {
    pub address: String,
    pub db_pool: PgPool,
}

// The function is asynchronous now!
async fn spawn_app() -> TestApp {
    let listener = TcpListener::bind("127.0.0.1:0").expect("Failed to bind random port");
    let port = listener.local_addr().unwrap().port();
    let address = format!("http://127.0.0.1:{}", port);

    let configuration = get_configuration().expect("Failed to read configuration.");
    let connection_pool = PgPool::connect(&configuration.database.connection_string())
        .await
        .expect("Failed to connect to Postgres.");
    
    let server = run(listener, connection_pool.clone()).expect("Failed to bind address");
    let _ = tokio::spawn(server);
    TestApp {
        address,
        db_pool: connection_pool,
    }
}

configuration.database.connection_string() uses the database_name specified in our configuration.yaml file - the same for all tests.
Let's randomise it with

let mut configuration = get_configuration().expect("Failed to read configuration.");
configuration.database.database_name = Uuid::new_v4().to_string();

let connection_pool = PgPool::connect(&configuration.database.connection_string())
    .await
    .expect("Failed to connect to Postgres.");

cargo test will fail: there is no database ready to accept connections using the name we generated.
Let's add a connection_string_without_db method to our DatabaseSettings:

//! src/configuration.rs
// [...]

impl DatabaseSettings {
    pub fn connection_string(&self) -> String {
        format!(
            "postgres://{}:{}@{}:{}/{}",
            self.username, self.password, self.host, self.port, self.database_name
        )
    }

    pub fn connection_string_without_db(&self) -> String {
        format!(
            "postgres://{}:{}@{}:{}",
            self.username, self.password, self.host, self.port
        )
    }
}

Omitting the database name we connect to the Postgres instance, not a specific logical database.
We can now use that connection to create the database we need and run migrations on it:

//! tests/health_check.rs
async fn spawn_app() -> TestApp {
    // [...]
    let mut configuration = get_configuration().expect("Failed to read configuration.");
    configuration.database.database_name = Uuid::new_v4().to_string();
    let connection_pool = configure_database(&configuration.database).await;
    // [...]
}

pub async fn configure_database(config: &DatabaseSettings) -> PgPool {
    // Create database
    let mut connection = PgConnection::connect(&config.connection_string_without_db())
        .await
        .expect("Failed to connect to Postgres");
    connection
        .execute(&*format!(r#"CREATE DATABASE "{}";"#, config.database_name))
        .await
        .expect("Failed to create database.");

    // Migrate database
    let connection_pool = PgPool::connect(&config.connection_string())
        .await
        .expect("Failed to connect to Postgres.");
    sqlx::migrate!("./migrations")
        .run(&connection_pool)
        .await
        .expect("Failed to migrate the database");

    connection_pool
}

sqlx::migrate! is the same macro used by sqlx-cli when executing sqlx migrate run - no need to throw bash scripts into the mix to achieve the same result.

Let's try again to run cargo test:

running 3 tests
test subscribe_returns_a_200_for_valid_form_data ... ok
test subscribe_returns_a_400_when_data_is_missing ... ok
test health_check_works ... ok

test result: ok. 3 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out

It works, this time for good. Try

repeat 100 { cargo test }

if you are skeptical.

You might have noticed that we do not perform any clean-up step at the end of our tests - the logical databases we create are not being deleted. This is intentional: we could add a clean-up step, but our Postgres instance is used only for test purposes and its easy enough to restart it if, after hundreds of test runs, performance starts to suffer due to the number of lingering (almost empty) databases.

4. Next Up On Zero To Production

We covered a large number of topics in this chapter: actix-web extractors and HTML forms, (de)serialisation with serde, an overview of the available database crates in the Rust ecosystem, the fundamentals of sqlx as well as basic techniques to ensure test isolation when dealing with databases.
Take your time to digest the material and go back to review individual sections if necessary.

All the code we wrote in this chapter can be found on GitHub - toss a star to your witcher, o' valley of plenty!

We started under the promise of a demo, but you will have to wait another two weeks for it: our email newsletter prototype, at the moment, is as silent as it gets for a web application. We have no visibility over what is going on after we input cargo run. No logs, nothing.
Next time we will work on getting an observability baseline using tracing as we set off to debug our demo.

See you for the next episode!

You can discuss the article on HackerNews or r/rust.


If you'd like to be notified when a new episode Zero To Production comes out you can subscribe to the mailing list:

Book ToC

The Table of Contents is provisional and might change over time. The draft below is the most accurate picture at this point in time.

  1. Getting Started
    • Installing The Rust Toolchain
    • Project Setup
    • IDEs
    • Continuous Integration
  2. Our Driving Example
    • What Should Our Newsletter Do?
    • Working In Iterations
  3. Sign Up A New Subscriber
  4. Publish A Newsletter Issue
    • Writing A REST Client
    • Mocking Third-Party APIs
  5. Logging
    • The Facade Pattern
    • Logging Levels
    • Log Setup
  6. Reject Invalid Subscribers
    • Result
    • Modeling With Types #1
  7. Survive Delivery Failures
    • Simulating API Errors
  8. Tracing
    • Structured Logging
    • Spans
    • OpenTelemetry
    • Jaeger
  9. Send A Confirmation Email On Sign Up
    • Migrating Your Database
    • Modeling With Types #2
    • Handling Confirmations
    • Send Newsletter Only To Confirmed Subscribers
  10. Metrics
    • Prometheus
    • Grafana
  11. Send Emails Asynchronously
    • Adding A Message Broker
    • Enqueueing Tasks
  12. Fulfilling Email Tasks
    • Adding An Actor Queue Worker
    • Basic Retries
    • Failure Injection
    • Idempotency
  13. Benchmarking
    • Cargo bench
    • Criterion
    • Load testing
1

You can look at serde_json's serialize_seq implementation for confirmation: here. There is an optimisation for empty sequences (you immediately output []), but that is pretty much what is happening.

2

At the same time, it must be said that writing a serializer that is specialised for a single data format and a single usecase (e.g. batch-serialisation) might give you a chance to leverage algorithmic choices that are not compatible with the structure of serde's data model, meant to support several formats for a variety of usecases. An example in this vein would be simd-json.

3

Unless we implement some kind of synchronisation protocol between our replicas, which would quickly turn into a badly-written poor-man-copy of a database.

4

Performing IO in a procedural macro is somewhat controversial and forces you to always have a database up and running when working on a sqlx project; sqlx is adding support for "offline" builds by caching the retrieved query metadata in its upcoming 0.4.0 release.

5

Async runtimes are based around the assumptions that futures, when polled, will yield control back to the executor "very quickly". If you run blocking IO code by mistake on the same threadpool used by the runtime to poll asynchronous tasks you get yourself in troubles - e.g. your application might mysteriously hang under load. You have to be careful and always make sure that blocking IO is performed on a separate threadpool using functions like tokio::spawn_blocking or async_std::spawn_blocking.

6

I do not belong to the "in-memory test database" school of thought: whenever possible you should strive to use the same database both for your tests and your production environment. I have been burned one time too many by differences between the in-memory stub and the real database engine to believe it provides any kind of benefit over using "the real thing".