最近接入一个新的日志,需要导入历史数据。数据分析师跟我说的时候我爽快地答应了,不就是ETL么,没想到在调用flume接口上卡了一个星期,今天查flume源码终于才搞定。虽然周末都花费在debug上,不过第一次算是解决了一个jira上的issuse(尽管已经关闭),还是很开心的啦。
程序流程是这样的:从一个http server的文件列表下载数据文件,然后逐行ETL,发送至flume的avro source。Flume的版本是1.6.0。我简单实现之后,在发送flume数据时,出现如下错误:
显然是netty报出的解压错误。查看flume agent配置,发现的确是配了deflate压缩。如果是source端有压缩选项,sdk应该也有相应的压缩方法。出乎意料的是找遍FlumeDeveloperGuide都没有提到deflate压缩。估计是avro比较少用,毕竟它多用于hadoop内部通讯,其它案例好像真没怎么听过。然后RpcClient这个类也并没有设置压缩的方法。
好吧,既然官方没有提供压缩,那就自己实现。然而使用了 java.util.zip.Deflater 压缩,各种level都尝试过之后,问题依旧。
我在测试服另起了一个agent,开启flume debug模式并关闭压缩,发现收到的Avro的数据格式如下:
- 未压缩: {“headers”:{},”body”:{“bytes”:”明文”}}
- 压缩: {“headers”:{},”body”:{“bytes”:”序列化后的字节数组”}}
显然这是先压缩再包装,但估计avro source是先解压再解包,因为解压是在netty实现,而netty肯定是最先接触数据的。所以想要自己实现压缩,不能只压缩bytes里面的内容,而是要压缩完整的一条数据。这样的话就没法在sdk的框架之内实现这个压缩了。
于是我决定上google求救,很快就给我搜到了一个与我情况相同的case,就是我前面提到的Flume-2370,可惜的是当时并没有解决方案,这个issue也不了了之。其中提到
I found if I changed ‘deflate’ for ‘gzip’ it works。
其实只是没有启用压缩,因为avro source只支持deflate,gzip是无效参数。
看来必须查源码才可以解决了。一开始我在flume-ng-core中找AvroSource相关的类,包括ZlibUtil、ZlibDecoder等实现,最大的收获只是-3的返回码对应了DATA_ERROR。后来我转念一想,其实问题不在于source,而在于avro客户端。这么一来思路就清晰了很多,我直奔flume-sdk,找到了 org.apache.flume.api.RpcClientConfigurationConstants,这不就是RpcClient的配置参数么?仔细一看,终于被我找到配置的方法:
再看工厂类RpcClientFactory,getInstance()方法是可以设置properties的,于是我将客户端初始化代码改成如下:
测试发送,成功!
本文是原创文章,转载请注明:时间与精神的小屋 - Flume Avro source 解压异常的解决方法