cpp

Expert Topics

10_Expert_Topics⚙️
// Module 10: Expert Topics
// Lesson: Modern Multithreading and Concurrency
// Build: g++ -std=c++17 -pthread 01_multithreading.cpp -o multithreading_demo
// Description: Demonstrates creating threads, protecting shared state, coordinating work, and composing asynchronous tasks.

#include <chrono>
#include <condition_variable>
#include <future>
#include <iomanip>
#include <iostream>
#include <mutex>
#include <numeric>
#include <queue>
#include <random>
#include <thread>
#include <vector>

using namespace std;

// Utility to print the banner for each part.
void printPart(const string &title)
{
    cout << "\n========== " << title << " ==========" << endl;
}

// PART 1: Basic thread creation and joining.
void part1_basic_threads()
{
    printPart("PART 1: Creating and Joining Threads");

    auto worker = [](int id) {
        cout << "Worker " << id << " starting on thread " << this_thread::get_id() << endl;
        this_thread::sleep_for(chrono::milliseconds(200 + id * 50));
        cout << "Worker " << id << " finished." << endl;
    };

    thread t1(worker, 1);
    thread t2(worker, 2);

    cout << "Main thread does other work..." << endl;
    t1.join();
    t2.join();
    cout << "All workers joined.\n";
}

// PART 2: Shared state with mutex and lock_guard.
void part2_mutex_and_shared_state()
{
    printPart("PART 2: Protecting Shared State");

    vector<int> data(1'000, 1);
    mutex m;
    size_t processed = 0;

    auto accumulateChunk = [&](size_t start, size_t end) {
        int local_sum = accumulate(data.begin() + static_cast<long>(start),
                                   data.begin() + static_cast<long>(end), 0);
        lock_guard<mutex> guard(m);
        processed += (end - start);
        cout << "Chunk [" << start << ", " << end << ") sum = " << local_sum
             << " | processed so far = " << processed << endl;
    };

    vector<thread> workers;
    size_t chunk = data.size() / 4;
    for (size_t i = 0; i < data.size(); i += chunk)
    {
        workers.emplace_back(accumulateChunk, i, min(i + chunk, data.size()));
    }
    for (auto &t : workers)
    {
        t.join();
    }
}

// PART 3: Producer-consumer with condition_variable.
class SafeQueue
{
public:
    void push(int value)
    {
        {
            lock_guard<mutex> lock(m_);
            queue_.push(value);
        }
        cv_.notify_one();
    }

    bool pop(int &value)
    {
        unique_lock<mutex> lock(m_);
        cv_.wait(lock, [&] { return !queue_.empty() || done_; });
        if (queue_.empty())
        {
            return false;
        }
        value = queue_.front();
        queue_.pop();
        return true;
    }

    void signal_done()
    {
        {
            lock_guard<mutex> lock(m_);
            done_ = true;
        }
        cv_.notify_all();
    }

private:
    queue<int> queue_;
    mutex m_;
    condition_variable cv_;
    bool done_ = false;
};

void part3_producer_consumer()
{
    printPart("PART 3: Producer-Consumer Pipeline");

    SafeQueue q;
    thread producer([&] {
        random_device rd;
        mt19937 gen(rd());
        uniform_int_distribution<int> dist(1, 100);
        for (int i = 0; i < 5; ++i)
        {
            int value = dist(gen);
            cout << "Producing " << value << endl;
            q.push(value);
            this_thread::sleep_for(chrono::milliseconds(150));
        }
        q.signal_done();
    });

    thread consumer([&] {
        int value;
        while (q.pop(value))
        {
            cout << "  Consuming " << value << endl;
            this_thread::sleep_for(chrono::milliseconds(250));
        }
        cout << "No more work." << endl;
    });

    producer.join();
    consumer.join();
}

// PART 4: Futures, promises, and async tasks.
long long heavyComputation(int n)
{
    long long sum = 0;
    for (int i = 1; i <= n; ++i)
    {
        sum += static_cast<long long>(i) * i;
    }
    this_thread::sleep_for(chrono::milliseconds(200));
    return sum;
}

void part4_async_tasks()
{
    printPart("PART 4: std::async and std::promise");

    future<long long> f1 = async(launch::async, heavyComputation, 10'000);

    promise<string> p;
    future<string> f2 = p.get_future();
    thread notifier([&p] {
        this_thread::sleep_for(chrono::milliseconds(300));
        p.set_value("Report ready!");
    });

    cout << "Heavy computation result = " << f1.get() << endl;
    cout << "Notification: " << f2.get() << endl;
    notifier.join();
}

int main()
{
    part1_basic_threads();
    part2_mutex_and_shared_state();
    part3_producer_consumer();
    part4_async_tasks();

    cout << "\nExercises:\n"
         << "1. Extend PART 2 to use std::scoped_lock with multiple mutexes.\n"
         << "2. Modify SafeQueue to be bounded (fixed capacity).\n"
         << "3. Create a thread pool that reuses worker threads for a job queue.\n"
         << "4. Benchmark when std::async launches lazily vs eagerly using launch policies.\n";
    return 0;
}
Expert Topics - C++ Tutorial | DeepML