无锁队列的实现

无锁编程,称为lock-free,是利用处理器的一些特殊原子指令来避免传统并行设计中对锁的使用,例如x86平台下对应的指令是CMPXCHG。使用无锁编程至少能够带来两个方面的好处,首先是避免锁的争用,提高性能,其次通过避免锁的使用,避免死锁。无锁队列是无锁编程中最基础的,本文在这里给出一个实现。

无锁编程基础

无锁编程最基本的一个操作是CAS(compare and swap),无锁编程的操作用C++语言描述的话如下所示,简而言之就是看看*reg里面的内容是不是oldval,如果是的话,对其复制为newval

1
2
3
4
5
6
7
8
template <typename T>
bool compare_and_swap(T *reg, T oldval, T newval) {
if (*reg = oldval) {
*reg = newval;
return true;
}
return false;
}

CAS实现是依赖编译器的:

  • GCCCAS
    GCC4.1+版本支持CAS的原子操作。

    1
    2
    bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
    type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
  • WindowsCAS

    1
    2
    3
    InterlockedCompareExchange ( __inout LONG volatile *Target,
    __in LONG Exchange,
    __in LONG Comperand);
  • C++11中的CAS
    C++11中的STLatomic类的函数可以跨平台实现CAS

    1
    2
    3
    4
    5
    6
    template< class T >
    bool atomic_compare_exchange_weak( std::atomic* obj,
    T* expected, T desired );
    template< class T >
    bool atomic_compare_exchange_weak( volatile std::atomic* obj,
    T* expected, T desired );

无锁队列的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#include <iostream>
using namespace std;
struct ListNode {
ListNode *next;
int value;
ListNode (int val) : value(val), next(NULL) { }
};
class CASQueue {
private:
ListNode *head;
ListNode *tail;
public:
CASQueue() {
head = new ListNode(0);
tail = head;
}
void push(int value) {
ListNode *node = new ListNode(value);
ListNode *p = tail;
do {
while (p->next != NULL) {
p = p->next;
}
} while (__sync_bool_compare_and_swap(&p->next, NULL, node) != true);
__sync_bool_compare_and_swap(&tail, p, node);
}
int pop() {
ListNode *node = head;
ListNode *p;
do {
p = head;
} while (__sync_bool_compare_and_swap(&head, p, p->next) != true);
int val = node->next->value;
delete node->next;
node->next = NULL;
return val;
}
};
int main() {
CASQueue queue;
queue.push(3);
queue.push(4);
cout << queue.pop() << endl;
return 0;
}

CAS的ABA问题

CASABA问题可以概括如下:

  1. 进程P1在共享变量中读到值为A
  2. P1被抢占了,进程P2执行
  3. P2把共享变量里的值从A改成了B,再改回到A,此时被P1抢占
  4. P1回来看到共享变量里的值没有被改变,于是继续执行

虽然P1以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA问题最容易发生在lock free的算法中的,CAS首当其冲,因为CAS判断的是指针的地址。如果这个地址被重用了呢,问题就很大了。(地址被重用是很经常发生的,一个内存分配后释放了,再分配,很有可能还是原来的地址)

维基百科上有一个比较通俗的比喻:

你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。


解决ABA问题

可以用double CAS,双保险CAS解决ABA问题,例如在32位系统上,我们要检查64位长的内容。

  • 一次用CAS检查双倍长度的值,前半部是指针,后半部分是一个计数器。
  • 只有这两个都一样,才算通过检查,要赋新的值。并把计数器累加1。
    这样一来,ABA发生时,虽然值一样,但是计数器就不一样(但是在32位的系统上,这个计数器会溢出回来又从1开始的,这还是会有ABA的问题)

参考文献:
无锁队列的实现
无锁编程基础