Kafka探险
说明
这篇博客讲的源码是kafka 2.8版本的。
源码
客户端
下面展示的是源代码自带的样例代码。
public class Producer extends Thread {
private final KafkaProducer<Integer, String> producer;
private final String topic;
private final Boolean isAsync;
private int numRecords;
private final CountDownLatch latch;
public Producer(final String topic,
final Boolean isAsync,
final String transactionalId,
final boolean enableIdempotency,
final int numRecords,
final int transactionTimeoutMs,
final CountDownLatch latch) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaProperties.KAFKA_SERVER_URL + ":" + KafkaProperties.KAFKA_SERVER_PORT);
props.put(ProducerConfig.CLIENT_ID_CONFIG, "DemoProducer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
if (transactionTimeoutMs > 0) {
props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeoutMs);
}
if (transactionalId != null) {
props.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionalId);
}
props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, enableIdempotency);
producer = new KafkaProducer<>(props);
this.topic = topic;
this.isAsync = isAsync;
this.numRecords = numRecords;
this.latch = latch;
}
KafkaProducer<Integer, String> get() {
return producer;
}
@Override
public void run() {
int messageKey = 0;
int recordsSent = 0;
while (recordsSent < numRecords) {
String messageStr = "Message_" + messageKey;
long startTime = System.currentTimeMillis();
if (isAsync) { // Send asynchronously
producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr), new DemoCallBack(startTime, messageKey, messageStr));
} else { // Send synchronously
try {
producer.send(new ProducerRecord<>(topic,
messageKey,
messageStr)).get();
System.out.println("Sent message: (" + messageKey + ", " + messageStr + ")");
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
messageKey += 2;
recordsSent += 1;
}
System.out.println("Producer sent " + numRecords + " records successfully");
latch.countDown();
}
}
class DemoCallBack implements Callback {
private final long startTime;
private final int key;
private final String message;
public DemoCallBack(long startTime, int key, String message) {
this.startTime = startTime;
this.key = key;
this.message = message;
}
public void onCompletion(RecordMetadata metadata, Exception exception) {
long elapsedTime = System.currentTimeMillis() - startTime;
if (metadata != null) {
System.out.println(
"message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
"), " +
"offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
} else {
exception.printStackTrace();
}
}
}
构建producer
可以发现代码中我们先是组装得到了一个配置类型(其实对应的就是一个Map),之后将这个配置作为参数构造KafkaProducer
实例。
public class KafkaProducer<K, V> implements Producer<K, V> {
KafkaProducer(ProducerConfig config,
Serializer<K> keySerializer,
Serializer<V> valueSerializer,
ProducerMetadata metadata,
KafkaClient kafkaClient,
ProducerInterceptors<K, V> interceptors,
Time time) {
try {
this.producerConfig = config;
this.time = time;
String transactionalId = config.getString(ProducerConfig.TRANSACTIONAL_ID_CONFIG);
this.clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
LogContext logContext;
if (transactionalId == null)
logContext = new LogContext(String.format("[Producer clientId=%s] ", clientId));
else
logContext = new LogContext(String.format("[Producer clientId=%s, transactionalId=%s] ", clientId, transactionalId));
log = logContext.logger(KafkaProducer.class);
log.trace("Starting the Kafka producer");
Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
.recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
JmxReporter jmxReporter = new JmxReporter();
jmxReporter.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)));
reporters.add(jmxReporter);
MetricsContext metricsContext = new KafkaMetricsContext(JMX_PREFIX,
config.originalsWithPrefix(CommonClientConfigs.METRICS_CONTEXT_PREFIX));
this.metrics = new Metrics(metricConfig, reporters, time, metricsContext);
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG,
Partitioner.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
long retryBackoffMs = config.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG);
if (keySerializer == null) {
this.keySerializer = config.getConfiguredInstance(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.keySerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), true);
} else {
config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG);
this.keySerializer = keySerializer;
}
if (valueSerializer == null) {
this.valueSerializer = config.getConfiguredInstance(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
Serializer.class);
this.valueSerializer.configure(config.originals(Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId)), false);
} else {
config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG);
this.valueSerializer = valueSerializer;
}
List<ProducerInterceptor<K, V>> interceptorList = (List) config.getConfiguredInstances(
ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
ProducerInterceptor.class,
Collections.singletonMap(ProducerConfig.CLIENT_ID_CONFIG, clientId));
if (interceptors != null)
this.interceptors = interceptors;
else
this.interceptors = new ProducerInterceptors<>(interceptorList);
ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer,
valueSerializer, interceptorList, reporters);
this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
this.maxBlockTimeMs = config.getLong(ProducerConfig.MAX_BLOCK_MS_CONFIG);
int deliveryTimeoutMs = configureDeliveryTimeout(config, log);
this.apiVersions = new ApiVersions();
this.transactionManager = configureTransactionState(config, logContext);
//accumulator负责缓存待发送的请求
this.accumulator = new RecordAccumulator(logContext,
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG),
this.compressionType,
lingerMs(config),
retryBackoffMs,
deliveryTimeoutMs,
metrics,
PRODUCER_METRIC_GROUP_NAME,
time,
apiVersions,
transactionManager,
new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME));
List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(
config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
//metadata存储集群的信息
if (metadata != null) {
this.metadata = metadata;
} else {
this.metadata = new ProducerMetadata(retryBackoffMs,
config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
config.getLong(ProducerConfig.METADATA_MAX_IDLE_CONFIG),
logContext,
clusterResourceListeners,
Time.SYSTEM);
//这里会去获取最新的元数据
this.metadata.bootstrap(addresses);
}
this.errors = this.metrics.sensor("errors");
this.sender = newSender(logContext, kafkaClient, this.metadata);
//创建一个线程去驱动sender
String ioThreadName = NETWORK_THREAD_PREFIX + " | " + clientId;
this.ioThread = new KafkaThread(ioThreadName, this.sender, true);
this.ioThread.start();
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
log.debug("Kafka producer started");
} catch (Throwable t) {
// call close methods if internal objects are already constructed this is to prevent resource leak. see KAFKA-2121
close(Duration.ofMillis(0), true);
// now propagate the exception
throw new KafkaException("Failed to construct kafka producer", t);
}
}
}
可以发现构造器中实际上只是通过传入的配置构建自己的成员对象。不必深入分析。不过如果想了解哪些参数是可配置的,可以看一下源码即可。
newSender
的代码如下:
public class KafkaProducer<K, V> implements Producer<K, V> {
Sender newSender(LogContext logContext, KafkaClient kafkaClient, ProducerMetadata metadata) {
//这个参数表示发送的最大窗口大小,窗口中的元素可能会出现乱序,因此如果希望按序发送,那么需要把这个参数设置为1
int maxInflightRequests = configureInflightRequests(producerConfig);
int requestTimeoutMs = producerConfig.getInt(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG);
ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(producerConfig, time, logContext);
ProducerMetrics metricsRegistry = new ProducerMetrics(this.metrics);
Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
new Selector(producerConfig.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
this.metrics, time, "producer", channelBuilder, logContext),
metadata,
clientId,
maxInflightRequests,
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG),
producerConfig.getLong(ProducerConfig.RECONNECT_BACKOFF_MAX_MS_CONFIG),
producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG),
producerConfig.getLong(ProducerConfig.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG),
ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
time,
true,
apiVersions,
throttleTimeSensor,
logContext);
//acks表示MQ的响应时机
//0表示节点收到消息就直接响应
//1表示消息写入到leader partition中就响应
//-1表示消息必须写入到partition的所有副本中才响应
//01都是可能会导致数据丢失的。
short acks = configureAcks(producerConfig, log);
return new Sender(logContext,
client,
metadata,
this.accumulator,
maxInflightRequests == 1,
producerConfig.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG),
acks,
producerConfig.getInt(ProducerConfig.RETRIES_CONFIG),
metricsRegistry.senderMetrics,
time,
requestTimeoutMs,
producerConfig.getLong(ProducerConfig.RETRY_BACKOFF_MS_CONFIG),
this.transactionManager,
apiVersions);
}
}
在Kafka中,类型的对应关系。
java类型 | 对象 |
---|---|
Node | Broker节点 |
Cluster | 集群信息 |
PartitionInfo | 分区信息,包括主从节点信息,副本的同步状态等 |
TopicPartition | 指定某个topic下的某个分区 |
发送
下面查看以下客户端发送消息的方法。
public class KafkaProducer<K, V> implements Producer<K, V> {
@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
//先用拦截器包装数据
ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
//实际发送代码
return doSend(interceptedRecord, callback);
}
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
TopicPartition tp = null;
try {
//这里检查以下producer状态
throwIfProducerClosed();
long nowMs = time.milliseconds();
ClusterAndWaitTime clusterAndWaitTime;
try {
clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
} catch (KafkaException e) {
if (metadata.isClosed())
throw new KafkaException("Producer closed while send in progress", e);
throw e;
}
nowMs += clusterAndWaitTime.waitedOnMetadataMs;
long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
Cluster cluster = clusterAndWaitTime.cluster;
//序列化操作
byte[] serializedKey;
try {
serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
" specified in key.serializer", cce);
}
byte[] serializedValue;
try {
serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
} catch (ClassCastException cce) {
throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
" to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
" specified in value.serializer", cce);
}
//计算具体对应的分区
int partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
//设置消息为只读
setReadOnly(record.headers());
Header[] headers = record.headers().toArray();
//这里估算了一下消息的大小
int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
compressionType, serializedKey, serializedValue, headers);
//这里校验消息是否太大了,太大会抛出异常
ensureValidRecordSize(serializedSize);
long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
if (log.isTraceEnabled()) {
log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
}
//InterceptorCallback中会同时通知用户的回调函数,还会通知内置的拦截器,消息已经被ack了
Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
if (transactionManager != null && transactionManager.isTransactional()) {
transactionManager.failIfNotReadyForSend();
}
//先把消息加入到accumulator中
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
//如果需要新建新的批次,那么之前的追加操作会失败,这里会重新追加
if (result.abortForNewBatch) {
int prevPartition = partition;
//调用监听器
partitioner.onNewBatch(record.topic(), cluster, prevPartition);
partition = partition(record, serializedKey, serializedValue, cluster);
tp = new TopicPartition(record.topic(), partition);
if (log.isTraceEnabled()) {
log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
}
interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);
result = accumulator.append(tp, timestamp, serializedKey,
serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
}
if (transactionManager != null && transactionManager.isTransactional())
transactionManager.maybeAddPartitionToTransaction(tp);
if (result.batchIsFull || result.newBatchCreated) {
log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
//创建了新的批次或者批次满了,那么唤醒发送线程
this.sender.wakeup();
}
return result.future;
// handling exceptions and record the errors;
// for API exceptions return them in the future,
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
if (callback != null)
callback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
} catch (InterruptedException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw new InterruptException(e);
} catch (KafkaException e) {
this.errors.record();
this.interceptors.onSendError(record, tp, e);
throw e;
} catch (Exception e) {
// we notify interceptor about all exceptions, since onSend is called before anything else in this method
this.interceptors.onSendError(record, tp, e);
throw e;
}
}
}
waitOnMetadata
负责拉取元数据。
public class KafkaProducer<K, V> implements Producer<K, V> {
//maxWaitMs表示最多等待的时间,默认一分钟
private ClusterAndWaitTime waitOnMetadata(String topic, Integer partition, long nowMs, long maxWaitMs) throws InterruptedException {
// add topic to metadata topic list if it is not there already and reset expiry
Cluster cluster = metadata.fetch();
if (cluster.invalidTopics().contains(topic))
throw new InvalidTopicException(topic);
metadata.add(topic, nowMs);
Integer partitionsCount = cluster.partitionCountForTopic(topic);
//如果缓存有分区信息的话,就从缓存中取,如果分区不足,则意味着用户可能创建了新的分区,需要去拉取
if (partitionsCount != null && (partition == null || partition < partitionsCount))
return new ClusterAndWaitTime(cluster, 0);
long remainingWaitMs = maxWaitMs;
long elapsed = 0;
do {
if (partition != null) {
log.trace("Requesting metadata update for partition {} of topic {}.", partition, topic);
} else {
log.trace("Requesting metadata update for topic {}.", topic);
}
metadata.add(topic, nowMs + elapsed);
//发送请求,要求sender从集群拉取topic的信息
int version = metadata.requestUpdateForTopic(topic);
//唤醒sender,等待它完成任务
sender.wakeup();
try {
//等待更新
metadata.awaitUpdate(version, remainingWaitMs);
} catch (TimeoutException ex) {
// Rethrow with original maxWaitMs to prevent logging exception with remainingWaitMs
throw new TimeoutException(
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs));
}
cluster = metadata.fetch();
elapsed = time.milliseconds() - nowMs;
if (elapsed >= maxWaitMs) {
throw new TimeoutException(partitionsCount == null ?
String.format("Topic %s not present in metadata after %d ms.",
topic, maxWaitMs) :
String.format("Partition %d of topic %s with partition count %d is not present in metadata after %d ms.",
partition, topic, partitionsCount, maxWaitMs));
}
metadata.maybeThrowExceptionForTopic(topic);
remainingWaitMs = maxWaitMs - elapsed;
partitionsCount = cluster.partitionCountForTopic(topic);
} while (partitionsCount == null || (partition != null && partition >= partitionsCount));
return new ClusterAndWaitTime(cluster, elapsed);
}
}
可以发现主要逻辑是在accumulator#append
中完成的
public final class RecordAccumulator {
public RecordAppendResult append(TopicPartition tp,
long timestamp,
byte[] key,
byte[] value,
Header[] headers,
Callback callback,
long maxTimeToBlock,
boolean abortOnNewBatch,
long nowMs) throws InterruptedException {
appendsInProgress.incrementAndGet();
ByteBuffer buffer = null;
if (headers == null) headers = Record.EMPTY_HEADERS;
try {
//拿到topic分区对应的队列
Deque<ProducerBatch> dq = getOrCreateDeque(tp);
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null)
return appendResult;
}
//如果设置了abortOnNewBatch,并且需要创建新的批次,那么就先返回,给调用者一次机会
//怪怪的逻辑,为什么不用回调函数呢
if (abortOnNewBatch) {
return new RecordAppendResult(null, false, false, true);
}
//下面是新建批次的逻辑
byte maxUsableMagic = apiVersions.maxUsableProduceMagic();
//批次的大小为预定义的大小(默认16k),或者当前消息的大小
int size = Math.max(this.batchSize, AbstractRecords.estimateSizeInBytesUpperBound(maxUsableMagic, compression, key, value, headers));
log.trace("Allocating a new {} byte message buffer for topic {} partition {} with remaining timeout {}ms", size, tp.topic(), tp.partition(), maxTimeToBlock);
buffer = free.allocate(size, maxTimeToBlock);
nowMs = time.milliseconds();
synchronized (dq) {
if (closed)
throw new KafkaException("Producer closed while send in progress");
//并发情况下,如果已经创建了新的批次,就加到新批次尾部
RecordAppendResult appendResult = tryAppend(timestamp, key, value, headers, callback, dq, nowMs);
if (appendResult != null) {
return appendResult;
}
//新建一个新的batch
MemoryRecordsBuilder recordsBuilder = recordsBuilder(buffer, maxUsableMagic);
ProducerBatch batch = new ProducerBatch(tp, recordsBuilder, nowMs);
FutureRecordMetadata future = Objects.requireNonNull(batch.tryAppend(timestamp, key, value, headers,
callback, nowMs));
//将batch加入到尾部
dq.addLast(batch);
incomplete.add(batch);
//由于buffer被使用了,所以这里设置为null,这样在finally块中不会被释放
buffer = null;
return new RecordAppendResult(future, dq.size() > 1 || batch.isFull(), true, false);
}
} finally {
//释放分配的buffer
if (buffer != null)
free.deallocate(buffer);
appendsInProgress.decrementAndGet();
}
}
}
可以发现无论是拉取元数据还是发送消息到累加器,都需要唤醒sender
线程来异步工作。我们来看看sender
内部具体做了什么工作。
public class Sender implements Runnable {
@Override
public void run() {
log.debug("Starting Kafka producer I/O thread.");
// main loop, runs until close is called
while (running) {
try {
//只要处于running状态中,就一直执行runOnce,直到退出
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
//做些清理操作
log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
while (!forceClose && ((this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0) || hasPendingTransactionalRequests())) {
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
while (!forceClose && transactionManager != null && transactionManager.hasOngoingTransaction()) {
if (!transactionManager.isCompleting()) {
log.info("Aborting incomplete transaction due to shutdown");
transactionManager.beginAbort();
}
try {
runOnce();
} catch (Exception e) {
log.error("Uncaught error in kafka producer I/O thread: ", e);
}
}
if (forceClose) {
if (transactionManager != null) {
log.debug("Aborting incomplete transactional requests due to forced shutdown");
transactionManager.close();
}
log.debug("Aborting incomplete batches due to forced shutdown");
this.accumulator.abortIncompleteBatches();
}
try {
this.client.close();
} catch (Exception e) {
log.error("Failed to close network client", e);
}
log.debug("Shutdown of Kafka producer I/O thread has completed.");
}
void runOnce() {
if (transactionManager != null) {
try {
transactionManager.maybeResolveSequences();
// do not continue sending if the transaction manager is in a failed state
if (transactionManager.hasFatalError()) {
RuntimeException lastError = transactionManager.lastError();
if (lastError != null)
maybeAbortBatches(lastError);
client.poll(retryBackoffMs, time.milliseconds());
return;
}
// Check whether we need a new producerId. If so, we will enqueue an InitProducerId
// request which will be sent below
transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
if (maybeSendAndPollTransactionalRequest()) {
return;
}
} catch (AuthenticationException e) {
// This is already logged as error, but propagated here to perform any clean ups.
log.trace("Authentication exception while processing transactional request", e);
transactionManager.authenticationFailed(e);
}
}
long currentTimeMs = time.milliseconds();
//发送堆积的数据到NetWorkClient,封装成请求
long pollTimeout = sendProducerData(currentTimeMs);
//执行真正的操作
client.poll(pollTimeout, currentTimeMs);
}
private long sendProducerData(long now) {
Cluster cluster = metadata.fetch();
// 获取准备好的发送的批次
RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
// 如果存在某个分片的leader节点未知,则拉取最新的leader节点信息
if (!result.unknownLeaderTopics.isEmpty()) {
for (String topic : result.unknownLeaderTopics)
//记录哪些topic需要更新
this.metadata.add(topic, now);
log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}",
result.unknownLeaderTopics);
//设置需要更新脏标记
this.metadata.requestUpdate();
}
// 清除那些不可用的节点(无法建立连接)
Iterator<Node> iter = result.readyNodes.iterator();
long notReadyTimeout = Long.MAX_VALUE;
while (iter.hasNext()) {
Node node = iter.next();
if (!this.client.ready(node, now)) {
iter.remove();
notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
}
}
// 清空累积器,并按照节点id进行分组
Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
addToInflightBatches(batches);
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
accumulator.resetNextBatchExpiryTime();
//超时未收到节点响应的批次
List<ProducerBatch> expiredInflightBatches = getExpiredInflightBatches(now);
//已经在累积器中待了太久的批次,时间由DELIVERY_TIMEOUT_MS_CONFIG控制,默认两分钟
List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(now);
//合并两个批次
expiredBatches.addAll(expiredInflightBatches);
if (!expiredBatches.isEmpty())
log.trace("Expired {} batches in accumulator", expiredBatches.size());
for (ProducerBatch expiredBatch : expiredBatches) {
String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition
+ ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
//将批次发送结果标记为失败,原因为超时
failBatch(expiredBatch, -1, NO_TIMESTAMP, new TimeoutException(errorMessage), false);
if (transactionManager != null && expiredBatch.inRetry()) {
transactionManager.markSequenceUnresolved(expiredBatch);
}
}
//统计数据
sensors.updateProduceRequestMetrics(batches);
long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
pollTimeout = Math.max(pollTimeout, 0);
if (!result.readyNodes.isEmpty()) {
log.trace("Nodes with data ready to send: {}", result.readyNodes);
pollTimeout = 0;
}
//发送批次
sendProduceRequests(batches, now);
return pollTimeout;
}
private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
for (Map.Entry<Integer, List<ProducerBatch>> entry : collated.entrySet())
sendProduceRequest(now, entry.getKey(), acks, requestTimeoutMs, entry.getValue());
}
private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
if (batches.isEmpty())
return;
final Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap<>(batches.size());
byte minUsedMagic = apiVersions.maxUsableProduceMagic();
for (ProducerBatch batch : batches) {
if (batch.magic() < minUsedMagic)
minUsedMagic = batch.magic();
}
ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
for (ProducerBatch batch : batches) {
TopicPartition tp = batch.topicPartition;
MemoryRecords records = batch.records();
if (!records.hasMatchingMagic(minUsedMagic))
records = batch.records().downConvert(minUsedMagic, 0, time).records();
ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
if (tpData == null) {
tpData = new ProduceRequestData.TopicProduceData().setName(tp.topic());
tpd.add(tpData);
}
tpData.partitionData().add(new ProduceRequestData.PartitionProduceData()
.setIndex(tp.partition())
.setRecords(records));
recordsByPartition.put(tp, batch);
}
String transactionalId = null;
if (transactionManager != null && transactionManager.isTransactional()) {
transactionalId = transactionManager.transactionalId();
}
ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic,
new ProduceRequestData()
.setAcks(acks)
.setTimeoutMs(timeout)
.setTransactionalId(transactionalId)
.setTopicData(tpd));
RequestCompletionHandler callback = response -> handleProduceResponse(response, recordsByPartition, time.milliseconds());
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId, requestBuilder, now, acks != 0,
requestTimeoutMs, callback);
//发送数据
client.send(clientRequest, now);
log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
}
}
接下来我们需要看一下NetworkClient
,这个类底层实现了网络客户端,支持异步IO。
public class NetworkClient implements KafkaClient {
@Override
public List<ClientResponse> poll(long timeout, long now) {
ensureActive();
if (!abortedSends.isEmpty()) {
List<ClientResponse> responses = new ArrayList<>();
handleAbortedSends(responses);
completeResponses(responses);
return responses;
}
//添加更新元数据的请求
long metadataTimeout = metadataUpdater.maybeUpdate(now);
try {
//发送所有请求
this.selector.poll(Utils.min(timeout, metadataTimeout, defaultRequestTimeoutMs));
} catch (IOException e) {
log.error("Unexpected error during I/O", e);
}
// 处理完成的请求
long updatedNow = this.time.milliseconds();
List<ClientResponse> responses = new ArrayList<>();
//发送完成
handleCompletedSends(responses, updatedNow);
//响应完成
handleCompletedReceives(responses, updatedNow);
//断开连接
handleDisconnections(responses, updatedNow);
//新建连接
handleConnections();
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutConnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
completeResponses(responses);
return responses;
}
}
其中metadataUpdater
默认实现类为DefaultMetadataUpdater
。
class DefaultMetadataUpdater implements MetadataUpdater {
@Override
public long maybeUpdate(long now) {
long timeToNextMetadataUpdate = metadata.timeToNextUpdate(now);
long waitForMetadataFetch = hasFetchInProgress() ? defaultRequestTimeoutMs : 0;
long metadataTimeout = Math.max(timeToNextMetadataUpdate, waitForMetadataFetch);
if (metadataTimeout > 0) {
return metadataTimeout;
}
//通过某种策略选择负载最小的节点
Node node = leastLoadedNode(now);
if (node == null) {
log.debug("Give up sending metadata request since no node is available");
return reconnectBackoffMs;
}
return maybeUpdate(now, node);
}
private long maybeUpdate(long now, Node node) {
String nodeConnectionId = node.idString();
//如果已经存在连接
if (canSendRequest(nodeConnectionId, now)) {
Metadata.MetadataRequestAndVersion requestAndVersion = metadata.newMetadataRequestAndVersion(now);
MetadataRequest.Builder metadataRequest = requestAndVersion.requestBuilder;
log.debug("Sending metadata request {} to node {}", metadataRequest, node);
//发送元数据请求
sendInternalMetadataRequest(metadataRequest, nodeConnectionId, now);
inProgress = new InProgressData(requestAndVersion.requestVersion, requestAndVersion.isPartialUpdate);
return defaultRequestTimeoutMs;
}
if (isAnyNodeConnecting()) {
return reconnectBackoffMs;
}
if (connectionStates.canConnect(nodeConnectionId, now)) {
log.debug("Initialize connection to node {} for sending metadata request", node);
initiateConnect(node, now);
return reconnectBackoffMs;
}
return Long.MAX_VALUE;
}
//元数据更新请求被响应后,这个方法会被调用
@Override
public void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse response) {
List<TopicPartition> missingListenerPartitions = response.topicMetadata().stream().flatMap(topicMetadata ->
topicMetadata.partitionMetadata().stream()
.filter(partitionMetadata -> partitionMetadata.error == Errors.LISTENER_NOT_FOUND)
.map(partitionMetadata -> new TopicPartition(topicMetadata.topic(), partitionMetadata.partition())))
.collect(Collectors.toList());
if (!missingListenerPartitions.isEmpty()) {
int count = missingListenerPartitions.size();
log.warn("{} partitions have leader brokers without a matching listener, including {}",
count, missingListenerPartitions.subList(0, Math.min(10, count)));
}
Map<String, Errors> errors = response.errors();
if (!errors.isEmpty())
log.warn("Error while fetching metadata with correlation id {} : {}", requestHeader.correlationId(), errors);
if (response.brokers().isEmpty()) {
log.trace("Ignoring empty metadata response with correlation id {}.", requestHeader.correlationId());
this.metadata.failedUpdate(now);
} else {
//更新数据
this.metadata.update(inProgress.requestVersion, response, inProgress.isPartialUpdate, now);
}
inProgress = null;
}
}
内部会调用回NetworkClient
中的方法。
public class NetworkClient implements KafkaClient {
void sendInternalMetadataRequest(MetadataRequest.Builder builder, String nodeConnectionId, long now) {
ClientRequest clientRequest = newClientRequest(nodeConnectionId, builder, now, true);
doSend(clientRequest, true, now);
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
ensureActive();
String nodeId = clientRequest.destination();
if (!isInternalRequest) {
if (!canSendRequest(nodeId, now))
throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
}
AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();
try {
NodeApiVersions versionInfo = apiVersions.get(nodeId);
short version;
if (versionInfo == null) {
version = builder.latestAllowedVersion();
if (discoverBrokerVersions && log.isTraceEnabled())
log.trace("No version information found when sending {} with correlation id {} to node {}. " +
"Assuming version {}.", clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version);
} else {
version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(),
builder.latestAllowedVersion());
}
//实际发送
doSend(clientRequest, isInternalRequest, now, builder.build(version));
} catch (UnsupportedVersionException unsupportedVersionException) {
log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", builder,
clientRequest.correlationId(), clientRequest.destination(), unsupportedVersionException);
ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()),
clientRequest.callback(), clientRequest.destination(), now, now,
false, unsupportedVersionException, null, null);
if (!isInternalRequest)
abortedSends.add(clientResponse);
else if (clientRequest.apiKey() == ApiKeys.METADATA)
metadataUpdater.handleFailedRequest(now, Optional.of(unsupportedVersionException));
}
}
private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
String destination = clientRequest.destination();
RequestHeader header = clientRequest.makeHeader(request.version());
if (log.isDebugEnabled()) {
log.debug("Sending {} request with header {} and timeout {} to node {}: {}",
clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request);
}
//构建请求头
Send send = request.toSend(header);
//把请求包装为发送中的请求,并记录
InFlightRequest inFlightRequest = new InFlightRequest(
clientRequest,
header,
isInternalRequest,
request,
send,
now);
this.inFlightRequests.add(inFlightRequest);
//将请求加入队列,等待下一次poll发送
selector.send(new NetworkSend(clientRequest.destination(), send));
}
}
selector
的send方法会把请求绑定到channel上,同时会注册channel的write事件。
public class Selector implements Selectable, AutoCloseable {
public void send(NetworkSend send) {
String connectionId = send.destinationId();
KafkaChannel channel = openOrClosingChannelOrFail(connectionId);
if (closingChannels.containsKey(connectionId)) {
this.failedSends.add(connectionId);
} else {
try {
channel.setSend(send);
} catch (Exception e) {
channel.state(ChannelState.FAILED_SEND);
this.failedSends.add(connectionId);
close(channel, CloseMode.DISCARD_NO_NOTIFY);
if (!(e instanceof CancelledKeyException)) {
log.error("Unexpected exception during send, closing connection {} and rethrowing exception {}",
connectionId, e);
throw e;
}
}
}
}
}
public class KafkaChannel implements AutoCloseable {
public void setSend(NetworkSend send) {
if (this.send != null)
throw new IllegalStateException("Attempt to begin a send operation with prior send operation still in progress, connection id is " + id);
this.send = send;
this.transportLayer.addInterestOps(SelectionKey.OP_WRITE);
}
}
下面看一下是如何处理来自节点的响应的。
public class NetworkClient implements KafkaClient {
private void handleCompletedReceives(List<ClientResponse> responses, long now) {
for (NetworkReceive receive : this.selector.completedReceives()) {
String source = receive.source();
//获取发送给这个节点的最久远的请求
InFlightRequest req = inFlightRequests.completeNext(source);
AbstractResponse response = parseResponse(receive.payload(), req.header);
if (throttleTimeSensor != null)
throttleTimeSensor.record(response.throttleTimeMs(), now);
if (log.isDebugEnabled()) {
log.debug("Received {} response from node {} for request with header {}: {}",
req.header.apiKey(), req.destination, req.header, response);
}
maybeThrottle(response, req.header.apiVersion(), req.destination, now);
//如果是更新元数据的请求
if (req.isInternalRequest && response instanceof MetadataResponse)
metadataUpdater.handleSuccessfulResponse(req.header, now, (MetadataResponse) response);
else if (req.isInternalRequest && response instanceof ApiVersionsResponse)
handleApiVersionsResponse(responses, req, now, (ApiVersionsResponse) response);
else
//消息投递的请求
responses.add(req.completed(response, now));
}
}
}
Metadata
会自己消费元数据相关的响应。
public class Metadata implements Closeable {
public synchronized void update(int requestVersion, MetadataResponse response, boolean isPartialUpdate, long nowMs) {
Objects.requireNonNull(response, "Metadata response cannot be null");
if (isClosed())
throw new IllegalStateException("Update requested after metadata close");
this.needPartialUpdate = requestVersion < this.requestVersion;
this.lastRefreshMs = nowMs;
this.updateVersion += 1;
if (!isPartialUpdate) {
this.needFullUpdate = false;
this.lastSuccessfulRefreshMs = nowMs;
}
String previousClusterId = cache.clusterResource().clusterId();
//用响应中的内容替换本地缓存
this.cache = handleMetadataResponse(response, isPartialUpdate, nowMs);
Cluster cluster = cache.cluster();
maybeSetMetadataError(cluster);
this.lastSeenLeaderEpochs.keySet().removeIf(tp -> !retainTopic(tp.topic(), false, nowMs));
String newClusterId = cache.clusterResource().clusterId();
if (!Objects.equals(previousClusterId, newClusterId)) {
log.info("Cluster ID: {}", newClusterId);
}
clusterResourceListeners.onUpdate(cache.clusterResource());
log.debug("Updated cluster metadata updateVersion {} to {}", this.updateVersion, this.cache);
}
}
可以发现这里更新了Metadata后没有发起通知,实际上通知发生在自类的覆盖方法中,如下:
public class ProducerMetadata extends Metadata {
public synchronized void update(int requestVersion, MetadataResponse response, boolean isPartialUpdate, long nowMs) {
//调用父类的逻辑
super.update(requestVersion, response, isPartialUpdate, nowMs);
//从newTopics中删除掉所有被更新的主题
if (!newTopics.isEmpty()) {
for (MetadataResponse.TopicMetadata metadata : response.topicMetadata()) {
newTopics.remove(metadata.topic());
}
}
//通知所有等待的线程
notifyAll();
}
}
sender
会调用RecordAccumulator
的ready
和drain
获取需要发送的消息。
public final class RecordAccumulator {
public ReadyCheckResult ready(Cluster cluster, long nowMs) {
Set<Node> readyNodes = new HashSet<>();
long nextReadyCheckDelayMs = Long.MAX_VALUE;
Set<String> unknownLeaderTopics = new HashSet<>();
boolean exhausted = this.free.queued() > 0;
for (Map.Entry<TopicPartition, Deque<ProducerBatch>> entry : this.batches.entrySet()) {
Deque<ProducerBatch> deque = entry.getValue();
synchronized (deque) {
ProducerBatch batch = deque.peekFirst();
if (batch != null) {
TopicPartition part = entry.getKey();
Node leader = cluster.leaderFor(part);
if (leader == null) {
//如果leader未知
unknownLeaderTopics.add(part.topic());
} else if (!readyNodes.contains(leader) && !isMuted(part)) {
long waitedTimeMs = batch.waitedTimeMs(nowMs);
//是否支持重试
boolean backingOff = batch.attempts() > 0 && waitedTimeMs < retryBackoffMs;
//支持重试的话使用重试时间(默认100ms),否则使用批次创建后的延迟时间(默认0)
long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs;
//至少一个批次已经满了
boolean full = deque.size() > 1 || batch.isFull();
boolean expired = waitedTimeMs >= timeToWaitMs;
//是否允许发送
boolean sendable = full || expired || exhausted || closed || flushInProgress();
if (sendable && !backingOff) {
//标记leader节点已经准备好接受消息
readyNodes.add(leader);
} else {
long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0);
nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs);
}
}
}
}
}
return new ReadyCheckResult(readyNodes, nextReadyCheckDelayMs, unknownLeaderTopics);
}
}
分区
KafkaProducer
在发送消息的时候会在本地计算具体发往哪个分区。分区逻辑如下
public class KafkaProducer<K, V> implements Producer<K, V> {
private int partition(ProducerRecord<K, V> record, byte[] serializedKey, byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
return partition != null ?
partition :
partitioner.partition(
record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster);
}
}
默认使用的分区器为DefaultPartitioner
,可以通过PARTITIONER_CLASS_CONFIG
来指定。
public class DefaultPartitioner implements Partitioner {
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
return partition(topic, key, keyBytes, value, valueBytes, cluster, cluster.partitionsForTopic(topic).size());
}
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
int numPartitions) {
//如果没有指定key,则使用粘性分区
//随机选择一个分区,并且在进程生命周期中都不会再改变
//这样可以保证所有的消息都是有序发送的到同一个分区中
//而不同的消息发送进程会均匀地使用不同的分区。
if (keyBytes == null) {
return stickyPartitionCache.partition(topic, cluster);
}
//指定key的情况下,计算哈希值,并对分区数取膜。
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
下面是粘性分区的实现。
public class StickyPartitionCache {
private final ConcurrentMap<String, Integer> indexCache;
public StickyPartitionCache() {
this.indexCache = new ConcurrentHashMap<>();
}
public int partition(String topic, Cluster cluster) {
//存在则返回已经计算好的值,否则重新随机生成一个新的分区
Integer part = indexCache.get(topic);
if (part == null) {
return nextPartition(topic, cluster, -1);
}
return part;
}
public int nextPartition(String topic, Cluster cluster, int prevPartition) {
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
Integer oldPart = indexCache.get(topic);
Integer newPart = oldPart;
if (oldPart == null || oldPart == prevPartition) {
List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
//随机选择一个分区
if (availablePartitions.size() < 1) {
//没有可用分区,随机选择一个
Integer random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = random % partitions.size();
}
//有可用分区,从可用分区中随机选择
else if (availablePartitions.size() == 1) {
newPart = availablePartitions.get(0).partition();
} else {
while (newPart == null || newPart.equals(oldPart)) {
int random = Utils.toPositive(ThreadLocalRandom.current().nextInt());
newPart = availablePartitions.get(random % availablePartitions.size()).partition();
}
}
//这里作CAS操作,保证不会覆盖后来更新的数据
if (oldPart == null) {
indexCache.putIfAbsent(topic, newPart);
} else {
indexCache.replace(topic, prevPartition, newPart);
}
return indexCache.get(topic);
}
return indexCache.get(topic);
}
}
内存池
Kafka中设计了自己的一套内存池,名字叫做BufferPool
,其默认大小为32M,由BUFFER_MEMORY_CONFIG
控制。
public class BufferPool {
private final ReentrantLock lock;
// memory表示内存池总大小,默认32M
// poolableSize表示每个内存块大小,默认16kb
public BufferPool(long memory, int poolableSize, Metrics metrics, Time time, String metricGrpName) {
this.poolableSize = poolableSize;
this.lock = new ReentrantLock();
this.free = new ArrayDeque<>();
this.waiters = new ArrayDeque<>();
this.totalMemory = memory;
this.nonPooledAvailableMemory = memory;
this.metrics = metrics;
this.time = time;
this.waitTime = this.metrics.sensor(WAIT_TIME_SENSOR_NAME);
MetricName rateMetricName = metrics.metricName("bufferpool-wait-ratio",
metricGrpName,
"The fraction of time an appender waits for space allocation.");
MetricName totalMetricName = metrics.metricName("bufferpool-wait-time-total",
metricGrpName,
"The total time an appender waits for space allocation.");
Sensor bufferExhaustedRecordSensor = metrics.sensor("buffer-exhausted-records");
MetricName bufferExhaustedRateMetricName = metrics.metricName("buffer-exhausted-rate", metricGrpName, "The average per-second number of record sends that are dropped due to buffer exhaustion");
MetricName bufferExhaustedTotalMetricName = metrics.metricName("buffer-exhausted-total", metricGrpName, "The total number of record sends that are dropped due to buffer exhaustion");
bufferExhaustedRecordSensor.add(new Meter(bufferExhaustedRateMetricName, bufferExhaustedTotalMetricName));
this.waitTime.add(new Meter(TimeUnit.NANOSECONDS, rateMetricName, totalMetricName));
this.closed = false;
}
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {
if (size > this.totalMemory)
throw new IllegalArgumentException("Attempt to allocate " + size
+ " bytes, but there is a hard limit of "
+ this.totalMemory
+ " on memory allocations.");
ByteBuffer buffer = null;
//通过互斥锁控制并法
this.lock.lock();
if (this.closed) {
this.lock.unlock();
throw new KafkaException("Producer closed while allocating memory");
}
try {
//如果正好需要一个内存块,且有可用的内存块,直接返回
if (size == poolableSize && !this.free.isEmpty())
return this.free.pollFirst();
//计算池化可用内存的总容量
int freeListSize = freeSize() * this.poolableSize;
//如果剩余内存足够
if (this.nonPooledAvailableMemory + freeListSize >= size) {
//将所有池化可用内存转化为非池化内存(仅仅用于计数)
freeUp(size);
this.nonPooledAvailableMemory -= size;
} else {
//内存不足,需要等待内存被释放
int accumulated = 0;
Condition moreMemory = this.lock.newCondition();
try {
long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);
//如果累积的内存不够
while (accumulated < size) {
long startWaitNs = time.nanoseconds();
long timeNs;
boolean waitingTimeElapsed;
try {
waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
} finally {
long endWaitNs = time.nanoseconds();
timeNs = Math.max(0L, endWaitNs - startWaitNs);
recordWaitTime(timeNs);
}
if (this.closed)
throw new KafkaException("Producer closed while allocating memory");
//等待超时
if (waitingTimeElapsed) {
this.metrics.sensor("buffer-exhausted-records").record();
throw new BufferExhaustedException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms.");
}
remainingTimeToBlockNs -= timeNs;
//如果size正好等于内存块的大小,且现在有内存块可用,那么拿一个内存块,直接返回
if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) {
buffer = this.free.pollFirst();
accumulated = size;
} else {
//争取分配size - accumulated内存
freeUp(size - accumulated);
//从非池化可用内存中,先抢夺一部分内存(尽可能多),抢夺的部分记录到accumulated变量
int got = (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);
this.nonPooledAvailableMemory -= got;
accumulated += got;
}
}
accumulated = 0;
} finally {
//释放掉占用的内存
this.nonPooledAvailableMemory += accumulated;
//移除等待的条件变量
this.waiters.remove(moreMemory);
}
}
} finally {
//由于可能释放了内存,因此通知第一个waiter去抢占内存
try {
if (!(this.nonPooledAvailableMemory == 0 && this.free.isEmpty()) && !this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
// 最终解锁
lock.unlock();
}
}
if (buffer == null)
//分配内存
return safeAllocateByteBuffer(size);
else
//如果之前等待过程抢到了一块buf
return buffer;
}
protected ByteBuffer allocateByteBuffer(int size) {
return ByteBuffer.allocate(size);
}
private ByteBuffer safeAllocateByteBuffer(int size) {
boolean error = true;
try {
//分配内存
ByteBuffer buffer = allocateByteBuffer(size);
error = false;
return buffer;
} finally {
if (error) {
this.lock.lock();
try {
this.nonPooledAvailableMemory += size;
if (!this.waiters.isEmpty())
this.waiters.peekFirst().signal();
} finally {
this.lock.unlock();
}
}
}
}
public void deallocate(ByteBuffer buffer) {
deallocate(buffer, buffer.capacity());
}
public void deallocate(ByteBuffer buffer, int size) {
lock.lock();
try {
//只有大小正好为内存块的,才会被池化
if (size == this.poolableSize && size == buffer.capacity()) {
buffer.clear();
this.free.add(buffer);
} else {
//加到非池化部分
this.nonPooledAvailableMemory += size;
}
//唤醒第一个节点,抢占内存
Condition moreMem = this.waiters.peekFirst();
if (moreMem != null)
moreMem.signal();
} finally {
lock.unlock();
}
}
}
可以发现在分配大于内存块的内存时,它会通过释放一些可用的内存块后,再重新分配一个新的内存块来实现的。这时候并不能实现内存的复用,不过在分配量不超过内存块大小的时候,可以复用池化的内存。
NetworkClient
sender
会调用NetworkClient
的ready
方法来判断一个broker是否可用。
public class NetworkClient implements KafkaClient {
@Override
public boolean ready(Node node, long now) {
if (node.isEmpty())
throw new IllegalArgumentException("Cannot connect to empty node " + node);
//判断连接是否可写
if (isReady(node, now))
return true;
if (connectionStates.canConnect(node.idString(), now))
//连接未建立或已经断开,则初始化连接
initiateConnect(node, now);
return false;
}
@Override
public boolean isReady(Node node, long now) {
//如果没有元数据更新请求,以及连接可写,才返回true
return !metadataUpdater.isUpdateDue(now) && canSendRequest(node.idString(), now);
}
private void initiateConnect(Node node, long now) {
String nodeConnectionId = node.idString();
try {
connectionStates.connecting(nodeConnectionId, now, node.host(), clientDnsLookup);
InetAddress address = connectionStates.currentAddress(nodeConnectionId);
log.debug("Initiating connection to node {} using address {}", node, address);
//建立网络连接
selector.connect(nodeConnectionId,
new InetSocketAddress(address, node.port()),
this.socketSendBuffer,
this.socketReceiveBuffer);
} catch (IOException e) {
log.warn("Error connecting to node {}", node, e);
// Attempt failed, we'll try again after the backoff
connectionStates.disconnected(nodeConnectionId, now);
// Notify metadata updater of the connection failure
metadataUpdater.handleServerDisconnect(now, nodeConnectionId, Optional.empty());
}
}
}
初始化连接的代码如下:
public class Selector implements Selectable, AutoCloseable {
@Override
public void connect(String id, InetSocketAddress address, int sendBufferSize, int receiveBufferSize) throws IOException {
//确保连接已经彻底断开
ensureNotRegistered(id);
SocketChannel socketChannel = SocketChannel.open();
SelectionKey key = null;
try {
configureSocketChannel(socketChannel, sendBufferSize, receiveBufferSize);
boolean connected = doConnect(socketChannel, address);
key = registerChannel(id, socketChannel, SelectionKey.OP_CONNECT);
//如果连接直接建立成功,比如说是本地连接
if (connected) {
log.debug("Immediately connected to node {}", id);
immediatelyConnectedKeys.add(key);
key.interestOps(0);
}
} catch (IOException | RuntimeException e) {
if (key != null)
immediatelyConnectedKeys.remove(key);
channels.remove(id);
socketChannel.close();
throw e;
}
}
//配置SocketChannel
private void configureSocketChannel(SocketChannel socketChannel, int sendBufferSize, int receiveBufferSize)
throws IOException {
//配置非阻塞
socketChannel.configureBlocking(false);
Socket socket = socketChannel.socket();
socket.setKeepAlive(true);
if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setSendBufferSize(sendBufferSize);
if (receiveBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
socket.setReceiveBufferSize(receiveBufferSize);
socket.setTcpNoDelay(true);
}
//连接到指定的地址
protected boolean doConnect(SocketChannel channel, InetSocketAddress address) throws IOException {
try {
return channel.connect(address);
} catch (UnresolvedAddressException e) {
throw new IOException("Can't resolve address: " + address, e);
}
}
//注册到selector上
protected SelectionKey registerChannel(String id, SocketChannel socketChannel, int interestedOps) throws IOException {
SelectionKey key = socketChannel.register(nioSelector, interestedOps);
KafkaChannel channel = buildAndAttachKafkaChannel(socketChannel, id, key);
this.channels.put(id, channel);
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), time.nanoseconds());
return key;
}
}
网络封装
kafka中用org.apache.kafka.common.network.Selector
封装了NIO中的Selector
。
public class Selector implements Selectable, AutoCloseable {
private final java.nio.channels.Selector nioSelector;
private final Map<String, KafkaChannel> channels;
}
用KafkaChannel
封装了Channel
。
public class KafkaChannel implements AutoCloseable {
private final TransportLayer transportLayer;
}
poll
负责处理事件。
public class NetworkClient implements KafkaClient {
@Override
public void poll(long timeout) throws IOException {
if (timeout < 0)
throw new IllegalArgumentException("timeout should be >= 0");
boolean madeReadProgressLastCall = madeReadProgressLastPoll;
clear();
boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
if (!immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
timeout = 0;
if (!memoryPool.isOutOfMemory() && outOfMemory) {
log.trace("Broker no longer low on memory - unmuting incoming sockets");
for (KafkaChannel channel : channels.values()) {
if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
channel.maybeUnmute();
}
}
outOfMemory = false;
}
/* check ready keys */
long startSelect = time.nanoseconds();
int numReadyKeys = select(timeout);
long endSelect = time.nanoseconds();
this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
if (dataInBuffers) {
keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
Set<SelectionKey> toPoll = keysWithBufferedRead;
keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
pollSelectionKeys(toPoll, false, endSelect);
}
//处理IO事件
pollSelectionKeys(readyKeys, false, endSelect);
readyKeys.clear();
//处理直接建立成功的连接事件
pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
immediatelyConnectedKeys.clear();
} else {
madeReadProgressLastPoll = true; //no work is also "progress"
}
long endIo = time.nanoseconds();
this.sensors.ioTime.record(endIo - endSelect, time.milliseconds());
completeDelayedChannelClose(endIo);
maybeCloseOldestConnection(endSelect);
}
private int select(long timeoutMs) throws IOException {
if (timeoutMs < 0L)
throw new IllegalArgumentException("timeout should be >= 0");
if (timeoutMs == 0L)
return this.nioSelector.selectNow();
else
return this.nioSelector.select(timeoutMs);
}
void pollSelectionKeys(Set<SelectionKey> selectionKeys,
boolean isImmediatelyConnected,
long currentTimeNanos) {
for (SelectionKey key : determineHandlingOrder(selectionKeys)) {
KafkaChannel channel = channel(key);
long channelStartTimeNanos = recordTimePerConnection ? time.nanoseconds() : 0;
boolean sendFailed = false;
String nodeId = channel.id();
sensors.maybeRegisterConnectionMetrics(nodeId);
if (idleExpiryManager != null)
idleExpiryManager.update(nodeId, currentTimeNanos);
try {
//处理连接建立事件
if (isImmediatelyConnected || key.isConnectable()) {
if (channel.finishConnect()) {
this.connected.add(nodeId);
this.sensors.connectionCreated.record();
SocketChannel socketChannel = (SocketChannel) key.channel();
log.debug("Created socket with SO_RCVBUF = {}, SO_SNDBUF = {}, SO_TIMEOUT = {} to node {}",
socketChannel.socket().getReceiveBufferSize(),
socketChannel.socket().getSendBufferSize(),
socketChannel.socket().getSoTimeout(),
nodeId);
} else {
continue;
}
}
if (channel.isConnected() && !channel.ready()) {
channel.prepare();
if (channel.ready()) {
long readyTimeMs = time.milliseconds();
boolean isReauthentication = channel.successfulAuthentications() > 1;
if (isReauthentication) {
sensors.successfulReauthentication.record(1.0, readyTimeMs);
if (channel.reauthenticationLatencyMs() == null)
log.warn(
"Should never happen: re-authentication latency for a re-authenticated channel was null; continuing...");
else
sensors.reauthenticationLatency
.record(channel.reauthenticationLatencyMs().doubleValue(), readyTimeMs);
} else {
sensors.successfulAuthentication.record(1.0, readyTimeMs);
if (!channel.connectedClientSupportsReauthentication())
sensors.successfulAuthenticationNoReauth.record(1.0, readyTimeMs);
}
log.debug("Successfully {}authenticated with {}", isReauthentication ?
"re-" : "", channel.socketDescription());
}
}
if (channel.ready() && channel.state() == ChannelState.NOT_CONNECTED)
channel.state(ChannelState.READY);
Optional<NetworkReceive> responseReceivedDuringReauthentication = channel.pollResponseReceivedDuringReauthentication();
responseReceivedDuringReauthentication.ifPresent(receive -> {
long currentTimeMs = time.milliseconds();
addToCompletedReceives(channel, receive, currentTimeMs);
});
if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasCompletedReceive(channel)
&& !explicitlyMutedChannels.contains(channel)) {
attemptRead(channel);
}
if (channel.hasBytesBuffered()) {
keysWithBufferedRead.add(key);
}
long nowNanos = channelStartTimeNanos != 0 ? channelStartTimeNanos : currentTimeNanos;
try {
attemptWrite(key, channel, nowNanos);
} catch (Exception e) {
sendFailed = true;
throw e;
}
/* cancel any defunct sockets */
if (!key.isValid())
close(channel, CloseMode.GRACEFUL);
} catch (Exception e) {
String desc = channel.socketDescription();
if (e instanceof IOException) {
log.debug("Connection with {} disconnected", desc, e);
} else if (e instanceof AuthenticationException) {
boolean isReauthentication = channel.successfulAuthentications() > 0;
if (isReauthentication)
sensors.failedReauthentication.record();
else
sensors.failedAuthentication.record();
String exceptionMessage = e.getMessage();
if (e instanceof DelayedResponseAuthenticationException)
exceptionMessage = e.getCause().getMessage();
log.info("Failed {}authentication with {} ({})", isReauthentication ? "re-" : "",
desc, exceptionMessage);
} else {
log.warn("Unexpected error from {}; closing connection", desc, e);
}
if (e instanceof DelayedResponseAuthenticationException)
maybeDelayCloseOnAuthenticationFailure(channel);
else
close(channel, sendFailed ? CloseMode.NOTIFY_ONLY : CloseMode.GRACEFUL);
} finally {
maybeRecordTimePerConnection(channel, channelStartTimeNanos);
}
}
}
private void attemptWrite(SelectionKey key, KafkaChannel channel, long nowNanos) throws IOException {
if (channel.hasSend()
&& channel.ready()
&& key.isWritable()
&& !channel.maybeBeginClientReauthentication(() -> nowNanos)) {
write(channel);
}
}
void write(KafkaChannel channel) throws IOException {
String nodeId = channel.id();
long bytesSent = channel.write();
NetworkSend send = channel.maybeCompleteSend();
if (bytesSent > 0 || send != null) {
long currentTimeMs = time.milliseconds();
if (bytesSent > 0)
this.sensors.recordBytesSent(nodeId, bytesSent, currentTimeMs);
if (send != null) {
this.completedSends.add(send);
this.sensors.recordCompletedSend(nodeId, send.size(), currentTimeMs);
}
}
}
}
写相关内容在KafkaChannel
中。
public class KafkaChannel implements AutoCloseable {
public long write() throws IOException {
if (send == null)
return 0;
midWrite = true;
return send.writeTo(transportLayer);
}
public NetworkSend maybeCompleteSend() {
if (send != null && send.completed()) {
//如果写完成
midWrite = false;
transportLayer.removeInterestOps(SelectionKey.OP_WRITE);
NetworkSend result = send;
send = null;
return result;
}
return null;
}
}