对于想了解org.apache.kafka.clients.consumer.ConsumerRebalanceListener的实例源码的读者,本文将是一篇不可错过的文章,我们将详细介绍apache
对于想了解org.apache.kafka.clients.consumer.ConsumerRebalanceListener的实例源码的读者,本文将是一篇不可错过的文章,我们将详细介绍apachekafka源码剖析,并且为您提供关于Apache Kafka(八)- Kafka Delivery Semantics for Consumers、apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题、CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti、FlinkKafkaConsumer和版本控制的消费者FlinkKafkaConsumer09 / FlinkKafkaConsumer010 / FlinkKafkaConsumer011之间的区别的有价值信息。
本文目录一览:- org.apache.kafka.clients.consumer.ConsumerRebalanceListener的实例源码(apachekafka源码剖析)
- Apache Kafka(八)- Kafka Delivery Semantics for Consumers
- apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题
- CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti
- FlinkKafkaConsumer和版本控制的消费者FlinkKafkaConsumer09 / FlinkKafkaConsumer010 / FlinkKafkaConsumer011之间的区别
org.apache.kafka.clients.consumer.ConsumerRebalanceListener的实例源码(apachekafka源码剖析)
@Override public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener) { super.subscribe(topics,listener); Map<TopicPartition,Long> offsets = topics .stream() .flatMap( topic -> IntStream .range(0,KafkaMockFactory.NUMBER_OF_PARTITIONS) .mapToObj(i -> new TopicPartition(topic,i))) .collect(Collectors.toMap(Function.identity(),topicPartition -> 0L)); rebalance(offsets.keySet()); updateBeginningOffsets(offsets); updateEndOffsets(offsets); }
@Override public void subscribe(final Collection<String> topics,final ConsumerRebalanceListener callback) { Retries.tryMe(new Runnable() { @Override public void run() { inner.subscribe(topics,callback); } },strategy()); }
@Override public void subscribe(final Pattern pattern,final ConsumerRebalanceListener callback) { Retries.tryMe(new Runnable() { @Override public void run() { inner.subscribe(pattern,strategy()); }
@Override public void subscribe(Collection<String> topics,Long> offsets = new HashMap<>(); for (String topic : topics) { TopicPartition partition1 = new TopicPartition(topic,0); offsets.put(partition1,0L); TopicPartition partition2 = new TopicPartition(topic,1); offsets.put(partition2,0L); } rebalance(offsets.keySet()); updateBeginningOffsets(offsets); updateEndOffsets(offsets); }
public void subscribe(Pattern pattern,ConsumerRebalanceListener listener) { if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); setSubscriptionType(SubscriptionType.AUTO_PATTERN); this.listener = listener; this.subscribedPattern = pattern; }
@Override public void subscribe(Collection<String> topics,ConsumerRebalanceListener callback) { Set<String> newSubscription = new HashSet<>(topics); // Todo: This is a hot fix for KAFKA-3664 and should be removed after the issue is fixed. commitSync(); for (TopicPartition tp : _kafkaConsumer.assignment()) { if (!newSubscription.contains(tp.topic())) { _consumerRecordsProcessor.clear(tp); } } _consumerRebalanceListener.setUserListener(callback); _kafkaConsumer.subscribe(new ArrayList<>(topics),_consumerRebalanceListener); }
@Override public void subscribe(Pattern pattern,ConsumerRebalanceListener callback) { if (callback != null) { _consumerRebalanceListener.setUserListener(callback); } _kafkaConsumer.subscribe(pattern,_consumerRebalanceListener); }
public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener) { if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); if (!this.userAssignment.isEmpty() || this.subscribedPattern != null) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); this.listener = listener; changeSubscription(topics); }
public void subscribe(Pattern pattern,ConsumerRebalanceListener listener) { if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); if (!this.subscription.isEmpty() || !this.userAssignment.isEmpty()) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); this.listener = listener; this.subscribedPattern = pattern; }
@Override public void subscribe(Collection<String> topics,_consumerRebalanceListener); }
@Override public void subscribe(Pattern pattern,_consumerRebalanceListener); }
@Override public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener) { consumer.subscribe(topics,listener); }
@Override public void subscribe(Pattern pattern,ConsumerRebalanceListener listener) { consumer.subscribe(pattern,listener); }
@Override public void subscribe(Collection<String> topics,ConsumerRebalanceListener callback) { activeConsumer().subscribe(topics,callback); }
@Override public void subscribe(Pattern pattern,ConsumerRebalanceListener callback) { activeConsumer().subscribe(pattern,callback); }
@Override public void subscribe(Collection<String> topics,ConsumerRebalanceListener callback) { }
@Override public void subscribe(Pattern pattern,ConsumerRebalanceListener callback) { }
public ConsumerRebalanceListener listener() { return listener; }
public ConsumerRebalanceListener getRebalanceListener() { return rebalanceListener; }
public void setUserListener(ConsumerRebalanceListener userListener) { _userListener = userListener; }
public ConsumerRebalanceListener listener() { return listener; }
public void setUserListener(ConsumerRebalanceListener userListener) { _userListener = userListener; }
public void subscribe(Set<String> topics,ConsumerRebalanceListener listener) { if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); setSubscriptionType(SubscriptionType.AUTO_TOPICS); this.listener = listener; changeSubscription(topics); }
/** * Subscribe to the given list of topics to get dynamically * assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(Collection)}. * <p> * If the given list of topics is empty,it is treated the same as {@link #unsubscribe()}. * <p> * As part of group management,the consumer will keep track of the list of consumers that belong to a particular * group and will trigger a rebalance operation if one of the following events trigger - * <ul> * <li>Number of partitions change for any of the subscribed list of topics * <li>Topic is created or deleted * <li>An existing member of the consumer group dies * <li>A new member is added to an existing consumer group via the join API * </ul> * <p> * When any of these events are triggered,the provided listener will be invoked first to indicate that * the consumer's assignment has been revoked,and then again when the new assignment has been received. * Note that this listener will immediately override any listener set in a prevIoUs call to subscribe. * It is guaranteed,however,that the partitions revoked/assigned through this interface are from topics * subscribed in this call. See {@link ConsumerRebalanceListener} for more details. * <p> * In order to support large message,the consumer tracks all the consumed messages for each partition. When the * user no longer subscribes to a new set of topics,the consumer will discard all the tracked messages of the * partitions of that topic. * * @param topics The list of topics to subscribe to * @param callback Non-null listener instance to get notifications on partition assignment/revocation for the * subscribed topics */ @InterfaceOrigin.ApacheKafka void subscribe(Collection<String> topics,ConsumerRebalanceListener callback);
/** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will * be done periodically against topics existing at the time of check. * <p> * As part of group management,the consumer will keep track of the list of consumers that * belong to a particular group and will trigger a rebalance operation if one of the * following events trigger - * <ul> * <li>Number of partitions change for any of the subscribed list of topics * <li>Topic is created or deleted * <li>An existing member of the consumer group dies * <li>A new member is added to an existing consumer group via the join API * </ul> * <p> * In order to support large message,the consumer will discard all the tracked messages of the * partitions of that topic. * * @param pattern Pattern to subscribe to */ @InterfaceOrigin.ApacheKafka void subscribe(Pattern pattern,ConsumerRebalanceListener callback);
/** * Subscribe to the given list of topics to get dynamically * assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current * assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management * with manual partition assignment through {@link #assign(Collection)}. * <p> * If the given list of topics is empty,ConsumerRebalanceListener callback);
/** * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will * be done periodically against topics existing at the time of check. * <p> * As part of group management,ConsumerRebalanceListener callback);
Apache Kafka(八)- Kafka Delivery Semantics for Consumers
Kafka Delivery Semantics
在Kafka Consumer中,有3种delivery semantics,分别为:至多一次(at most once)、至少一次(at least once)、以及准确一次(exactly once),下面我们分别介绍这3种Delivery 语义。
1. At Most Once
在message batch在被consumer接收后,立即commit offsets。此时若是在消息处理逻辑中出现异常,则未被处理的消息会丢失(不会再次被读取)。
此场景一个例子如下图:
此例流程如下:
- Consumer读一个batch的消息
- 在接收到消息后,Consumer commits offsets
- Consumer 处理数据,例如发送邮件,但是此时一个batch中的最后两条消息由于consumer异常宕机而未被正常处理
- Consumer 重启并重新开始读数据。但是此时由于已经committed offset,所以consumer会在最新的offset处读一个batch的消息,之前上一个batch中由于异常而未被处理的消息会丢失
所以at most once 会有丢失数据的风险,但若是应用可以承受丢失数据的风险,则可以使用此方式。
2. At Least Once
在消息被consumer接收并处理后,offsets才被 commit。若是在消息处理时发生异常,则消息会被重新消费。也就是说,会导致消息被重复处理。
At Least Once 是默认使用的语义,在这种情况下,需要保证应用是idempotent 类型(处理重复的消息不会对应用产生影响)。
此场景一个例子如下:
此示例流程如下:
- Consumer 读一个batch的消息
- 在接收到消息并正常处理
- 在consumer 正常处理消息完毕后,commits offset
- 继续读并处理下一个batch 的消息。若是在此过程中发生异常(例如consumer 重启),则consumer会从最近的 offset 开始读一个batch的消息并处理。所以此时会导致有重复消息被处理(此例中为4263、4264、4265)
3. Exactly once
此语义较难实现,在kafka中仅能在Kafka => Kafka的工作流中,通过使用Kafka Stream API 实现。对于Kafka => Sink 的工作流,请使用 idempotent consumer。
对于大部分应用程序,我们应使用at least once processing,并确保consumer端的transformation/processing 是idempotent类型。
4. 构建 idempotent consumer
一个idempotent consumer可以在处理重复消息时,不影响整个应用的逻辑。在ElasticSearch 中,通过一个_id 字段唯一识别一条消息。所以在这个场景下,为了实现idempotent consumer,我们需要对同样_id字段的消息做同样的处理。
之前给出的Elastic Search Consumer的例子中,每条消息的 _id 都是默认随机生成的,也就是说:若是处理之前重复的消息,生成的id也是一条新的随机_id,此行为不符合一个idempotent consumer。对此,我们可以自定义一个_id 模式,修改代码如下:
// poll for new data
while(true){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMinutes(100));
for(ConsumerRecord record : records) {
// construct a kafka generic ID
String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
// where we insert data into ElasticSearch
IndexRequest indexRequest = new IndexRequest(
"kafkademo"
).id(kafka_generic_id).source(record.value(), XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
String id = indexResponse.getId();
logger.info(id);
try {
Thread.sleep(1000); // introduce a small delay
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
打印出id结果为:
可以看到新的 id 由 kafka topic + partition + offset 这3 部分组成,可以唯一定位一个 record。所以即使重复处理一条record,它发往 ElasticSearch 的 id 也是一样的(即处理逻辑一样)。在这个场景下,即为一个imdepotent consumer。
apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题
build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign (Ljava/util/Collection;) 的问题。
环境 CDH5.7.6,kylin2.1.0-bin-cdh57
kafka 版本:(官网要求是 0.10.0+)
报错如下:
2018-10-26 14:30:59,065 DEBUG [http-bio-7070-exec-9] kafka.KafkaSource:83 : Last segment doesn''t exist, and didn''t initiate the start offset, will seek from topic''s earliest offset.
2018-10-26 14:30:59,068 INFO [http-bio-7070-exec-9] consumer.ConsumerConfig:171 : ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = kylin_streaming_cube
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [master:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = latest
2018-10-26 14:30:59,075 INFO [http-bio-7070-exec-9] utils.AppInfoParser:83 : Kafka version : 0.9.0-kafka-2.0.1
2018-10-26 14:30:59,075 INFO [http-bio-7070-exec-9] utils.AppInfoParser:84 : Kafka commitId : unknown
2018-10-26 14:30:59,193 ERROR [http-bio-7070-exec-9] controller.CubeController:305 : org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffset(KafkaClient.java:84)
at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffsets(KafkaClient.java:127)
at org.apache.kylin.source.kafka.KafkaSource.enrichSourcePartitionBeforeBuild(KafkaSource.java:84)
at org.apache.kylin.rest.service.JobService.submitJobInternal(JobService.java:236)
at org.apache.kylin.rest.service.JobService.submitJob(JobService.java:208)
at org.apache.kylin.rest.service.JobService
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:68)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
at org.apache.kylin.rest.service.JobService
at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:302)
at org.apache.kylin.rest.controller.CubeController.rebuild2(CubeController.java:290)
at org.apache.kylin.rest.controller.CubeController.build2(CubeController.java:283)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:832)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:743)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:961)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:895)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:967)
at org.springframework.web.servlet.FrameworkServlet.doPut(FrameworkServlet.java:880)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:653)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:843)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:316)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:122)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:169)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:48)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.www.BasicAuthenticationFilter.doFilterInternal(BasicAuthenticationFilter.java:213)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:205)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:120)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:53)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:91)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:213)
at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:176)
at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:209)
at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:244)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:505)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:169)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:436)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1078)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:625)
at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:318)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
2018-10-26 14:30:59,196 ERROR [http-bio-7070-exec-9] controller.BasicController:57 :
org.apache.kylin.rest.exception.InternalErrorException: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:306)
at org.apache.kylin.rest.controller.CubeController.rebuild2(CubeController.java:290)
at org.apache.kylin.rest.controller.CubeController.build2(CubeController.java:283)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:832)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:743)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:961)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:895)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:967)
at org.springframework.web.servlet.FrameworkServlet.doPut(FrameworkServlet.java:880)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:653)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:843)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:316)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:122)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:169)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:48)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.www.BasicAuthenticationFilter.doFilterInternal(BasicAuthenticationFilter.java:213)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:205)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:120)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:53)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:91)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:213)
at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:176)
at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:209)
at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:244)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:505)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:169)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:436)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1078)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:625)
at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:318)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffset(KafkaClient.java:84)
at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffsets(KafkaClient.java:127)
at org.apache.kylin.source.kafka.KafkaSource.enrichSourcePartitionBeforeBuild(KafkaSource.java:84)
at org.apache.kylin.rest.service.JobService.submitJobInternal(JobService.java:236)
at org.apache.kylin.rest.service.JobService.submitJob(JobService.java:208)
at org.apache.kylin.rest.service.JobService
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:68)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
at org.apache.kylin.rest.service.JobService
at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:302)
... 78 more
请问各位,该如何解决这个问题呢?
CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti
参考文章:
flume kafka sparkstreaming整合后集群报错org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/ut
https://blog.csdn.net/u010936936/article/details/77247075?locationNum=2&fps=1
最近在使用CDH 环境 提交 Kafka-Spark Streaming 作业的时候遇到了一些问题,特此记录如下:
主要的报错信息为:
Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;
Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;
并且在报错之前,可以看到 kafka-client 为 0.9.0 版本
18/11/01 16:48:04 INFO AppInfoParser: Kafka version : 0.9.0.0
18/11/01 16:48:04 INFO AppInfoParser: Kafka commitId : cb8625948210849f
但是,我们查看打的包高于此版本。
原因分析
其实这个在官方文档中有介绍。地址如下:
https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html#running_jobs
简单说,就是kafka集成spark2,需要在CDH中进行设置。官网介绍了2中方法。这里我采用了第二种,在CDH中进行修改配置的方法。
步骤如下:
1.进入CDH的spark2配置界面,在搜索框中输入SPARK_KAFKA_VERSION,出现如下图,
2.然后选择对应版本,这里我应该选择的是None,
即 : 选用集群上传的Kafka-client 版本 !!
3.然后保存配置,重启生效。
4.重新跑sparkstreaming任务,问题解决。
FlinkKafkaConsumer和版本控制的消费者FlinkKafkaConsumer09 / FlinkKafkaConsumer010 / FlinkKafkaConsumer011之间的区别
版本化的Kafka使用者(和生产者)是针对那些版本的Kafka客户端构建的,旨在与每个特定版本的Kafka一起使用。未版本化的连接器FlinkKafkaConsumer和FlinkKafkaProducer使用通用客户端库构建,并且与自0.10开始的所有版本的Kafka兼容。
请注意,在Flink 1.11中删除了Kafka 0.8和0.9的版本化使用者和生产者,而在Flink 1.12(https://issues.apache.org/jira/browse/FLINK-19152)中则删除了0.10和0.11版本。
编辑:
在某些情况下,仅允许来自水槽的背压来节流水源就足够了。但是在其他情况下(例如多个来源),效果可能不够好。
您将在http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Rate-Limit-in-FlinkConsumer-td36417.html上找到有关此问题的讨论以及进行自己的速率限制的代码。
我们今天的关于org.apache.kafka.clients.consumer.ConsumerRebalanceListener的实例源码和apachekafka源码剖析的分享就到这里,谢谢您的阅读,如果想了解更多关于Apache Kafka(八)- Kafka Delivery Semantics for Consumers、apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题、CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti、FlinkKafkaConsumer和版本控制的消费者FlinkKafkaConsumer09 / FlinkKafkaConsumer010 / FlinkKafkaConsumer011之间的区别的相关信息,可以在本站进行搜索。
本文标签: