Piotr Kołaczkowski

Benchmarking Apache Cassandra with Rust

Performance of a database system depends on many factors: hardware, configuration, database schema, amount of data, workload type, network latency, and many others. Therefore, one typically can’t tell the actual performance of such system without first measuring it. In this blog post I’m describing how to build a benchmarking tool for Apache Cassandra from scratch in Rust and how to avoid many pitfalls. The techniques I show are applicable to any system with an async API.

Apache Cassandra is a popular, scalable, distributed, open-source database system. It comes with its own benchmarking tool cassandra-stress that can issue queries in parallel and measure throughtput and response times. It offers not just a few built-in standard benchmarks, but also allows defining custom schemas and workloads, making it really versatile. So why write another one?

Being written in Java, cassanda-stress has a few downsides:

A natively compiled, GC-less language like C, C++ or Rust addresses all of these issues. Let’s see how we can write a benchmarking tool in Rust.

Connecting to Cassandra

Before we can issue any queries, we need to establish a connection to the database and obtain a session. To access Cassandra, we’ll use cassandra_cpp crate which is a Rust wrapper over the official Cassandra driver for C++ from DataStax. There exist other third-party drivers developed natively in Rust from scratch, but at the time of writing this post, they weren’t production ready.

Installing the driver on Ubuntu is straightforward:

sudo apt install libuv1 libuv1-dev
sudo dpkg -i cassandra-cpp-driver_2.15.3-1_amd64.deb
sudo dpkg -i cassandra-cpp-driver-dev_2.15.3-1_amd64.deb

Then we need to add cassandra_cpp dependency to Cargo.toml:

cassandra-cpp = "0.15.1"

Configuring the connection is performed through Cluster type:

use cassandra_cpp::*;

let mut cluster = Cluster::default();
cluster.set_contact_points("localhost").unwrap();
cluster.set_core_connections_per_host(1).unwrap();
cluster.set_max_connections_per_host(1).unwrap();    
cluster.set_queue_size_io(1024).unwrap();
cluster.set_num_threads_io(4).unwrap();
cluster.set_connect_timeout(time::Duration::seconds(5));
cluster.set_load_balance_round_robin();

Finally, we can connect:

let session = match cluster.connect() {
  Ok(s) => s,
  Err(e) => {
      eprintln!("error: Failed to connect to Cassandra: {}", e);
      exit(1)
  }
}

Just a Loop

Now that we have the session, we can issue queries. How hard could writing a database benchmark be? It is just sending queries in a loop and measuring how long they take, isn’t it? For simplicity, let’s assume there already exists a table test in keyspace1 with the following schema:

CREATE TABLE keyspace1.test(pk BIGINT PRIMARY KEY, col BIGINT);

Let’s issue some reads from this table and measure how long they took:

use std::time::{Duration, Instant};

let session = // ... setup the session
let count = 100000;
let start = Instant::now();
for i in 0..count {
  session
    .execute(&stmt!("SELECT * FROM keyspace1.test WHERE pk = 1"))
    .wait()
    .unwrap();
}
let end = Instant::now();
println!(
    "Throughput: {:.1} request/s",
    1000000.0 * count as f64 / (end - start).as_micros() as f64
);

I bet you’ve seen similar benchmarking code in some benchmarks on the Internet. I’ve seen results from code like this being used to justify a choice of one database system over another. Unfortunately, this simple code has a few very serious issues and can lead to incorrect conclusions about performance of the system:

  1. The loop performs only one request at a time. In case of systems like Apache Cassandra which are optimised to handle many thousands of parallel requests, this leaves most of the available computing resources idle. Most (all?) modern CPUs have multiple cores. Hard drives also benefit from parallel access. Additionally, there is non-zero network latency for sending the request to the server and sending the response back to the client. Even if running this client code on the same computer, there is non-zero time needed for the operating system to deliver the data from one process to another over the loopback. During that time, the server has literally nothing to do. The result throughput you’ll get from such a naive benchmark loop will be significantly lower than the server is really capable of.

  2. Sending a single query at a time precludes the driver from automatically batching multiple requests. Batching can improve the network bandwidth by using a more efficient data representation and can reduce the number of syscalls, e.g. by writing many requests to the network socket at once. Reading requests from the socket on the server side is also much more efficient if there are many available in the socket buffer.

  3. The code doesn’t use prepared statements. Many database systems, not only Cassandra, but also many traditional relational database systems, have this feature for a reason: parsing and planning a query can be a substantial amount of work, and it is makes sense to do it only once.

  4. The code is reading the same row over and over again. Depending on what you wish to measure this could be a good or a bad thing. In this case, the database system would cache the data fully and serve it from RAM, so you might actually overestimate the performance, because real workloads rarely fetch a single row in a loop. On the other hand, such test would make some sense if you deliberately want to test the happy-path performance of the cache layer.

As a result, the reported throughput is abysmally poor:

Throughput: 2279.7 request/s

Prepared Statements

The problems #3 and #4 are the easiest to solve. Let’s change the code to use a prepared statement, and let’s introduce a parameter so we’re not fetching the same row all the time:

use cassandra_cpp::*;
use std::time::{Duration, Instant};

let session = // ... setup the session
let statement = session
    .prepare("SELECT * FROM keyspace1.test WHERE pk = ?")?
    .wait()?;
let count = 100000;
let start = Instant::now();
for i in 0..count {
    let mut statement = statement.bind();
    statement.bind(0, i as i64)?;
    session.execute(&statement).wait().unwrap();
}
let end = Instant::now();
println!(
    "Throughput: {:.1} request/s",
    1000000.0 * count as f64 / (end - start).as_micros() as f64
);

Unfortunately, the performance is still extremely low.

Throughput: 2335.9 request/s

Going async

To fix the problems #1 and #2, we need to send more than one query at a time. In the codes above we’re calling wait() on the futures returned from the driver and that call is blocking. And because our program is single-threaded, it can’t do anything else while being blocked.

There are two approaches we can take:

In order to be able to use async functions at all, first we need to initialize an async runtime. I decided to use a very popular crate tokio. Installation is adding just the following line to Cargo.toml:

tokio = { version = "0.2", features = ["full"] }

Now we can annotate the main function as async, replace wait with await, and call tokio::spawn to launch the requests. Although await looks like blocking, it doesn’t block the calling thread, but allows it to move on to the next task.

#[tokio::main]
async fn main() -> cassandra_cpp::Result<()> {
    let mut cluster = Cluster::default();
    // ... configure cluster
    let session = cluster.connect_async().await?;
    let statement = session
        .prepare("SELECT * FROM keyspace1.test WHERE pk = ?")?
        .await?;
    let count = 100000;
    let start = Instant::now();
    for i in 0..count {
        let mut statement = statement.bind();
        statement.bind(0, i as i64).unwrap();
        tokio::spawn(async {
            let result = session.execute(&statement);
            result.await.unwrap();
        });
    }
    let end = Instant::now();
    println!(
        "Throughput: {:.1} request/s",
        1000000.0 * count as f64 / (end - start).as_micros() as f64
    );
    Ok(())
}

Unfortunately, this doesn’t compile, because our friend borrow-checker correctly notices that the async code inside of the loop can live longer than the main() function and its local variables such as i, session and statement:

error[E0373]: async block may outlive the current function, but it borrows `i`, which is owned by the current function
262 |           tokio::spawn(async {
...
help: to force the async block to take ownership of `i` (and any other referenced variables), use the `move` keyword

error[E0373]: async block may outlive the current function, but it borrows `statement`, which is owned by the current function
help: to force the async block to take ownership of `statement` (and any other referenced variables), use the `move` keyword

error[E0373]: async block may outlive the current function, but it borrows `session`, which is owned by the current function
help: to force the async block to take ownership of `session` (and any other referenced variables), use the `move` keyword

The compiler advices us to use move to move these shared variables into the async code:

    // ...
    for i in 0..count {
        tokio::spawn(async move {
            //...
   

Fortunately, the problem with the loop counter i and statement is gone now. But that still doesn’t work for session:

error[E0382]: use of moved value: `session`
   --> src/main.rs:262:33
    |
255 |       let session = cluster.connect_async().await.unwrap();
    |           ------- move occurs because `session` has type `cassandra_cpp::Session`, which does not implement the `Copy` trait
...
262 |           tokio::spawn(async move {
    |  _________________________________^
263 | |             let result = session.execute(&statement);
    | |                          ------- use occurs due to use in generator
264 | |             result.await.unwrap();
265 | |         });
    | |_________^ value moved here, in previous iteration of loop

This is quite obvious – we’re spawning more than one async task here, but because session is not copyable, there can only exist one of each. Of course, we don’t want multiple sessions or statements here – we need a single one shared among all the tasks. But how to pass only session by reference but still use move for passing the loop counter i and statement?

Let’s take the reference to session before the loop – references are copyable:

    // ...
    let session = &session;
    for i in 0..count {
        // ...
        tokio::spawn(async move {
            // ...

But this brings us back to the first problem of insufficient lifetime, though:

error[E0597]: `session` does not live long enough
   --> src/main.rs:262:19
    |
262 |       let session = &session;
    |                     ^^^^^^^^ borrowed value does not live long enough
...
265 |           tokio::spawn(async move {
    |  ______________________-
266 | |             let result = session.execute(&statement);
267 | |             result.await.unwrap();
268 | |         });
    | |_________- argument requires that `session` is borrowed for `'static`
...
276 |   }
    |   - `session` dropped here while still borrowed

So it looks like we can’t pass the session by move, because we want sharing, but we also can’t pass it by reference because the session doesn’t live long enough.

Scopes?

In one of the earlier blog bosts I showed how this problem can be solved by using scoped threads. The concept of scope allows to force all background tasks to finish before the shared variables are dropped.

Unfortunately, I haven’t found anything like scopes inside of the tokio crate. A search reveals a ticket, but it has been closed and the conclusion is a bit disappointing:

As @Matthias247 pointed out, one should be able to establish scopes at any point. However, there is no way to enforce the scope without blocking the thread. The best we can do towards enforcing the scope is to panic when used “incorrectly”. This is the strategy @Matthias247 has taken in his PRs. However, dropping adhoc is currently a key async rust pattern. I think this prohibits pancing when dropping a scope that isn’t 100% complete. If we do this, using a scope within a select! would lead to panics. We are at an impasse. Maybe if AsyncDrop lands in Rust then we can investigate this again. Until then, we have no way forward, so I will close this. It is definitely an unfortunate outcome.

Of course, if you are fine with blocking the thread on scope exit, you can use the tokio_scoped crate.

ARC

The lifetime problem can be also solved with automatic reference counting. Let’s wrap the session and statement in Arc. Arc will keep the shared session and statement live as long as there exists at least one unfinished task:

    // ...
    let session = Arc::new(session);
    for i in 0..count {
        let mut statement = statement.bind();
        statement.bind(0, i as i64).unwrap();
        let session = session.clone();
        tokio::spawn(async move {
            // ...

This compiles fine and it wasn’t even that hard! Let’s run it:

thread 'tokio-runtime-worker' panicked at 'called `Result::unwrap()` on an `Err` value: 
Error(CassError(LIB_REQUEST_QUEUE_FULL, "The request queue has reached capacity"), State { next_error: None, backtrace: InternalBacktrace { backtrace: None } })', src/main.rs:271:26
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace

We submitted 100000 async queries all at once and the client driver’s queue can’t hold so many. Let’s do a quick temporary workaround: lower the count to 1000. The benchmark finishes fine now:

Throughput: 3095975.2 request/s

But is the result correct? Is Apache Cassandra really so fast? I’d really love to use a database that can do 3+ mln queries per second on a developer’s laptop, but unfortunately this isn’t the case. The benchmark is still incorrect. Now we don’t wait at all for the results to come back from the database. The benchmark submits the queries to the driver’s queue as fast as possible and then it immediately considers its job done. So we only measured how fast we can send the queries to the local driver’s queue (not even how fast we can push them to the database server).

Waiting for Everything to Finish

Look at what we’re doing with the result of the query:

            // ...
            let result = session.execute(&statement);
            result.await.unwrap();        
        });

The problem is: we’re doing nothing! After unwrapping, we’re just throwing the result away. Although the await might look like we were waiting for the result from the server, note this is all happening in the coroutine and the top-level code doesn’t wait for it.

Can we pass the results back from the nested tasks to the top level and wait for them at the end? Yes! Tokio provides its own, async implementation of a communication channel. Let’s setup a channel, plug its sending side to the coroutine and receive at the top-level at the end, but before computing the end time:

    let start = Instant::now();
    let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
    let session = Arc::new(session);
    for i in 0..count {
        let mut statement = statement.bind();
        statement.bind(0, i as i64).unwrap();

        let session = session.clone();
        let tx = tx.clone();
        tokio::spawn(async move {
            let query_start = Instant::now();
            let result = session.execute(&statement);
            result.await.unwrap();
            let query_end = Instant::now();
            let duration_micros= (query_end - query_start).as_micros();
            tx.send(duration_micros).unwrap();
        });
    }   
    // We need to drop the top-level tx we created at the beginning,
    // so all txs are dropped when all queries finish. 
    // Otherwise the following read loop would wait for more data forever.
    drop(tx);

    let mut successful_count = 0;
    while let Some(duration) = rx.next().await {
        // Here we get a sequence of durations
        // We could use it also to compute e.g. the mean duration or the histogram
        successful_count += 1;
    }

    let end = Instant::now();
    println!(
        "Throughput: {:.1} request/s",
        1000000.0 * successful_count as f64 / (end - start).as_micros() as f64
    );

This prints a much more accurate number:

Throughput: 91734.7 request/s

Can we now increase the count back to 100,000? Not yet. Although we’re waiting at the end, the loop still spins like crazy submitting all the queries at once and overflowing the queues. We need to slow it down.

Backpressure

We don’t want to exceed the capacity of the internal queues of the driver. Hence, we need to keep the number of submitted but unfinished queries limited. This is a good task for a semaphore. A semaphore is a structure that allows at most N parallel tasks. Tokio comes with a nice, asynchronous implementation of semaphore. The Semaphore structure allows a limited number of permits. The function of obtaining a permit is asynchronous, so it composes with the other elements we already use here. We’ll obtain a permit before spawning a task, and drop the permit after receiving the results.

    // ...
    let parallelism_limit = 1000;
    let semaphore = Arc::new(Semaphore::new(parallelism_limit));
    for i in 0..count {
        // ...
        let permit = semaphore.clone().acquire_owned().await;
        tokio::spawn(async move {
            // ... run the query and await the result

            // Now drop the permit; 
            // Actually it is sufficient to reference the permit value 
            // anywhere inside the coroutine so it is moved here and it would be dropped
            // automatically at the closing brace. But drop is more explicitly 
            // telling the intent.
            drop(permit);
        }
    });
    // ...

Because the permit is passed to the asynchronous code that may outlive the scope of main, here again we need to use Arc. We also needed to use an owned permit, rather than the standard one obtained by acquire(). An owned permit can be moved, a standard one cannot.

After putting it all together, and running the benchmark for a few times to warmup the server, the final throughput of running 100k queries was:

Throughput: 152374.3 request/s

The Final Word

Benchmarking is hard and it is easy to get incorrect results or arrive at incorrect conclusions.

Keep in mind that the way how you query data may severly affect the numbers you get. Watch for:

If you’d like to measure performance of your Cassandra cluster, you should try the tool I’m working on at the moment: Latte. Latte uses uses the approach described in this blog post to measure the throughput and response times. It is still very early-stage and I look forward to your feedback!

Share on: