Piotr Kołaczkowski

How a Single Line of Code Made a 24-core Server Slower Than a Laptop

Imagine you wrote a program for a pleasingly parallel problem, where each thread does its own independent piece of work, and the threads don’t need to coordinate except joining the results at the end. Obviously you’d expect the more cores it runs on, the faster it is. You benchmark it on a laptop first and indeed you find out it scales nearly perfectly on all of the 4 available cores. Then you run it on a big, fancy, multiprocessor machine, expecting even better performance, only to see it actually runs slower
than the laptop, no matter how many cores you give it. Uh. That has just happened to me recently.

I’ve been working recently on a Cassandra benchmarking tool Latte which is probably the most efficient Cassandra benchmarking tool you can get, both in terms of CPU use and memory use. The whole idea is very simple: a small piece of code generates data and executes a bunch of asynchronous CQL statements against Cassandra. Latte calls that code in a loop and records how long each iteration took. Finally, it makes a statistical analysis and displays it in various forms.

Benchmarking seems to be a very pleasant problem to parallelize. As long as the code under benchmark is stateless, it can be fairly trivially called from multiple threads. I’ve blogged about how to achieve that in Rust already here and here.

However, at the time I wrote those earlier blog posts, Latte’s workload definition capabilities were nonexistent quite limited. It came with only two predefined, hardcoded workloads, one for reading and another one for writing. There were a few things you could parameterize, e.g. the number or the sizes of table columns, but nothing really fancy. No secondary indexes. No custom filtering clauses. No control over the CQL text. Really nothing. So, overall, Latte at that time was more of a proof-of-concept rather than a universal tool for doing real work. Surely, you could fork it and write a new workload in Rust, then compile everything from source. But who wants to waste time on learning the internals of a niche benchmarking tool?

Rune scripting

So the last year, in order to be able to measure the performance of storage attached indexes in Cassandra, I decided to integrate Latte with a scripting engine that would allow me to easily define workloads without recompiling the whole program. After playing a bit with embedding CQL statements in TOML config files (which turned out to be both messy and limited at the same time), through having some fun with embedding Lua (which is probably great in C world, but didn’t play so nice with Rust as I expected, although it kinda worked), I eventually ended up with a design similar to that of sysbench but with an embedded Rune interpreter instead of Lua.

The main selling points of Rune that convinced me were painless Rust integration and support for async code. Thanks to async support, the user can execute CQL statements directly in the workload scripts, leveraging the asynchronous nature of the Cassandra driver. Additionally, the Rune team is amazingly helpful and removed anything that blocked me in virtually no time.

Here is an example of a complete workload that measures performance of selecting rows by random keys:

const ROW_COUNT = latte::param!("rows", 100000);

const KEYSPACE = "latte";
const TABLE = "basic";

pub async fn schema(ctx) {
    ctx.execute(`CREATE KEYSPACE IF NOT EXISTS ${KEYSPACE} \
                    WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }`).await?;
    ctx.execute(`CREATE TABLE IF NOT EXISTS ${KEYSPACE}.${TABLE}(id bigint PRIMARY KEY)`).await?;
}

pub async fn erase(ctx) {
    ctx.execute(`TRUNCATE TABLE ${KEYSPACE}.${TABLE}`).await?;
}

pub async fn prepare(ctx) {
    ctx.load_cycle_count = ROW_COUNT;
    ctx.prepare("insert", `INSERT INTO ${KEYSPACE}.${TABLE}(id) VALUES (:id)`).await?;
    ctx.prepare("select", `SELECT * FROM ${KEYSPACE}.${TABLE} WHERE id = :id`).await?;
}

pub async fn load(ctx, i) {
    ctx.execute_prepared("insert", [i]).await?;
}

pub async fn run(ctx, i) {
    ctx.execute_prepared("select", [latte::hash(i) % ROW_COUNT]).await?;
}

You can find more info on how to write those scripts in the README.

Benchmarking the benchmarking program

Although the scripts are not JIT-compiled to native code yet, they are acceptably fast, and thanks to the limited amount of code they typically contain, they don’t show up at the top of the profile. I’ve empirically found that the overhead of Rust-Rune FFI was lower than that of Rust-Lua provided by mlua, probably due to the safety checks employed by mlua.

Initially, to assess the performance of the benchmarking loop, I created an empty script:

pub async fn run(ctx, i) {
}

Even though there is no function body there, the benchmarking program needs to do some work to actually run it:

The results on my old 4-core laptop with Intel Xeon E3-1505M v6 locked at 3 GHz looked very promising:

Because there are 4 cores, the throughput increases linearly up to 4 threads. Then it increases slightly more up to 8 threads, thanks to hyper-threading that squeezes a bit more performance out of each core. Obviously there is no performance improvement beyond 8 threads, because all CPU resources are saturated at this point.

I was also satisfied with the absolute numbers I got. A few million of empty calls per second on a laptop sounds like the benchmarking loop is lightweight enough to not cause significant overhead in real measurements. A local Cassandra server launched on the same laptop can only do about 200k requests per second when fully loaded and that only if those requests are stupidly simple and all the data fits in memory.

By the way, after adding some real code for data generation in the body, but with no calls to the database, as expected everything got proportionally slower, but not more than 2x slower, so it was still in a “millions ops per second” range.

That was easy. I could have stopped here and announce victory. However, I was curious how fast it could go if tried on a bigger machine with more cores.

Running an empty loop on 24 cores

A server with two Intel Xeon CPU E5-2650L v3 processors, each with 12 cores running at 1.8 GHz should be obviously a lot faster than an old 4-core laptop, shouldn’t it? Well, maybe with 1 thread it would be slower because of lower CPU frequency (3 GHz vs 1.8 GHz), but it should make up for that by having many more cores.

Let the numbers speak for themselves:

You’ll agree there is something wrong here. Two threads are better than one… and that’s basically it. I couldn’t squeeze more throughput than about 2 million calls per second, which was about 4x worse than the throughput I got on the laptop. Either the server was a lemon or my program had a serious scalability issue.

Investigation

When you hit a performance problem, the most common way of investigating it is to run the code under profiler. In Rust, it is very easy to generate flamegraphs with cargo flamegraph. Let’s compare the flamegraphs collected when running the benchmark with 1 thread vs 12 threads:

flamegraph 1 thread

flamegraph 12 threads

I was expecting to find a single thing that was a bottleneck, e.g. a contended mutex or something similar, but to my surprise, there was nothing obvious there. There wasn’t even a single bottleneck! Rune’s VM::run code seemed to take about 1/3 of the time, but the rest was simply taken by polling futures and quite likely the culprit got inlined and disappeared from the profile.

Anyway, because of VM::run and the path rune::shared::assert_send::AssertSend leading also to Rune, I decided to disable the code responsible for calling the Rune function, and I reran the experiment with just a loop running an empty future, albeit with timing and statistics code still enabled:

// Executes a single iteration of a workload.
// This should be idempotent –
// the generated action should be a function of the iteration number.
// Returns the end time of the query.
pub async fn run(&self, iteration: i64) -> Result<Instant, LatteError> {
    let start_time = Instant::now();
    let session = SessionRef::new(&self.session);
    // let result = self
    //     .program
    //     .async_call(self.function, (session, iteration))
    //     .await
    //     .map(|_| ()); // erase Value, because Value is !Send
    let end_time = Instant::now();
    let mut state = self.state.try_lock().unwrap();
    state.fn_stats.operation_completed(end_time - start_time);
    // ... 
    Ok(end_time)   
}

That scaled fine to over 100M calls per second on 48 threads! So the problem must be somewhere below the Program::async_call function:

// Compiled workload program
pub struct Program {
    sources: Sources,
    context: Arc<RuntimeContext>, 
    unit: Arc<Unit>,
}

// Executes given async function with args.
// If execution fails, emits diagnostic messages, e.g. stacktrace to standard error stream.
// Also signals an error if the function execution succeeds, but the function returns
// an error value.    
pub async fn async_call(
    &self,
    fun: FnRef,
    args: impl Args + Send,
) -> Result<Value, LatteError> {
    let handle_err = |e: VmError| {
        let mut out = StandardStream::stderr(ColorChoice::Auto);
        let _ = e.emit(&mut out, &self.sources);
        LatteError::ScriptExecError(fun.name, e)
    };
    let execution = self.vm().send_execute(fun.hash, args).map_err(handle_err)?;
    let result = execution.async_complete().await.map_err(handle_err)?;
    self.convert_error(fun.name, result)
}

// Initializes a fresh virtual machine needed to execute this program.
// This is extremely lightweight.
fn vm(&self) -> Vm {
    Vm::new(self.context.clone(), self.unit.clone())
}

The async_call function does a few things:

My next idea was to just remove the send_execute and async_complete calls and leave just the VM preparation. So basically I wanted to benchmark that line:

Vm::new(self.context.clone(), self.unit.clone())

The code looks fairly innocent. No locks, no mutexes, no syscalls, no shared mutable data here. There are some read-only structures context and unit shared behind an Arc, but read-only sharing shouldn’t be a problem.

VM::new is also trivial:

impl Vm {

    // Construct a new virtual machine.
    pub const fn new(context: Arc<RuntimeContext>, unit: Arc<Unit>) -> Self {
        Self::with_stack(context, unit, Stack::new())
    }

    // Construct a new virtual machine with a custom stack.
    pub const fn with_stack(context: Arc<RuntimeContext>, unit: Arc<Unit>, stack: Stack) -> Self {
        Self {
            context,
            unit,
            ip: 0,
            stack,
            call_frames: vec::Vec::new(),
        }
    }

However, not matter how innocent the code looks, I like to double check my assumptions. I ran that with different numbers of threads and, although it was now faster than before, it didn’t scale at all again – it hit a throughput ceiling of about 4 million calls per second!

The problem

Although at first it doesn’t look like there is any sharing of mutable data in the code above, actually there is something slightly hidden that’s shared and mutated: the Arc reference counters themselves. Those counters are shared between all the invocations, performed from many threads, and they are the source of the congestion here.

Some may argue that atomically increasing or decreasing a shared atomic counter shouldn’t be a problem because those are “lockless” operations. They even translate to single assembly instructions (e.g. lock xadd)! If something is a single assembly instruction, it is not slow, isn’t it? That reasoning is unfortunately flawed.

The root of the problem is not really the computation, but the cost of maintaining the shared state.

The amount of time required to read or write data is mostly influenced by how far the CPU core needs to reach out for the data. Here are the typical latencies for the Intel Haswell Xeon CPUs according to this site:

L1 and L2 caches are typically local to a core (L2 may be shared by two cores). L3 cache is shared by all cores of a CPU. There is also a direct interconnect between L3 caches of different processors on the main board for managing L3 cache coherency, so L3 is logically shared between all processors.

As long as you don’t update the cache line and only read it from multiple threads, the line will be loaded by multiple cores and marked as shared. It is likely that frequent accesses to such data would be served from L1 cache, which is very fast. Therefore sharing read-only data is perfectly fine and scales well. Even using atomics for only reading will be plenty fast in that case.

However, once we introduce updates to the shared cache line, things start to complicate. The x86-amd64 architecture has coherent data caches. This means basically that what you write on one core, you can read back on another one. It is not possible to store a cache line with conflicting data in multiple cores. Once a thread decides to update a shared cache line, that line gets invalidated on all the other cores, so subsequent loads on those cores would have to fetch the data from at least L3. That is obviously a lot slower, and even slower if there are more processors than one on the main board.

The fact that our reference counters are atomic is an additional problem that makes things even more complex for the processor. Although using atomic instructions is often referred to as “lockless programming”, this is slightly misleading – in fact, atomic operations require some locking to happen at the hardware level. This locking is very fine-grained and cheap as long as there is no congestion, but as usual with locking, you may expect very poor performance if many things try to fight for the same lock at the same time. And it is of course much worse if those “things” are whole CPUs and not just single cores that are close to each other.

The fix

The obvious fix is to avoid sharing the reference counters. Latte has a very simple, hierarchical lifecycle structure, so all those Arc updates looked like an overkill to me and they could probably be replaced with simpler references and Rust lifetimes. However, this is easier said than done. Unfortunately Rune requires the references to the Unit and RuntimeContext to be passed wrapped in Arc for managing the lifetime (in probably more complex scenarios) and it also uses some Arc-wrapped values internally as part of those structures. Rewriting Rune just for my tiny use case was out of the question.

Therefore the Arc had to stay. Instead of using a single Arc value we can use one Arc per thread. That requires also separating the Unit and RuntimeContext values, so each thread would get their own. As a side effect, this guarantees there is no sharing at all, so even if Rune clones an Arc stored internally as a part of those values, that problem would be also fixed. The downside of this solution is higher memory use. Fortunately . Latte workload scripts are usually tiny, so higher memory use is likely not a big problem.

To be able to use separate Unit and RuntimeContext I submitted a patch to Rune to make them Clone. Then, on the Latte side, the whole fix was actually introducing a new function for “deep” cloning the Program struct and then making sure each thread gets its own copy:

    // Makes a deep copy of context and unit.
    // Calling this method instead of `clone` ensures that Rune runtime structures
    // are separate and can be moved to different CPU cores efficiently without accidental
    // sharing of Arc references.
    fn unshare(&self) -> Program {
        Program {
            sources: self.sources.clone(),
            context: Arc::new(self.context.as_ref().clone()),   // clones the value under Arc and wraps it in a new counter
            unit: Arc::new(self.unit.as_ref().clone()),         // clones the value under Arc and wraps it in a new counter
        }
    }

BTW: The sources field is not used during the execution, except for emitting diagnostics, so it could be left shared.

Note that the original line where I originally found the slowdown did not need any changes!

Vm::new(self.context.clone(), self.unit.clone())

This is because self.context and self.unit are not shared between threads anymore. Atomic updates to non-shared counters are fortunately very fast.

Final results

Now the throughput scales linearly up to 24 threads, as expected:

Takeaways

Share on: