cpp中的并发
cpp11之后,cpp本身对并发的支持大大增强,这里记录一下
1 std::thread class
cpp的thread类型,创建即运行,不像java那样需要start,thread表示一个系统资源,一个系统线程
构造函数
function | construct |
---|---|
constructor 1 | c++thread() noexcept; |
constructor 2 | thread( thread&& other ) noexcept; |
constructor 3 | template<class F, class... Args> explicit thread(F&& f, Args&&... args); |
copy constructor | thread(const thread&) = delete; |
assign copy operator | thread& operator=(const thread&) = delete; |
assign move operator | thread& operator=( thread&& other ) noexcept; |
- 默认构造函数,创建一个空的 thread 执行对象;
- 初始化构造函数,创建一个 thread对象,该 thread对象可被 joinable,新产生的线程会调用 fn 函数,该函数的参数由 args 给出;
- 拷贝构造函数被禁用,意味着 thread 不可被拷贝;
- 赋值操作被禁用,thread 对象不可被拷贝。
- move 构造函数, 调用成功之后 other 不代表任何 thread 执行对象;
- move 赋值操作,如果当前对象不可 joinable,需要传递一个右值引用(rhs)给 move 赋值操作;如果当前对象可被 joinable,则 terminate() 报错;
注意:可被 joinable 的 thread 对象必须在他们销毁之前被主线程 join 或者将其设置为 detached.
this_thread namespace
this_thread这个命名空间下定义了几个有用的静态函数
functional | detail |
---|---|
std::thread::id get_id() const noexcept; | 获取当前thread的id,这个id是cpp自己定义的 |
void yield() noexcept; | 把调度机会让给其他的thread |
void sleep_until( const std::chrono::time_point tp) | sleep到指定的 time_point |
void sleep_for( const std::chrono::duration duration) | sleep 指定的 duration |
#include <thread>
#include <chrono>
using namespace std;
int main(int argc, char *argv[]){
this_thread::sleep_until(std::chrono::steady_clock::now() + 1234ms);
this_thread::sleep_for(1234ms);
return 0;
}
其他相关函数
function | detail |
---|---|
bool joinable() const noexcept; | 检查thread是否可以 join |
native_handle_type native_handle(); | 获取平台相关联的id,posix系统一般是关联的pthread,这个函数要在thread未调用join之前用,之后只会返回0 |
static unsigned int hardware_concurrency() noexcept; | 系统支持的线程数,比如2核但是支持超线程,可能返回4 |
void detach(); | 脱离用户管理,类似pthread_detach |
void swap( std::thread& other ) noexcept; | 交换thread的ownership |
std::ref(T&) | 返回引用, |
std::cref(T&) | 返回const引用 |
线程退出
cpp 的 thread 缺少类似 java 的 interrupt,一般有2种方式来终止:
- 通过某个变量来控制是否退出
- 调用native_handle获取到pthread_id,然后再调用 pthread_cancel
#include <iostream>
#include <thread>
using namespace std;
void func1(atomic_bool& quit) {
while (!quit) {
this_thread::sleep_for(100ms);
}
cout << "thread1 quit\n";
}
void func2() {
while (true) {
this_thread::sleep_for(100ms);
}
cout << "thread2 quit\n";
}
int main(int argc, char *argv[]) {
atomic_bool quit(false);
cout << "start func1" << endl;
thread t1{func1, ref(quit)};
this_thread::sleep_for(5s);
quit.store(true);
t1.join();
cout << "func1 joined" << endl;
cout << "start func2" << endl;
thread t2{func2};
this_thread::sleep_for(5s);
pthread_cancel(t2.native_handle());
t2.join();
cout << "func2 joined" << endl;
return 0;
}
thread_local 数据
thread_local 的数据是一个线程独有的数据,其他线程无法访问(除非用指针),还可以是 extern 的。
#include <iostream>
#include <string>
#include <thread>
using namespace std;
thread_local unsigned int rage = 1;
void increase_rage(const std::string& thread_name) {
++rage; // modifying outside a lock is okay; this is a thread-local variable
cout << "Rage counter for " << thread_name << " : " << rage << '\n';
}
int main() {
thread a(increase_rage, "a");
a.join();
thread b(increase_rage, "b");
b.join();
}
2 mutex
互斥量和条件变量,
相关类或函数
class | describe |
---|---|
mutex | 基本的 mutex 类 |
recursive_mutex | 递归 mutex 类, 可以被同一个thread重复获取 |
timed_mutex | 定时非递归 mutex 类,在指定时长内获取mutex |
recursive_timed_mutex | 定时递归 mutex 类, |
lock_guard<M> | 析构时释放 M, lock 只能使用 adopt_lock(见表中) 标志 |
unique_lock<M> | movable,but not copyable, 灵活性高 区别与 mutex 的不可移动不可拷贝,但是性能低于 mutex 析构时释放 M,也可以提前调用unlock()来降低锁的粒度,unique_lock中保存了锁状态 |
shared_lock<M> c++14 | TODO |
std::lock(M1,M2…) | 同时获取多个锁, 使用了死锁避免算法,这是一个函数,不会自动释放全部获取到的锁 |
scoped_lock<M…> c++17 | std::lock的 RAII-style 的封装,析构时释放全部获取到的锁 |
defer_lock | 这是一个标志,表示只是获取m的ownership, 但是不调用lock()函数,线程还没有获取到锁 |
adopt_lock | 这是一个标志,表示只是获取m的ownership, 但是不调用lock()函数,这个需要线程先获取到锁 |
#include <mutex>
#include <iostream>
using namespace std;
int main(int argc, char *argv[])
{
mutex mtx1, mtx2;
{
lock_guard l(mtx1);
}
{
std::lock(mtx1, mtx2);
// mtx1, mtx2 already locked, so use adopt_lock
lock_guard l1(mtx1, adopt_lock);
lock_guard l2(mtx2, adopt_lock);
}
{
scoped_lock lock(mtx1, mtx2);
}
{
unique_lock l1(mtx1, std::defer_lock);
unique_lock l2(mtx2, std::defer_lock);
// mtx1, mtx2 is not locked, because use defer_lock
// donot use mtx1 and mtx2 directly
std::lock(l1, l2);
if(l1.owns_lock() && l2)
cout << "own metux" << endl;
else
cout << "onwership is moved!\n";
}
lock_guard l(mtx1);
return 0;
}
3 call_once
使用 std::once_flag 和 std::call_once 来保证单次执行, c++中双重检查也存在潜在竞争,C++和双重检查锁定模式(DCLP)的风险,所以还是推荐使用 call_once
void init(string file_path) {
static once_flag flag;
call_once(flag, init_from_file, file_path);
}
4 condition
条件变量(condition variables)是用于线程间同步的重要工具,允许线程在某个条件不满足时挂起等待,直到其他线程通知该条件已满足。
相关类
class | describe |
---|---|
condition_variable | 提供与std::unique_lock配合使用的条件变量 |
condition_variable_any | 提供与任何满足基本要求的锁配合使用的条件变量 |
notify_all_at_thread_exit | 安排在当前线程完全退出时通知所有等待的线程 |
cv_status | 列举条件变量状态的枚举类型 |
condition_variable
std::condition_variable
是最常用的条件变量类型,必须与 std::unique_lock<std::mutex>
一起使用。
主要成员函数
function | detail |
---|---|
void wait(unique_lock lock) | 阻塞当前线程直到条件变量被通知 |
template<class Predicate> void wait(unique_lock lock, Predicate pred) | 带谓词的等待,等价于while(!pred()) wait(lock); |
cv_status wait_for(unique_lock lock, const duration rel_time) | 等待指定时间或直到被通知 |
template<class Rep, class Period, class Predicate> bool wait_for(unique_lock lock, const duration rel_time, Predicate pred) | 带谓词和超时的等待 |
cv_status wait_until(unique_lock lock, const time_point abs_time) | 等待到指定时间点或直到被通知 |
template<class Predicate> bool wait_until(unique_lock lock,const time_point abs_time, Predicate pred) | 带谓词和时间点的等待 |
void notify_one() | 通知一个等待的线程 |
void notify_all() | 通知所有等待的线程 |
condition_variable_any
std::condition_variable_any
与 condition_variable
类似,但可以与任何满足基本要求的锁类型一起使用,而不仅限于 std::unique_lock<std::mutex>
。由于实现复杂度较高,通常推荐使用 condition_variable
。
使用示例
以下是一个经典的生产者-消费者模型示例:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::mutex mtx;
std::condition_variable cv;
std::queue<int> data_queue;
bool finished = false;
// 生产者线程函数
void producer() {
for (int i = 0; i < 5; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
{
std::lock_guard<std::mutex> lock(mtx);
data_queue.push(i);
std::cout << "Produced: " << i << std::endl;
}
cv.notify_one(); // 通知消费者
}
{
std::lock_guard<std::mutex> lock(mtx);
finished = true;
}
cv.notify_all(); // 通知所有消费者生产已完成
}
// 消费者线程函数
void consumer(int id) {
while (true) {
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, []{ return !data_queue.empty() || finished; });
if (!data_queue.empty()) {
int data = data_queue.front();
data_queue.pop();
std::cout << "Consumer " << id << " consumed: " << data << std::endl;
lock.unlock(); // 提前解锁以减少锁持有时间
} else if (finished) {
break;
}
}
}
int main() {
std::thread prod(producer);
std::thread cons1(consumer, 1);
std::thread cons2(consumer, 2);
prod.join();
cons1.join();
cons2.join();
return 0;
}
注意事项
虚假唤醒:线程可能在没有收到通知的情况下被唤醒,因此通常需要在循环中检查条件:
std::unique_lock<std::mutex> lock(mtx); while (!condition) { cv.wait(lock); }
谓词形式:使用带谓词的
wait()
std::unique_lock<std::mutex> lock(mtx); cv.wait(lock, []{ return condition; });
锁的顺序:调用
wait()
前必须持有锁,wait()
会自动释放锁并在返回前重新获取锁。通知方式:
notify_one()
:只唤醒一个等待线程notify_all()
:唤醒所有等待线程
高级用法和最佳实践
与多个条件配合使用
在复杂场景中,可能需要多个条件变量协调工作:
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
class ThreadPool {
private:
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
public:
ThreadPool() : stop(false) {}
void start() {
std::thread worker([this] {
while (true) {
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock, [this] { return this->stop; });
if (this->stop) {
std::cout << "Worker thread stopping..." << std::endl;
break;
}
}
});
worker.detach();
}
void shutdown() {
{
std::unique_lock<std::mutex> lock(queue_mutex);
stop = true;
}
condition.notify_all();
}
};
使用 std::notify_all_at_thread_exit
当需要确保在线程完全退出后才通知其他线程时,可以使用 std::notify_all_at_thread_exit
:
#include <chrono>
#include <iostream>
#include <mutex>
#include <condition_variable>
#include <thread>
std::mutex m;
std::condition_variable cv;
bool ready = false;
std::string result;
void worker_thread(){
std::unique_lock<std::mutex> lk(m);
result = "processing data";
// 在线程退出时通知所有等待的线程
std::notify_all_at_thread_exit(cv, std::move(lk));
// 执行一些清理工作
std::this_thread::sleep_for(std::chrono::seconds(1));
result = "data processed";
ready = true;
}
int main() {
std::thread t(worker_thread);
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, []{ return ready; });
std::cout << "Result: " << result << std::endl;
t.join();
}
性能优化建议
减少锁竞争:
- 尽可能缩短持有锁的时间
- 在调用
wait()
前预先准备好数据
选择合适的等待方式:
- 使用超时等待避免无限期阻塞
- 根据业务场景选择
notify_one()
还是notify_all()
s
避免死锁:
- 始终以相同顺序获取多个锁
- 使用
std::lock()
或std::scoped_lock
同时获取多个锁
C++20 中的改进
C++20 引入了更多现代化的并发工具,包括对条件变量的改进:
// C++20 中的改进等待接口
std::condition_variable cv;
std::mutex mtx;
bool condition = false;
// 新增的等待接口,支持停止令牌
std::stop_token st;
// cv.wait(st, lock, predicate); // 可以被外部请求停止
5 基于任务的并发
thread 类过于底层,使用起来会分散注意力的
class | describe |
---|---|
packaged_task<F> | 打包可调用对象F作为任务运行 |
promise<T> | |
future<T> | |
shared_future<T> | |
x=async(policy, f, args) | |
x=async(f, args) |
future & promise
promise 可以保存某一类型的值, 该值可被 future 对象读取(可能在另外一个线程中),因此 promise 也提供了一种线程同步的手段。在 promise 对象构造时可以和一个共享状态(通常是 future)相关联,并可以在相关联的共享状态(future)上保存一个类型为 T 的值。
可以通过 get_future 来获取与该 promise 对象相关联的 future 对象,调用该函数之后,两个对象共享相同的共享状态(shared state)
例子:
#include <chrono>
#include <functional> // std::ref
#include <future> // std::promise, std::future
#include <iostream> // std::cout
#include <thread> // std::thread
using namespace std;
void print_int(promise<int> promise) {
cout << "thred id : [" << this_thread::get_id() << "] start.\n";
promise.set_value_at_thread_exit(10);
this_thread::sleep_for(chrono::seconds(3));
cout << "thred id : [" << this_thread::get_id() << "] exit.\n";
}
int main() {
promise<int> prom; // 生成一个 std::promise<int> 对象.
future<int> fut = prom.get_future(); // 和 future 关联.
thread(print_int, std::move(prom)).detach(); // 将 promise 交给另外一个线程t.
auto val = fut.get(); // 等待结果.
cout << "get future [" << val << "]" << endl;
return 0;
}
packaged_task
packaged_task 是对一个任务和一对(promise/future)的封装,让使用更加的便捷
#include <string>
#include <iostream>
#include <thread>
#include <future>
using namespace std;
int demo(int i) {
cout << this_thread::get_id() <<" thread start (" << i << ")\n";
this_thread::sleep_for(3s);
if (i) return i*i;
throw runtime_error("demo(0)");
}
int main(int argc, char *argv[])
{
packaged_task<int(int)> task1{demo};
packaged_task<int(int)> task2{demo};
auto f1 = task1.get_future();
auto f2 = task2.get_future();
// bind with thread
thread(std::move(task1), 2).detach();
thread(std::move(task2), 0).detach();
f1.wait_for(0s);
cout << "------- [" << this_thread::get_id() << "] start call future.get() -------\n";
try {
cout << "[" << f1.get() << "]" << endl;
cout << f2.get() << endl;
} catch (const exception& e) {
cerr << "exception: " << e.what() << endl;
}
return 0;
}
运行结果为:
281473574039936 thread start (2)
------- [281473579782208] start call future.get() -------
281473565585792 thread start (0)
4
exception: demo(0)
线程启动器 async
线程启动器(thread launch)是一个函数, 它决定是否创建新线程、回收旧线程或者简单的在本thread上执行;async
可以看做是一个复杂的线程启动器的简单接口,调用 async 简单的返回一个future<R>
launch | method |
---|---|
fu = async(policy, f, args) | 根据启动策略执行 |
fu = async(f, args) | 等同于 async(launch::async|launch::deferred, f, args) |
表中有2种策略,这是目前支持的只有这2种:
- launch::async : 就像 创建了一个新线程一样执行
- launch::deferred : 延时执行,等到future执行get的时候在执行
launch::async 线程启动器有很大的决策权,是否创建新线程或者底层使用线程池,依赖各个系统的实现了。
std::future<int> f2 = std::async(std::launch::async, [](){ return 8; });
// can delete
f2.wait();
cout << f2.get() << endl;
6 atomic
C++11 引入了原子操作库,定义在
基本概念
原子类型是封装了一个值的类型,这个值的访问保证不会导致数据竞争,并且可以在并发环境中安全地修改。
常用原子类型
C++ 提供了多种原子类型,主要包括:
std::atomic_bool
或std::atomic<bool>
- 原子布尔类型std::atomic_char
或std::atomic<char>
- 原子字符类型std::atomic_int
或std::atomic<int>
- 原子整型std::atomic_long
或std::atomic<long>
- 原子长整型std::atomic_pointer
或std::atomic<T*>
- 原子指针类型
基本操作
1. load 和 store 操作
#include <atomic>
#include <iostream>
#include <thread>
std::atomic<int> atomic_var{0};
void writer() {
atomic_var.store(42); // 原子写入值
}
void reader() {
int value = atomic_var.load(); // 原子读取值
std::cout << "Value: " << value << std::endl;
}
2. exchange 操作
exchange 操作会用新值替换当前值,并返回旧值:
std::atomic<int> atomic_var{10};
int old_value = atomic_var.exchange(20); // old_value = 10, atomic_var = 20
3. compare_exchange 操作
compare_exchange_weak 和 compare_exchange_strong 是原子的比较并交换操作:
std::atomic<int> atomic_var{10};
int expected = 10;
int new_value = 20;
// 如果 atomic_var 的值等于 expected,则将其设置为 new_value,返回 true
// 否则,将 expected 设置为 atomic_var 的当前值,返回 false
bool success = atomic_var.compare_exchange_weak(expected, new_value);
内存顺序
原子操作可以指定不同的内存顺序约束,以平衡性能和一致性:
memory_order_relaxed
- 最宽松的内存顺序,仅保证原子性memory_order_consume
- 依赖关系释放memory_order_acquire
- 获取操作memory_order_release
- 释放操作memory_order_acq_rel
- 获取释放操作memory_order_seq_cst
- 顺序一致性(默认)
示例:
std::atomic<int> atomic_var{0};
// 使用不同内存顺序的操作
atomic_var.store(42, std::memory_order_relaxed);
int value = atomic_var.load(std::memory_order_acquire);
bool success = atomic_var.compare_exchange_strong(expected, new_value, std::memory_order_acq_rel);
应用示例
在前面的线程退出示例中,我们已经使用了 atomic_bool:
#include <iostream>
#include <thread>
#include <atomic>
void func1(std::atomic_bool& quit) {
while (!quit.load()) { // 原子读取
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
std::cout << "thread1 quit\n";
}
int main() {
std::atomic_bool quit{false};
std::cout << "start func1" << std::endl;
std::thread t1{func1, std::ref(quit)};
std::this_thread::sleep_for(std::chrono::seconds(5));
quit.store(true); // 原子写入,通知线程退出
t1.join();
std::cout << "func1 joined" << std::endl;
return 0;
}
原子操作提供了一种比互斥锁更轻量级的同步机制,在某些场景下可以显著提高性能.