kafka消息消费异常会如何处理? | 开发者工具论坛-大发黄金版app下载
1.前言
当我们使用@kafkalistener注解声明一个消费者时,该消费者就会轮询去拉取对应分区消息记录,消费消息记录,正如你所知道的那样,正常场景下会执行ack操作,提交offset到kafka服务器。但是异常场景下会如何执行,不知你是否也了解?在了解之前,先一起来看下异常处理器,看完之后想必会有所收获。
2.异常处理器
2.1 创建异常处理器
在实例化org.springframework.kafka.listener.kafkamessagelistenercontainer.listenerconsumer#listenerconsumer的时候如果没有自定义异常处理器,会去创建seektocurrenterrorhandler作为默认异常处理器使用
typescript
复制代码
protected errorhandler determineerrorhandler(genericerrorhandler<?> errhandler) {
    return errhandler != null ? (errorhandler) errhandler
              : this.transactionmanager != null
              ? null : new seektocurrenterrorhandler();
}2.2 声明补偿策略
在构建默认异常处理器seektocurrenterrorhandler时会指定对应的补偿策略
csharp
复制代码
/**
 * construct an instance with the default recoverer which simply logs the record after
 * {@value seekutils#default_max_failures} (maxfailures) have occurred for a
 * topic/partition/offset, with the default back off (9 retries, no delay).
 * [@since](https://learnku.com/users/65735) 2.2
 */
public seektocurrenterrorhandler() {
    this(null, seekutils.default_back_off);
}从代码注释上我们可以了解到该补偿策略会进行9次无时间间隔重试
2.3 调用异常处理器
scss
复制代码
@nullable
private runtimeexception doinvokerecordlistener(final consumerrecord<k, v> record, // nosonar
 iterator<consumerrecord<k, v>> iterator) {
 object sample = startmicrometersample();
 try {
 // 1.消费消息
 invokeonmessage(record);
 successtimer(sample);
 recordinterceptafter(record, null);
 } catch (runtimeexception e) {
 try {
 // 2.执行异常处理器
 invokeerrorhandler(record, iterator, e);
 // 3.提交offset
 commitoffsetsifneeded(record);
 } catch (kafkaexception ke) {
 }
 }
 return null;
}这里可以看到当消息消费异常后,会调用默认异常处理器
seektocurrenterrorhandler
2.4 执行异常处理器
csharp
复制代码
public static boolean doseeks(list<consumerrecord<?, ?>> records, consumer<?, ?> consumer, exception exception,
 boolean recoverable, recoverystrategy recovery, @nullable messagelistenercontainer container,
 logaccessor logger) {
 map<topicpartition, long> partitions = new linkedhashmap<>();
 atomicboolean first = new atomicboolean(true);
 atomicboolean skipped = new atomicboolean();
 records.foreach(record -> {
 if (recoverable && first.get()) {
 try {
 // 1.判断该消息是否可重试
 boolean test = recovery.recovered(record, exception, container, consumer);
 skipped.set(test);
 }
 catch (exception ex) {
 }
 }
 if (!recoverable || !first.get() || !skipped.get()) {
 partitions.computeifabsent(new topicpartition(record.topic(), record.partition()),
 offset -> record.offset());
 }
 first.set(false);
 });
 // 2.重置分区偏移量,以便可以重复拉取异常消息
 seekpartitions(consumer, partitions, logger);
 return skipped.get();
}异常处理器执行过程:
判断当前消息是否可重试
如果当前消息可以重试,会将该消息对应
offset存储在partitions中,紧接着通过seekpartitions方法来将当前分区offset重置为当前消息offset,以至在下一次拉取消息的时候,仍然可以拉取到该异常消息。如果当前消息不可以重试,判断此次拉取的消息是否只有一条,如果是,不做处理;如果不是,则通过
partitions.computeifabsent方法设置分区offset为异常消息下一条消息对应offset,以至在下一次拉取的时候可以拉取到异常消息后的其它消息。
2.5 提交偏移量
csharp
复制代码
@nullable
private runtimeexception doinvokerecordlistener(final consumerrecord<k, v> record, // nosonar
 iterator<consumerrecord<k, v>> iterator) {
 object sample = startmicrometersample();
 try {
 invokeonmessage(record);
 }
 catch (runtimeexception e) {
 try {
 invokeerrorhandler(record, iterator, e);
 // 提交分区offset
 commitoffsetsifneeded(record);
 } catch (kafkaexception ke) {
 }
 }
 return null;
}当默认异常处理器重试达到最大次数
9次后,会执行commitoffsetsifneeded方法,手动提交分区offset
2.6 总结
当消费消息异常,在没有声明异常处理器的前提下会选择使用默认异常处理器seektocurrenterrorhandler,默认异常处理器会对异常消息进行重试,在达到最大重试次数9次后,会手动提交异常消息offset,然后继续消费异常消息之后的其它消息。
至此想必对消息消费异常有了一个大致认识,如有疑问,欢迎留言讨论。
作者:黑白搬砖工
链接:
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
本作品采用《cc 协议》,转载必须注明作者和本文链接
