多线程服务器编程[2]-线程同步精要

线程同步的四项原则

  • 最低限度的共享对象,减少需要同步的场合
  • 使用高级的并发构件,如TaskQueue,Producer-Consumer Queue,CountDownLatch等
  • 非不得已使用底层同步原语时,只使用非递归的互斥器和条件变量,慎用读写锁,不要使用信号量
  • 除了使用atomic整数外,不自己编写lock-free代码,也不要使用内核级同步原语

互斥器(mutex)

使用原则

  • 使用RAII手法封装mutex的创建、销毁、加锁、解锁

    • 保证了锁的生效期间等于一个作用域(Scoped Locking),在死锁发生时有助于定位
    • 保证了不手工调用lock和unlock
    • 保证了不跨进程和线程调用锁
    • 保证了不忘记解锁和重复加锁
  • 只使用非递归的mutex(即不可重入的mutex)
  • 每次构造Guard对象时,思考一路上已经持有的锁(函数调用栈),防止因枷锁顺序不同导致的死锁

为什么只使用非递归的mutex

  • 同一个线程可以重复对递归的mutex加锁,但不能重复对非递归的mutex加锁
  • 在同一个县城里多次对非递归的mutex加锁会导致死锁
  • 两者性能差别不大

原因

  • 非递归的锁能提早暴露出编程问题

死锁

  • 除了线程间的死锁,单个线程也会造成死锁
  • 在保证使用Scoped Locking时,可以通过分析调用栈得到原因

条件变量

使用条件变量实现的BlockingQueue

muduo::MutexLock mutex;
muduo::Condition cond(mutex);
std::deque queue;

int dequeue(){
    MutexLockGuard lock(mutex);
    while(queue.empty()){
        cond.wait(); // unlock mutex and wait (原子地)
    }
    assert(!queue.empty());
    int top = queue.front();
    queue.pop_front();
    return top;
}

void enqueue(int x){
    {
    MutexLockGuard lock(mutex);
    queue.push_back(x);
    }
    cond.notify(); // wakeup the wait side
}
  • 每次加入都调用notify(),而不是notifyall()是为了避免惊群效应
  • 不只在0->1的时候notify(),而是每加一次通知一次,是当存在多个消费者时,能高效地唤醒他们,不然只唤醒了一个
  • 详见 https://book.douban.com/annot...

使用条件变量实现CountDownLatch(倒计时)

class CountDownLatch : boost::noncopyable{
    public:
        explicit CountDownLatch(int count);
        void wait();
        void countDown();
    private:
        mutable MutexLock mutex_;
        Condition condition_;
        int count_;
}

void CountDownLatch::wait(){
    MutexLockGuard lock(mutex_);
    whilt(count_>0){
        condition_.wait();
    }
}

void CountDownLatch::countDown(){
    MutexLockGuard lock(mutex_);
    --count_;
    if(count_ == 0){
        condition_.notifyAll();
    }
}

封装MutexLock、MutexLockGuard、Condition的实现

class MutexLock : boost::noncopyable{
    public:
        MutexLock()
        :holder_(0)
        {
            pthread_mutex_init(&mutex_NULL);
        }

        ~MutexLock()
        {
            assert(hoilder_ == 0);
            pthread_mutex_destory(&mutex);
        }

        bool isLockedByThisThread(){
            return holder_ == CurrentThread::tid();
        }
        
        void assertLocked(){
            assert(isLockedByThisThread());
        }
        
        void lock(){
            pthread_mutex_lock(&mutex);
            holder_ = CurrentThread::tid();
        }

        void unlock(){
            holder_ = 0;
            pthread_mutex_unlock();
        }

        pthread_mutex_t* getPthreadMutex(){
            return &mutex;
        }
    private:
        pthread_mutex_t mutex_;
        pid_t holder_; 
}

class MutexLockGuard : boost::noncopyable{
    public:
        explicit MutexLockGuard(MutexLock& mutex)
        :mutex_(mutex) 
        {
            mutex_.lock();
        }
        
        ~MutexLockGuard(){
            mutex_.unlock;
        }
    private:
        MutexLock& mutex_;
}

#define MutexLockGuard(x) static_assert(false,"Missing mutex guard variable name!")

class Condition : boost::noncopyable{
    public:
        explicit Condition(MutexLock& mutex)
        :mutex_(mutex)
        {
            pthread_cond_init(&pcond_,NULL);
        }

        ~Condition(){
            pthread_cond_destory(&pcond_);
        }

        void wait(){
            pthread_cond_wait(&pcond_,mutex_.getPthreadMutex());
        }

        void notify(){
            pthread_cond_signal(&pcond_);
        }
        
        void notifyAll(){
            pthread_cond_boardcast(&pcond);
        }

    private:
        MutexLock& mutex_;
        pthread_cond_t pcond_;
}

小结

  • 线程同步的四项原则:尽量使用高级同步设施
  • 对于其他的任务,使用普通互斥器和条件变量,采用RAII手法和Scooped Locking

你可能感兴趣的