kafka 重试机制
kafka 消息的重试机制作为kafka 生产者端的数据不丢失的重要保障,对我们学习和理解kafka 大有裨益,前面我们学习kafka 生产者的时候,了解了kafka 异步生产者的Callback 机制,合理使用Callback机制也可以保证我们生产者端的数据不丢失,但是Callback我们往往是在重试机制之后使用的,也会是说重试之后依然失败的情况下,可以作为我们生产者端最后的保障。
重试源码
首先我们从KafkaProducer的send 方法入手我们看到其实客户端是不会直接发送数据的,而是将其加入到了一个缓存队列里面去,实例化KafkaProducer会实例RecordAccumulator,RecordAccumulator维护了一个Map,key为不同的主题和partition,value为数据队列
private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
然后当添加数据到队列之后,根据缓存的情况(result
)判断要不要发送,发送也只是唤醒sender
对象,Sender会轮寻这个队列 进行发送
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
this.sender.wakeup();
}
复制代码
如果发送失败,判断是否属于可重试异常,如果可以重试,则放入队列等待再次轮寻,如果不可以重试则抛出异常,由业务方的try-catch代码处理或者程序失败
Sender
我们看到sender 是一个Runnable
的实现类,那我们直接看它的run 方法,我们看到run 方法最后调用了一个叫做sendProducerData
的方法来发送数据
long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
复制代码
接下来就有这样的一个调用链sendProducerData
->sendProduceRequests
->handleProduceResponse
->completeBatch
->canRetry
这里就是我们的重试代码了
首先我们还是先看一下completeBatch
方法,我们看到只有在特定的错误原因下我们才去重试的,毕竟不是所有的重试都是有意义的,所以首先要求错误不能是未知错误Errors.NONE
private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
long now, long throttleUntilTimeMs) {
Errors error = response.error;
if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
(batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
// If the batch is too large, we split the batch and send the split batches again. We do not decrement
// the retry attempts in this case.
log.warn(
"Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
this.retries - batch.attempts(),
error);
if (transactionManager != null)
transactionManager.removeInFlightBatch(batch);
this.accumulator.splitAndReenqueue(batch);
this.accumulator.deallocate(batch);
this.sensors.recordBatchSplit();
} else if (error != Errors.NONE) {
if (canRetry(batch, response, now)) {
log.warn(
"Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
correlationId,
batch.topicPartition,
this.retries - batch.attempts() - 1,
error);
if (transactionManager == null) {
reenqueueBatch(batch, now);
} else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
// If idempotence is enabled only retry the request if the current producer id is the same as
// the producer id of the batch.
log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}",
batch.topicPartition, batch.producerId(), batch.baseSequence());
// 重新加入队列`accumulator.reenqueue(batch, currentTimeMs);`,后面就会按照数据的发送流程重新发送
reenqueueBatch(batch, now);
} else {
failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
"batch but the producer id changed from " + batch.producerId() + " to " +
transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);
}
} else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
// 这个错误其实是比较有意思的,就是我们的重试机制发出去了重复的消息,其实就是我们学习的幂等的SEQUENCE_NUMBER,所以我们认为这个错误是不需要重试的
completeBatch(batch, response);
} else {
final RuntimeException exception;
if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
exception = new TopicAuthorizationException(batch.topicPartition.topic());
else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
else
exception = error.exception();
// tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
// its retries -- if it did, we don't know whether the sequence number was accepted or not, and
// thus it is not safe to reassign the sequence.
failBatch(batch, response, exception, batch.attempts() < this.retries);
}
if (error.exception() instanceof InvalidMetadataException) {
if (error.exception() instanceof UnknownTopicOrPartitionException) {
log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
"topic-partition may not exist or the user may not have Describe access to it",
batch.topicPartition);
} else {
log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
"to request metadata update now", batch.topicPartition, error.exception().toString());
}
metadata.requestUpdate();
}
} else {
completeBatch(batch, response);
}
// Unmute the completed partition.
if (guaranteeMessageOrder)
this.accumulator.unmutePartition(batch.topicPartition, throttleUntilTimeMs);
}
复制代码
接下来就是针对具体的错误原因进行处理了,有一个比较特殊的就是批次过大,也就是Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1
,有一个批次分割的操作
reenqueueBatch
就是重新加入队列,也就是重试
completeBatch
就是我们认为当前批次的发送是成功的,不需要重试,例如DUPLICATE_SEQUENCE_NUMBER
错误
failBatch
就是失败的批次,且不能进行尝试的。
1. 比较致命的错误
2. 达到了重试的限制次数,例如`failBatch(batch, response, exception, batch.attempts() < this.retries)`
复制代码
我们看一下reenqueue 的代码实现,其实到现在我们还是没有将加入队列和发送消息关系串联起来,但是我们看到一个方法getOrCreateDeque
,这个方法其实我们在发送消息的加入缓存的时候调用的一个方法
public void reenqueue(ProducerBatch batch, long now) {
batch.reenqueued(now);
Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
synchronized (deque) {
if (transactionManager != null)
insertInSequenceOrder(deque, batch);
else
deque.addFirst(batch);
}
}
复制代码
到这里我们的就大致明白了,KafkaProducer的发送方法是最终是调用了getOrCreateDeque
将数据添加到了缓存,重试也是调用getOrCreateDeque
将数据添加到了缓存,唯一不一样的地方是重试添加到了队列的头部,发送是添加到了队列的尾部
private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
Callback callback, Deque<ProducerBatch> deque) {
ProducerBatch last = deque.peekLast();
if (last != null) {
FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
if (future == null)
last.closeForRecordAppends();
else
return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
}
return null;
}
复制代码
下面我们看一下canRetry的实现,其实这个方法就是判断当前失败的批次能不能重试
private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) {
return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) &&
batch.attempts() < this.retries &&
!batch.isDone() &&
((response.error.exception() instanceof RetriableException) ||
(transactionManager != null && transactionManager.canRetry(response, batch)));
}
复制代码
这里限制条件挺多的,我们只关注两个,如果满足则可以重试
- 重试的次数,这个就是我们的客户端的配置
- 异常是不是可重试异常
response.error.exception() instanceof RetriableException)
,下面就是全部的可重试异常
当然这中间还有一个就是transactionManager.canRetry
幂等生产者的影响
其实我们看到在整个重试的过程中不不需要重新计算partition信息的,也就是说我们的partition信息只在客户端的doSend
方法里面只计算一次的
这就说明了retry会保证发送到同一个分区,如果不能保证的话,幂等就又会多了一个限制条件,那就是不可重试
全局幂等
一个幂等性的producer,只保证单分区的幂等性,而一个producer的消息会发给一个主题的多个分区,每个单分区都保证幂等性,其实就是实现了多分区的幂等性,也就是全局幂等。
消息乱序的影响
假设a,b两条消息,a先发送后由于发送失败重试,这时顺序就会在b的消息后面,可以设置max.in.flight.requests.per.connection=1
来避免
max.in.flight.requests.per.connection
的意思是限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求,但吞吐量会下降
总结
-
重试机制是数据可靠性的一种保障,这是在producer Callback 机制之前的一种保证
-
由于重试过程中不会重新计算消息的分区信息,所以重试可以保证幂等性不会被破坏
-
producer 端的ack也是保证数据不丢失的一种方案,只不过它和producer的重试机制保证的点是不一样的,ack既可以保证producer 也可以保证server 端,但是重试机制仅仅是保证producer的数据发送不丢失。