大批量任务处理总结

10:47:00 AM 0 Comments

最近几天在做一个集群间数据迁移的任务,要做的事很简单,就是给定一个任务文件,文件中每一行对应一个source:dest形式的迁移任务(source和dest均为文件名),任务数在千万级别。要做的事情其实很简单,读取每一行,解析出source和dest,并根据给定的集群信息从源集群读取source,并写到目标集群的dest。

经历了写程序、执行任务、分析日志、改程序、再执行子任务...等多次折磨之后,发现自己在处理任务的过程中走了很多弯路,原因是刚开始没有意识到问题的复杂性,处理的方式太过简单,这里说下我对这种大批量任务处理的一些心得。

大批量任务处理主要从以下两个方面来考虑:性能和正确性。

性能主要通过任务执行的时间来衡量,主要是通过优化每条任务的执行时间并行处理实现。本次数据迁移的任务,每次迁移需要从源集群读取文件,并且写到目标集群,由于读写都是通过客户端接口实现,故能优化的地方仅仅是在读的时候合理的利用好客户端cache;文件名字空间是扁平化的,文件之间看不出任何的联系,也无法通过重新组织任务顺序的方式来进行优化。由于子任务之间没有任何依赖和联系,迁移任务是非常适合采用多进程或多线程的方式并行处理的,1000w条任务,每秒大约能处理30个任务(可理解为一次随机读和一次随机写的时间),单进程(单线程)顺序处理时间约为3.9天,如果10个进程(线程)同时处理,则处理时间约为10个小时。为了减少编码工作量,我采用的方式是编写一个简单的单线程读写程序,在外围将待处理任务文件分成10个子文件,启动10个进程分别处理各个子文件。这里划分成10个任务的原因:(1)方便计算;(2)10个小时的处理时间能够接受,刚好睡一觉程序就跑完了;(3)10个进程并行,网卡也快跑满了。

接下来说说正确性,其实这块是最不好处理的,每一条迁移任务执行失败可能有很多原因,如任务描述不合法、读取源(这里又可细分为多个阶段)失败,写目标失败(这里也可细分),对于发生错误的情况,有些错误时必然错误,如给定的源或目标不符合规则、源文件不存在等;有些错误可能只是偶然,如读取某个文件,只成功读到一部分,这种错误通过重新执行任务可以避免;还有些错误可能是由于程序(工具程序、甚至是客户端库)的bug造成,通过修改程序可避免这类错误。

通过日志的方式可以区分以上提到的不同情况,这几天发现打印日志其实是很需要技巧的,并不是随便输出点信息就叫打了日志(这里不讨论日志分级别打印的情况,只讨论日志内容输出的技巧)。

首先对于描述错误信息的日志,必须能从日志中迅速定位出错误位置及原因,在日志内容上的描述上,日志内容需要能很方便喂给grep、awk等工具分析,从而避免再去写日志分析工具。另外日志中最好详尽的描述错误的任务信息,这样再二次处理的时候就不用再去从任务文件中分析出错误的任务,我刚开始写迁移程序的时候,在读不到source的时候,只打印了source的信息,结果要二次处理这些任务项时,还另外写了一个python脚本,把这些项从任务文件分析出来,而如果把source:dest,只需awk下就能从日志中得到错误的任务,重新处理。

对于本文提到的数据迁移任务,我总结了一套行之有效的日志打印方法:

1. 区分错误日志和结果日志,错误日志记录发生错误时的具体信息,结果日志为任务执行的具体结果(成功还是失败)。处理一个任务可能对应很多条(包括使用到的库打印的错误日志),但每个任务只对一个一条结果日志,结果日志最好包含任务描述的所有信息;

2. 将任务划分成多个阶段,该信息在任务处理过程中不断更新,如果某个任务出错,在结果日志中打印任务执行到的阶段和错误信息(错误码),通过阶段信息快速定位错误位置,通过错误描述信息(错误码)进行错误分类。

3. 将错误日志和结果日志输出到不同的位置,简单的实现可将错误和结果日志分别输出到stderr和stdout,并将stderr和stdour重定向到不同的文件。

通过结果日志,可以将执行完的任务按照错误信息快速分类(grep),对不同的错误情况进行不同的处理;而当需要确切知道出错的具体原因时,则可分析错误日志。

最后,如果对迁移后的数据正确性要求很高,可在迁移过程中做下crc或md5检查,或是编写额外的check工具进行全面的检查工作,而这个工作的性质与迁移的工作非常类似。

Some say he’s half man half fish, others say he’s more of a seventy/thirty split. Either way he’s a fishy bastard.