GVKun编程网logo

org.apache.kafka.clients.consumer.ConsumerRebalanceListener的实例源码(apachekafka源码剖析)

36

对于想了解org.apache.kafka.clients.consumer.ConsumerRebalanceListener的实例源码的读者,本文将是一篇不可错过的文章,我们将详细介绍apache

对于想了解org.apache.kafka.clients.consumer.ConsumerRebalanceListener的实例源码的读者,本文将是一篇不可错过的文章,我们将详细介绍apachekafka源码剖析,并且为您提供关于Apache Kafka(八)- Kafka Delivery Semantics for Consumers、apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题、CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti、FlinkKafkaConsumer和版本控制的消费者FlinkKafkaConsumer09 / FlinkKafkaConsumer010 / FlinkKafkaConsumer011之间的区别的有价值信息。

本文目录一览:

org.apache.kafka.clients.consumer.ConsumerRebalanceListener的实例源码(apachekafka源码剖析)

org.apache.kafka.clients.consumer.ConsumerRebalanceListener的实例源码(apachekafka源码剖析)

项目:Lagerta    文件:ProxyMockConsumer.java   
@Override
public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener) {
    super.subscribe(topics,listener);
    Map<TopicPartition,Long> offsets = topics
            .stream()
            .flatMap(
                    topic -> IntStream
                            .range(0,KafkaMockFactory.NUMBER_OF_PARTITIONS)
                            .mapToObj(i -> new TopicPartition(topic,i)))
            .collect(Collectors.toMap(Function.identity(),topicPartition -> 0L));
    rebalance(offsets.keySet());
    updateBeginningOffsets(offsets);
    updateEndOffsets(offsets);
}
项目:Lagerta    文件:ConsumerProxyRetry.java   
@Override
public void subscribe(final Collection<String> topics,final ConsumerRebalanceListener callback) {
    Retries.tryMe(new Runnable() {
        @Override
        public void run() {
            inner.subscribe(topics,callback);
        }
    },strategy());
}
项目:Lagerta    文件:ConsumerProxyRetry.java   
@Override
public void subscribe(final Pattern pattern,final ConsumerRebalanceListener callback) {
    Retries.tryMe(new Runnable() {
        @Override
        public void run() {
            inner.subscribe(pattern,strategy());
}
项目:Lagerta    文件:ProxyMockConsumer.java   
@Override public void subscribe(Collection<String> topics,Long> offsets = new HashMap<>();
    for (String topic : topics) {
        TopicPartition partition1 = new TopicPartition(topic,0);
        offsets.put(partition1,0L);

        TopicPartition partition2 = new TopicPartition(topic,1);
        offsets.put(partition2,0L);
    }
    rebalance(offsets.keySet());
    updateBeginningOffsets(offsets);
    updateEndOffsets(offsets);
}
项目:kafka-0.11.0.0-src-with-comment    文件:SubscriptionState.java   
public void subscribe(Pattern pattern,ConsumerRebalanceListener listener) {
    if (listener == null)
        throw new IllegalArgumentException("RebalanceListener cannot be null");

    setSubscriptionType(SubscriptionType.AUTO_PATTERN);

    this.listener = listener;
    this.subscribedPattern = pattern;
}
项目:likafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public void subscribe(Collection<String> topics,ConsumerRebalanceListener callback) {
  Set<String> newSubscription = new HashSet<>(topics);
  // Todo: This is a hot fix for KAFKA-3664 and should be removed after the issue is fixed.
  commitSync();
  for (TopicPartition tp : _kafkaConsumer.assignment()) {
    if (!newSubscription.contains(tp.topic())) {
      _consumerRecordsProcessor.clear(tp);
    }
  }
  _consumerRebalanceListener.setUserListener(callback);
  _kafkaConsumer.subscribe(new ArrayList<>(topics),_consumerRebalanceListener);
}
项目:likafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public void subscribe(Pattern pattern,ConsumerRebalanceListener callback) {
  if (callback != null) {
    _consumerRebalanceListener.setUserListener(callback);
  }
  _kafkaConsumer.subscribe(pattern,_consumerRebalanceListener);
}
项目:kafka    文件:SubscriptionState.java   
public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener) {
    if (listener == null)
        throw new IllegalArgumentException("RebalanceListener cannot be null");

    if (!this.userAssignment.isEmpty() || this.subscribedPattern != null)
        throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);

    this.listener = listener;

    changeSubscription(topics);
}
项目:kafka    文件:SubscriptionState.java   
public void subscribe(Pattern pattern,ConsumerRebalanceListener listener) {
    if (listener == null)
        throw new IllegalArgumentException("RebalanceListener cannot be null");

    if (!this.subscription.isEmpty() || !this.userAssignment.isEmpty())
        throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE);

    this.listener = listener;
    this.subscribedPattern = pattern;
}
项目:li-apache-kafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public void subscribe(Collection<String> topics,_consumerRebalanceListener);
}
项目:li-apache-kafka-clients    文件:LiKafkaConsumerImpl.java   
@Override
public void subscribe(Pattern pattern,_consumerRebalanceListener);
}
项目:java-kafka-client    文件:TracingKafkaConsumer.java   
@Override
public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener) {
  consumer.subscribe(topics,listener);
}
项目:java-kafka-client    文件:TracingKafkaConsumer.java   
@Override
public void subscribe(Pattern pattern,ConsumerRebalanceListener listener) {
  consumer.subscribe(pattern,listener);
}
项目:Lagerta    文件:ConsumerForTests.java   
@Override public void subscribe(Collection<String> topics,ConsumerRebalanceListener callback) {
    activeConsumer().subscribe(topics,callback);
}
项目:Lagerta    文件:ConsumerForTests.java   
@Override public void subscribe(Pattern pattern,ConsumerRebalanceListener callback) {
    activeConsumer().subscribe(pattern,callback);
}
项目:Lagerta    文件:ConsumerAdapter.java   
@Override public void subscribe(Collection<String> topics,ConsumerRebalanceListener callback) {
}
项目:Lagerta    文件:ConsumerAdapter.java   
@Override public void subscribe(Pattern pattern,ConsumerRebalanceListener callback) {
}
项目:kafka-0.11.0.0-src-with-comment    文件:SubscriptionState.java   
public ConsumerRebalanceListener listener() {
    return listener;
}
项目:rmap    文件:IndexingConsumer.java   
public ConsumerRebalanceListener getRebalanceListener() {
    return rebalanceListener;
}
项目:likafka-clients    文件:LiKafkaConsumerRebalanceListener.java   
public void setUserListener(ConsumerRebalanceListener userListener) {
  _userListener = userListener;
}
项目:kafka    文件:SubscriptionState.java   
public ConsumerRebalanceListener listener() {
    return listener;
}
项目:li-apache-kafka-clients    文件:LiKafkaConsumerRebalanceListener.java   
public void setUserListener(ConsumerRebalanceListener userListener) {
  _userListener = userListener;
}
项目:kafka-0.11.0.0-src-with-comment    文件:SubscriptionState.java   
public void subscribe(Set<String> topics,ConsumerRebalanceListener listener) {
    if (listener == null)
        throw new IllegalArgumentException("RebalanceListener cannot be null");

    setSubscriptionType(SubscriptionType.AUTO_TOPICS);

    this.listener = listener;

    changeSubscription(topics);
}
项目:likafka-clients    文件:LiKafkaConsumer.java   
/**
 * Subscribe to the given list of topics to get dynamically
 * assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current
 * assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management
 * with manual partition assignment through {@link #assign(Collection)}.
 * <p>
 * If the given list of topics is empty,it is treated the same as {@link #unsubscribe()}.
 * <p>
 * As part of group management,the consumer will keep track of the list of consumers that belong to a particular
 * group and will trigger a rebalance operation if one of the following events trigger -
 * <ul>
 * <li>Number of partitions change for any of the subscribed list of topics
 * <li>Topic is created or deleted
 * <li>An existing member of the consumer group dies
 * <li>A new member is added to an existing consumer group via the join API
 * </ul>
 * <p>
 * When any of these events are triggered,the provided listener will be invoked first to indicate that
 * the consumer's assignment has been revoked,and then again when the new assignment has been received.
 * Note that this listener will immediately override any listener set in a prevIoUs call to subscribe.
 * It is guaranteed,however,that the partitions revoked/assigned through this interface are from topics
 * subscribed in this call. See {@link ConsumerRebalanceListener} for more details.
 * <p>
 * In order to support large message,the consumer tracks all the consumed messages for each partition. When the
 * user no longer subscribes to a new set of topics,the consumer will discard all the tracked messages of the
 * partitions of that topic.
 *
 * @param topics   The list of topics to subscribe to
 * @param callback Non-null listener instance to get notifications on partition assignment/revocation for the
 *                 subscribed topics
 */
@InterfaceOrigin.ApacheKafka
void subscribe(Collection<String> topics,ConsumerRebalanceListener callback);
项目:likafka-clients    文件:LiKafkaConsumer.java   
/**
 * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will
 * be done periodically against topics existing at the time of check.
 * <p>
 * As part of group management,the consumer will keep track of the list of consumers that
 * belong to a particular group and will trigger a rebalance operation if one of the
 * following events trigger -
 * <ul>
 * <li>Number of partitions change for any of the subscribed list of topics
 * <li>Topic is created or deleted
 * <li>An existing member of the consumer group dies
 * <li>A new member is added to an existing consumer group via the join API
 * </ul>
 * <p>
 * In order to support large message,the consumer will discard all the tracked messages of the
 * partitions of that topic.
 *
 * @param pattern Pattern to subscribe to
 */
@InterfaceOrigin.ApacheKafka
void subscribe(Pattern pattern,ConsumerRebalanceListener callback);
项目:li-apache-kafka-clients    文件:LiKafkaConsumer.java   
/**
 * Subscribe to the given list of topics to get dynamically
 * assigned partitions. <b>Topic subscriptions are not incremental. This list will replace the current
 * assignment (if there is one).</b> Note that it is not possible to combine topic subscription with group management
 * with manual partition assignment through {@link #assign(Collection)}.
 * <p>
 * If the given list of topics is empty,ConsumerRebalanceListener callback);
项目:li-apache-kafka-clients    文件:LiKafkaConsumer.java   
/**
 * Subscribe to all topics matching specified pattern to get dynamically assigned partitions. The pattern matching will
 * be done periodically against topics existing at the time of check.
 * <p>
 * As part of group management,ConsumerRebalanceListener callback);

Apache Kafka(八)- Kafka Delivery Semantics for Consumers

Apache Kafka(八)- Kafka Delivery Semantics for Consumers

Kafka Delivery Semantics

在Kafka Consumer中,有3种delivery semantics,分别为:至多一次(at most once)、至少一次(at least once)、以及准确一次(exactly once),下面我们分别介绍这3种Delivery 语义。

 

1. At Most Once

在message batch在被consumer接收后,立即commit offsets。此时若是在消息处理逻辑中出现异常,则未被处理的消息会丢失(不会再次被读取)。

此场景一个例子如下图:

 

 

此例流程如下:

  1. Consumer读一个batch的消息
  2. 在接收到消息后,Consumer commits offsets
  3. Consumer 处理数据,例如发送邮件,但是此时一个batch中的最后两条消息由于consumer异常宕机而未被正常处理
  4. Consumer 重启并重新开始读数据。但是此时由于已经committed offset,所以consumer会在最新的offset处读一个batch的消息,之前上一个batch中由于异常而未被处理的消息会丢失

所以at most once 会有丢失数据的风险,但若是应用可以承受丢失数据的风险,则可以使用此方式。

 

2. At Least Once

在消息被consumer接收并处理后,offsets才被 commit。若是在消息处理时发生异常,则消息会被重新消费。也就是说,会导致消息被重复处理。

At Least Once 是默认使用的语义,在这种情况下,需要保证应用是idempotent 类型(处理重复的消息不会对应用产生影响)。

此场景一个例子如下:

 

 

此示例流程如下:

  1. Consumer 读一个batch的消息
  2. 在接收到消息并正常处理
  3. 在consumer 正常处理消息完毕后,commits offset
  4. 继续读并处理下一个batch 的消息。若是在此过程中发生异常(例如consumer 重启),则consumer会从最近的 offset 开始读一个batch的消息并处理。所以此时会导致有重复消息被处理(此例中为4263、4264、4265)

 

3. Exactly once

此语义较难实现,在kafka中仅能在Kafka => Kafka的工作流中,通过使用Kafka Stream API 实现。对于Kafka => Sink 的工作流,请使用 idempotent consumer。

对于大部分应用程序,我们应使用at least once processing,并确保consumer端的transformation/processing 是idempotent类型。

 

4. 构建 idempotent consumer

一个idempotent consumer可以在处理重复消息时,不影响整个应用的逻辑。在ElasticSearch 中,通过一个_id 字段唯一识别一条消息。所以在这个场景下,为了实现idempotent consumer,我们需要对同样_id字段的消息做同样的处理。

之前给出的Elastic Search Consumer的例子中,每条消息的 _id 都是默认随机生成的,也就是说:若是处理之前重复的消息,生成的id也是一条新的随机_id,此行为不符合一个idempotent consumer。对此,我们可以自定义一个­_id 模式,修改代码如下:

// poll for new data
while(true){
    ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMinutes(100));

    for(ConsumerRecord record : records) {

        // construct a kafka generic ID
        String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();

        // where we insert data into ElasticSearch
        IndexRequest indexRequest = new IndexRequest(
                "kafkademo"
        ).id(kafka_generic_id).source(record.value(), XContentType.JSON);

        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
        String id = indexResponse.getId();

        logger.info(id);

        try {
            Thread.sleep(1000); // introduce a small delay
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    }

 

打印出id结果为:

 

 

可以看到新的 id 由 kafka topic + partition + offset 这3 部分组成,可以唯一定位一个 record。所以即使重复处理一条record,它发往 ElasticSearch 的 id 也是一样的(即处理逻辑一样)。在这个场景下,即为一个imdepotent consumer。

 

apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题

apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题

build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign (Ljava/util/Collection;) 的问题。

环境 CDH5.7.6,kylin2.1.0-bin-cdh57

kafka 版本:(官网要求是 0.10.0+)

报错如下:

2018-10-26 14:30:59,065 DEBUG [http-bio-7070-exec-9] kafka.KafkaSource:83 : Last segment doesn''t exist, and didn''t initiate the start offset, will seek from topic''s earliest offset.
2018-10-26 14:30:59,068 INFO  [http-bio-7070-exec-9] consumer.ConsumerConfig:171 : ConsumerConfig values:
    metric.reporters = []
    metadata.max.age.ms = 300000
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    group.id = kylin_streaming_cube
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 1048576
    bootstrap.servers = [master:9092]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    enable.auto.commit = false
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = null
    session.timeout.ms = 30000
    metrics.num.samples = 2
    client.id =
    ssl.endpoint.identification.algorithm = null
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    ssl.protocol = TLS
    check.crcs = true
    request.timeout.ms = 40000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 65536
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = PLAINTEXT
    ssl.truststore.location = null
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    auto.offset.reset = latest

2018-10-26 14:30:59,075 INFO  [http-bio-7070-exec-9] utils.AppInfoParser:83 : Kafka version : 0.9.0-kafka-2.0.1
2018-10-26 14:30:59,075 INFO  [http-bio-7070-exec-9] utils.AppInfoParser:84 : Kafka commitId : unknown
2018-10-26 14:30:59,193 ERROR [http-bio-7070-exec-9] controller.CubeController:305 : org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
    at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffset(KafkaClient.java:84)
    at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffsets(KafkaClient.java:127)
    at org.apache.kylin.source.kafka.KafkaSource.enrichSourcePartitionBeforeBuild(KafkaSource.java:84)
    at org.apache.kylin.rest.service.JobService.submitJobInternal(JobService.java:236)
    at org.apache.kylin.rest.service.JobService.submitJob(JobService.java:208)
    at org.apache.kylin.rest.service.JobServiceFastClassBySpringCGLIB

83a44b2a.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:68)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
    at org.apache.kylin.rest.service.JobServiceEnhancerBySpringCGLIB
9b79b224.submitJob(<generated>)
    at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:302)
    at org.apache.kylin.rest.controller.CubeController.rebuild2(CubeController.java:290)
    at org.apache.kylin.rest.controller.CubeController.build2(CubeController.java:283)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:832)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:743)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:961)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:895)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:967)
    at org.springframework.web.servlet.FrameworkServlet.doPut(FrameworkServlet.java:880)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:653)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:843)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:316)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:122)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:169)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:48)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.www.BasicAuthenticationFilter.doFilterInternal(BasicAuthenticationFilter.java:213)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:205)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:120)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:53)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:91)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:213)
    at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:176)
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:209)
    at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:244)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:505)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:169)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
    at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:436)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1078)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:625)
    at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:318)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:748)
2018-10-26 14:30:59,196 ERROR [http-bio-7070-exec-9] controller.BasicController:57 :
org.apache.kylin.rest.exception.InternalErrorException: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
    at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:306)
    at org.apache.kylin.rest.controller.CubeController.rebuild2(CubeController.java:290)
    at org.apache.kylin.rest.controller.CubeController.build2(CubeController.java:283)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:832)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:743)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:961)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:895)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:967)
    at org.springframework.web.servlet.FrameworkServlet.doPut(FrameworkServlet.java:880)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:653)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:843)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:316)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:122)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:169)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:48)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.www.BasicAuthenticationFilter.doFilterInternal(BasicAuthenticationFilter.java:213)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:205)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:120)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:53)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:91)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:213)
    at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:176)
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:209)
    at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:244)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:505)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:169)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
    at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:436)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1078)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:625)
    at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:318)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
    at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffset(KafkaClient.java:84)
    at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffsets(KafkaClient.java:127)
    at org.apache.kylin.source.kafka.KafkaSource.enrichSourcePartitionBeforeBuild(KafkaSource.java:84)
    at org.apache.kylin.rest.service.JobService.submitJobInternal(JobService.java:236)
    at org.apache.kylin.rest.service.JobService.submitJob(JobService.java:208)
    at org.apache.kylin.rest.service.JobServiceFastClassBySpringCGLIB
83a44b2a.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:68)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
    at org.apache.kylin.rest.service.JobServiceEnhancerBySpringCGLIB
9b79b224.submitJob(<generated>)
    at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:302)
    ... 78 more

 

请问各位,该如何解决这个问题呢?

CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti

CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti

参考文章:

flume kafka sparkstreaming整合后集群报错org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/ut

https://blog.csdn.net/u010936936/article/details/77247075?locationNum=2&fps=1

 

   最近在使用CDH 环境 提交 Kafka-Spark Streaming 作业的时候遇到了一些问题,特此记录如下:

主要的报错信息为:
 

Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;

 

Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;

 

并且在报错之前,可以看到 kafka-client 为 0.9.0 版本

 

18/11/01 16:48:04 INFO AppInfoParser: Kafka version : 0.9.0.0
18/11/01 16:48:04 INFO AppInfoParser: Kafka commitId : cb8625948210849f
 

但是,我们查看打的包高于此版本。

 

原因分析
      其实这个在官方文档中有介绍。地址如下:
https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html#running_jobs
       简单说,就是kafka集成spark2,需要在CDH中进行设置。官网介绍了2中方法。这里我采用了第二种,在CDH中进行修改配置的方法。

步骤如下:

1.进入CDH的spark2配置界面,在搜索框中输入SPARK_KAFKA_VERSION,出现如下图,

2.然后选择对应版本,这里我应该选择的是None,

    即 : 选用集群上传的Kafka-client 版本 !!

3.然后保存配置,重启生效。

4.重新跑sparkstreaming任务,问题解决。
 

 

 

 

FlinkKafkaConsumer和版本控制的消费者FlinkKafkaConsumer09 / FlinkKafkaConsumer010 / FlinkKafkaConsumer011之间的区别

FlinkKafkaConsumer和版本控制的消费者FlinkKafkaConsumer09 / FlinkKafkaConsumer010 / FlinkKafkaConsumer011之间的区别

版本化的Kafka使用者(和生产者)是针对那些版本的Kafka客户端构建的,旨在与每个特定版本的Kafka一起使用。未版本化的连接器FlinkKafkaConsumer和FlinkKafkaProducer使用通用客户端库构建,并且与自0.10开始的所有版本的Kafka兼容。

请注意,在Flink 1.11中删除了Kafka 0.8和0.9的版本化使用者和生产者,而在Flink 1.12(https://issues.apache.org/jira/browse/FLINK-19152)中则删除了0.10和0.11版本。

编辑:

在某些情况下,仅允许来自水槽的背压来节流水源就足够了。但是在其他情况下(例如多个来源),效果可能不够好。

您将在http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Kafka-Rate-Limit-in-FlinkConsumer-td36417.html上找到有关此问题的讨论以及进行自己的速率限制的代码。

我们今天的关于org.apache.kafka.clients.consumer.ConsumerRebalanceListener的实例源码apachekafka源码剖析的分享就到这里,谢谢您的阅读,如果想了解更多关于Apache Kafka(八)- Kafka Delivery Semantics for Consumers、apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题、CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti、FlinkKafkaConsumer和版本控制的消费者FlinkKafkaConsumer09 / FlinkKafkaConsumer010 / FlinkKafkaConsumer011之间的区别的相关信息,可以在本站进行搜索。

本文标签:

上一篇cpw.mods.fml.common.network.ByteBufUtils的实例源码(fwknop 源码解析)

下一篇微信小程序开发费用有哪些?(微信小程序开发费用有哪些项目)