如果您对Flink写入kafka时,只写入kafka的部分Partitioner,无法写所有的Partitioner问题感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于Fli
如果您对Flink写入kafka时,只写入kafka的部分Partitioner,无法写所有的Partitioner问题感兴趣,那么本文将是一篇不错的选择,我们将为您详在本文中,您将会了解到关于Flink写入kafka时,只写入kafka的部分Partitioner,无法写所有的Partitioner问题的详细内容,并且为您提供关于.IllegalStateException: Previously tracked partitions [kafka.topic-0] been revoked by Kafka ??、.kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition、.kafka.common.utils.AppInfoParser - Kafka commitId : unknown ??、flink 读取kafka 数据,partition分配的有价值信息。
本文目录一览:- Flink写入kafka时,只写入kafka的部分Partitioner,无法写所有的Partitioner问题
- .IllegalStateException: Previously tracked partitions [kafka.topic-0] been revoked by Kafka ??
- .kafka.clients.consumer.NoOffsetForPartitionException: Undefined offset with no reset policy for partition
- .kafka.common.utils.AppInfoParser - Kafka commitId : unknown ??
- flink 读取kafka 数据,partition分配
Flink写入kafka时,只写入kafka的部分Partitioner,无法写所有的Partitioner问题
####1. 写在前面 在利用flink实时计算的时候,往往会从kafka读取数据写入数据到kafka,但会发现当kafka多个Partitioner时,特别在P量级数据为了kafka的性能kafka的节点有十几个时,一个topic的Partitioner可能有几十个甚至更多,发现flink写入kafka的时候没有全部写Partitioner,而是写了部分的Partitioner,虽然这个问题不容易被发现,但这个问题会影响flink写入kafka的性能和造成单个Partitioner数据过多的问题,更严重的问题会导致单个Partitioner所在磁盘写满,为什么会出现这种问题,我们来分析flink写入kafka的源码,主要是FlinkKafkaProducer09
这个类 ####2. 分析FlinkKafkaProducer09
的源码
public class FlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
private static final long serialVersionUID = 1L;
public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
}
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), producerConfig, (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
}
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), producerConfig, (FlinkKafkaPartitioner)customPartitioner);
}
public FlinkKafkaProducer09(String brokerList, String topicId, KeyedSerializationSchema<IN> serializationSchema) {
this(topicId, (KeyedSerializationSchema)serializationSchema, getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
}
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig) {
this(topicId, (KeyedSerializationSchema)serializationSchema, producerConfig, (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
}
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
/** @deprecated */
@Deprecated
public FlinkKafkaProducer09(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), producerConfig, (KafkaPartitioner)customPartitioner);
}
/** @deprecated */
@Deprecated
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, KafkaPartitioner<IN> customPartitioner) {
super(topicId, serializationSchema, producerConfig, new FlinkKafkaDelegatePartitioner(customPartitioner));
}
protected void flush() {
if (this.producer != null) {
this.producer.flush();
}
}
}
只关注下面这个两个构造器
public FlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), getPropertiesFromBrokerList(brokerList), (FlinkKafkaPartitioner)(new FlinkFixedPartitioner()));
}
public FlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
主要看第一个构造器,可以推测往Partition是这个类new FlinkFixedPartitioner())
,再来关注该类
####3. 分析FlinkFixedPartitioner
类的源码
public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> {
private static final long serialVersionUID = -3785320239953858777L;
private int parallelInstanceId;
public FlinkFixedPartitioner() {
}
public void open(int parallelInstanceId, int parallelInstances) {
Preconditions.checkArgument(parallelInstanceId >= 0, "Id of this subtask cannot be negative.");
Preconditions.checkArgument(parallelInstances > 0, "Number of subtasks must be larger than 0.");
this.parallelInstanceId = parallelInstanceId;
}
public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) {
Preconditions.checkArgument(partitions != null && partitions.length > 0, "Partitions of the target topic is empty.");
return partitions[this.parallelInstanceId % partitions.length];
}
public boolean equals(Object o) {
return this == o || o instanceof FlinkFixedPartitioner;
}
public int hashCode() {
return FlinkFixedPartitioner.class.hashCode();
}
}
根据代码可以推测 return partitions[this.parallelInstanceId % partitions.length]
代码会导致有的partition无法写到,现在来自己重写一个类似FlinkKafkaProducer09
的类MyFlinkKafkaProducer09
####4. 分析原生自带的FlinkKafkaProducer09
的逻辑 1>.需要继承FlinkKafkaProducerBase
类
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.flink.streaming.connectors.kafka;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Map.Entry;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.SerializableObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Internal
public abstract class FlinkKafkaProducerBase<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaProducerBase.class);
private static final long serialVersionUID = 1L;
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
protected final Properties producerConfig;
protected final String defaultTopicId;
protected final KeyedSerializationSchema<IN> schema;
protected final FlinkKafkaPartitioner<IN> flinkKafkaPartitioner;
protected final Map<String, int[]> topicPartitionsMap;
protected boolean logFailuresOnly;
protected boolean flushOnCheckpoint = true;
protected transient KafkaProducer<byte[], byte[]> producer;
protected transient Callback callback;
protected transient volatile Exception asyncException;
protected final SerializableObject pendingRecordsLock = new SerializableObject();
protected long pendingRecords;
public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
Objects.requireNonNull(defaultTopicId, "TopicID not set");
Objects.requireNonNull(serializationSchema, "serializationSchema not set");
Objects.requireNonNull(producerConfig, "producerConfig not set");
ClosureCleaner.clean(customPartitioner, true);
ClosureCleaner.ensureSerializable(serializationSchema);
this.defaultTopicId = defaultTopicId;
this.schema = serializationSchema;
this.producerConfig = producerConfig;
this.flinkKafkaPartitioner = customPartitioner;
if (!producerConfig.containsKey("key.serializer")) {
this.producerConfig.put("key.serializer", ByteArraySerializer.class.getName());
} else {
LOG.warn("Overwriting the ''{}'' is not recommended", "key.serializer");
}
if (!producerConfig.containsKey("value.serializer")) {
this.producerConfig.put("value.serializer", ByteArraySerializer.class.getName());
} else {
LOG.warn("Overwriting the ''{}'' is not recommended", "value.serializer");
}
if (!this.producerConfig.containsKey("bootstrap.servers")) {
throw new IllegalArgumentException("bootstrap.servers must be supplied in the producer config properties.");
} else {
this.topicPartitionsMap = new HashMap();
}
}
public void setLogFailuresOnly(boolean logFailuresOnly) {
this.logFailuresOnly = logFailuresOnly;
}
public void setFlushOnCheckpoint(boolean flush) {
this.flushOnCheckpoint = flush;
}
@VisibleForTesting
protected <K, V> KafkaProducer<K, V> getKafkaProducer(Properties props) {
return new KafkaProducer(props);
}
public void open(Configuration configuration) {
this.producer = this.getKafkaProducer(this.producerConfig);
RuntimeContext ctx = this.getRuntimeContext();
if (null != this.flinkKafkaPartitioner) {
if (this.flinkKafkaPartitioner instanceof FlinkKafkaDelegatePartitioner) {
((FlinkKafkaDelegatePartitioner)this.flinkKafkaPartitioner).setPartitions(getPartitionsByTopic(this.defaultTopicId, this.producer));
}
this.flinkKafkaPartitioner.open(ctx.getIndexOfThisSubtask(), ctx.getNumberOfParallelSubtasks());
}
LOG.info("Starting FlinkKafkaProducer ({}/{}) to produce into default topic {}", new Object[]{ctx.getIndexOfThisSubtask() + 1, ctx.getNumberOfParallelSubtasks(), this.defaultTopicId});
if (!Boolean.parseBoolean(this.producerConfig.getProperty("flink.disable-metrics", "false"))) {
Map<MetricName, ? extends Metric> metrics = this.producer.metrics();
if (metrics == null) {
LOG.info("Producer implementation does not support metrics");
} else {
MetricGroup kafkaMetricGroup = this.getRuntimeContext().getMetricGroup().addGroup("KafkaProducer");
Iterator var5 = metrics.entrySet().iterator();
while(var5.hasNext()) {
Entry<MetricName, ? extends Metric> metric = (Entry)var5.next();
kafkaMetricGroup.gauge(((MetricName)metric.getKey()).name(), new KafkaMetricWrapper((Metric)metric.getValue()));
}
}
}
if (this.flushOnCheckpoint && !((StreamingRuntimeContext)this.getRuntimeContext()).isCheckpointingEnabled()) {
LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
this.flushOnCheckpoint = false;
}
if (this.logFailuresOnly) {
this.callback = new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e != null) {
FlinkKafkaProducerBase.LOG.error("Error while sending record to Kafka: " + e.getMessage(), e);
}
FlinkKafkaProducerBase.this.acknowledgeMessage();
}
};
} else {
this.callback = new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (exception != null && FlinkKafkaProducerBase.this.asyncException == null) {
FlinkKafkaProducerBase.this.asyncException = exception;
}
FlinkKafkaProducerBase.this.acknowledgeMessage();
}
};
}
}
public void invoke(IN next, Context context) throws Exception {
this.checkErroneous();
byte[] serializedKey = this.schema.serializeKey(next);
byte[] serializedValue = this.schema.serializeValue(next);
String targetTopic = this.schema.getTargetTopic(next);
if (targetTopic == null) {
targetTopic = this.defaultTopicId;
}
int[] partitions = (int[])this.topicPartitionsMap.get(targetTopic);
if (null == partitions) {
partitions = getPartitionsByTopic(targetTopic, this.producer);
this.topicPartitionsMap.put(targetTopic, partitions);
}
ProducerRecord record;
if (this.flinkKafkaPartitioner == null) {
record = new ProducerRecord(targetTopic, serializedKey, serializedValue);
} else {
record = new ProducerRecord(targetTopic, this.flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), serializedKey, serializedValue);
}
if (this.flushOnCheckpoint) {
synchronized(this.pendingRecordsLock) {
++this.pendingRecords;
}
}
this.producer.send(record, this.callback);
}
public void close() throws Exception {
if (this.producer != null) {
this.producer.close();
}
this.checkErroneous();
}
private void acknowledgeMessage() {
if (this.flushOnCheckpoint) {
synchronized(this.pendingRecordsLock) {
--this.pendingRecords;
if (this.pendingRecords == 0L) {
this.pendingRecordsLock.notifyAll();
}
}
}
}
protected abstract void flush();
public void initializeState(FunctionInitializationContext context) throws Exception {
}
public void snapshotState(FunctionSnapshotContext ctx) throws Exception {
this.checkErroneous();
if (this.flushOnCheckpoint) {
this.flush();
synchronized(this.pendingRecordsLock) {
if (this.pendingRecords != 0L) {
throw new IllegalStateException("Pending record count must be zero at this point: " + this.pendingRecords);
}
this.checkErroneous();
}
}
}
protected void checkErroneous() throws Exception {
Exception e = this.asyncException;
if (e != null) {
this.asyncException = null;
throw new Exception("Failed to send data to Kafka: " + e.getMessage(), e);
}
}
public static Properties getPropertiesFromBrokerList(String brokerList) {
String[] elements = brokerList.split(",");
String[] var2 = elements;
int var3 = elements.length;
for(int var4 = 0; var4 < var3; ++var4) {
String broker = var2[var4];
NetUtils.getCorrectHostnamePort(broker);
}
Properties props = new Properties();
props.setProperty("bootstrap.servers", brokerList);
return props;
}
protected static int[] getPartitionsByTopic(String topic, KafkaProducer<byte[], byte[]> producer) {
List<PartitionInfo> partitionsList = new ArrayList(producer.partitionsFor(topic));
Collections.sort(partitionsList, new Comparator<PartitionInfo>() {
public int compare(PartitionInfo o1, PartitionInfo o2) {
return Integer.compare(o1.partition(), o2.partition());
}
});
int[] partitions = new int[partitionsList.size()];
for(int i = 0; i < partitions.length; ++i) {
partitions[i] = ((PartitionInfo)partitionsList.get(i)).partition();
}
return partitions;
}
@VisibleForTesting
protected long numPendingRecords() {
synchronized(this.pendingRecordsLock) {
return this.pendingRecords;
}
}
}
2>.需要关注FlinkKafkaProducerBase
类下面的构造器:
public FlinkKafkaProducerBase(String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, FlinkKafkaPartitioner<IN> customPartitioner) {
Objects.requireNonNull(defaultTopicId, "TopicID not set");
Objects.requireNonNull(serializationSchema, "serializationSchema not set");
Objects.requireNonNull(producerConfig, "producerConfig not set");
ClosureCleaner.clean(customPartitioner, true);
ClosureCleaner.ensureSerializable(serializationSchema);
this.defaultTopicId = defaultTopicId;
this.schema = serializationSchema;
this.producerConfig = producerConfig;
this.flinkKafkaPartitioner = customPartitioner;
if (!producerConfig.containsKey("key.serializer")) {
this.producerConfig.put("key.serializer", ByteArraySerializer.class.getName());
} else {
LOG.warn("Overwriting the ''{}'' is not recommended", "key.serializer");
}
if (!producerConfig.containsKey("value.serializer")) {
this.producerConfig.put("value.serializer", ByteArraySerializer.class.getName());
} else {
LOG.warn("Overwriting the ''{}'' is not recommended", "value.serializer");
}
if (!this.producerConfig.containsKey("bootstrap.servers")) {
throw new IllegalArgumentException("bootstrap.servers must be supplied in the producer config properties.");
} else {
this.topicPartitionsMap = new HashMap();
}
}
3>.同时关注类下面invoke()
方法的以下代码
if (this.flinkKafkaPartitioner == null) {
record = new ProducerRecord(targetTopic, serializedKey, serializedValue);
}
4>.再来分析ProducerRecord
这个类
//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//
package org.apache.kafka.clients.producer;
public final class ProducerRecord<K, V> {
private final String topic;
private final Integer partition;
private final K key;
private final V value;
public ProducerRecord(String topic, Integer partition, K key, V value) {
if (topic == null) {
throw new IllegalArgumentException("Topic cannot be null");
} else {
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
}
}
public ProducerRecord(String topic, K key, V value) {
this(topic, (Integer)null, key, value);
}
public ProducerRecord(String topic, V value) {
this(topic, (Object)null, value);
}
public String topic() {
return this.topic;
}
public K key() {
return this.key;
}
public V value() {
return this.value;
}
public Integer partition() {
return this.partition;
}
public String toString() {
String key = this.key == null ? "null" : this.key.toString();
String value = this.value == null ? "null" : this.value.toString();
return "ProducerRecord(topic=" + this.topic + ", partition=" + this.partition + ", key=" + key + ", value=" + value;
}
public boolean equals(Object o) {
if (this == o) {
return true;
} else if (!(o instanceof ProducerRecord)) {
return false;
} else {
ProducerRecord that;
label56: {
that = (ProducerRecord)o;
if (this.key != null) {
if (this.key.equals(that.key)) {
break label56;
}
} else if (that.key == null) {
break label56;
}
return false;
}
label49: {
if (this.partition != null) {
if (this.partition.equals(that.partition)) {
break label49;
}
} else if (that.partition == null) {
break label49;
}
return false;
}
if (this.topic != null) {
if (!this.topic.equals(that.topic)) {
return false;
}
} else if (that.topic != null) {
return false;
}
if (this.value != null) {
if (!this.value.equals(that.value)) {
return false;
}
} else if (that.value != null) {
return false;
}
return true;
}
}
public int hashCode() {
int result = this.topic != null ? this.topic.hashCode() : 0;
result = 31 * result + (this.partition != null ? this.partition.hashCode() : 0);
result = 31 * result + (this.key != null ? this.key.hashCode() : 0);
result = 31 * result + (this.value != null ? this.value.hashCode() : 0);
return result;
}
}
5>. 关注该类下面的构造器
public ProducerRecord(String topic, Integer partition, K key, V value) {
if (topic == null) {
throw new IllegalArgumentException("Topic cannot be null");
} else {
this.topic = topic;
this.partition = partition;
this.key = key;
this.value = value;
}
}
6>. 从代码中可以看到获取默认的partition即所有partition,那么我们自定义的类就会很简单的
####5. 编写自己的flink写入kafka的类MyFlinkKafkaProducer09
package com.run;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.codehaus.commons.nullanalysis.Nullable;
import java.util.Properties;
public class MyFlinkKafkaProducer09<IN> extends FlinkKafkaProducerBase<IN> {
public MyFlinkKafkaProducer09(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
this(topicId, (KeyedSerializationSchema)(new KeyedSerializationSchemaWrapper(serializationSchema)), getPropertiesFromBrokerList(brokerList),null);
}
public MyFlinkKafkaProducer09(String topicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, @Nullable FlinkKafkaPartitioner<IN> customPartitioner) {
super(topicId, serializationSchema, producerConfig, customPartitioner);
}
protected void flush() {
if (this.producer != null) {
this.producer.flush();
}
}
}
在这里不给FlinkKafkaPartitioner(new FlinkFixedPartitioner())
,直接给一个null,FlinkKafkaProducerBase
会直接写所有的Partitioner 在自己的实时计算程序应用
distributeDataStream.addSink(new FlinkKafkaProducer09<String>("localhost:9092", "my-topic", new SimpleStringSchema()));
.IllegalStateException: Previously tracked partitions [kafka.topic-0] been revoked by Kafka ??
.IllegalStateException: Previously tracked partitions [kafka.topic-0] been revoked by Kafka because of consumer rebalance. This is mostly due to another stream with same group id joined, please check if there''re different streaming application misconfigure to use same group id. Fundamentally different stream should use different group id
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.latestOffsets(DirectKafkaInputDStream.scala:200)
at org.apache.spark.streaming.kafka010.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:228)
at org.apache.spark.streaming.dstream.DStream
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
at org.apache.spark.streaming.dstream.DStream
at org.apache.spark.streaming.dstream.DStream
at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:416)
at org.apache.spark.streaming.dstream.DStream