Expert Topics
Multithreading in C++
Table of Contents
Thread Basics
#include <thread>
// Create and start thread
void task() {
cout << "Running in thread" << endl;
}
thread t(task);
t.join(); // Wait for completion
// Lambda
thread t2([]() {
cout << "Lambda thread" << endl;
});
t2.join();
// With arguments
void greet(const string& name) {
cout << "Hello, " << name << endl;
}
thread t3(greet, "Alice");
t3.join();
Thread Management
Join vs Detach
thread t(task);
// join() - Wait for thread to finish
t.join();
// detach() - Run independently
thread t2(task);
t2.detach(); // Daemon thread
// Check if joinable
if (t.joinable()) {
t.join();
}
Thread ID and Hardware
// Get thread ID
thread::id id = this_thread::get_id();
// Number of cores
unsigned int cores = thread::hardware_concurrency();
// Sleep
this_thread::sleep_for(chrono::seconds(1));
this_thread::sleep_until(chrono::steady_clock::now() + chrono::seconds(1));
// Yield
this_thread::yield();
Mutex and Locks
Basic Mutex
#include <mutex>
mutex mtx;
int counter = 0;
void increment() {
mtx.lock();
++counter;
mtx.unlock();
}
lock_guard (RAII)
mutex mtx;
void safeIncrement() {
lock_guard<mutex> lock(mtx);
++counter;
// Automatically unlocks when scope ends
}
unique_lock (Flexible)
mutex mtx;
void flexibleLock() {
unique_lock<mutex> lock(mtx);
// Can unlock and relock
lock.unlock();
// ... do other work
lock.lock();
// Can transfer ownership
unique_lock<mutex> lock2 = move(lock);
}
Multiple Mutexes
mutex mtx1, mtx2;
void transferSafe() {
// Lock multiple mutexes without deadlock
scoped_lock lock(mtx1, mtx2);
// Or: lock(mtx1, mtx2); with defer_lock
}
Shared Mutex (Reader-Writer)
#include <shared_mutex>
shared_mutex rwMutex;
void reader() {
shared_lock lock(rwMutex); // Multiple readers OK
// Read data
}
void writer() {
unique_lock lock(rwMutex); // Exclusive access
// Write data
}
Condition Variables
#include <condition_variable>
mutex mtx;
condition_variable cv;
bool ready = false;
void worker() {
unique_lock<mutex> lock(mtx);
cv.wait(lock, []{ return ready; }); // Wait until ready
cout << "Working" << endl;
}
void signaler() {
{
lock_guard<mutex> lock(mtx);
ready = true;
}
cv.notify_one(); // or notify_all()
}
Producer-Consumer
queue<int> buffer;
mutex mtx;
condition_variable cv;
bool done = false;
void producer() {
for (int i = 0; i < 10; i++) {
{
lock_guard<mutex> lock(mtx);
buffer.push(i);
}
cv.notify_one();
}
done = true;
cv.notify_all();
}
void consumer() {
while (true) {
unique_lock<mutex> lock(mtx);
cv.wait(lock, []{ return !buffer.empty() || done; });
if (buffer.empty() && done) break;
int val = buffer.front();
buffer.pop();
lock.unlock();
process(val);
}
}
Async and Futures
async
#include <future>
int compute() {
return 42;
}
// Launch async task
future<int> result = async(launch::async, compute);
// Get result (blocks if not ready)
int value = result.get();
// Launch policies
async(launch::async, func); // New thread
async(launch::deferred, func); // Lazy evaluation
async(launch::async | launch::deferred, func); // Implementation chooses
promise and future
void worker(promise<int> p) {
// Do work
p.set_value(42); // Or set_exception
}
promise<int> p;
future<int> f = p.get_future();
thread t(worker, move(p));
int result = f.get();
t.join();
packaged_task
packaged_task<int(int, int)> task([](int a, int b) {
return a + b;
});
future<int> result = task.get_future();
thread t(move(task), 3, 4);
cout << result.get() << endl; // 7
t.join();
Wait Operations
future<int> f = async(compute);
// Check status
if (f.wait_for(chrono::seconds(0)) == future_status::ready) {
cout << "Ready!" << endl;
}
// Wait with timeout
auto status = f.wait_for(chrono::seconds(5));
if (status == future_status::ready) { }
else if (status == future_status::timeout) { }
else if (status == future_status::deferred) { }
Atomic Operations
#include <atomic>
atomic<int> counter(0);
void increment() {
counter++; // Atomic increment
counter.fetch_add(1); // Same thing
}
// Atomic operations
counter.store(10); // Write
int val = counter.load(); // Read
int old = counter.exchange(20); // Swap
// Compare and swap
int expected = 10;
counter.compare_exchange_strong(expected, 20);
// Atomic flag
atomic_flag flag = ATOMIC_FLAG_INIT;
while (flag.test_and_set()) { } // Spinlock
flag.clear();
Thread Pool Pattern
A common pattern for managing worker threads:
class ThreadPool {
vector<thread> workers;
queue<function<void()>> tasks;
mutex queueMutex;
condition_variable cv;
bool stop = false;
public:
ThreadPool(size_t numThreads) {
for (size_t i = 0; i < numThreads; i++) {
workers.emplace_back([this] {
while (true) {
function<void()> task;
{
unique_lock<mutex> lock(queueMutex);
cv.wait(lock, [this] {
return stop || !tasks.empty();
});
if (stop && tasks.empty()) return;
task = move(tasks.front());
tasks.pop();
}
task();
}
});
}
}
template<class F>
void enqueue(F&& f) {
{
lock_guard<mutex> lock(queueMutex);
tasks.emplace(forward<F>(f));
}
cv.notify_one();
}
~ThreadPool() {
{
lock_guard<mutex> lock(queueMutex);
stop = true;
}
cv.notify_all();
for (auto& worker : workers) worker.join();
}
};
// Usage
ThreadPool pool(4);
pool.enqueue([]{ cout << "Task 1" << endl; });
pool.enqueue([]{ cout << "Task 2" << endl; });
Recommended Conventions
✅ Do
// 1. Use RAII locks
{
lock_guard<mutex> lock(mtx);
// Protected code
}
// 2. Minimize lock scope
{
lock_guard<mutex> lock(mtx);
auto copy = shared_data;
}
process(copy); // Outside lock
// 3. Use async for simple parallel tasks
auto f = async(launch::async, heavyComputation);
// 4. Use atomics for simple counters
atomic<int> counter{0};
❌ Don't
// 1. Don't forget to join/detach
thread t(func);
// Must call t.join() or t.detach()!
// 2. Don't hold locks while blocking
mtx.lock();
cin >> input; // BAD - blocks while holding lock
mtx.unlock();
// 3. Don't access shared data without synchronization
int shared = 0;
thread t1([&]{ shared++; }); // Race condition!
thread t2([&]{ shared++; });
Cheat Sheet
// Thread
thread t(func, args...);
t.join(); t.detach(); t.joinable();
this_thread::get_id(); sleep_for(); yield();
// Mutex
mutex mtx;
lock_guard<mutex> lg(mtx);
unique_lock<mutex> ul(mtx);
scoped_lock sl(mtx1, mtx2);
// Condition Variable
condition_variable cv;
cv.wait(lock, predicate);
cv.notify_one(); cv.notify_all();
// Async/Future
future<T> f = async(launch::async, func);
T result = f.get();
f.wait(); f.wait_for(); f.wait_until();
// Promise
promise<T> p;
future<T> f = p.get_future();
p.set_value(val);
// Atomic
atomic<T> a;
a.load(); a.store(v);
a.fetch_add(n); a.compare_exchange_strong(e, v);
Compile & Run
g++ -std=c++17 -pthread -Wall examples.cpp -o examples && ./examples