kafka消费者提交offset失败分析

8880阅读 0评论2021-02-25 zpf1218
分类:服务器与存储

问题描述

1、 测试反馈,设备有上报的数据,但管理页面显示设备状态离线

分析思路:

1、由于这个状态显示是实时计算的,之前业务判断逻辑是,当上报时间与当前系统处理时间超过15分钟,则认为设备异常。而当前设备上报是正常的,那么有可能是服务处理延迟。 

分析过程:

1、用kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9098 --describe --group sample 命令,查看kafka消息队列,发现消息队列有积压,说明消费者处理的速度,跟不上消息产生的速度。证实了之前的想法。
2、 增加消费线程的数量修改concurrency=3 ,重启服务继续观察。
3、 查看业务日志,消费者能正常处理,但消息队列的current-offset(记录当前消费的位置) 一直保持不变,且消费线程只有1个。也就表明,调整concurrency并没有生效,消费者提交offset失败,继续查看日志,发现提交offset失败日志:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
   大意是说,消费者处理消息耗时过长,超过session.timeout.ms时间,认为消费者异常,对消费group进行了重新平衡,建议减少max.poll.records(每次批处理记录数)值

解决过程:

1、 根据错误提示修改批处理每次拉取10条记录,即max.poll.records=10(默认值500条),同时调整会话超时时间session.timeout.ms=60000(默认10000ms),并增加每个业务逻辑处理耗时时长,以便找到耗时处理逻辑进行优化。
2、 更新服务后,继续观察消息队列,current-offset 在逐步增加,说明消费提交offset正常。 
3、 查看业务处理逻辑耗时时长,处理一条消息耗时大概700ms左右。继续查看,发现操作mongodb数据库耗时在680ms左右。
4、 Mongodb数据库操作由于业务需要,处理逻辑,先查询,再更新。怀疑查询比较耗时,考虑创建索引。创建索引后,发现操作数据库耗时缩减至30ms左右。一条完整的消息耗时,由700ms缩减至现在60ms左右。

总结:

1、Kafka消费者群组并发数是否生效,取决于主题topic的分区数,kafka规定在同一消费者组里,同一时刻一个分区只能有一个消费者。假设topic分区数是3,并发数 concurrency=5,则只有3个消费者线程,其余2个消费者线程不会生效。
 由于当前kafka配置num.partitions=1 ,即topic的分区数为1。所以,当调整消费并发数concurrency=3时,还是只有一个消费者线程生效,没有增加并发消费处理。
2、max.poll.records: 默认500 表示消费者批处理记录时,每次从kafka队列获取的消息条数,这一批消息处理完成,才会触发提交。
 session.timeout.ms :默认10秒 会话超时时间 如果消费者没有在session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者。
 因此,可以通减小max.poll.records值,同时增加session.timeout.ms时间来解决消费者提交offset失败问题
3、在没有建立索引的情况下,查询Mongodb数据表时,需要把数据都加载到内存,进行全表扫描。当数据量较大时,会非常耗时。建立索引以后,对索引字段进行查询时,仅会加载索引数据,并能提高查询速度。
  建立索引时,要考虑查询条件,否则建立的索引,查询时并不一定能充分利用。
  例如,本例中查询条件为:query.addCriteria(Criteria.where("sampleTime").is(sampleTime).and("deviceId").is(reportInfo.getDeviceId()).and("sampleType").is(frequencyEnums.ordinal()));
  所以,建立索引如下:
db.getCollection('sample').createIndex({"sampleTime": -1,"deviceId": 1, "sampleType":1},{"name":'idx_deviceId_time'});
   建立索引时,考虑以下几个因素:

上一篇:检查java进程占用内存
下一篇:关于kafka消费者处理消息异常实验