Flume Avro source 解压异常的解决方法

最近接入一个新的日志,需要导入历史数据。数据分析师跟我说的时候我爽快地答应了,不就是ETL么,没想到在调用flume接口上卡了一个星期,今天查flume源码终于才搞定。虽然周末都花费在debug上,不过第一次算是解决了一个jira上的issuse(尽管已经关闭),还是很开心的啦。

程序流程是这样的:从一个http server的文件列表下载数据文件,然后逐行ETL,发送至flume的avro source。Flume的版本是1.6.0。我简单实现之后,在发送flume数据时,出现如下错误:

服务端异常栈信息

显然是netty报出的解压错误。查看flume agent配置,发现的确是配了deflate压缩。如果是source端有压缩选项,sdk应该也有相应的压缩方法。出乎意料的是找遍FlumeDeveloperGuide都没有提到deflate压缩。估计是avro比较少用,毕竟它多用于hadoop内部通讯,其它案例好像真没怎么听过。然后RpcClient这个类也并没有设置压缩的方法。

好吧,既然官方没有提供压缩,那就自己实现。然而使用了 java.util.zip.Deflater 压缩,各种level都尝试过之后,问题依旧。

我在测试服另起了一个agent,开启flume debug模式并关闭压缩,发现收到的Avro的数据格式如下:

  • 未压缩: {“headers”:{},”body”:{“bytes”:”明文”}}
  • 压缩: {“headers”:{},”body”:{“bytes”:”序列化后的字节数组”}}

显然这是先压缩再包装,但估计avro source是先解压再解包,因为解压是在netty实现,而netty肯定是最先接触数据的。所以想要自己实现压缩,不能只压缩bytes里面的内容,而是要压缩完整的一条数据。这样的话就没法在sdk的框架之内实现这个压缩了。

于是我决定上google求救,很快就给我搜到了一个与我情况相同的case,就是我前面提到的Flume-2370,可惜的是当时并没有解决方案,这个issue也不了了之。其中提到

I found if I changed ‘deflate’ for ‘gzip’ it works。

其实只是没有启用压缩,因为avro source只支持deflate,gzip是无效参数。

看来必须查源码才可以解决了。一开始我在flume-ng-core中找AvroSource相关的类,包括ZlibUtil、ZlibDecoder等实现,最大的收获只是-3的返回码对应了DATA_ERROR。后来我转念一想,其实问题不在于source,而在于avro客户端。这么一来思路就清晰了很多,我直奔flume-sdk,找到了 org.apache.flume.api.RpcClientConfigurationConstants,这不就是RpcClient的配置参数么?仔细一看,终于被我找到配置的方法:

1
2
3
4
5
6
7
8
/**
* The following are const for the NettyAvro Client。 To enable compression
* and set a compression level
*/
public static final String CONFIG_COMPRESSION_TYPE = "compression-type";
public static final String CONFIG_COMPRESSION_LEVEL = "compression-level";
public static final int DEFAULT_COMPRESSION_LEVEL = 6;
`

再看工厂类RpcClientFactory,getInstance()方法是可以设置properties的,于是我将客户端初始化代码改成如下:

1
2
3
4
5
6
Properties props = new Properties();
props.put("hosts","h1");
props.put("hosts.h1",socket);
props.put("compression-type","deflate");
props.put("compression-level","6");
RpcClient client = RpcClientFactory.getInstance(props);

测试发送,成功!

本文是原创文章,转载请注明:时间与精神的小屋 - Flume Avro source 解压异常的解决方法