cpp11之后,cpp本身对并发的支持大大增强,这里记录一下

1 std::thread class

cpp的thread类型,创建即运行,不像java那样需要start,thread表示一个系统资源,一个系统线程

构造函数

functionconstruct
constructor 1c++thread() noexcept;
constructor 2thread( thread&& other ) noexcept;
constructor 3template<class F, class... Args>
explicit thread(F&& f, Args&&... args);
copy constructorthread(const thread&) = delete;
assign copy operatorthread& operator=(const thread&) = delete;
assign move operatorthread& operator=( thread&& other ) noexcept;
  1. 默认构造函数,创建一个空的 thread 执行对象;
  2. 初始化构造函数,创建一个 thread对象,该 thread对象可被 joinable,新产生的线程会调用 fn 函数,该函数的参数由 args 给出;
  3. 拷贝构造函数被禁用,意味着 thread 不可被拷贝;
  4. 赋值操作被禁用,thread 对象不可被拷贝。
  5. move 构造函数, 调用成功之后 other 不代表任何 thread 执行对象;
  6. move 赋值操作,如果当前对象不可 joinable,需要传递一个右值引用(rhs)给 move 赋值操作;如果当前对象可被 joinable,则 terminate() 报错;

注意:可被 joinable 的 thread 对象必须在他们销毁之前被主线程 join 或者将其设置为 detached.

this_thread namespace

this_thread这个命名空间下定义了几个有用的静态函数

functionaldetail
std::thread::id get_id() const noexcept;获取当前thread的id,这个id是cpp自己定义的
void yield() noexcept;把调度机会让给其他的thread
void sleep_until( const std::chrono::time_point tp)sleep到指定的 time_point
void sleep_for( const std::chrono::duration duration)sleep 指定的 duration
#include <thread>
#include <chrono>
using namespace std;

int main(int argc, char *argv[]){
    this_thread::sleep_until(std::chrono::steady_clock::now() + 1234ms);
    this_thread::sleep_for(1234ms);
    return 0;
}

其他相关函数

functiondetail
bool joinable() const noexcept;检查thread是否可以 join
native_handle_type native_handle();获取平台相关联的id,posix系统一般是关联的pthread,这个函数要在thread未调用join之前用,之后只会返回0
static unsigned int hardware_concurrency() noexcept;系统支持的线程数,比如2核但是支持超线程,可能返回4
void detach();脱离用户管理,类似pthread_detach
void swap( std::thread& other ) noexcept;交换thread的ownership
std::ref(T&)返回引用,
std::cref(T&)返回const引用

线程退出

cpp 的 thread 缺少类似 java 的 interrupt,一般有2种方式来终止:

  1. 通过某个变量来控制是否退出
  2. 调用native_handle获取到pthread_id,然后再调用 pthread_cancel
#include <iostream>
#include <thread>
using namespace std;

void func1(atomic_bool& quit) {
    while (!quit) {
        this_thread::sleep_for(100ms);
    }
    cout << "thread1 quit\n";
}

void func2() {
    while (true) {
        this_thread::sleep_for(100ms);
    }
    cout << "thread2 quit\n";
}

int main(int argc, char *argv[]) {
    atomic_bool quit(false);
    cout << "start func1" << endl;
    thread t1{func1, ref(quit)};
    this_thread::sleep_for(5s);
    quit.store(true);
    t1.join();
    cout << "func1 joined" << endl;
    
    cout << "start func2" << endl;
    thread t2{func2};
    this_thread::sleep_for(5s);
    pthread_cancel(t2.native_handle());
    t2.join();
    cout << "func2 joined" << endl;

    return 0;
}

thread_local 数据

thread_local 的数据是一个线程独有的数据,其他线程无法访问(除非用指针),还可以是 extern 的。

#include <iostream>
#include <string>
#include <thread>
using namespace std;

thread_local unsigned int rage = 1;

void increase_rage(const std::string& thread_name) {
    ++rage; // modifying outside a lock is okay; this is a thread-local variable
    cout << "Rage counter for " << thread_name << " : " << rage << '\n';
}

int main() {
    thread a(increase_rage, "a");
    a.join();
    thread b(increase_rage, "b");
    b.join();
}

2 mutex

互斥量和条件变量,

相关类或函数

classdescribe
mutex基本的 mutex 类
recursive_mutex递归 mutex 类, 可以被同一个thread重复获取
timed_mutex定时非递归 mutex 类,在指定时长内获取mutex
recursive_timed_mutex定时递归 mutex 类,
lock_guard<M>析构时释放 M, lock 只能使用 adopt_lock(见表中) 标志
unique_lock<M>movable,but not copyable, 灵活性高
区别与 mutex 的不可移动不可拷贝,但是性能低于 mutex
析构时释放 M,也可以提前调用unlock()来降低锁的粒度,unique_lock中保存了锁状态
shared_lock<M> c++14TODO
std::lock(M1,M2…)同时获取多个锁, 使用了死锁避免算法,这是一个函数,不会自动释放全部获取到的锁
scoped_lock<M…> c++17std::lock的 RAII-style 的封装,析构时释放全部获取到的锁
defer_lock这是一个标志,表示只是获取m的ownership, 但是不调用lock()函数,线程还没有获取到锁
adopt_lock这是一个标志,表示只是获取m的ownership, 但是不调用lock()函数,这个需要线程先获取到锁
#include <mutex>
#include <iostream>
using namespace std;

int main(int argc, char *argv[])
{
    mutex mtx1, mtx2;
    {
        lock_guard l(mtx1);
    }
    {
        std::lock(mtx1, mtx2);
        // mtx1, mtx2 already locked, so use adopt_lock
        lock_guard l1(mtx1, adopt_lock);
        lock_guard l2(mtx2, adopt_lock);
    }
    {
        scoped_lock lock(mtx1, mtx2);
    }
    {
        unique_lock l1(mtx1, std::defer_lock);
        unique_lock l2(mtx2, std::defer_lock);
        // mtx1, mtx2 is not locked, because use defer_lock
        // donot use mtx1 and mtx2 directly 
        std::lock(l1, l2);
        if(l1.owns_lock() && l2)
            cout << "own metux" << endl;
        else
            cout << "onwership is moved!\n";
    }

    lock_guard l(mtx1);

    return 0;
}

3 call_once

使用 std::once_flag 和 std::call_once 来保证单次执行, c++中双重检查也存在潜在竞争,C++和双重检查锁定模式(DCLP)的风险,所以还是推荐使用 call_once

void init(string file_path) {
    static once_flag flag;
    call_once(flag, init_from_file, file_path);
}

4 condition

条件变量(condition variables)是用于线程间同步的重要工具,允许线程在某个条件不满足时挂起等待,直到其他线程通知该条件已满足。

相关类

classdescribe
condition_variable提供与std::unique_lock配合使用的条件变量
condition_variable_any提供与任何满足基本要求的锁配合使用的条件变量
notify_all_at_thread_exit安排在当前线程完全退出时通知所有等待的线程
cv_status列举条件变量状态的枚举类型

condition_variable

std::condition_variable 是最常用的条件变量类型,必须与 std::unique_lock<std::mutex> 一起使用。

主要成员函数

functiondetail
void wait(unique_lock lock)阻塞当前线程直到条件变量被通知
template<class Predicate>
void wait(unique_lock lock, Predicate pred)
带谓词的等待,等价于while(!pred()) wait(lock);
cv_status wait_for(unique_lock lock, const duration rel_time)等待指定时间或直到被通知
template<class Rep, class Period, class Predicate>
bool wait_for(unique_lock lock, const duration rel_time, Predicate pred)
带谓词和超时的等待
cv_status wait_until(unique_lock lock, const time_point abs_time)等待到指定时间点或直到被通知
template<class Predicate>
bool wait_until(unique_lock lock,const time_point abs_time, Predicate pred)
带谓词和时间点的等待
void notify_one()通知一个等待的线程
void notify_all()通知所有等待的线程

condition_variable_any

std::condition_variable_anycondition_variable 类似,但可以与任何满足基本要求的锁类型一起使用,而不仅限于 std::unique_lock<std::mutex>。由于实现复杂度较高,通常推荐使用 condition_variable

使用示例

以下是一个经典的生产者-消费者模型示例:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>

std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
bool finished = false;

// 生产者线程函数
void producer() {
    for (int i = 0; i < 5; ++i) {
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
        {
            std::lock_guard<std::mutex> lock(mtx);
            data_queue.push(i);
            std::cout << "Produced: " << i << std::endl;
        }
        cv.notify_one(); // 通知消费者
    }
    
    {
        std::lock_guard<std::mutex> lock(mtx);
        finished = true;
    }
    cv.notify_all(); // 通知所有消费者生产已完成
}

// 消费者线程函数
void consumer(int id) {
    while (true) {
        std::unique_lock<std::mutex> lock(mtx);
        cv.wait(lock, []{ return !data_queue.empty() || finished; });
        
        if (!data_queue.empty()) {
            int data = data_queue.front();
            data_queue.pop();
            std::cout << "Consumer " << id << " consumed: " << data << std::endl;
            lock.unlock(); // 提前解锁以减少锁持有时间
        } else if (finished) {
            break;
        }
    }
}

int main() {
    std::thread prod(producer);
    std::thread cons1(consumer, 1);
    std::thread cons2(consumer, 2);
    
    prod.join();
    cons1.join();
    cons2.join();
    
    return 0;
}

注意事项

  1. 虚假唤醒:线程可能在没有收到通知的情况下被唤醒,因此通常需要在循环中检查条件:

    std::unique_lock<std::mutex> lock(mtx);
    while (!condition) {
        cv.wait(lock);
    }
  2. 谓词形式:使用带谓词的 wait()

    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, []{ return condition; });
  3. 锁的顺序:调用 wait()前必须持有锁,wait() 会自动释放锁并在返回前重新获取锁。

  4. 通知方式

    • notify_one():只唤醒一个等待线程
    • notify_all():唤醒所有等待线程

高级用法和最佳实践

与多个条件配合使用

在复杂场景中,可能需要多个条件变量协调工作:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

class ThreadPool {
private:
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;

public:
    ThreadPool() : stop(false) {}
    
    void start() {
        std::thread worker([this] {
            while (true) {
                std::unique_lock<std::mutex> lock(this->queue_mutex);
                this->condition.wait(lock, [this] { return this->stop; });
                if (this->stop) {
                    std::cout << "Worker thread stopping..." << std::endl;
                    break;
                }
            }
        });
        
        worker.detach();
    }
    
    void shutdown() {
        {
            std::unique_lock<std::mutex> lock(queue_mutex);
            stop = true;
        }
        condition.notify_all();
    }
};

使用 std::notify_all_at_thread_exit

当需要确保在线程完全退出后才通知其他线程时,可以使用 std::notify_all_at_thread_exit

#include <chrono>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>

std::mutex m;
std::condition_variable cv;

bool ready = false;
std::string result;

void worker_thread(){
    std::unique_lock<std::mutex> lk(m);
    result = "processing data";
    
    // 在线程退出时通知所有等待的线程
    std::notify_all_at_thread_exit(cv, std::move(lk));
    
    // 执行一些清理工作
    std::this_thread::sleep_for(std::chrono::seconds(1));
    result = "data processed";
    ready = true;
}

int main() {
    std::thread t(worker_thread);
    
    std::unique_lock<std::mutex> lk(m);
    cv.wait(lk, []{ return ready; });
    
    std::cout << "Result: " << result << std::endl;
    t.join();
}

性能优化建议

  1. 减少锁竞争

    • 尽可能缩短持有锁的时间
    • 在调用 wait() 前预先准备好数据
  2. 选择合适的等待方式

    • 使用超时等待避免无限期阻塞
    • 根据业务场景选择 notify_one() 还是 notify_all()s
  3. 避免死锁

    • 始终以相同顺序获取多个锁
    • 使用 std::lock()std::scoped_lock 同时获取多个锁

C++20 中的改进

C++20 引入了更多现代化的并发工具,包括对条件变量的改进:

// C++20 中的改进等待接口
std::condition_variable cv;
std::mutex mtx;
bool condition = false;

// 新增的等待接口,支持停止令牌
std::stop_token st;
// cv.wait(st, lock, predicate); // 可以被外部请求停止

5 基于任务的并发

thread 类过于底层,使用起来会分散注意力的

classdescribe
packaged_task<F>打包可调用对象F作为任务运行
promise<T>
future<T>
shared_future<T>
x=async(policy, f, args)
x=async(f, args)

future & promise

promise 可以保存某一类型的值, 该值可被 future 对象读取(可能在另外一个线程中),因此 promise 也提供了一种线程同步的手段。在 promise 对象构造时可以和一个共享状态(通常是 future)相关联,并可以在相关联的共享状态(future)上保存一个类型为 T 的值。

可以通过 get_future 来获取与该 promise 对象相关联的 future 对象,调用该函数之后,两个对象共享相同的共享状态(shared state)

例子:

#include <chrono>
#include <functional>  // std::ref
#include <future>      // std::promise, std::future
#include <iostream>    // std::cout
#include <thread>      // std::thread
using namespace std;

void print_int(promise<int> promise) {
    cout << "thred id : [" << this_thread::get_id() << "] start.\n";
    promise.set_value_at_thread_exit(10);
    this_thread::sleep_for(chrono::seconds(3));
    cout << "thred id : [" << this_thread::get_id() << "] exit.\n";
}

int main() {
    promise<int> prom;                            // 生成一个 std::promise<int> 对象.
    future<int>  fut = prom.get_future();         // 和 future 关联.
    thread(print_int, std::move(prom)).detach();  // 将 promise 交给另外一个线程t.

    auto val = fut.get();  // 等待结果.
    cout << "get future [" << val << "]" << endl;

    return 0;
}

packaged_task

packaged_task 是对一个任务和一对(promise/future)的封装,让使用更加的便捷

#include <string>
#include <iostream>
#include <thread>
#include <future>
using namespace std;

int demo(int i) {
    cout << this_thread::get_id() <<" thread start (" << i << ")\n";
    this_thread::sleep_for(3s);
    if (i) return i*i;
    throw runtime_error("demo(0)");
}

int main(int argc, char *argv[])
{
    packaged_task<int(int)> task1{demo};
    packaged_task<int(int)> task2{demo};
    auto f1 = task1.get_future();
    auto f2 = task2.get_future();
    // bind with thread
    thread(std::move(task1), 2).detach();
    thread(std::move(task2), 0).detach();
    f1.wait_for(0s);
    cout << "------- [" << this_thread::get_id() << "] start call future.get() -------\n";
    try {
        cout << "[" << f1.get() << "]" << endl;
        cout << f2.get() << endl;
    } catch (const exception& e) {
        cerr << "exception: " << e.what() << endl;
    }

    return 0;
}

运行结果为:

281473574039936 thread start (2)
------- [281473579782208] start call future.get() -------
281473565585792 thread start (0)
4
exception: demo(0)

线程启动器 async

线程启动器(thread launch)是一个函数, 它决定是否创建新线程、回收旧线程或者简单的在本thread上执行;
async 可以看做是一个复杂的线程启动器的简单接口,调用 async 简单的返回一个future<R>

launchmethod
fu = async(policy, f, args)根据启动策略执行
fu = async(f, args)等同于 async(launch::async|launch::deferred, f, args)

表中有2种策略,这是目前支持的只有这2种:

  • launch::async : 就像 创建了一个新线程一样执行
  • launch::deferred : 延时执行,等到future执行get的时候在执行

launch::async 线程启动器有很大的决策权,是否创建新线程或者底层使用线程池,依赖各个系统的实现了。

std::future<int> f2 = std::async(std::launch::async, [](){ return 8; });
// can delete
f2.wait(); 
cout << f2.get() << endl;

6 atomic

C++11 引入了原子操作库,定义在头文件中,提供了一种无锁的同步机制。原子操作是指不会被线程调度机制打断的操作,这种操作一旦开始,就一直运行到结束,中间不会有任何线程切换。

基本概念

原子类型是封装了一个值的类型,这个值的访问保证不会导致数据竞争,并且可以在并发环境中安全地修改。

常用原子类型

C++ 提供了多种原子类型,主要包括:

  • std::atomic_boolstd::atomic<bool> - 原子布尔类型
  • std::atomic_charstd::atomic<char> - 原子字符类型
  • std::atomic_intstd::atomic<int> - 原子整型
  • std::atomic_longstd::atomic<long> - 原子长整型
  • std::atomic_pointerstd::atomic<T*> - 原子指针类型

基本操作

1. load 和 store 操作

#include <atomic>
#include <iostream>
#include <thread>

std::atomic<int> atomic_var{0};

void writer() {
    atomic_var.store(42);  // 原子写入值
}

void reader() {
    int value = atomic_var.load();  // 原子读取值
    std::cout << "Value: " << value << std::endl;
}

2. exchange 操作

exchange 操作会用新值替换当前值,并返回旧值:

std::atomic<int> atomic_var{10};
int old_value = atomic_var.exchange(20);  // old_value = 10, atomic_var = 20

3. compare_exchange 操作

compare_exchange_weak 和 compare_exchange_strong 是原子的比较并交换操作:

std::atomic<int> atomic_var{10};
int expected = 10;
int new_value = 20;

// 如果 atomic_var 的值等于 expected,则将其设置为 new_value,返回 true
// 否则,将 expected 设置为 atomic_var 的当前值,返回 false
bool success = atomic_var.compare_exchange_weak(expected, new_value);

内存顺序

原子操作可以指定不同的内存顺序约束,以平衡性能和一致性:

  • memory_order_relaxed - 最宽松的内存顺序,仅保证原子性
  • memory_order_consume - 依赖关系释放
  • memory_order_acquire - 获取操作
  • memory_order_release - 释放操作
  • memory_order_acq_rel - 获取释放操作
  • memory_order_seq_cst - 顺序一致性(默认)

示例:

std::atomic<int> atomic_var{0};

// 使用不同内存顺序的操作
atomic_var.store(42, std::memory_order_relaxed);
int value = atomic_var.load(std::memory_order_acquire);
bool success = atomic_var.compare_exchange_strong(expected, new_value, std::memory_order_acq_rel);

应用示例

在前面的线程退出示例中,我们已经使用了 atomic_bool:

#include <iostream>
#include <thread>
#include <atomic>

void func1(std::atomic_bool& quit) {
    while (!quit.load()) {  // 原子读取
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }
    std::cout << "thread1 quit\n";
}

int main() {
    std::atomic_bool quit{false};
    std::cout << "start func1" << std::endl;
    std::thread t1{func1, std::ref(quit)};
    std::this_thread::sleep_for(std::chrono::seconds(5));
    quit.store(true);  // 原子写入,通知线程退出
    t1.join();
    std::cout << "func1 joined" << std::endl;
    return 0;
}

原子操作提供了一种比互斥锁更轻量级的同步机制,在某些场景下可以显著提高性能.