C++ 同步原语
同步原语(Synchronization Primitives)是操作系统和编程语言提供的基本工具,用于在多线程或并发环境中协调线程(或进程)之间的执行顺序,管理共享资源的访问,以避免数据竞争(data race)、死锁(deadlock)等问题。它们是实现线程同步和互斥的核心机制,广泛应用于多线程编程中。
同步原语是底层的同步机制,用于:
1.互斥(Mutual Exclusion):确保同一时间只有一个线程访问共享资源(如你的 counter
变量)。
2.同步(Synchronization):协调线程的执行顺序,例如等待某个条件满足后再继续。
3.通信:在线程间传递信号或数据。
C++ 的同步原语:
1.互斥锁(Mutexes)
作用:保证同一时间只有一个线程可以访问共享资源,防止数据竞争。
C++ 实现:
std::mutex
(C++11):基本的互斥锁,提供独占访问。
方法:lock()
:获取锁(阻塞直到锁可用)。unlock()
:释放锁。try_lock()
:尝试获取锁(非阻塞)。
示例:std::lock_guard
确保每次 ++counter
是原子操作,避免数据竞争,保证 counter
最终为 2000。
#include <iostream>
#include <thread>
#include <mutex>std::mutex mtx;
int counter = 0;void increment() {for (int i = 0; i < 1000; ++i) {std::lock_guard<std::mutex> lock(mtx); // RAII 自动加锁/解锁++counter;}
}int main() {std::thread t1(increment);std::thread t2(increment);t1.join();t2.join();std::cout << counter << std::endl; // 输出 2000return 0;
}
std::recursive_mutex
(C++11):允许同一线程多次锁定(递归加锁),但其他线程仍被阻塞,适合需要递归调用的场景。
方法:同 std::mutex
。
示例:
std::recursive_mutex rmtx;
void recursive_func(int n) {std::lock_guard<std::recursive_mutex> lock(rmtx);if (n > 0) recursive_func(n - 1);
}
std::timed_mutex
(C++11):支持超时尝试加锁的互斥锁。避免无限阻塞。
额外方法:try_lock_for()
, try_lock_until()
。
示例:
std::timed_mutex tmtx;
void try_access() {if (tmtx.try_lock_for(std::chrono::milliseconds(100))) {// 访问资源tmtx.unlock();}
}
std::recursive_timed_mutex
(C++11):结合 recursive_mutex
和 timed_mutex
的特性,支持递归加锁和超时。用于递归场景中需要超时控制。
示例:类似 timed_mutex
,但允许同一线程多次加锁。
std::shared_mutex
(C++17):支持读写锁,允许多个线程同时读取共享资源,但写操作独占访问,即允许多个线程共享读锁(lock_shared()
),但写锁(lock()
)独占。适用于读多写少的场景。
方法:lock()
/ unlock()
:独占写锁。lock_shared()
/ unlock_shared()
:共享读锁。try_lock()
, try_lock_shared()
。
示例:读写锁允许多个读者并发,但写者独占访问。
#include <shared_mutex>
#include <thread>
#include <iostream>std::shared_mutex rw_mutex;
int data = 0;void reader(int id) {std::shared_lock<std::shared_mutex> lock(rw_mutex);std::cout << "Reader " << id << " reads: " << data << std::endl;
}void writer(int id) {std::unique_lock<std::shared_mutex> lock(rw_mutex);++data;std::cout << "Writer " << id << " writes: " << data << std::endl;
}int main() {std::vector<std::thread> threads;for (int i = 0; i < 3; ++i) {threads.emplace_back(reader, i);threads.emplace_back(writer, i);}for (auto& t : threads) {t.join();}return 0;
}
std::shared_timed_mutex
(C++14):shared_mutex
的超时版本,支持超时尝试读锁或写锁。适用于读写锁需要超时控制。
额外方法:try_lock_for()
, try_lock_until()
, try_lock_shared_for()
, try_lock_shared_until()
。
示例:类似 shared_mutex
,但支持超时。
2.锁管理(Lock Management)
这些是 RAII 风格的工具,用于管理互斥锁的获取和释放,确保异常安全和简化代码。
std::lock_guard
(C++11):RAII 封装,构造时加锁,析构时解锁,不支持手动解锁。适用于简单、轻量级的锁管理。
std::unique_lock
(C++11):更灵活的 RAII 锁管理,支持手动加锁/解锁、延迟加锁、超时尝试。适用于需要灵活控制锁的场景,如条件变量。
方法:lock()
, unlock()
, try_lock()
, try_lock_for()
, try_lock_until()
.
示例:
std::unique_lock<std::mutex> lock(mtx);
cv.wait(lock, [] { return !q.empty(); });
std::shared_lock
(C++14):RAII 封装,用于管理 std::shared_mutex
的共享锁(读锁)。适合读写锁的读操作。
示例:
std::shared_lock<std::shared_mutex> lock(smtx);
// 读取共享资源
std::scoped_lock
(C++17):RAII 封装,支持同时锁定多个互斥锁,避免死锁。适合需要锁定多个锁的场景。
示例:
std::mutex m1, m2;
void func() {std::scoped_lock lock(m1, m2); // 同时锁定 m1 和 m2// 访问资源
}
3.条件变量(Condition Variables)
作用:允许线程等待某个条件成立(如队列非空、任务完成),并在条件满足时被唤醒。常与互斥锁配合使用。
C++ 实现:
std::condition_variable
(C++11):允许线程等待条件满足,支持通知机制。
方法:wait(lock, predicate)
:释放锁并等待,直到条件满足。wait_for()
, wait_until()
, notify_one()
:唤醒一个等待的线程。notify_all()
:唤醒所有等待的线程。
示例:生产者-消费者模型,条件变量协调生产者和消费者,确保消费者在队列非空时才处理数据。
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <iostream>std::mutex mtx;
std::condition_variable cv;
std::queue<int> q;void producer() {for (int i = 0; i < 5; ++i) {std::lock_guard<std::mutex> lock(mtx);q.push(i);std::cout << "Produced: " << i << std::endl;cv.notify_one(); // 通知消费者}
}void consumer() {while (true) {std::unique_lock<std::mutex> lock(mtx);cv.wait(lock, [] { return !q.empty(); }); // 等待队列非空int value = q.front();q.pop();std::cout << "Consumed: " << value << std::endl;lock.unlock();if (value == 4) break; // 退出条件}
}int main() {std::thread t1(producer);std::thread t2(consumer);t1.join();t2.join();return 0;
}
std::condition_variable_any
(C++11):更通用的条件变量,支持任何符合锁接口的类型(不仅限于 std::mutex
)。适用于需要非标准锁类型的场景。
示例:
std::condition_variable_any cv_any;
std::unique_lock<std::shared_mutex> lock(smtx);
cv_any.wait(lock, [] { return true; });
4.原子操作(Atomic Operations)
作用:提供无锁(lock-free)的线程安全的原子操作,适合简单变量(如计数器)的并发访问。
C++ 实现:
std::atomic<T>
(C++11):提供原子操作,支持基本类型(如 int
, bool
, pointer
)和用户定义类型(需满足要求)。用于高效的计数器、标志位。
方法:load()
, store()
, exchange()
, compare_exchange_strong()
, fetch_add()
, fetch_sub()
, 等。
示例:std::atomic
使 counter
的递增操作原子化,解决数据竞争,性能比互斥锁更高。
#include <iostream>
#include <thread>
#include <atomic>std::atomic<int> counter = 0;void increment() {for (int i = 0; i < 1000; ++i) {counter.fetch_add(1); // 原子递增}
}int main() {std::thread t1(increment);std::thread t2(increment);t1.join();t2.join();std::cout << counter << std::endl; // 输出 2000return 0;
}
std::atomic_flag
(C++11):最轻量级的原子类型,仅支持测试和设置操作,保证无锁(lock-free)。用于简单的标志或自旋锁。
方法:test_and_set()
, clear()
, test()
(C++20)。
示例:
std::atomic_flag flag = ATOMIC_FLAG_INIT;
while (flag.test_and_set()) {} // 自旋锁
原子内存序(Memory Ordering)(C++11):不是独立的原语,但与 std::atomic
密切相关,用于控制原子操作的内存序。用于优化性能或确保正确的同步。
类型:std::memory_order_relaxed
, std::memory_order_acquire
, std::memory_order_release
, std::memory_order_seq_cst
, 等。
示例:
std::atomic<int> x = 0;
x.store(1, std::memory_order_release);
5.信号量(Semaphores)
作用:控制有限资源的访问,或协调线程间的执行顺序。信号量是一个计数器,支持增(释放)和减(获取)操作,用于控制资源访问或线程同步。
std::counting_semaphore
(C++20):支持任意计数的信号量,控制有限资源的访问。用于限制并发线程数。
方法:acquire()
:减少计数,阻塞如果计数为 0。release()
:增加计数,唤醒等待的线程。try_acquire()
, try_acquire_for()
, try_acquire_until()
示例:信号量限制最多 3 个线程同时运行。
#include <semaphore>
#include <thread>
#include <vector>
#include <iostream>std::counting_semaphore<3> sem(3); // 最多 3 个线程并发void task(int id) {sem.acquire();std::cout << "Task " << id << " running" << std::endl;std::this_thread::sleep_for(std::chrono::milliseconds(100));std::cout << "Task " << id << " done" << std::endl;sem.release();
}int main() {std::vector<std::thread> threads;for (int i = 0; i < 5; ++i) {threads.emplace_back(task, i);}for (auto& t : threads) {t.join();}return 0;
}
std::binary_semaphore
(C++20):计数范围为 0 或 1 的信号量,相当于二进制锁。用于简单的互斥或同步。
示例:
std::binary_semaphore sem(1);
sem.acquire();
// 独占资源
sem.release();
6.屏障(Barriers)
作用:协调一组线程,使它们在某个点同步等待,直到所有线程都到达。
C++ 实现:
std::barrier
(C++20):允许多个线程在到达屏障点时等待,直到所有线程到达。用于多阶段任务同步。
方法:arrive_and_wait()
:线程到达屏障并等待。arrive_and_drop()
。
示例:屏障确保所有线程完成第一阶段后才进入第二阶段。
#include <barrier>
#include <thread>
#include <iostream>std::barrier sync_point(3);void task(int id) {std::cout << "Task " << id << " phase 1" << std::endl;sync_point.arrive_and_wait();std::cout << "Task " << id << " phase 2" << std::endl;
}int main() {std::vector<std::thread> threads;for (int i = 0; i < 3; ++i) {threads.emplace_back(task, i);}for (auto& t : threads) {t.join();}return 0;
}
7.闩锁(Latches)
闩锁是一次性屏障,等待所有线程完成特定操作。
C++ 实现:
std::latch
(C++20):计数器,线程减少计数,当计数为 0 时继续。用于一次性同步(如初始化完成)。
方法:count_down()
, wait()
, try_wait()
, arrive_and_wait()
。
示例:
std::latch latch(3);
void task() {// 初始化latch.count_down();
}
void main_thread() {latch.wait(); // 等待所有任务完成
}
8.其他同步工具
这些工具虽然不完全是传统意义上的同步原语,但与线程同步密切相关。
std::future
/ std::promise
(C++11):用于异步任务的结果传递和同步,等待异步操作完成。
示例:
std::promise<int> p;
std::future<int> f = p.get_future();
std::thread t([&p] { p.set_value(42); });
std::cout << f.get() << std::endl;
t.join();
std::async
(C++11):启动异步任务,返回 std::future
。用于简化异步编程。
示例:
auto f = std::async(std::launch::async, [] { return 42; });
std::cout << f.get() << std::endl;
std::thread::yield
(C++11):让当前线程放弃 CPU,允许其他线程运行,用于优化忙等待。
示例:
while (!ready) {std::this_thread::yield();
}
选择哪种同步原语取决于具体场景:
简单计数器:用 std::atomic
。
独占访问:用 std::mutex
和 std::lock_guard
。
读多写少:用 std::shared_mutex
。
线程等待条件:用 std::condition_variable
。
限制并发:用 std::counting_semaphore
。
多阶段同步:用 std::barrier
。
多个线程互相等待对方释放锁,可能导致死锁。解决:始终按固定顺序获取多个锁,或使用 std::lock
同时获取。
锁(如 mutex
)可能导致线程阻塞,降低并发性能。优先考虑无锁机制(如 std::atomic
)或减少锁的粒度。
使用 RAII(如 std::lock_guard
, std::unique_lock
)可以确保锁在异常时释放,保证异常安全。
数据竞争和死锁难以重现时,建议使用线程分析工具(如 ThreadSanitizer)。
选择合适的同步原语需要权衡功能、性能和复杂性。