Boost无锁队列分析与实现

在多线程编程环境中,有时为了保证数据处理的效率会使用一些无锁(Lockfree)的数据结构,例如无锁队列(Lockfree Queue)、无锁栈(Lockfree Stack)、环形缓冲(Ring Buffer)、无锁哈希(Lockfree Hash)之类。本文分析Boost库(V1.57.0)中的无锁队列,并将其从复杂的Boost数据结构中剥离出来单独实现。

boost v1.57.0下载

改写的源码

std::memory_order一些用在本文的知识点

参考文献:http://wilburding.github.io/blog/2013/04/07/c-plus-plus-11-atomic-and-memory-model/
http://bartoszmilewski.com/2008/12/01/c-atomics-and-memory-ordering/

编译器从计算机的远古时代发展至今,已经单纯脱离原始的翻译代码,而是会进行非常多的优化,将“糟糕”的代码改写成高效的代码,在多线程环境下会导致指令重排序happens-befors问题std::memory_order作用就是告诉编译器哪些数据是线程共享的,编译器只在必要的情况下对其保守优化。

 enum memory_order {

memory_order_relaxed,
memory_order_consume,
memory_order_acquire,
memory_order_release,
memory_order_acq_rel,
memory_order_seq_cst

};

尽量用一句话在简明概括一下上面每条枚举的用处:

memory_order_relaxed – 不保证顺序性,只保证原子性和修改的一致性。

memory_order_consume – memory_order_acquire弱化版,C++专家特性,一般用不到。

memory_order_acquire – 不会把其他读操作移动它前面,保证读取到(其他线程store(memory_order_release))最新修改。

memory_order_release – 不会把其他写操作移动它后面,保证其他线程(load(memory_order_acquire))可以取到最新修改。

memory_order_acq_rel – 用在对内存值本身操作上,内置acquire/release合体。

memory_order_scq_cstmemory_order_acq_rel强化版,支持处理顺序一致性,最容易用的默认参数。

刚开始提到的memory_order那几个枚举就是告知编译器数据是在线程间共享的,并且是怎样共享的。memory_order_acquire和memory_order_release的关系大概就像mutex的acquire(lock)和release(unlock)。mutex使线程间串行执行,并且上一个线程unlock后另一个线程lock进入,保证可以看到上一个线程的所有数据修改而不会有读写操作被reorder后出问题的情况。

 

无锁队列Boost库Push的源码和注释

template <bool Bounded>;
bool do_push(T const & t)
{
    using detail::likely;

    node * n = pool.template construct<true, Bounded>(t, pool.null_handle());
    handle_type node_handle = pool.get_handle(n);

    if (n == NULL)
        return false;

    for (;;) {
        tagged_node_handle tail = tail_.load(memory_order_acquire);
        node * tail_node = pool.get_pointer(tail);
        tagged_node_handle next = tail_node-&gt;next.load(memory_order_acquire);
        node * next_ptr = pool.get_pointer(next);

        tagged_node_handle tail2 = tail_.load(memory_order_acquire);

        //进行比较是为了防止其他线程进行Push导致了tail变化
        //内存模型memory_order_acquire保证tail,tail2取值的顺序性
        if (likely(tail == tail2)) {

            //Tail.next必须指向0才进行push下一步关键操作
            //若不为0,可能另外一个线程提前push,我们进入else将Tail指向Tail.next
            if (next_ptr == 0) {
                tagged_node_handle new_tail_next(node_handle, next.get_next_tag());

                //这里next一般指向0,尝试将Tail.next赋值成新node.next的数值
                //若是第一个push node那么Head.next也会改变
                //赋值的目的是为了将新node与原有node形成链状关系
                if ( tail_node-&gt;next.compare_exchange_weak(next, new_tail_next) ) {
                    //成功赋值,将Tail前移(指向新node)
                    tagged_node_handle new_tail(node_handle, tail.get_next_tag());
                    tail_.compare_exchange_strong(tail, new_tail);
                    return true;
                }
            }
            else {
                //移动Tail
                tagged_node_handle new_tail(pool.get_handle(next_ptr), tail.get_next_tag());
                tail_.compare_exchange_strong(tail, new_tail);
            }
        }
    }
}

Push图示

lockfreeQpush

 

无锁队列Boost库Pop的源码和注释

 

template <typename U>
bool pop (U & ret)
{
    using detail::likely;
    for (;;) {
        tagged_node_handle head = head_.load(memory_order_acquire);
        node * head_ptr = pool.get_pointer(head);

        tagged_node_handle tail = tail_.load(memory_order_acquire);
        tagged_node_handle next = head_ptr->next.load(memory_order_acquire);
        node * next_ptr = pool.get_pointer(next);

        tagged_node_handle head2 = head_.load(memory_order_acquire);

        //进行比较是为了防止其他线程进行Pop导致了head变化
        //内存模型memory_order_acquire保证head,head2取值的顺序性
        if (likely(head == head2)) {

            //头尾一致
            if (pool.get_handle(head) == pool.get_handle(tail)) {
                //下一个node为0,证明没有可以pop的node
                if (next_ptr == 0)
                    return false;

                //还有可以pop的node我们先移动tail
                //这里可能出现的情况是有node在其他线程快速push进来
                tagged_node_handle new_tail(pool.get_handle(next), tail.get_next_tag());
                tail_.compare_exchange_strong(tail, new_tail);

            } else {
                if (next_ptr == 0)
                    /* this check is not part of the original algorithm as published
                     * by michael and scott
                     *
                     * however we reuse the tagged_ptr part for the freelist and clear
                     * the next part during node allocation. we can observe a null-pointer here.
                     * */
                    continue;

                //取出赋值,这里需要注意的是Head永远是用作哨兵,实际有意义数据的节点是Head.next
                detail::copy_payload(next_ptr->data, ret);

                tagged_node_handle new_head(pool.get_handle(next), head.get_next_tag());
                //为了防止这个节点已经被其他线程pop
                if (head_.compare_exchange_weak(head, new_head)) {
                    pool.template destruct<true>(head);
                    return true;
                }
            }
        }
    }
}

Push图示

lockfreeQpop

 

点击查看 – 改写的源码

(全文结束)

 


转载文章请注明出处:漫漫路 - lanindex.com

Leave a Comment

Your email address will not be published.

1 Trackback

  1. Boost无锁队列分析与实现 | fcoding (Pingback)