kafka系列之重试机制(15)

掘金 · · 943 次点击 · · 开始浏览    
这是一个创建于 的文章,其中的信息可能已经有所发展或是发生改变。

kafka 重试机制

kafka 消息的重试机制作为kafka 生产者端的数据不丢失的重要保障,对我们学习和理解kafka 大有裨益,前面我们学习kafka 生产者的时候,了解了kafka 异步生产者的Callback 机制,合理使用Callback机制也可以保证我们生产者端的数据不丢失,但是Callback我们往往是在重试机制之后使用的,也会是说重试之后依然失败的情况下,可以作为我们生产者端最后的保障。

重试源码

首先我们从KafkaProducer的send 方法入手我们看到其实客户端是不会直接发送数据的,而是将其加入到了一个缓存队列里面去,实例化KafkaProducer会实例RecordAccumulator,RecordAccumulator维护了一个Map,key为不同的主题和partition,value为数据队列

private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;

然后当添加数据到队列之后,根据缓存的情况(result)判断要不要发送,发送也只是唤醒sender 对象,Sender会轮寻这个队列 进行发送

RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
        serializedValue, headers, interceptCallback, remainingWaitMs);
if (result.batchIsFull || result.newBatchCreated) {
    log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
    this.sender.wakeup();
}
复制代码

如果发送失败,判断是否属于可重试异常,如果可以重试,则放入队列等待再次轮寻,如果不可以重试则抛出异常,由业务方的try-catch代码处理或者程序失败

Sender

我们看到sender 是一个Runnable的实现类,那我们直接看它的run 方法,我们看到run 方法最后调用了一个叫做sendProducerData的方法来发送数据

long pollTimeout = sendProducerData(now);
client.poll(pollTimeout, now);
复制代码

接下来就有这样的一个调用链sendProducerData ->sendProduceRequests ->handleProduceResponse->completeBatch->canRetry 这里就是我们的重试代码了

首先我们还是先看一下completeBatch 方法,我们看到只有在特定的错误原因下我们才去重试的,毕竟不是所有的重试都是有意义的,所以首先要求错误不能是未知错误Errors.NONE

private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionResponse response, long correlationId,
                           long now, long throttleUntilTimeMs) {
    Errors error = response.error;

    if (error == Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1 && !batch.isDone() &&
            (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 || batch.isCompressed())) {
        // If the batch is too large, we split the batch and send the split batches again. We do not decrement
        // the retry attempts in this case.
        log.warn(
            "Got error produce response in correlation id {} on topic-partition {}, splitting and retrying ({} attempts left). Error: {}",
            correlationId,
            batch.topicPartition,
            this.retries - batch.attempts(),
            error);
        if (transactionManager != null)
            transactionManager.removeInFlightBatch(batch);
        this.accumulator.splitAndReenqueue(batch);
        this.accumulator.deallocate(batch);
        this.sensors.recordBatchSplit();
    } else if (error != Errors.NONE) {
        if (canRetry(batch, response, now)) {
            log.warn(
                "Got error produce response with correlation id {} on topic-partition {}, retrying ({} attempts left). Error: {}",
                correlationId,
                batch.topicPartition,
                this.retries - batch.attempts() - 1,
                error);
            if (transactionManager == null) {
                reenqueueBatch(batch, now);
            } else if (transactionManager.hasProducerIdAndEpoch(batch.producerId(), batch.producerEpoch())) {
                // If idempotence is enabled only retry the request if the current producer id is the same as
                // the producer id of the batch.
                log.debug("Retrying batch to topic-partition {}. ProducerId: {}; Sequence number : {}",
                        batch.topicPartition, batch.producerId(), batch.baseSequence());
              	// 重新加入队列`accumulator.reenqueue(batch, currentTimeMs);`,后面就会按照数据的发送流程重新发送
                reenqueueBatch(batch, now);
            } else {
                failBatch(batch, response, new OutOfOrderSequenceException("Attempted to retry sending a " +
                        "batch but the producer id changed from " + batch.producerId() + " to " +
                        transactionManager.producerIdAndEpoch().producerId + " in the mean time. This batch will be dropped."), false);
            }
        } else if (error == Errors.DUPLICATE_SEQUENCE_NUMBER) {
          	// 这个错误其实是比较有意思的,就是我们的重试机制发出去了重复的消息,其实就是我们学习的幂等的SEQUENCE_NUMBER,所以我们认为这个错误是不需要重试的
            completeBatch(batch, response);
        } else {
            final RuntimeException exception;
            if (error == Errors.TOPIC_AUTHORIZATION_FAILED)
                exception = new TopicAuthorizationException(batch.topicPartition.topic());
            else if (error == Errors.CLUSTER_AUTHORIZATION_FAILED)
                exception = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
            else
                exception = error.exception();
            // tell the user the result of their request. We only adjust sequence numbers if the batch didn't exhaust
            // its retries -- if it did, we don't know whether the sequence number was accepted or not, and
            // thus it is not safe to reassign the sequence.
            failBatch(batch, response, exception, batch.attempts() < this.retries);
        }
        if (error.exception() instanceof InvalidMetadataException) {
            if (error.exception() instanceof UnknownTopicOrPartitionException) {
                log.warn("Received unknown topic or partition error in produce request on partition {}. The " +
                        "topic-partition may not exist or the user may not have Describe access to it",
                    batch.topicPartition);
            } else {
                log.warn("Received invalid metadata error in produce request on partition {} due to {}. Going " +
                        "to request metadata update now", batch.topicPartition, error.exception().toString());
            }
            metadata.requestUpdate();
        }
    } else {
        completeBatch(batch, response);
    }

    // Unmute the completed partition.
    if (guaranteeMessageOrder)
        this.accumulator.unmutePartition(batch.topicPartition, throttleUntilTimeMs);
}
复制代码

接下来就是针对具体的错误原因进行处理了,有一个比较特殊的就是批次过大,也就是Errors.MESSAGE_TOO_LARGE && batch.recordCount > 1,有一个批次分割的操作

reenqueueBatch 就是重新加入队列,也就是重试

completeBatch 就是我们认为当前批次的发送是成功的,不需要重试,例如DUPLICATE_SEQUENCE_NUMBER错误

failBatch 就是失败的批次,且不能进行尝试的。

1. 比较致命的错误
2. 达到了重试的限制次数,例如`failBatch(batch, response, exception, batch.attempts() < this.retries)`
复制代码

我们看一下reenqueue 的代码实现,其实到现在我们还是没有将加入队列和发送消息关系串联起来,但是我们看到一个方法getOrCreateDeque,这个方法其实我们在发送消息的加入缓存的时候调用的一个方法

public void reenqueue(ProducerBatch batch, long now) {
    batch.reenqueued(now);
    Deque<ProducerBatch> deque = getOrCreateDeque(batch.topicPartition);
    synchronized (deque) {
        if (transactionManager != null)
            insertInSequenceOrder(deque, batch);
        else
            deque.addFirst(batch);
    }
}
复制代码

到这里我们的就大致明白了,KafkaProducer的发送方法是最终是调用了getOrCreateDeque将数据添加到了缓存,重试也是调用getOrCreateDeque将数据添加到了缓存,唯一不一样的地方是重试添加到了队列的头部,发送是添加到了队列的尾部

private RecordAppendResult tryAppend(long timestamp, byte[] key, byte[] value, Header[] headers,
                                     Callback callback, Deque<ProducerBatch> deque) {
    ProducerBatch last = deque.peekLast();
    if (last != null) {
        FutureRecordMetadata future = last.tryAppend(timestamp, key, value, headers, callback, time.milliseconds());
        if (future == null)
            last.closeForRecordAppends();
        else
            return new RecordAppendResult(future, deque.size() > 1 || last.isFull(), false);
    }
    return null;
}
复制代码

下面我们看一下canRetry的实现,其实这个方法就是判断当前失败的批次能不能重试

private boolean canRetry(ProducerBatch batch, ProduceResponse.PartitionResponse response, long now) {
    return !batch.hasReachedDeliveryTimeout(accumulator.getDeliveryTimeoutMs(), now) &&
        batch.attempts() < this.retries &&
        !batch.isDone() &&
        ((response.error.exception() instanceof RetriableException) ||
            (transactionManager != null && transactionManager.canRetry(response, batch)));
}
复制代码

这里限制条件挺多的,我们只关注两个,如果满足则可以重试

  1. 重试的次数,这个就是我们的客户端的配置
  2. 异常是不是可重试异常response.error.exception() instanceof RetriableException),下面就是全部的可重试异常

image-20210317103741837

当然这中间还有一个就是transactionManager.canRetry

幂等生产者的影响

其实我们看到在整个重试的过程中不不需要重新计算partition信息的,也就是说我们的partition信息只在客户端的doSend方法里面只计算一次的

image-20210317104300063

这就说明了retry会保证发送到同一个分区,如果不能保证的话,幂等就又会多了一个限制条件,那就是不可重试

全局幂等

一个幂等性的producer,只保证单分区的幂等性,而一个producer的消息会发给一个主题的多个分区,每个单分区都保证幂等性,其实就是实现了多分区的幂等性,也就是全局幂等。

消息乱序的影响

假设a,b两条消息,a先发送后由于发送失败重试,这时顺序就会在b的消息后面,可以设置max.in.flight.requests.per.connection=1来避免

max.in.flight.requests.per.connection的意思是限制客户端在单个连接上能够发送的未响应请求的个数。设置此值是1表示kafka broker在响应请求之前client不能再向同一个broker发送请求,但吞吐量会下降

总结

  1. 重试机制是数据可靠性的一种保障,这是在producer Callback 机制之前的一种保证

  2. 由于重试过程中不会重新计算消息的分区信息,所以重试可以保证幂等性不会被破坏

  3. producer 端的ack也是保证数据不丢失的一种方案,只不过它和producer的重试机制保证的点是不一样的,ack既可以保证producer 也可以保证server 端,但是重试机制仅仅是保证producer的数据发送不丢失。

本文来自:掘金

感谢作者:掘金

查看原文:kafka系列之重试机制(15)

943 次点击  
加入收藏 微博
暂无回复
添加一条新回复 (您需要 登录 后才能回复 没有账号 ?)
  • 请尽量让自己的回复能够对别人有帮助
  • 支持 Markdown 格式, **粗体**、~~删除线~~、`单行代码`
  • 支持 @ 本站用户;支持表情(输入 : 提示),见 Emoji cheat sheet
  • 图片支持拖拽、截图粘贴等方式上传