GCD 中 dispatch_once 的性能与实现

关于 dispatch_once 的性能,国外有开发者做过性能测试,原文在此。原文在单线程和多线程情况下测试了 @synchronizeddispatch_once 实现单例的性能对比,结果如下:

Single threaded results  
-----------------------
  @synchronized: 3.3829 seconds
  dispatch_once: 0.9891 seconds

Multi threaded results  
----------------------
  @synchronized: 33.5171 seconds
  dispatch_once: 1.6648 seconds

得到的结论是 dispatch_once 在线程竞争环境下性能显著优于 @synchronized


Objective-C 中,@synchronized 是用 NSRecursiveLock 实现的,并且隐式添加一个 exception handler,如果有异常抛出,handler 会自动释放互斥锁。而 dispatch_once 之所以拥有高性能是因为它省去了锁操作,代替的是大量的原子操作,该原子操作内部不是靠 pthread 等锁来实现,而是直接利用了 lock 的汇编指令,靠底层 CPU 指令来支持的。


dispatch_once 实现单例的用法如下:

static dispatch_once_t onceToken;  
dispatch_once(&onceToken, ^{  
    //some one-time task
});

onceToken 参数是 dispatch_once 实现中的关键所在,它保证了后面的 block 只被执行一遍。我们通过一个简单的单例实现来看看 onceTokendispatch_once 运行过程中的变化:

- (void)singletonObject
{
    static dispatch_once_t onceToken;
    static NSObject *obj;
    NSLog(@"Before dispatch_once, onceToken = %ld", onceToken);
    dispatch_once(&onceToken, ^{
        NSLog(@"When dispatch_once is running onceToken = %ld", onceToken);
        obj = [[NSObject alloc]init];
    });
    NSLog(@"After dispatch_once, onceToken = %ld", onceToken);
    return obj;
}

// output: 
// Before dispatch_once, onceToken = 0
// When dispatch_once is running onceToken = 140734723410256
// After dispatch_once, onceToken = -1

通过输出我们可以发现,在 dispatch_once 执行前,onceToken 的值是 0,因为 dispatch_once_t 是由 typedef long dispatch_once_t 而来,所以在 onceToken 还没被手动赋值的情况下,0 是编译器给 onceToken 的初始化赋值。在 dispatch_once 执行过程中,onceToken 是一个很大的数字,这个值是 dispath_once 内部实现中一个局部变量的地址,并不是一个固定的值。当 dispatch_once 执行完毕,onceToken 的值被赋为 -1。

所以 dispatch_once 的实现需要满足以下三种场景的需求:

  1. dispatch_once 第一次执行,block 被调用,调用结束需标记 onceToken
  2. dispatch_once 第一次执行过程中,有其它线程执行该 dispatch_once,则其它线程的请求需要等待 dispatch_once 的第一次执行结束才能被处理。
  3. dispatch_once 第一次执行已经结束,有其它线程执行该 dispatch_once,则其它线程直接跳过 block 执行后续任务。

由于场景 1 只会发生一次,场景 2 发生的次数也是有限的,甚至根本不会发生,而场景 3 的发生次数可能是非常高的数量级,也正是影响 dispatch_once 性能的关键所在。


场景 3 的性能优化是通过 dispatch_once 的读取端来实现,其头文件中的实现如下:

#ifdef __BLOCKS__
__OSX_AVAILABLE_STARTING(__MAC_10_6,__IPHONE_4_0)  
DISPATCH_EXPORT DISPATCH_NONNULL_ALL DISPATCH_NOTHROW  
void  
dispatch_once(dispatch_once_t *predicate, dispatch_block_t block);

DISPATCH_INLINE DISPATCH_ALWAYS_INLINE DISPATCH_NONNULL_ALL DISPATCH_NOTHROW  
void  
_dispatch_once(dispatch_once_t *predicate, dispatch_block_t block)  
{
    // 告诉 CPU *predicate 等于 ~0l 的可能性非常高,
    // 这就使得 CPU 预测不进入 if 分支,提前取后续指令,译码,
    // 甚至提前计算一些结果,提高效率,
    // 场景 3 的性能优化主要在此体现
    if (DISPATCH_EXPECT(*predicate, ~0l) != ~0l) {
        dispatch_once(predicate, block);
    }
}
#undef dispatch_once
#define dispatch_once _dispatch_once
#endif

通过宏定义 #define dispatch_once _dispatch_once 可知,我们实际调用的是 _dispatch_once 方法,并且是强制 inlineDISPATCH_EXPECT__builtin_expect((x), (v)) 的宏替换,long __builtin_expect (long EXP, long C)GCC 提供的内建函数来处理分支预测,EXP 为一个整型表达式,这个内建函数的返回值也是 EXPC 为一个编译期常量。这个函数相当于告诉编译器,EXP == C 的可能性非常高,其作用是帮助编译器判断条件跳转的预期值,编译器会产生相应的代码来优化 CPU 执行效率,CPU 遇到条件转移指令时会提前预测并装载某个分支的指令,避免跳转造成时间乱费,但并没有改变其对真值的判断,如果分支预测错了,就会丢弃之前的指令,从正确的分支重新开始执行。


至于场景 1 和 场景 2 的需求则是在 dispatch_once 的写入端来保证,实现如下:

struct _dispatch_once_waiter_s {  
    volatile struct _dispatch_once_waiter_s *volatile dow_next;
    // _dispatch_thread_semaphore_t 是 unsigned long 类型的别名,用来表示信号量
    _dispatch_thread_semaphore_t dow_sema;
};

// 将 DISPATCH_ONCE_DONE 定义为 _dispatch_once_waiter_s 类型的指针,
// ~0l 是 long 的 0 取反,也就是一大堆 1(输出为 -1),是个无效的指针,
// 即指向的地址不可能为一个有效的 _dispatch_once_waiter_s 类型,
// 用来标记 onceToken,表示 dispatch_once 第一次执行已经完成
#define DISPATCH_ONCE_DONE ((struct _dispatch_once_waiter_s *)~0l)

#ifdef __BLOCKS__
void  
dispatch_once(dispatch_once_t *val, dispatch_block_t block)  
{
    // dispatch_block_t 的类型定义:typedef void (^dispatch_block_t)(void)
    struct Block_basic *bb = (void *)block;
    // 执行 block 最终是调用 C 函数
    dispatch_once_f(val, block, (void *)bb->Block_invoke);
}
#endif

// val 即外部传入的 &onceToken,ctxt 传入指向 block 的指针,可取到 block 上下文,
// dispatch_function_t 的类型定义:typedef void (*dispatch_function_t)(void *)
// func 是 block 内部的函数指针,指向函数执行体,执行它就是执行 block
DISPATCH_NOINLINE  
void  
dispatch_once_f(dispatch_once_t *val, void *ctxt, dispatch_function_t func)  
{
    // volatile 是一个类型修饰符,用来修饰被不同线程访问和修改的变量,
    // 遇到这个关键字声明的变量,编译器对访问该变量的代码就不再进行优化,
    // 优化器在用到这个变量时必须重新从它所在的内存读取数据,而不是使用保存在寄存器里的备份
    struct _dispatch_once_waiter_s * volatile *vval =
            (struct _dispatch_once_waiter_s**)val;
    // dow 意为 dispatch_once waiter
    struct _dispatch_once_waiter_s dow = { NULL, 0 };
    struct _dispatch_once_waiter_s *tail, *tmp;
    _dispatch_thread_semaphore_t sema;

    // dispatch_atomic_cmpxchg 是原子比较交换函数 __sync_bool_compare_and_swap 的宏替换,
    // 原理是大致如下(真正的实现并非如此):
    //     if(*vval == NULL)
    //     {
    //         *vval = &dow;
    //         return true;
    //     }
    //     else
    //     {
    //         return false;
    //     }
    // 当 dispatch_once 第一次执行时,*vval 为 0,
    // 则 *vval 被值赋值为 &dow 并返回 true,
    // 此时 *vval 的值是类似上文中的 140734723410256
    if (dispatch_atomic_cmpxchg(vval, NULL, &dow)) {
        // 空的宏替换,什么都不做
        dispatch_atomic_acquire_barrier();
        // _dispatch_client_callout 实际上就是调用了func,执行了 block,即初始化并写入 obj
        _dispatch_client_callout(ctxt, func);

        dispatch_atomic_maximally_synchronizing_barrier();

        // dispatch_atomic_xchg 原子交换函数 __sync_swap 的宏替换,
        // 执行的操作是:
        //         temp = *vval;
        //         *vval = DISPATCH_ONCE_DONE;
        tmp = dispatch_atomic_xchg(vval, DISPATCH_ONCE_DONE);
        tail = &dow;
        // 若在 block 执行过程中,没有其它线程进入线程等待分支来等待,
        // 则 *vval == &dow,即 tmp == &dow,while 循环不会被执行,分支结束,
        // 若有其它线程进入线程等待分支来等待,那么会构造一个信号量链表,
        // *vval 变为信号量链的头部,&dow 为链表的尾部,
        // 则在此 while 循环中,遍历链表来 signal 每个信号量
        while (tail != tmp) {
            // 因为线程等待分支会中途将 val(即 *vval)赋值为 &dow,
            // 然后再为 val->dow_next 赋值,
            // 在 val->dow_next 赋值之前其值为 NULL,需要等待,
            // pause 就像 nop,延迟空等,主要是提高性能和节省 CPU 耗电
            while (!tmp->dow_next) {
                _dispatch_hardware_pause();
            }
            sema = tmp->dow_sema;
            tmp = (struct _dispatch_once_waiter_s*)tmp->dow_next;
            _dispatch_thread_semaphore_signal(sema);
        }
    } else {
        dow.dow_sema = _dispatch_get_thread_semaphore();
        for (;;) {
            tmp = *vval;
            // 如果发现 *vval 已经为 DISPATCH_ONCE_DONE,则直接break,
            // 然后调用 _dispatch_put_thread_semaphore 销毁信号量
            if (tmp == DISPATCH_ONCE_DONE) {
                break;
            }
            // 空的宏替换,什么都不做
            dispatch_atomic_store_barrier();
            // 如果 *vval 不为 DISPATCH_ONCE_DONE,则进行原子比较并交换操作,
            // 如果期间有其它线程同时进入线程等待分支并交错修改链表,则可能导致 *vval != tmp,
            // 则 for 循环重新开始,重新获取一次 vval 来进行同样的操作,
            // 若 *vval == tmp,则将 *vval 赋值为 &dow,
            // 接着执行 dow.dow_next = tmp 增加链表节点,然后等待信号量,
            // 当 block 执行分支完成并遍历链表来 signal 时,结束等待往下执行
            if (dispatch_atomic_cmpxchg(vval, tmp, &dow)) {
                dow.dow_next = tmp;
                _dispatch_thread_semaphore_wait(dow.dow_sema);
            }
        }
        _dispatch_put_thread_semaphore(dow.dow_sema);
    }
}


由于 CPU 的流水线特性,有一种边缘状况可能出现。如下图所示,假如线程 a 在初始化并写入 obj 尚未完成时,线程 b 读取了 obj,则此时 objnil,而线程 b 在线程 apredicateDISPATCH_ONCE_DONE 之后读取 predicate,线程 b 会认为 obj 初始化已经完成,将空的 obj 返回,那么接下来关于 obj 函数调用可能会导致程序崩溃。

multiAccessRisk

假如写入端能在 初始化并写入 objpredicateDISPATCH_ONCE_DONE 之间等待足够长的时间,即满足 Ta > Tb,那上述的问题就都解决了。因此 dispatch_once 在执行了 block 之后,会调用 dispatch_atomic_maximally_synchronizing_barrier() 宏函数,在 intel 处理器上,这个函数编译出的是 cpuid 指令,并强制将指令流串行化,在其他厂商处理器上,这个宏函数编译出的是合适的其它指令,这些指令都将耗费可观数量的 CPU 时钟周期,以保证 Ta > Tb


最后感慨一下,为了榨取 CPU 性能,dispatch_once 简直是太抠了!