Actix Web - database as an actor

by

This is part 1 from 3 parts series about implementing web server with Actix

Actix is an actor-based framework, and I love it’s done this way. Every resource in the system is an actor, and you can access this actor only via messages. In this model, the File System, external services, database is an actor.

The actor model is used primarily when you have a critical section, and you can create a deadlock or race condition. But Actix offers much more than this, and you can create a well-organized codebase using it

Current state

Official Actix examples are sadly elementary and messy. It's usually one main.rs file, and many functions are called in a spaghetti manner.

Let’s see an example for the Postgres crate:

mod db {
    // in db module
    pub async fn add_user(client: &Client, user_info: User) -> Result<User, MyError> {
        let _stmt = include_str!("../sql/add_user.sql");
        let _stmt = _stmt.replace("$table_fields", &User::sql_table_fields());
        let stmt = client.prepare(&_stmt).await.unwrap();

        client
            .query(
                &stmt,
                &[
                    &user_info.email,
                    &user_info.first_name,
                    &user_info.last_name,
                    &user_info.username,
                ],
            )
            .await?
            .iter()
            .map(|row| User::from_row_ref(row).unwrap())
            .collect::<Vec<User>>()
            .pop()
            .ok_or(MyError::NotFound) // more applicable for SELECTs
    }
}

mod handlers {
    // in handlers module
    pub async fn add_user(
        user: web::Json<User>,
        db_pool: web::Data<Pool>,
    ) -> Result<HttpResponse, Error> {
        let user_info: User = user.into_inner();
    
        let client: Client = db_pool.get().await.map_err(MyError::PoolError)?;
    
        let new_user = db::add_user(&client, user_info).await?;
    
        Ok(HttpResponse::Ok().json(new_user))
    }
}

It's just 2 functions, and one is calling another. This is the fastest async approach you can make.

But we are immediately struck by the following problems:

Database as an actor

Let us create a connection to a database capable of receiving a message using Mailbox and processing requests asynchronously.

This way, instead of pushing SQL queries in an endpoint or some functions dangling around with no unified interface, we will have a single form and place holding every database operation.

We will start by creating a wrapper around the database connection pool. Then we will add specific handlers for every action database can perform.

/// This is a database wrapper. It has only one field and can be simplified to "new-type", but just for clarity,
/// I decided to write it as an entire struct. You will likely have more than this field in production code.
pub struct Database {
    conn: DatabasePool,
}

impl actix::Actor for Database {
    type Context = actix::Context<Self>;
}

#[derive(actix::Message)]
#[rtype(result = "Result<User, DatabaseError>")]
pub struct AddUser {
    /* all you need to create user */
}

/// in handlers module
async fn add_user(db: AppData<db::Database>, /* arguments */) -> HttpResult {
    // prepare data
    let _user = match db.into_inner().send().await {
        Ok(Ok(user)) => user,
        Ok(Err(_db_err)) => { /* handle db error */ },
        Err(_actor_err) => { /* handle actor error */ },
    };
    // send response
}

Just for convenience, I'll use sqlx for further examples; the reasons for are:

Let us define a little more what we want to achieve. We want to create a record in the database with user information.

This user can look like this:

CREATE TABLE users
(
    id        sequence not null primary key,
    login     varchar  not null unique,
    email     varchar  not null unique,
    pass_hash varchar  not null
);

And a model for this entity:

#[derive(sqlx::Type)]
#[sqlx(transparent)]
pub struct UserId(pub i32);

#[derive(sqlx::Type)]
#[sqlx(transparent)]
pub struct Username(pub String);

#[derive(sqlx::Type)]
#[sqlx(transparent)]
pub struct Email(pub String);

#[derive(sqlx::Type)]
#[sqlx(transparent)]
pub struct Pass(String);

#[derive(sqlx::FromRow)]
pub struct User {
    pub id: UserId,
    pub username: Username,
    pub email: Email,
    pub pass_hash: Pass,
}

Now we know we will use:

Id will be generated automatically, so we don't need to take care of this.

So our message will look like this:

#[derive(actix::Message)]
#[rtype(result = "Result<User, DatabaseError>")]
pub struct AddUser {
  pub username: Username,
  pub email: Email,
  pub pass_hash: Pass,
}

static ADD_USER_QUERY: &str = r#"
INSERT INTO users (username, email, pass_hash)
VALUES ($1, $2, $3)
RETURNING id, username, email, pass_hash
"#;

How the password is hashed it's not the business of the database. It needs to communicate with the service and run multiple queries in transactions if this is required.

Handling messages by actor

So we have our message and actor.

pub struct Database {
    conn: sqx::PgPool,
}

impl Clone for Database {
    fn clone(&self) -> Self {
        Self { conn: self.conn.clone() }
    }
}

impl actix::Actor for Database {
    type Context = actix::Context<Self>;
}

We need to write how this message will be handled by the database.

Actix allows us to do this by implementing the Handler trait. This means we will create a Database method that will do what is needed and return results.

impl actix::Handler<AddUser> for Database {
    type Result = Result<User, DatabaseError>;

    fn handle(&mut self, msg: AddUser, _ctx: &mut Self::Context) -> Self::Result {
        let _conn = self.conn.clone();
        let _future = sqlx::query_as::<_, User>(ADD_USER_QUERY)
            .bind(&msg.username)
            .bind(&msg.email)
            .bind(&msg.pass_hash)
            .fetch_one(db);// <- async job
        // return result somehow??
    }
}

And here we have our first problem. Actix handlers aren't async, so we can't just .await for the result. They aren't async because Rust isn't ready for async traits. We can use the async-trait crate to achieve the async trait, but we don't want to do this here; we want to use actor Mailbox.

So how does actix deal with async jobs inside handler?

Actix gives us special struct ResponseActFuture which envelop Future.

impl actix::Handler<AddUser> for Database {
    type Result = ResponseActFuture<Self, Result<User, DatabaseError>>;

    fn handle(&mut self, msg: AddUser, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.conn.clone();
        Box::pin(
            async move {
                sqlx::query_as::<_, User>(ADD_USER_QUERY)
                    .bind(&msg.username)
                    .bind(&msg.email)
                    .bind(&msg.pass_hash)
                    .fetch_one(conn)
            }
                .into_actor(self)
                .map(|res, _actor, _ctx| res)
        )
    }
}

So what happens here? We created Future and wrapped it into Box. This, unfortunately, make it slower to run, but it's acceptable tradeoff.

Then we moved this into specialized actor future for asynchronous message handling, and when the future is done, we take only SQL query result set from it.

This could be it, but we also want to allow other handlers to use code so it can be run in a transaction.

impl actix::Handler<AddUser> for Database {
    type Result = ResponseActFuture<Self, Result<User, DatabaseError>>;

    fn handle(&mut self, msg: AddUser, _ctx: &mut Self::Context) -> Self::Result {
        let conn = self.conn.clone();
        Box::pin(
            async move {
                let mut t = conn.begin().await?;
                let res = add_user(msg, &mut t).await;
                t.commit().await?;
                res
            }
                .into_actor(self)
                .map(|res, _actor, _ctx| res)
        )
    }
}

async fn add_user(msg: AddUser, conn: &mut sqlx::Transaction<'_, sqlx::Postgres>) -> Result<User, DatabaseError>
{
    sqlx::query_as::<_, User>(ADD_USER_QUERY)
        .bind(&msg.username)
        .bind(&msg.email)
        .bind(&msg.pass_hash)
        .fetch_one(conn)
}

It may look like we made a full circle, and again we have 2 functions, but now we have an abstraction layer with limited mailbox and simple unified interface to communicate between actors.

Performance

It makes nice abstraction and fits well in actix philosophy. But it's fast?

Let us perform some benches with siege! I prepared a simple small application with sqlx database.

use actix::Actor;
use actix_web::web::Data;
use actix_web::{App, HttpServer};

mod db {
    use actix::{Actor, Addr, Context, Handler, ResponseActFuture, Running, WrapFuture};
    use actix_rt::ArbiterHandle;
    use sqlx::PgPool;

    static QUERY: &str = r#"
INSERT INTO rows (a, b, c, d)
VALUES ($1, $2, $3, $4)
RETURNING a, b, c, d
"#;

    #[derive(sqlx::FromRow)]
    pub struct Row {
        pub a: i32,
        pub b: i32,
        pub c: i32,
        pub d: String,
    }

    pub async fn db() -> PgPool {
        PgPool::connect("postgres://postgres@localhost/part_1")
            .await
            .unwrap()
    }

    #[derive(Clone)]
    pub struct Database {
        pub db: PgPool,
    }

    impl Actor for Database {
        type Context = Context<Self>;
    }

    /// Test query database without additional abstraction
    pub async fn test1(msg: Row, db: PgPool) -> Row {
        sqlx::query_as(QUERY)
            .bind(msg.a)
            .bind(msg.b)
            .bind(msg.c)
            .bind(msg.d)
            .fetch_one(&db)
            .await
            .unwrap()
    }

    /// Test query database with actor abstraction but inside async closure
    #[derive(actix::Message)]
    #[rtype(result = "Row")]
    pub struct Test2 {
        pub row: Row,
    }

    impl Handler<Test2> for Database {
        type Result = ResponseActFuture<Self, Row>;

        fn handle(&mut self, msg: Test2, _ctx: &mut Self::Context) -> Self::Result {
            let db = self.db.clone();
            Box::pin(
                async move {
                    sqlx::query_as(QUERY)
                        .bind(msg.row.a)
                        .bind(msg.row.b)
                        .bind(msg.row.c)
                        .bind(msg.row.d)
                        .fetch_one(&db)
                        .await
                        .unwrap()
                }
                .into_actor(self),
            )
        }
    }

    #[derive(actix::Message)]
    #[rtype(result = "Row")]
    pub struct Test3 {
        pub row: Row,
    }

    impl Handler<Test3> for Database {
        type Result = ResponseActFuture<Self, Row>;

        fn handle(&mut self, msg: Test3, _ctx: &mut Self::Context) -> Self::Result {
            let db = self.db.clone();
            Box::pin(async move { test1(msg.row, db).await }.into_actor(self))
        }
    }
}

mod api {
    use crate::db;
    use crate::db::{Database, Row, Test2, Test3};
    use actix::Addr;
    use actix_web::web::Data;
    use actix_web::{get, HttpResponse};
    use sqlx::PgPool;

    /// Request test
    ///
    /// ```bash
    /// /usr/bin/time siege -r3000 -c30 -j 'http://0.0.0.0:7845/test1' | jq '.elapsed_time'
    /// ```
    ///
    /// Bench test
    ///
    /// ```bash
    /// /usr/bin/time siege -r3000 -c30 --benchmark -j 'http://0.0.0.0:7845/test1' | jq '.elapsed_time'
    /// ```
    #[get("/test1")]
    pub async fn test_1(pool: Data<PgPool>) -> HttpResponse {
        db::test1(
            Row {
                a: 0,
                b: 12,
                c: 435,
                d: "test1".to_string(),
            },
            (&*pool.into_inner()).clone(),
        )
        .await;
        HttpResponse::Ok().finish()
    }

    /// Request test
    ///
    /// ```bash
    /// /usr/bin/time siege -r3000 -c30 -j 'http://0.0.0.0:7845/test2' | jq '.elapsed_time'
    /// ```
    /// 
    /// Bench test
    ///
    /// ```bash
    /// /usr/bin/time siege -r3000 -c30 --benchmark -j 'http://0.0.0.0:7845/test2' | jq '.elapsed_time'
    /// ```
    #[get("/test2")]
    pub async fn test_2(database: Data<Addr<Database>>) -> HttpResponse {
        database
            .send(Test2 {
                row: Row {
                    a: 0,
                    b: 12,
                    c: 435,
                    d: "test2".to_string(),
                },
            })
            .await
            .unwrap();
        HttpResponse::Ok().finish()
    }

    /// Request test
    ///
    /// ```bash
    /// /usr/bin/time siege -r3000 -c30 -j 'http://0.0.0.0:7845/test3' | jq '.elapsed_time'
    /// ```
    ///
    /// Bench test
    ///
    /// ```bash
    /// /usr/bin/time siege -r3000 -c30 --benchmark -j 'http://0.0.0.0:7845/test3' | jq '.elapsed_time'
    /// ```
    #[get("/test3")]
    pub async fn test_3(database: Data<Addr<Database>>) -> HttpResponse {
        database
            .send(Test3 {
                row: Row {
                    a: 0,
                    b: 12,
                    c: 435,
                    d: "test3".to_string(),
                },
            })
            .await
            .unwrap();
        HttpResponse::Ok().finish()
    }


    /// Request test
    ///
    /// ```bash
    /// /usr/bin/time siege -r3000 -c30 -j 'http://0.0.0.0:7845/test4' | jq '.elapsed_time'
    /// ```
    ///
    /// Bench test
    ///
    /// ```bash
    /// /usr/bin/time siege -r3000 -c30 --benchmark -j 'http://0.0.0.0:7845/test4' | jq '.elapsed_time'
    /// ```
    #[get("/test4")]
    pub async fn test_4(pool: Data<PgPool>) -> HttpResponse {
        async {
            async {
                async {
                    db::test1(
                        Row {
                            a: 0,
                            b: 12,
                            c: 435,
                            d: "test1".to_string(),
                        },
                        (&*pool.into_inner()).clone(),
                    )
                    .await;
                }
                .await
            }
            .await
        }
        .await;
        HttpResponse::Ok().finish()
    }
}

#[actix_rt::main]
async fn main() {
    let db = db::db().await;
    HttpServer::new(move || {
        App::new()
            .app_data(Data::new(db.clone()))
            .app_data(Data::new(db::Database { db: db.clone() }.start()))
            .service(api::test_1)
            .service(api::test_2)
            .service(api::test_3)
            .service(api::test_4)
    })
    .bind(("0.0.0.0", 7845))
    .unwrap()
    .run()
    .await
    .unwrap()
}

I ran siege commands ten times and collected performance test times. Even from the first row, we see that using actix and delegating the job to the async function is a bad idea, and it's three times slower than doing everything in a closure.

Then it's pretty harmless until we hit 40 000 requests when it's as bad as using the delegated approach.

I even tried to increase mailbox capacity to 10 000, but it didn't help.

Link to results

First chart Second chart

This sounds really bad, but can we improve it? Of course, we can. First, we will set the maximal number of connections to a database to know exactly how many requests we can do at once.

pub async fn db() -> PgPool {
    sqlx::postgres::PgPoolOptions::new()
        .min_connections(20)
        .max_connections(20)
        .connect("postgres://postgres@localhost/part_1")
        .await
        .unwrap()
}

We will align the mailing box size to the number of connections we can make at once.

impl Actor for Database {
    type Context = Context<Self>;

    fn started(&mut self, ctx: &mut Self::Context) {
        ctx.set_mailbox_capacity(20);
    }
}

Until 30 000 requests using sqlx directly in the HTTP handler is 3 times faster, but on 30 000, only the first test performed well and suddenly dropped to every other solution.

What happened here and why? So by default, sqlx is using 10 concurrent connections, and actix uses a mailbox with a capacity of 16 messages at once. So messages stack and just wait until the database do its job.

Additionally, as we see in test 4, which adds an additional async block, it's not even the fault of the actix actor, and it's just how tokio implements a scheduler.

Simply every task in tokio has an equal time of CPU. This is enforced by multiple advanced mechanisms in this crate, and you can't skip it.

Machine

I was using my PC with: