大批量任务处理总结

10:47:00 AM 0 Comments

最近几天在做一个集群间数据迁移的任务,要做的事很简单,就是给定一个任务文件,文件中每一行对应一个source:dest形式的迁移任务(source和dest均为文件名),任务数在千万级别。要做的事情其实很简单,读取每一行,解析出source和dest,并根据给定的集群信息从源集群读取source,并写到目标集群的dest。

经历了写程序、执行任务、分析日志、改程序、再执行子任务...等多次折磨之后,发现自己在处理任务的过程中走了很多弯路,原因是刚开始没有意识到问题的复杂性,处理的方式太过简单,这里说下我对这种大批量任务处理的一些心得。

大批量任务处理主要从以下两个方面来考虑:性能和正确性。

性能主要通过任务执行的时间来衡量,主要是通过优化每条任务的执行时间并行处理实现。本次数据迁移的任务,每次迁移需要从源集群读取文件,并且写到目标集群,由于读写都是通过客户端接口实现,故能优化的地方仅仅是在读的时候合理的利用好客户端cache;文件名字空间是扁平化的,文件之间看不出任何的联系,也无法通过重新组织任务顺序的方式来进行优化。由于子任务之间没有任何依赖和联系,迁移任务是非常适合采用多进程或多线程的方式并行处理的,1000w条任务,每秒大约能处理30个任务(可理解为一次随机读和一次随机写的时间),单进程(单线程)顺序处理时间约为3.9天,如果10个进程(线程)同时处理,则处理时间约为10个小时。为了减少编码工作量,我采用的方式是编写一个简单的单线程读写程序,在外围将待处理任务文件分成10个子文件,启动10个进程分别处理各个子文件。这里划分成10个任务的原因:(1)方便计算;(2)10个小时的处理时间能够接受,刚好睡一觉程序就跑完了;(3)10个进程并行,网卡也快跑满了。

接下来说说正确性,其实这块是最不好处理的,每一条迁移任务执行失败可能有很多原因,如任务描述不合法、读取源(这里又可细分为多个阶段)失败,写目标失败(这里也可细分),对于发生错误的情况,有些错误时必然错误,如给定的源或目标不符合规则、源文件不存在等;有些错误可能只是偶然,如读取某个文件,只成功读到一部分,这种错误通过重新执行任务可以避免;还有些错误可能是由于程序(工具程序、甚至是客户端库)的bug造成,通过修改程序可避免这类错误。

通过日志的方式可以区分以上提到的不同情况,这几天发现打印日志其实是很需要技巧的,并不是随便输出点信息就叫打了日志(这里不讨论日志分级别打印的情况,只讨论日志内容输出的技巧)。

首先对于描述错误信息的日志,必须能从日志中迅速定位出错误位置及原因,在日志内容上的描述上,日志内容需要能很方便喂给grep、awk等工具分析,从而避免再去写日志分析工具。另外日志中最好详尽的描述错误的任务信息,这样再二次处理的时候就不用再去从任务文件中分析出错误的任务,我刚开始写迁移程序的时候,在读不到source的时候,只打印了source的信息,结果要二次处理这些任务项时,还另外写了一个python脚本,把这些项从任务文件分析出来,而如果把source:dest,只需awk下就能从日志中得到错误的任务,重新处理。

对于本文提到的数据迁移任务,我总结了一套行之有效的日志打印方法:

1. 区分错误日志和结果日志,错误日志记录发生错误时的具体信息,结果日志为任务执行的具体结果(成功还是失败)。处理一个任务可能对应很多条(包括使用到的库打印的错误日志),但每个任务只对一个一条结果日志,结果日志最好包含任务描述的所有信息;

2. 将任务划分成多个阶段,该信息在任务处理过程中不断更新,如果某个任务出错,在结果日志中打印任务执行到的阶段和错误信息(错误码),通过阶段信息快速定位错误位置,通过错误描述信息(错误码)进行错误分类。

3. 将错误日志和结果日志输出到不同的位置,简单的实现可将错误和结果日志分别输出到stderr和stdout,并将stderr和stdour重定向到不同的文件。

通过结果日志,可以将执行完的任务按照错误信息快速分类(grep),对不同的错误情况进行不同的处理;而当需要确切知道出错的具体原因时,则可分析错误日志。

最后,如果对迁移后的数据正确性要求很高,可在迁移过程中做下crc或md5检查,或是编写额外的check工具进行全面的检查工作,而这个工作的性质与迁移的工作非常类似。

nginx中锁的设计以及惊群的处理

10:46:00 AM 0 Comments

nginx中使用的锁是自己来实现的,这里锁的实现分为两种情况,一种是支持原子操作的情况,也就是由NGX_HAVE_ATOMIC_OPS这个宏来进行控制的,一种是不支持原子操作,这是是使用文件锁来实现。

首先我们要知道在用户空间进程间锁实现的原理,起始原理很简单,就是能弄一个让所有进程共享的东西,比如mmap的内存,比如文件,然后通过这个东西来控制进程的互斥。

说起来锁很简单,就是共享一个变量,然后通过设置这个变量来控制进程的行为。

我们先来看核心的数据结构,也就是说用来控制进程的互斥的东西。

这个数据结构可以看到和我上面讲得一样,那就是通过宏来分成两种。

1 如果支持原子操作,则我们可以直接使用mmap,然后lock就保存mmap的内存区域的地址

2 如果不支持原子操作,则我们使用文件锁来实现,这里fd表示进程间共享的文件句柄,name表示文件名。

Java代码 收藏代码
  1. typedef struct {
  2. #if (NGX_HAVE_ATOMIC_OPS)
  3. ngx_atomic_t *lock;
  4. #else
  5. ngx_fd_t fd;
  6. u_char *name;
  7. #endif
  8. } ngx_shmtx_t;


接着来看代码,先来看支持原子操作的情况下的实现方式。这里要注意下,下面的函数基本都会有两个实现,一个是支持原子操作,一个是不支持的,我这里全部都是分开来分析的。

先来看初始化,初始化代码在ngx_event_module_init中。

下面这段代码是设置将要设置的共享区域的大小,这里cl的大小最好是要大于或者等于cache line。
通过代码可以看到这里将会有3个区域被所有进程共享,其中我们的锁将会用到的是第一个。
Java代码 收藏代码
  1. size_t size, cl;
  2. cl = 128;
  3. //可以看到三个区域。
  4. size = cl /* ngx_accept_mutex */
  5. + cl /* ngx_connection_counter */
  6. + cl; /* ngx_temp_number */



下面这段代码是初始化对应的共享内存区域。然后保存对应的互斥体指针。
Java代码 收藏代码
  1. //这个是一个全局变量,保存的是共享区域的指针。
  2. ngx_atomic_t *ngx_accept_mutex_ptr;
  3. //这个就是我们上面介绍的互斥体。
  4. ngx_shmtx_t ngx_accept_mutex;
  5. ngx_shm_t shm;
  6. //开始初始化
  7. shm.size = size;
  8. shm.name.len = sizeof("nginx_shared_zone");
  9. shm.name.data = (u_char *) "nginx_shared_zone";
  10. shm.log = cycle->log;
  11. //分配对应的内存,使用mmap或者shm之类的。
  12. if (ngx_shm_alloc(&shm) != NGX_OK) {
  13. return NGX_ERROR;
  14. }
  15. shared = shm.addr;
  16. ngx_accept_mutex_ptr = (ngx_atomic_t *) shared;
  17. //初始化互斥体。
  18. if (ngx_shmtx_create(&ngx_accept_mutex, shared, cycle->lock_file.data)
  19. != NGX_OK)
  20. {
  21. return NGX_ERROR;
  22. }


下面我们来看ngx_shmtx_create的实现。
可以看到如果支持原子操作的话,非常简单,就是将共享内存的地址付给loc这个域。
Java代码 收藏代码
  1. ngx_int_t
  2. ngx_shmtx_create(ngx_shmtx_t *mtx, void *addr, u_char *name)
  3. {
  4. mtx->lock = addr;
  5. return NGX_OK;
  6. }


然后来看nginx中如何来获得锁,以及释放锁。

我们先来看获得锁。

这里nginx分为两个函数,一个是trylock,它是非阻塞的,也就是说它会尝试的获得锁,如果没有获得的话,它会直接返回错误。

而第二个是lock,它也会尝试获得锁,而当没有获得他不会立即返回,而是开始进入循环然后不停的去获得锁,知道获得。不过nginx这里还有用到一个技巧,就是每次都会让当前的进程放到cpu的运行队列的最后一位,也就是自动放弃cpu。

先来看trylock

这个很简单,首先判断lock是否为0,为0的话表示可以获得锁,因此我们就调用ngx_atomic_cmp_set去获得锁,如果获得成功就会返回1,负责为0.

Java代码 收藏代码
  1. static ngx_inline ngx_uint_t
  2. ngx_shmtx_trylock(ngx_shmtx_t *mtx)
  3. {
  4. return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
  5. }


接下来详细描述下ngx_atomic_cmp_set,这里这个操作是一个原子操作,这是因为由于我们要进行比较+赋值两个操作,如果不是原子操作的话,有可能在比较之后被其他进程所抢占,此时再赋值的话就会有问题了,因此这里就必须是一个原子操作。

我们来看这个函数的实现,如果系统库不支持这个指令的话,nginx自己还用汇编实现了一个,其实实现也很简单,比如x86的话有一个cmpxchgl的指令,就是做这个的。

先来看如果系统库支持的情况,此时直接调用OSAtomicCompareAndSwap32Barrier。

Java代码 收藏代码
  1. #define ngx_atomic_cmp_set(lock, old, new) \
  2. OSAtomicCompareAndSwap32Barrier(old, new, (int32_t *) lock)


来看函数的原型:
Java代码 收藏代码
  1. OSAtomicCompareAndSwap32Barrier(old, new, addr)


然后这个函数翻译成伪码的话就是这个:

Java代码 收藏代码
  1. f (*addr == oldvalue) {
  2. *addr = newvalue;
  3. return 1;
  4. } else {
  5. return 0;
  6. }


这个代码就不解释了,很浅显易懂。

因此上面的trylock的代码:
Java代码 收藏代码
  1. ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid)
的意思就是如果lock的值是0的话,就把lock的值修改为当前的进程id,否则返回失败。

然后来看这个的汇编实现,这里nginx实现了多个平台的比如x86,sparc,ppc.
我们来看x86的:

Java代码 收藏代码
  1. static ngx_inline ngx_atomic_uint_t
  2. ngx_atomic_cmp_set(ngx_atomic_t *lock, ngx_atomic_uint_t old,
  3. ngx_atomic_uint_t set)
  4. {
  5. u_char res;
  6. __asm__ volatile (
  7. NGX_SMP_LOCK
  8. " cmpxchgl %3, %1; "
  9. " sete %0; "
  10. : "=a" (res) : "m" (*lock), "a" (old), "r" (set) : "cc", "memory");
  11. return res;
  12. }


具体的这些指令和锁可以去看intel的相关手册。

接下来来看lock的实现,lock最终会调用ngx_spinlock,因此下面我要主要来分析这个函数。
Java代码 收藏代码
  1. #define ngx_shmtx_lock(mtx) ngx_spinlock((mtx)->lock, ngx_pid, 1024)


我们来看spinklock,必须支持原子指令,才会有这个函数,这里nginx采用宏来控制的.

这里和trylock的处理差不多,都是利用原子指令来实现的,只不过这里如果无法获得锁,则会继续等待。

我们来看代码的实现:

Java代码 收藏代码
  1. void
  2. ngx_spinlock(ngx_atomic_t *lock, ngx_atomic_int_t value, ngx_uint_t spin)
  3. {
  4. #if (NGX_HAVE_ATOMIC_OPS)
  5. ngx_uint_t i, n;
  6. for ( ;; ) {
  7. //如果lock为0,则说明没有进程持有锁,因此设置lock为value(为当前进程id),然后返回。
  8. if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) {
  9. return;
  10. }
  11. //如果cpu个数大于1(也就是多核),则进入spin-wait loop阶段。
  12. if (ngx_ncpu > 1) {
  13. //开始进入循环。
  14. for (n = 1; n < spin; n <<= 1) {
  15. //下面这段就是纯粹的spin-loop wait。
  16. for (i = 0; i < n; i++) {
  17. //这个函数其实就是执行"PAUSE"指令,接下来会解释这个指令。
  18. ngx_cpu_pause();
  19. }
  20. //然后重新获取锁,如果获得则直接返回。
  21. if (*lock == 0 && ngx_atomic_cmp_set(lock, 0, value)) {
  22. return;
  23. }
  24. }
  25. }
  26. //这个函数调用的是sched_yield,它会强迫当前运行的进程放弃占有处理器。
  27. ngx_sched_yield();
  28. }
  29. #else
  30. #if (NGX_THREADS)
  31. #error ngx_spinlock() or ngx_atomic_cmp_set() are not defined !
  32. #endif
  33. #endif
  34. }


通过上面的代码可以看到spin lock实现的很简单,就是一个如果无法获得锁,就进入忙等的过程,不过这里nginx还多加了一个处理,就是如果忙等太长,就放弃cpu,直到下次任务再次占有cpu。

接下来来看下PAUSE指令,这条指令主要的功能就是告诉cpu,我现在是一个spin-wait loop,然后cpu就不会因为害怕循环退出时,内存的乱序而需要处理,所引起的效率损失问题。

下面就是intel手册的解释:

引用

Improves the performance of spin-wait loops. When executing a “spin-wait loop,” a
Pentium 4 or Intel Xeon processor suffers a severe performance penalty when exiting
the loop because it detects a possible memory order violation. The PAUSE instruction
provides a hint to the processor that the code sequence is a spin-wait loop. The
processor uses this hint to avoid the memory order violation in most situations,
which greatly improves processor performance. For this reason, it is recommended
that a PAUSE instruction be placed in all spin-wait loops.


内核的spin lock也有用到这条指令的。

接下来就是unlokck。unlock比较简单,就是和当前进程id比较,如果相等,就把lock改为0,说明放弃这个锁。

Java代码 收藏代码
  1. #define ngx_shmtx_unlock(mtx) (void) ngx_atomic_cmp_set((mtx)->lock, ngx_pid, 0)


然后就是不支持原子操作的情况,此时使用文件锁来实现的,这里就不介绍这种实现了,基本原来和上面的差不多,想要了解的,可以去看nginx的相关代码。

接下来我们来看nginx如何利用lock来控制子进程的负载均衡以及惊群。

先来大概解释下这两个概念。

负载均衡是为了解决有可能一个进程处理了多个连接,因此就需要让多个进程更平均的处理连接。

惊群也就是当我们多个进程阻塞在epoll这类调用的时候,当有数据可读的时候,多个进程会被同时唤醒,此时如果去accept的话,只能有一个进程accept到句柄。

在看代码之前,我们先来看ngx_use_accept_mutex这个变量,如果有这个变量,说明nginx有必要使用accept互斥体,这个变量的初始化在ngx_event_process_init中。

这里还有两个变量,一个是ngx_accept_mutex_held,一个是ngx_accept_mutex_delay,其中前一个表示当前是否已经持有锁,后一个表示,当获得锁失败后,再次去请求锁的间隔时间,这个时间可以看到可以在配置文件中设置的。

Java代码 收藏代码
  1. //如果使用了master worker,并且worker个数大于1,并且配置文件里面有设置使用accept_mutex.的话,设置ngx_use_accept_mutex
  2. if (ccf->master && ccf->worker_processes > 1 && ecf->accept_mutex) {
  3. ngx_use_accept_mutex = 1;
  4. //下面这两个变量后面会解释。
  5. ngx_accept_mutex_held = 0;
  6. ngx_accept_mutex_delay = ecf->accept_mutex_delay;
  7. } else {
  8. ngx_use_accept_mutex = 0;
  9. }



这里还有一个变量是ngx_accept_disabled,这个变量是一个阈值,如果大于0,说明当前的进程处理的连接过多。
下面就是这个值的初始化,可以看到初始值是全部连接的7/8(注意是负值0.

Java代码 收藏代码
  1. ngx_accept_disabled = ngx_cycle->connection_n / 8
  2. - ngx_cycle->free_connection_n;


然后来看ngx_process_events_and_timers中的处理。

Java代码 收藏代码
  1. //如果有使用mutex,则才会进行处理。
  2. if (ngx_use_accept_mutex) {
  3. //如果大于0,则跳过下面的锁的处理,并减一。
  4. if (ngx_accept_disabled > 0) {
  5. ngx_accept_disabled--;
  6. } else {
  7. //试着获得锁,如果出错则返回。
  8. if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
  9. return;
  10. }
  11. //如果ngx_accept_mutex_held为1,则说明已经获得锁,此时设置flag,这个flag后面会解释。
  12. if (ngx_accept_mutex_held) {
  13. flags |= NGX_POST_EVENTS;
  14. } else {
  15. //否则,设置timer,也就是定时器。接下来会解释这段。
  16. if (timer == NGX_TIMER_INFINITE
  17. || timer > ngx_accept_mutex_delay)
  18. {
  19. timer = ngx_accept_mutex_delay;
  20. }
  21. }
  22. }
  23. }


然后先来看NGX_POST_EVENTS标记,设置了这个标记就说明当socket有数据被唤醒时,我们并不会马上accept或者说读取,而是将这个事件保存起来,然后当我们释放锁之后,才会进行accept或者读取这个句柄。

Java代码 收藏代码
  1. //如果ngx_posted_accept_events不为NULL,则说明有accept event需要nginx处理。
  2. if (ngx_posted_accept_events) {
  3. ngx_event_process_posted(cycle, &ngx_posted_accept_events);
  4. }


而如果没有设置NGX_POST_EVENTS标记的话,nginx会立即accept或者读取句柄。

然后是定时器,这里如果nginx没有获得锁,并不会马上再去获得锁,而是设置定时器,然后在epoll休眠(如果没有其他的东西唤醒).此时如果有连接到达,当前休眠进程会被提前唤醒,然后立即accept。否则,休眠 ngx_accept_mutex_delay时间,然后继续try lock.

最后是核心的一个函数,那就是ngx_trylock_accept_mutex。这个函数用来尝试获得accept mutex.

Java代码 收藏代码
  1. ngx_int_t
  2. ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
  3. {
  4. //尝试获得锁
  5. if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
  6. //如果本来已经获得锁,则直接返回Ok
  7. if (ngx_accept_mutex_held
  8. && ngx_accept_events == 0
  9. && !(ngx_event_flags & NGX_USE_RTSIG_EVENT))
  10. {
  11. return NGX_OK;
  12. }
  13. //到达这里,说明重新获得锁成功,因此需要打开被关闭的listening句柄。
  14. if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
  15. ngx_shmtx_unlock(&ngx_accept_mutex);
  16. return NGX_ERROR;
  17. }
  18. ngx_accept_events = 0;
  19. //设置获得锁的标记。
  20. ngx_accept_mutex_held = 1;
  21. return NGX_OK;
  22. }
  23. //如果我们前面已经获得了锁,然后这次获得锁失败,则说明当前的listen句柄已经被其他的进程锁监听,因此此时需要从epoll中移出调已经注册的listen句柄。这样就很好的控制了子进程的负载均衡
  24. if (ngx_accept_mutex_held) {
  25. if (ngx_disable_accept_events(cycle) == NGX_ERROR) {
  26. return NGX_ERROR;
  27. }
  28. //设置锁的持有为0.
  29. ngx_accept_mutex_held = 0;
  30. }
  31. return NGX_OK;
  32. }


这里可以看到大部分情况下,每次只会有一个进程在监听listen句柄,而只有当ngx_accept_disabled大于0的情况下,才会出现一定程度的惊群。

而nginx中,由于锁的控制(以及获得锁的定时器),每个进程都能相对公平的accept句柄,也就是比较好的解决了子进程负载均衡。