Intel TBB
TBB (Threading Building Blocks) 是 Intel 出品的 C++ 并行编程库,也贡献给了 oneAPI 项目(oneAPI TBB)。它提供了高层抽象的任务并行、并行算法、并发容器和同步原语,让开发者不用直接操作 pthread/win32 thread 就能写出跨平台的高效并行程序。
1. 安装与配置
获取方式
- oneAPI TBB(推荐):
https://github.com/oneapi-src/oneTBB - 包管理器:Ubuntu 下
apt install libtbb-dev,macOS 下brew install tbb
CMake 集成
find_package(TBB REQUIRED)
target_link_libraries(my_app PRIVATE TBB::tbb)头文件引用
#include <tbb/tbb.h> // 全量包含(方便但不推荐)
#include <tbb/parallel_for.h> // 按需包含(推荐)
#include <tbb/blocked_range.h>
#include <tbb/concurrent_vector.h>
#include <tbb/global_control.h>TBB 使用 C++20 标准构建(oneAPI TBB 2021+),编译时需要 -std=c++20 并链接 -ltbb。
2. 并行算法
TBB 提供了多种并行算法模板,它们自动将工作负载划分给所有可用线程,无需手动管理线程生命周期。
2.1 parallel_for
对索引范围做并行迭代:
#include <tbb/parallel_for.h>
#include <tbb/blocked_range.h>
constexpr size_t n = 1'000'000;
std::vector<int> vec(n);
tbb::parallel_for(tbb::blocked_range<size_t>(0, n),
[&](const tbb::blocked_range<size_t>& r) {
for (size_t i = r.begin(); i != r.end(); ++i) {
vec[i] = i * i;
}
});更简洁的分区形式(自动分区):
tbb::parallel_for(size_t(0), n, [&](size_t i) {
vec[i] = i * i;
});2.2 parallel_reduce
并行规约(求和、求积等):
#include <tbb/parallel_reduce.h>
auto sum = tbb::parallel_reduce(
tbb::blocked_range<size_t>(0, n), 0.0,
[&](const tbb::blocked_range<size_t>& r, double init) {
for (size_t i = r.begin(); i != r.end(); ++i)
init += vec[i];
return init;
},
[](double a, double b) { return a + b; }
);parallel_reduce 的参数依次是:范围、初始值、映射函数(每个分片局部累加)、规约函数(合并分片结果)。
2.3 parallel_scan
并行前缀和:
#include <tbb/parallel_scan.h>
std::vector<int> input(n, 1), output(n);
int sum = tbb::parallel_scan(
tbb::blocked_range<size_t>(0, n), 0,
[&](const tbb::blocked_range<size_t>& r, int init, bool is_final_scan) {
for (size_t i = r.begin(); i != r.end(); ++i) {
init += input[i];
if (is_final_scan)
output[i] = init;
}
return init;
},
[](int a, int b) { return a + b; }
);parallel_scan 会执行两轮:第一轮计算部分和(不做最终写入),第二轮做最终写入。这个特性保证了缓存一致性的同时避免了大量同步开销。
2.4 parallel_sort
#include <tbb/parallel_sort.h>
std::vector<int> data = {9, 3, 5, 1, 8, 4, 7, 2, 6};
tbb::parallel_sort(data.begin(), data.end());
// 也支持自定义比较器
tbb::parallel_sort(data.begin(), data.end(), std::greater<int>());内部实现是 并行化的快速排序,当递归到足够小时会退化为串行排序。
2.5 parallel_for_each
对迭代器范围并行执行函数:
#include <tbb/parallel_for_each.h>
std::list<int> lst = {1, 2, 3, 4, 5};
tbb::parallel_for_each(lst.begin(), lst.end(), [](int& x) {
x *= 2;
});2.6 parallel_invoke
并行执行多个独立函数:
#include <tbb/parallel_invoke.h>
tbb::parallel_invoke(
[] { process_image_left_half(); },
[] { process_image_right_half(); },
[] { process_metadata(); }
);适合任务粒度较大且互不依赖的场景。内部实现把函数包成 task,通过 task scheduler 调度。
2.7 parallel_pipeline
并行流水线处理:
#include <tbb/parallel_pipeline.h>
tbb::parallel_pipeline(
8, // 流水线最大并发 token 数
tbb::make_filter<void, Frame>(
tbb::filter_mode::serial_in_order,
[&](tbb::flow_control& fc) -> Frame {
Frame f = read_frame();
if (!f.valid) fc.stop();
return f;
}
) &
tbb::make_filter<Frame, ProcessedFrame>(
tbb::filter_mode::parallel,
[](Frame f) { return decode(f); }
) &
tbb::make_filter<ProcessedFrame, void>(
tbb::filter_mode::serial_out_of_order,
[](ProcessedFrame f) { render(f); }
)
);流水线模型:第一阶段(读帧)串行有序 → 第二阶段(解码)并行 → 第三阶段(渲染)串行无序。parallel_pipeline 确保了不同阶段的并行度和顺序约束。
3. 分区器
TBB 提供多种分区策略,控制工作负载的划分方式。
| 分区器 | 行为 |
|---|---|
blocked_range<T> | 一维连续范围,按块粒度切分 |
blocked_range2D<T> | 二维范围(行列),在大图像处理时有用 |
blocked_range3D<T> | 三维范围 |
simple_partitioner | 严格按 grainsize 切分,粒度精确 |
auto_partitioner | 自动调整粒度,兼顾负载均衡与开销,默认推荐 |
static_partitioner | 一次性切分,不 work-stealing,适合各分片负载极度均匀的场景 |
affinity_partitioner | 利用缓存亲和性,适合每次范围相同的重复调用 |
// 手动指定粒度的分区
tbb::parallel_for(
tbb::blocked_range<size_t>(0, n, 256), // 每个分片至少 256 个元素
[&](const auto& r) { /* ... */ },
tbb::simple_partitioner{}
);
// 自动分区(默认行为)
tbb::parallel_for(
tbb::blocked_range<size_t>(0, n),
[&](const auto& r) { /* ... */ }
// 第三个参数不传等效于 auto_partitioner
);选择建议:
- 元素处理开销大时,
grainsize小一点或直接用auto_partitioner - 元素处理开销小时,
grainsize大一点,减少任务调度开销 - 各分片负载不均时,用
auto_partitioner让 TBB work-stealing 自动均衡
4. 任务分组
4.1 task_group
TBB 的 task_group 允许动态添加和等待任务:
#include <tbb/task_group.h>
tbb::task_group tg;
tg.run([&] { process_part1(data); });
tg.run([&] { process_part2(data); });
tg.run([&] { process_part3(data); });
tg.wait(); // 等待所有提交的任务完成也可以配合异常处理:
tbb::task_group tg;
try {
tg.run([] { might_throw(); });
tg.wait();
} catch (...) {
tg.cancel(); // 取消所有未开始的任务
// 重新抛出或处理异常
}4.2 structured_task_group(C++17 风格)
structured_task_group 在析构时自动 wait(),且只在单线程环境下使用(不能跨线程 run)。更轻量、安全检查更严格:
tbb::structured_task_group stg;
stg.run([] { do_work(); });
stg.run([] { do_more_work(); });
stg.wait(); // 也可以不调,析构自动 wait区别总结:
| 特性 | task_group | structured_task_group |
|---|---|---|
| 跨线程 run | 可以 | 不可以 |
| 外部取消 | 支持 cancel() | 支持 |
| 析构自动 wait | 不会,会触发 UB | 会 |
| 开销 | 稍大 | 极小 |
5. 并发容器
TBB 提供线程安全的容器,内部使用细粒度锁或 lock-free 技术,允许并发读写。
5.1 concurrent_vector
动态数组,支持并发 push_back 和随机访问,不会因为扩容而 invalidate 已有元素:
#include <tbb/concurrent_vector.h>
tbb::concurrent_vector<int> cv;
tbb::parallel_for(size_t(0), 10000, [&](size_t i) {
cv.push_back(i);
});
// cv.size() == 10000
// 随机访问(线程安全——读旧数据可以,同时写不同位置也安全)
int val = cv[42];
cv[42] = 999; // 这种方式并**不**安全——多个线程同时写同一位置会 data race重要:concurrent_vector 保证的是 grow 操作安全(push_back, emplace_back),不同线程写不同索引安全,但写相同索引需要外部加锁。
5.2 concurrent_queue / concurrent_bounded_queue
#include <tbb/concurrent_queue.h>
tbb::concurrent_bounded_queue<int> q;
q.set_capacity(1024);
// 生产者
tbb::parallel_for(size_t(0), 1000, [&](size_t i) {
q.push(i);
});
// 消费者
int item;
while (q.try_pop(item)) {
process(item);
}| 容器 | 特点 |
|---|---|
concurrent_queue<T> | 无界、多生产者多消费者安全 |
concurrent_bounded_queue<T> | 有界、try_push 可在满时立即失败、支持阻塞的 push/pop |
5.3 concurrent_hash_map
支持并发插入/查找/删除的哈希表:
#include <tbb/concurrent_hash_map.h>
tbb::concurrent_hash_map<std::string, int> chm;
// 线程安全的查找和修改
{
auto accessor = chm.accessor; // 获取写访问
if (chm.find(accessor, key))
accessor->second += 1;
else
chm.insert(accessor, {key, 1});
} // accessor 析构释放锁
// 只读查找
{
auto const_accessor = chm.const_accessor;
if (chm.find(const_accessor, key))
use(const_accessor->second);
}accessor 本质上是一个 scoped 锁,持有对应桶的读/写锁,析构时释放。
5.4 concurrent_unordered_map / concurrent_unordered_set
C++11 风格的无序关联容器(oneTBB 2021 引入):
#include <tbb/concurrent_unordered_map.h>
tbb::concurrent_unordered_map<int, std::string> cum;
tbb::parallel_for(size_t(0), 1000, [&](size_t i) {
cum.emplace(i, "value_" + std::to_string(i));
});
cum.for_each([](const auto& p) {
println("{} -> {}", p.first, p.second);
});与 concurrent_hash_map 的区别:
concurrent_hash_map使用 accessor(锁语义),更精细的控制concurrent_unordered_map接口更接近std::unordered_map,使用简单
5.5 concurrent_priority_queue
线程安全的优先队列:
#include <tbb/concurrent_priority_queue.h>
tbb::concurrent_priority_queue<int> cpq;
cpq.push(5);
cpq.push(10);
cpq.push(3);
int top;
cpq.try_pop(top); // top == 3(小顶堆,默认)
// 支持自定义比较器
tbb::concurrent_priority_queue<int, std::greater<int>> max_heap;6. 同步原语
TBB 提供多种互斥量,在不同场景下有不同的性能特征:
#include <tbb/spin_mutex.h>
#include <tbb/spin_rw_mutex.h>
#include <tbb/queuing_mutex.h>
#include <tbb/queuing_rw_mutex.h>
#include <tbb/mutex.h>| 类型 | 特点 | 适用场景 |
|---|---|---|
spin_mutex | 自旋锁,极高开销下直接 busy-wait | 临界区 极短(< 几十条指令) |
spin_rw_mutex | 读写自旋锁,读读不互斥 | 大量读、少量写,临界区短 |
queuing_mutex | 排队自旋锁,FIFO 公平 | 避免锁饥饿,临界区稍长 |
queuing_rw_mutex | 排队读写自旋锁 | 公平读写锁 |
mutex | 包装系统 mutex(pthread_mutex) | 临界区较长或有系统调用时 |
rw_mutex | 包装系统读写锁 | 长临界区的读写分离 |
用法:统一遵循 scoped_lock / RAllock 模式:
tbb::spin_mutex mtx;
int shared_counter = 0;
tbb::parallel_for(size_t(0), 10000, [&](size_t) {
tbb::spin_mutex::scoped_lock lock(mtx);
++shared_counter;
}); // lock 析构自动释放最佳实践:
- 如果临界区是 10~20 条指令级别的,用
spin_mutex最合适 - 如果可能等待较长(IO、malloc 等),用
mutex(避免浪费 CPU) - 永远不要
spin_mutex+ 阻塞操作,那是灾难 scoped_lock保证异常安全
7. 全局控制
控制线程数、优先级等全局属性:
#include <tbb/global_control.h>
// 限制线程数为 4
tbb::global_control gc(
tbb::global_control::max_allowed_parallelism, 4
);
tbb::parallel_for(size_t(0), n, [&](size_t i) { /* ... */ });
// 这里最多 4 个线程
// gc 析构后恢复默认线程数可以在代码中多次嵌套,内层取最小值:
{
tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 2);
{
tbb::global_control gc2(tbb::global_control::max_allowed_parallelism, 4);
// 实际生效 = min(2, 4) = 2
}
}另一种控制方式——设置线程栈大小:
tbb::global_control gc(tbb::global_control::thread_stack_size, 1024 * 1024); // 1MB8. 任务调度器高级用法
8.1 task_arena
限制任务在某组线程上执行:
#include <tbb/task_arena.h>
tbb::task_arena arena(2); // 2 个线程的 arena
arena.execute([&] {
tbb::parallel_for(size_t(0), n, [&](size_t i) {
/* 这段代码只在 arena 的 2 个线程上跑 */
});
});可以绑定到特定的 NUMA 节点:
tbb::task_arena arena(tbb::task_arena::constraints{}
.set_numa_id(0));8.2 task_scheduler_handle
自 oneTBB 2021 起,可以手动控制调度器的生命周期:
#include <tbb/task_scheduler_handle.h>
{
tbb::task_scheduler_handle handle;
tbb::finalize(handle); // 主动清理调度器资源
}9. 内存分配器
TBB 提供高性能的内存分配器,避免多线程下的 false-sharing 和锁竞争:
#include <tbb/scalable_allocator.h>
// 使用 TBB 的 scalable allocator
std::vector<int, tbb::scalable_allocator<int>> vec(1000);
// STL 容器开箱即用
using tbb_vector = std::vector<double, tbb::scalable_allocator<double>>;
tbb_vector v1, v2;
// cache_aligned_allocator —— 防止 false sharing
#include <tbb/cache_aligned_allocator.h>
struct alignas(64) PaddedCounter {
int64_t value;
};
std::vector<PaddedCounter, tbb::cache_aligned_allocator<PaddedCounter>> counters;| 分配器 | 用途 |
|---|---|
tbb::scalable_allocator<T> | 全局可伸缩分配器,减少堆锁竞争 |
tbb::cache_aligned_allocator<T> | 缓存行对齐,避免 false sharing |
tbb::zero_allocator<T> | 分配的同时零初始化 |
10. Flow Graph 简介
TBB Flow Graph 允许通过有向无环图定义计算,节点自动并行:
#include <tbb/flow_graph.h>
tbb::flow::graph g;
tbb::flow::function_node<int, int> square(g, tbb::flow::unlimited,
[](int x) { return x * x; });
tbb::flow::function_node<int, void> print(g, tbb::flow::serial,
[](int x) { println("result: {}", x); });
tbb::flow::make_edge(square, print);
for (int i = 0; i < 100; ++i)
square.try_put(i);
g.wait_for_all();节点类型:
| 类型 | 行为 |
|---|---|
function_node | 接收一个输入,产生一个输出 |
multifunction_node | 一个输入可以产生 0~N 个输出 |
join_node | 等待多个输入同时到达后组合 |
split_node | 将一个 tuple 拆成多路 |
broadcast_node | 广播到所有后继节点 |
buffer_node | 缓冲消息(可排序、无限容量) |
queue_node | 排队消息,FIFO |
并发度控制:tbb::flow::serial(串行)、tbb::flow::unlimited(无限并发)、或指定数字。
11. 性能建议
粒度控制
并行不是银弹。颗粒度太小,任务调度开销会淹没实际计算。
// 不要这样——每个元素一个任务(太细了)
tbb::parallel_for(size_t(0), vec.size(), [&](size_t i) {
vec[i] = light_op(vec[i]); // light_op 只有几条指令
});确保每个分片有 至少上万条指令 的工作量,或者使用 blocked_range 手动控制颗粒度。
避免 False Sharing
多线程写入同一缓存行的不同变量会导致严重的性能下降:
// BAD: counters[i] 很可能共享同一条缓存行
int counters[8];
// GOOD: 每个计数器独自占一个缓存行
alignas(64) int counters[8];
// 或使用 tbb::cache_aligned_allocator复用 Task Scheduler
TBB 的 task scheduler 在第一次调用并行算法时初始化,之后复用。如果有多个短促并行段,尽量在一个初始化周期内完成:
// BAD: 多次启动/停止 scheduler
for (int i = 0; i < 100; ++i)
tbb::parallel_for(...);
// GOOD: 在外部保持 scheduler 存活
tbb::parallel_for(0, 100, [&](int) {
// 直接写并行段
});选择合适的容器
- 并发插入而不删改已有数据 →
concurrent_vector - 生产者-消费者队列 →
concurrent_bounded_queue(有界背压是关键) - 并发字典 → 读多写少用
concurrent_hash_map,简单场景用concurrent_unordered_map - 快速分配内存 →
tbb::scalable_allocator
NUMA 感知
多路服务器场景:
tbb::task_arena numa_arena(tbb::task_arena::constraints{}
.set_numa_id(current_numa_node));
numa_arena.execute([&] {
tbb::parallel_for(/* 只处理本 NUMA 节点的数据 */);
});配合 hwinfo 或 libnuma 获取 NUMA 拓扑,按节点分配数据。
12. 与 C++17 并行算法对比
C++17 引入了 std::execution::parallel_policy:
std::sort(std::execution::par, data.begin(), data.end());
std::for_each(std::execution::par, data.begin(), data.end(), work);| 维度 | TBB | C++17 并行算法 |
|---|---|---|
| 编译器要求 | C++20 + TBB 库 | C++17 + TBB/别的后端 |
| 可控粒度 | blocked_range, 分区器 | 无(由实现决定) |
| 容器广度 | concurrent_* 全套 | 无 |
| 任务模型 | task_group, flow graph | 无 |
| 缓存亲和 | affinity_partitioner | 无 |
| 可移植性 | 需安装 TBB | 标准库自带(但具体后端可能不同) |
两者并不冲突:可以用 C++17 并行算法做简单并行,用 TBB 做复杂流水线、并发容器和任务图。
13. 综合示例
图像处理流水线——读取 → 并行处理 → 结果汇总:
#include <tbb/tbb.h>
#include <vector>
#include <cstdio>
struct Image {
int id;
std::vector<uint8_t> data;
bool valid;
};
Image read_image(int id) {
return Image{id, std::vector<uint8_t>(1024, 0), true};
}
void process_segment(Image& img, size_t start, size_t end) {
for (size_t i = start; i < end; ++i)
img.data[i] = static_cast<uint8_t>(img.data[i] * 1.5);
}
struct Result {
int id;
double avg;
};
int main() {
constexpr int count = 100;
tbb::concurrent_vector<Image> images;
tbb::parallel_for(0, count, [&](int i) {
images.push_back(read_image(i));
});
tbb::parallel_for(
tbb::blocked_range<size_t>(0, images.size(), 4),
[&](const auto& r) {
for (size_t i = r.begin(); i != r.end(); ++i) {
auto& img = images[i];
process_segment(img, 0, img.data.size());
}
},
tbb::auto_partitioner{}
);
auto total_avg = tbb::parallel_reduce(
tbb::blocked_range<size_t>(0, images.size()), 0.0,
[&](const auto& r, double init) {
for (size_t i = r.begin(); i != r.end(); ++i) {
double sum = 0;
for (auto v : images[i].data)
sum += v;
init += sum / images[i].data.size();
}
return init;
},
std::plus<double>{}
) / images.size();
println("average brightness: {}", total_avg);
}这个例子演示了 TBB 中最常用的几个组件:
concurrent_vector做线程安全的收集parallel_for+blocked_range做数据并行parallel_reduce做结果聚合- 所有同步由库自动完成,不需要显式创建或 join 任何线程
TBB 的核心理念是 让开发者关注任务逻辑,而不是线程管理。用好 TBB,你的并行代码既不会比手写 pthread 慢,又能保持代码的简洁和可移植性。







