TBB (Threading Building Blocks) 是 Intel 出品的 C++ 并行编程库,也贡献给了 oneAPI 项目(oneAPI TBB)。它提供了高层抽象的任务并行、并行算法、并发容器和同步原语,让开发者不用直接操作 pthread/win32 thread 就能写出跨平台的高效并行程序。

1. 安装与配置

获取方式

  • oneAPI TBB(推荐):https://github.com/oneapi-src/oneTBB
  • 包管理器:Ubuntu 下 apt install libtbb-dev,macOS 下 brew install tbb

CMake 集成

find_package(TBB REQUIRED)
target_link_libraries(my_app PRIVATE TBB::tbb)

头文件引用

#include <tbb/tbb.h>          // 全量包含(方便但不推荐)
#include <tbb/parallel_for.h> // 按需包含(推荐)
#include <tbb/blocked_range.h>
#include <tbb/concurrent_vector.h>
#include <tbb/global_control.h>

TBB 使用 C++20 标准构建(oneAPI TBB 2021+),编译时需要 -std=c++20 并链接 -ltbb

2. 并行算法

TBB 提供了多种并行算法模板,它们自动将工作负载划分给所有可用线程,无需手动管理线程生命周期。

2.1 parallel_for

对索引范围做并行迭代:

#include <tbb/parallel_for.h>
#include <tbb/blocked_range.h>

constexpr size_t n = 1'000'000;
std::vector<int> vec(n);

tbb::parallel_for(tbb::blocked_range<size_t>(0, n),
    [&](const tbb::blocked_range<size_t>& r) {
        for (size_t i = r.begin(); i != r.end(); ++i) {
            vec[i] = i * i;
        }
    });

更简洁的分区形式(自动分区):

tbb::parallel_for(size_t(0), n, [&](size_t i) {
    vec[i] = i * i;
});

2.2 parallel_reduce

并行规约(求和、求积等):

#include <tbb/parallel_reduce.h>

auto sum = tbb::parallel_reduce(
    tbb::blocked_range<size_t>(0, n), 0.0,
    [&](const tbb::blocked_range<size_t>& r, double init) {
        for (size_t i = r.begin(); i != r.end(); ++i)
            init += vec[i];
        return init;
    },
    [](double a, double b) { return a + b; }
);

parallel_reduce 的参数依次是:范围、初始值、映射函数(每个分片局部累加)、规约函数(合并分片结果)。

2.3 parallel_scan

并行前缀和:

#include <tbb/parallel_scan.h>

std::vector<int> input(n, 1), output(n);
int sum = tbb::parallel_scan(
    tbb::blocked_range<size_t>(0, n), 0,
    [&](const tbb::blocked_range<size_t>& r, int init, bool is_final_scan) {
        for (size_t i = r.begin(); i != r.end(); ++i) {
            init += input[i];
            if (is_final_scan)
                output[i] = init;
        }
        return init;
    },
    [](int a, int b) { return a + b; }
);

parallel_scan 会执行两轮:第一轮计算部分和(不做最终写入),第二轮做最终写入。这个特性保证了缓存一致性的同时避免了大量同步开销。

2.4 parallel_sort

#include <tbb/parallel_sort.h>

std::vector<int> data = {9, 3, 5, 1, 8, 4, 7, 2, 6};
tbb::parallel_sort(data.begin(), data.end());
// 也支持自定义比较器
tbb::parallel_sort(data.begin(), data.end(), std::greater<int>());

内部实现是 并行化的快速排序,当递归到足够小时会退化为串行排序。

2.5 parallel_for_each

对迭代器范围并行执行函数:

#include <tbb/parallel_for_each.h>

std::list<int> lst = {1, 2, 3, 4, 5};
tbb::parallel_for_each(lst.begin(), lst.end(), [](int& x) {
    x *= 2;
});

2.6 parallel_invoke

并行执行多个独立函数:

#include <tbb/parallel_invoke.h>

tbb::parallel_invoke(
    [] { process_image_left_half();  },
    [] { process_image_right_half(); },
    [] { process_metadata();         }
);

适合任务粒度较大且互不依赖的场景。内部实现把函数包成 task,通过 task scheduler 调度。

2.7 parallel_pipeline

并行流水线处理:

#include <tbb/parallel_pipeline.h>

tbb::parallel_pipeline(
    8,  // 流水线最大并发 token 数
    tbb::make_filter<void, Frame>(
        tbb::filter_mode::serial_in_order,
        [&](tbb::flow_control& fc) -> Frame {
            Frame f = read_frame();
            if (!f.valid) fc.stop();
            return f;
        }
    ) &
    tbb::make_filter<Frame, ProcessedFrame>(
        tbb::filter_mode::parallel,
        [](Frame f) { return decode(f); }
    ) &
    tbb::make_filter<ProcessedFrame, void>(
        tbb::filter_mode::serial_out_of_order,
        [](ProcessedFrame f) { render(f); }
    )
);

流水线模型:第一阶段(读帧)串行有序 → 第二阶段(解码)并行 → 第三阶段(渲染)串行无序。parallel_pipeline 确保了不同阶段的并行度和顺序约束。

3. 分区器

TBB 提供多种分区策略,控制工作负载的划分方式。

分区器行为
blocked_range<T>一维连续范围,按块粒度切分
blocked_range2D<T>二维范围(行列),在大图像处理时有用
blocked_range3D<T>三维范围
simple_partitioner严格按 grainsize 切分,粒度精确
auto_partitioner自动调整粒度,兼顾负载均衡与开销,默认推荐
static_partitioner一次性切分,不 work-stealing,适合各分片负载极度均匀的场景
affinity_partitioner利用缓存亲和性,适合每次范围相同的重复调用
// 手动指定粒度的分区
tbb::parallel_for(
    tbb::blocked_range<size_t>(0, n, 256), // 每个分片至少 256 个元素
    [&](const auto& r) { /* ... */ },
    tbb::simple_partitioner{}
);

// 自动分区(默认行为)
tbb::parallel_for(
    tbb::blocked_range<size_t>(0, n),
    [&](const auto& r) { /* ... */ }
    // 第三个参数不传等效于 auto_partitioner
);

选择建议

  • 元素处理开销大时,grainsize 小一点或直接用 auto_partitioner
  • 元素处理开销小时,grainsize 大一点,减少任务调度开销
  • 各分片负载不均时,用 auto_partitioner 让 TBB work-stealing 自动均衡

4. 任务分组

4.1 task_group

TBB 的 task_group 允许动态添加和等待任务:

#include <tbb/task_group.h>

tbb::task_group tg;

tg.run([&] { process_part1(data); });
tg.run([&] { process_part2(data); });
tg.run([&] { process_part3(data); });

tg.wait(); // 等待所有提交的任务完成

也可以配合异常处理:

tbb::task_group tg;
try {
    tg.run([] { might_throw(); });
    tg.wait();
} catch (...) {
    tg.cancel(); // 取消所有未开始的任务
    // 重新抛出或处理异常
}

4.2 structured_task_group(C++17 风格)

structured_task_group 在析构时自动 wait(),且只在单线程环境下使用(不能跨线程 run)。更轻量、安全检查更严格:

tbb::structured_task_group stg;
stg.run([] { do_work(); });
stg.run([] { do_more_work(); });
stg.wait(); // 也可以不调,析构自动 wait

区别总结

特性task_groupstructured_task_group
跨线程 run可以不可以
外部取消支持 cancel()支持
析构自动 wait不会,会触发 UB
开销稍大极小

5. 并发容器

TBB 提供线程安全的容器,内部使用细粒度锁或 lock-free 技术,允许并发读写。

5.1 concurrent_vector

动态数组,支持并发 push_back 和随机访问,不会因为扩容而 invalidate 已有元素:

#include <tbb/concurrent_vector.h>

tbb::concurrent_vector<int> cv;

tbb::parallel_for(size_t(0), 10000, [&](size_t i) {
    cv.push_back(i);
});
// cv.size() == 10000

// 随机访问(线程安全——读旧数据可以,同时写不同位置也安全)
int val = cv[42];
cv[42] = 999; // 这种方式并**不**安全——多个线程同时写同一位置会 data race

重要concurrent_vector 保证的是 grow 操作安全(push_back, emplace_back),不同线程写不同索引安全,但写相同索引需要外部加锁。

5.2 concurrent_queue / concurrent_bounded_queue

#include <tbb/concurrent_queue.h>

tbb::concurrent_bounded_queue<int> q;
q.set_capacity(1024);

// 生产者
tbb::parallel_for(size_t(0), 1000, [&](size_t i) {
    q.push(i);
});

// 消费者
int item;
while (q.try_pop(item)) {
    process(item);
}
容器特点
concurrent_queue<T>无界、多生产者多消费者安全
concurrent_bounded_queue<T>有界、try_push 可在满时立即失败、支持阻塞的 push/pop

5.3 concurrent_hash_map

支持并发插入/查找/删除的哈希表:

#include <tbb/concurrent_hash_map.h>

tbb::concurrent_hash_map<std::string, int> chm;

// 线程安全的查找和修改
{
    auto accessor = chm.accessor; // 获取写访问
    if (chm.find(accessor, key))
        accessor->second += 1;
    else
        chm.insert(accessor, {key, 1});
} // accessor 析构释放锁

// 只读查找
{
    auto const_accessor = chm.const_accessor;
    if (chm.find(const_accessor, key))
        use(const_accessor->second);
}

accessor 本质上是一个 scoped 锁,持有对应桶的读/写锁,析构时释放。

5.4 concurrent_unordered_map / concurrent_unordered_set

C++11 风格的无序关联容器(oneTBB 2021 引入):

#include <tbb/concurrent_unordered_map.h>

tbb::concurrent_unordered_map<int, std::string> cum;

tbb::parallel_for(size_t(0), 1000, [&](size_t i) {
    cum.emplace(i, "value_" + std::to_string(i));
});

cum.for_each([](const auto& p) {
    println("{} -> {}", p.first, p.second);
});

concurrent_hash_map 的区别:

  • concurrent_hash_map 使用 accessor(锁语义),更精细的控制
  • concurrent_unordered_map 接口更接近 std::unordered_map,使用简单

5.5 concurrent_priority_queue

线程安全的优先队列:

#include <tbb/concurrent_priority_queue.h>

tbb::concurrent_priority_queue<int> cpq;
cpq.push(5);
cpq.push(10);
cpq.push(3);

int top;
cpq.try_pop(top); // top == 3(小顶堆,默认)

// 支持自定义比较器
tbb::concurrent_priority_queue<int, std::greater<int>> max_heap;

6. 同步原语

TBB 提供多种互斥量,在不同场景下有不同的性能特征:

#include <tbb/spin_mutex.h>
#include <tbb/spin_rw_mutex.h>
#include <tbb/queuing_mutex.h>
#include <tbb/queuing_rw_mutex.h>
#include <tbb/mutex.h>
类型特点适用场景
spin_mutex自旋锁,极高开销下直接 busy-wait临界区 极短(< 几十条指令)
spin_rw_mutex读写自旋锁,读读不互斥大量读、少量写,临界区短
queuing_mutex排队自旋锁,FIFO 公平避免锁饥饿,临界区稍长
queuing_rw_mutex排队读写自旋锁公平读写锁
mutex包装系统 mutex(pthread_mutex)临界区较长或有系统调用时
rw_mutex包装系统读写锁长临界区的读写分离

用法:统一遵循 scoped_lock / RAllock 模式:

tbb::spin_mutex mtx;
int shared_counter = 0;

tbb::parallel_for(size_t(0), 10000, [&](size_t) {
    tbb::spin_mutex::scoped_lock lock(mtx);
    ++shared_counter;
}); // lock 析构自动释放

最佳实践

  • 如果临界区是 10~20 条指令级别的,用 spin_mutex 最合适
  • 如果可能等待较长(IO、malloc 等),用 mutex(避免浪费 CPU)
  • 永远不要 spin_mutex + 阻塞操作,那是灾难
  • scoped_lock 保证异常安全

7. 全局控制

控制线程数、优先级等全局属性:

#include <tbb/global_control.h>

// 限制线程数为 4
tbb::global_control gc(
    tbb::global_control::max_allowed_parallelism, 4
);

tbb::parallel_for(size_t(0), n, [&](size_t i) { /* ... */ });
// 这里最多 4 个线程
// gc 析构后恢复默认线程数

可以在代码中多次嵌套,内层取最小值:

{
    tbb::global_control gc(tbb::global_control::max_allowed_parallelism, 2);
    {
        tbb::global_control gc2(tbb::global_control::max_allowed_parallelism, 4);
        // 实际生效 = min(2, 4) = 2
    }
}

另一种控制方式——设置线程栈大小:

tbb::global_control gc(tbb::global_control::thread_stack_size, 1024 * 1024); // 1MB

8. 任务调度器高级用法

8.1 task_arena

限制任务在某组线程上执行:

#include <tbb/task_arena.h>

tbb::task_arena arena(2); // 2 个线程的 arena

arena.execute([&] {
    tbb::parallel_for(size_t(0), n, [&](size_t i) {
        /* 这段代码只在 arena 的 2 个线程上跑 */
    });
});

可以绑定到特定的 NUMA 节点:

tbb::task_arena arena(tbb::task_arena::constraints{}
    .set_numa_id(0));

8.2 task_scheduler_handle

自 oneTBB 2021 起,可以手动控制调度器的生命周期:

#include <tbb/task_scheduler_handle.h>

{
    tbb::task_scheduler_handle handle;
    tbb::finalize(handle); // 主动清理调度器资源
}

9. 内存分配器

TBB 提供高性能的内存分配器,避免多线程下的 false-sharing 和锁竞争:

#include <tbb/scalable_allocator.h>

// 使用 TBB 的 scalable allocator
std::vector<int, tbb::scalable_allocator<int>> vec(1000);

// STL 容器开箱即用
using tbb_vector = std::vector<double, tbb::scalable_allocator<double>>;
tbb_vector v1, v2;

// cache_aligned_allocator —— 防止 false sharing
#include <tbb/cache_aligned_allocator.h>
struct alignas(64) PaddedCounter {
    int64_t value;
};
std::vector<PaddedCounter, tbb::cache_aligned_allocator<PaddedCounter>> counters;
分配器用途
tbb::scalable_allocator<T>全局可伸缩分配器,减少堆锁竞争
tbb::cache_aligned_allocator<T>缓存行对齐,避免 false sharing
tbb::zero_allocator<T>分配的同时零初始化

10. Flow Graph 简介

TBB Flow Graph 允许通过有向无环图定义计算,节点自动并行:

#include <tbb/flow_graph.h>

tbb::flow::graph g;

tbb::flow::function_node<int, int> square(g, tbb::flow::unlimited,
    [](int x) { return x * x; });

tbb::flow::function_node<int, void> print(g, tbb::flow::serial,
    [](int x) { println("result: {}", x); });

tbb::flow::make_edge(square, print);

for (int i = 0; i < 100; ++i)
    square.try_put(i);

g.wait_for_all();

节点类型:

类型行为
function_node接收一个输入,产生一个输出
multifunction_node一个输入可以产生 0~N 个输出
join_node等待多个输入同时到达后组合
split_node将一个 tuple 拆成多路
broadcast_node广播到所有后继节点
buffer_node缓冲消息(可排序、无限容量)
queue_node排队消息,FIFO

并发度控制:tbb::flow::serial(串行)、tbb::flow::unlimited(无限并发)、或指定数字。

11. 性能建议

粒度控制

并行不是银弹。颗粒度太小,任务调度开销会淹没实际计算。

// 不要这样——每个元素一个任务(太细了)
tbb::parallel_for(size_t(0), vec.size(), [&](size_t i) {
    vec[i] = light_op(vec[i]);  // light_op 只有几条指令
});

确保每个分片有 至少上万条指令 的工作量,或者使用 blocked_range 手动控制颗粒度。

避免 False Sharing

多线程写入同一缓存行的不同变量会导致严重的性能下降:

// BAD: counters[i] 很可能共享同一条缓存行
int counters[8];

// GOOD: 每个计数器独自占一个缓存行
alignas(64) int counters[8];
// 或使用 tbb::cache_aligned_allocator

复用 Task Scheduler

TBB 的 task scheduler 在第一次调用并行算法时初始化,之后复用。如果有多个短促并行段,尽量在一个初始化周期内完成:

// BAD: 多次启动/停止 scheduler
for (int i = 0; i < 100; ++i)
    tbb::parallel_for(...);

// GOOD: 在外部保持 scheduler 存活
tbb::parallel_for(0, 100, [&](int) {
    // 直接写并行段
});

选择合适的容器

  • 并发插入而不删改已有数据 → concurrent_vector
  • 生产者-消费者队列 → concurrent_bounded_queue(有界背压是关键)
  • 并发字典 → 读多写少用 concurrent_hash_map,简单场景用 concurrent_unordered_map
  • 快速分配内存 → tbb::scalable_allocator

NUMA 感知

多路服务器场景:

tbb::task_arena numa_arena(tbb::task_arena::constraints{}
    .set_numa_id(current_numa_node));

numa_arena.execute([&] {
    tbb::parallel_for(/* 只处理本 NUMA 节点的数据 */);
});

配合 hwinfolibnuma 获取 NUMA 拓扑,按节点分配数据。

12. 与 C++17 并行算法对比

C++17 引入了 std::execution::parallel_policy

std::sort(std::execution::par, data.begin(), data.end());
std::for_each(std::execution::par, data.begin(), data.end(), work);
维度TBBC++17 并行算法
编译器要求C++20 + TBB 库C++17 + TBB/别的后端
可控粒度blocked_range, 分区器无(由实现决定)
容器广度concurrent_* 全套
任务模型task_group, flow graph
缓存亲和affinity_partitioner
可移植性需安装 TBB标准库自带(但具体后端可能不同)

两者并不冲突:可以用 C++17 并行算法做简单并行,用 TBB 做复杂流水线、并发容器和任务图。

13. 综合示例

图像处理流水线——读取 → 并行处理 → 结果汇总:

#include <tbb/tbb.h>
#include <vector>
#include <cstdio>

struct Image {
    int id;
    std::vector<uint8_t> data;
    bool valid;
};

Image read_image(int id) {
    return Image{id, std::vector<uint8_t>(1024, 0), true};
}

void process_segment(Image& img, size_t start, size_t end) {
    for (size_t i = start; i < end; ++i)
        img.data[i] = static_cast<uint8_t>(img.data[i] * 1.5);
}

struct Result {
    int id;
    double avg;
};

int main() {
    constexpr int count = 100;
    tbb::concurrent_vector<Image> images;

    tbb::parallel_for(0, count, [&](int i) {
        images.push_back(read_image(i));
    });

    tbb::parallel_for(
        tbb::blocked_range<size_t>(0, images.size(), 4),
        [&](const auto& r) {
            for (size_t i = r.begin(); i != r.end(); ++i) {
                auto& img = images[i];
                process_segment(img, 0, img.data.size());
            }
        },
        tbb::auto_partitioner{}
    );

    auto total_avg = tbb::parallel_reduce(
        tbb::blocked_range<size_t>(0, images.size()), 0.0,
        [&](const auto& r, double init) {
            for (size_t i = r.begin(); i != r.end(); ++i) {
                double sum = 0;
                for (auto v : images[i].data)
                    sum += v;
                init += sum / images[i].data.size();
            }
            return init;
        },
        std::plus<double>{}
    ) / images.size();

    println("average brightness: {}", total_avg);
}

这个例子演示了 TBB 中最常用的几个组件:

  1. concurrent_vector 做线程安全的收集
  2. parallel_for + blocked_range 做数据并行
  3. parallel_reduce 做结果聚合
  4. 所有同步由库自动完成,不需要显式创建或 join 任何线程

TBB 的核心理念是 让开发者关注任务逻辑,而不是线程管理。用好 TBB,你的并行代码既不会比手写 pthread 慢,又能保持代码的简洁和可移植性。