GVKun编程网logo

Spring Kafka:kafkaTemplate executeInTransaction 方法如何发挥 Consumer 的 read_committed 隔离级别 我的具体场景我的困惑是

22

在本文中,我们将给您介绍关于SpringKafka:kafkaTemplateexecuteInTransaction方法如何发挥Consumer的read_committed隔离级别我的具体场景我的

在本文中,我们将给您介绍关于Spring Kafka:kafkaTemplate executeInTransaction 方法如何发挥 Consumer 的 read_committed 隔离级别 我的具体场景我的困惑是的详细内容,此外,我们还将为您提供关于Apache Kafka(九)- Kafka Consumer 消费行为、Apache Kafka(八)- Kafka Delivery Semantics for Consumers、apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题、CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti的知识。

本文目录一览:

Spring Kafka:kafkaTemplate executeInTransaction 方法如何发挥 Consumer 的 read_committed 隔离级别 我的具体场景我的困惑是

Spring Kafka:kafkaTemplate executeInTransaction 方法如何发挥 Consumer 的 read_committed 隔离级别 我的具体场景我的困惑是

如何解决Spring Kafka:kafkaTemplate executeInTransaction 方法如何发挥 Consumer 的 read_committed 隔离级别 我的具体场景我的困惑是?

我看到 isolation.level=read_committed 消费者属性确保消费者只会读取已提交的消息。我试图理解在这种情况下 committed message 究竟是什么意思?我们什么时候可以说生产者的消息已提交给主题?

我的具体场景

我正在使用 Spring-kafka,kafkaTemplate.executeInTransaction 方法异步向 kafka 发送消息。

我看到 executeInTransaction 方法在内部调用 producer.commitTransaction(),如果它无法在 max.block.ms 内完成,则会抛出异常

我的困惑是

如果 producer.commitTransaction()max.block.ms 内完成,是否表示消息已成功存储在主题中,准备好让具有 isolation.level=read_committed 的消费者消费?

我问这个是因为我看到还有另一个属性 delivery.timeout.ms 对应于在 send()/max.block.ms 完成后启动的进程。

所以..这是否意味着即使在 producer.commitTransaction() 返回之后,我们仍然需要等待最多 delivery.timeout.ms 才能确定消息已写入主题?

解决方法

没有;当提交成功时,记录在日志中是安全的。

Apache Kafka(九)- Kafka Consumer 消费行为

Apache Kafka(九)- Kafka Consumer 消费行为

1. Poll Messages

Kafka Consumer 中消费messages时,使用的是poll模型,也就是主动去Kafka端取数据。其他消息管道也有的是push模型,也就是服务端向consumer推送数据,consumer仅需等待即可。

Kafka Consumerpoll模型使得consumer可以控制从log的指定offset去消费数据、消费数据的速度、以及replay events的能力。

Kafka Consumer poll模型工作如下图:

 

 

  • ·       Consumer 调用.poll(Duration timeout) 方法,向broker请求数据
  • ·       若是broker端有数据则立即返回;否则在timeout时间后返回empty

 

我们可以通过参数控制 Kafka Consumer 行为,主要有:

  • ·       Fetch.min.bytes(默认值是1

o   控制在每个请求中,至少拉取多少数据

o   增加此参数可以提高吞吐并降低请求的数目,但是代价是增加延时

 

  • ·       Max.poll.records(默认是500

o   控制在每个请求中,接收多少条records

o   如果消息普遍都比较小而consumer端又有较大的内存,则可以考虑增大此参数

o   最好是监控在每个请求中poll了多少条消息

 

  • ·       Max.partitions.fetch.bytes(默认为1MB

o   Broker中每个partition可返回的最多字节

o   如果目标端有100多个partitions,则需要较多内存

 

  • ·       Fetch.max.bytes(默认50MB

o   对每个fetch 请求,可以返回的最大数据量(一个fetch请求可以覆盖多个partitions

o   Consumer并行执行多个fetch操作

 

默认情况下,一般不建议手动调整以上参数,除非我们的consumer已经达到了默认配置下的最高的吞吐,且需要达到更高的吞吐。

 

2. Consumer Offset Commit 策略

在一个consumer 应用中,有两种常见的committing offsets的策略,分别为:

  • ·       (较为简单)enable.auto.commit = true:自动commit offsets,但必须使用同步的方式处理数据
  • ·       (进阶)enable.auto.commit = false:手动commit offsets

 

在设置enable.auto.commit = true时,考虑以下代码:

while(true) {
     List<Records> batch = consumer.poll(Duration.ofMillis(100));
     doSomethingSynchronous(batch);
 }

 

一个Consumer 每隔100ms poll一次消息,然后以同步地方式处理这个batch的数据。此时offsets 会定期自动被commit,此定期时间由 auto.commit.interval.ms 决定,默认为 5000,也就是在每次调用 .poll() 方法 5 秒后,会自动commit offsets

但是如果在处理数据时用的是异步的方式,则会导致“at-most-once”的行为。因为offsets可能会在数据被处理前就被commit

所以对于新手来说,使用 enable.auto.commit = true 可能是有风险的,所以不建议一开始就使用这种方式

 

若设置 enable.auto.commit = false,考虑以下代码:

while(true) {
     List<Records> batch = consumer.poll(Duration.ofMillis(100));
     if isReady(batch){
         doSomethingSynchronous(batch);
         consumer.commitSync();
     }
 }

  

此例子明确指示了在同步地处理了数据后,再主动commit offsets。这样我们可以控制在什么条件下,去commit offsets。一个比较典型的场景为:将接收的数据读入缓存,然后flush 缓存到一个数据库中,最后再commit offsets

 

3. 手动Commit Offset 示例

首先我们关闭自动commit offsets

// disable auto commit of offsets
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

指定每个请求最多接收10records,便于测试:
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10"); 

添加以下代码逻辑:

public static void main(String[] args) throws IOException {
     Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
     RestHighLevelClient client = createClient();
 
     // create Kafka consumer
     KafkaConsumer<String, String> consumer = createConsumer("kafka_demo");
 
     // poll for new data
     while(true){
         ConsumerRecords<String, String> records =
                 consumer.poll(Duration.ofMinutes(100));
 
         logger.info("received " + records.count() + "records");
         for(ConsumerRecord record : records) {
 
             // construct a kafka generic ID
             String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
 
             // where we insert data into ElasticSearch
             IndexRequest indexRequest = new IndexRequest(
                     "kafkademo"
             ).id(kafka_generic_id).source(record.value(), XContentType.JSON);
 
             IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
             String id = indexResponse.getId();
 
             logger.info(id);
 
             try {
                 Thread.sleep(10); // introduce a small delay
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
 
         logger.info("Committing offsets...");
         consumer.commitSync();                      // commit offsets manually
         logger.info("Offsets have been committed");
 
         }
     }

这里我们在处理每次获取的10records后(也就是for 循环完整执行一次),手动执行一次offsets commit。打印日志记录为:

 

手动停止consumer 程序后,可以看到最后的committed offsets165

  

使用consumer-group cli 也可以验证当前committed offsets165

  

4. Performance Improvement using Batching

在这个例子中,consumer 限制每次poll 10条数据,然后每条依次处理(插入elastic search)。此方法效率较低,我们可以通过使用 batching 的方式增加吞吐。这里实现的方式是使用 elastic search API 提供的BulkRequest,基于之前的代码,修改如下:

public static void main(String[] args) throws IOException {
     Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
     RestHighLevelClient client = createClient();
 
     // create Kafka consumer
     KafkaConsumer<String, String> consumer = createConsumer("kafka_demo");
 
     // poll for new data
     while(true){
         ConsumerRecords<String, String> records =
                 consumer.poll(Duration.ofMinutes(100));
 
         // bulk request
         BulkRequest bulkRequest = new BulkRequest();
 
         logger.info("received " + records.count() + "records");
         for(ConsumerRecord record : records) {
 
             // construct a kafka generic ID
             String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
 
             // where we insert data into ElasticSearch
             IndexRequest indexRequest = new IndexRequest(
                     "kafkademo"
             ).id(kafka_generic_id).source(record.value(), XContentType.JSON);
 
             IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
 
             // add to our bulk request (takes no time)
             bulkRequest.add(indexRequest);
 
 
             //String id = indexResponse.getId();
             //logger.info(id);
 
             try {
                 Thread.sleep(10); // introduce a small delay
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
         }
 
         // bulk response
         BulkResponse bulkItemResponses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
 
         logger.info("Committing offsets...");
         consumer.commitSync();                      // commit offsets manually
         logger.info("Offsets have been committed");
 
         }
     }

   

可以看到,consumerpoll到记录后,并不会一条条的向elastic search 发送,而是将它们放入一个BulkRequest,并在for循环结束后发送。在发送完毕后,再手动commit offsets

 

执行结果为:

 

Apache Kafka(八)- Kafka Delivery Semantics for Consumers

Apache Kafka(八)- Kafka Delivery Semantics for Consumers

Kafka Delivery Semantics

在Kafka Consumer中,有3种delivery semantics,分别为:至多一次(at most once)、至少一次(at least once)、以及准确一次(exactly once),下面我们分别介绍这3种Delivery 语义。

 

1. At Most Once

在message batch在被consumer接收后,立即commit offsets。此时若是在消息处理逻辑中出现异常,则未被处理的消息会丢失(不会再次被读取)。

此场景一个例子如下图:

 

 

此例流程如下:

  1. Consumer读一个batch的消息
  2. 在接收到消息后,Consumer commits offsets
  3. Consumer 处理数据,例如发送邮件,但是此时一个batch中的最后两条消息由于consumer异常宕机而未被正常处理
  4. Consumer 重启并重新开始读数据。但是此时由于已经committed offset,所以consumer会在最新的offset处读一个batch的消息,之前上一个batch中由于异常而未被处理的消息会丢失

所以at most once 会有丢失数据的风险,但若是应用可以承受丢失数据的风险,则可以使用此方式。

 

2. At Least Once

在消息被consumer接收并处理后,offsets才被 commit。若是在消息处理时发生异常,则消息会被重新消费。也就是说,会导致消息被重复处理。

At Least Once 是默认使用的语义,在这种情况下,需要保证应用是idempotent 类型(处理重复的消息不会对应用产生影响)。

此场景一个例子如下:

 

 

此示例流程如下:

  1. Consumer 读一个batch的消息
  2. 在接收到消息并正常处理
  3. 在consumer 正常处理消息完毕后,commits offset
  4. 继续读并处理下一个batch 的消息。若是在此过程中发生异常(例如consumer 重启),则consumer会从最近的 offset 开始读一个batch的消息并处理。所以此时会导致有重复消息被处理(此例中为4263、4264、4265)

 

3. Exactly once

此语义较难实现,在kafka中仅能在Kafka => Kafka的工作流中,通过使用Kafka Stream API 实现。对于Kafka => Sink 的工作流,请使用 idempotent consumer。

对于大部分应用程序,我们应使用at least once processing,并确保consumer端的transformation/processing 是idempotent类型。

 

4. 构建 idempotent consumer

一个idempotent consumer可以在处理重复消息时,不影响整个应用的逻辑。在ElasticSearch 中,通过一个_id 字段唯一识别一条消息。所以在这个场景下,为了实现idempotent consumer,我们需要对同样_id字段的消息做同样的处理。

之前给出的Elastic Search Consumer的例子中,每条消息的 _id 都是默认随机生成的,也就是说:若是处理之前重复的消息,生成的id也是一条新的随机_id,此行为不符合一个idempotent consumer。对此,我们可以自定义一个­_id 模式,修改代码如下:

// poll for new data
while(true){
    ConsumerRecords<String, String> records =
            consumer.poll(Duration.ofMinutes(100));

    for(ConsumerRecord record : records) {

        // construct a kafka generic ID
        String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();

        // where we insert data into ElasticSearch
        IndexRequest indexRequest = new IndexRequest(
                "kafkademo"
        ).id(kafka_generic_id).source(record.value(), XContentType.JSON);

        IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
        String id = indexResponse.getId();

        logger.info(id);

        try {
            Thread.sleep(1000); // introduce a small delay
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    }

 

打印出id结果为:

 

 

可以看到新的 id 由 kafka topic + partition + offset 这3 部分组成,可以唯一定位一个 record。所以即使重复处理一条record,它发往 ElasticSearch 的 id 也是一样的(即处理逻辑一样)。在这个场景下,即为一个imdepotent consumer。

 

apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题

apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题

build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign (Ljava/util/Collection;) 的问题。

环境 CDH5.7.6,kylin2.1.0-bin-cdh57

kafka 版本:(官网要求是 0.10.0+)

报错如下:

2018-10-26 14:30:59,065 DEBUG [http-bio-7070-exec-9] kafka.KafkaSource:83 : Last segment doesn''t exist, and didn''t initiate the start offset, will seek from topic''s earliest offset.
2018-10-26 14:30:59,068 INFO  [http-bio-7070-exec-9] consumer.ConsumerConfig:171 : ConsumerConfig values:
    metric.reporters = []
    metadata.max.age.ms = 300000
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    group.id = kylin_streaming_cube
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    max.partition.fetch.bytes = 1048576
    bootstrap.servers = [master:9092]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    enable.auto.commit = false
    ssl.key.password = null
    fetch.max.wait.ms = 500
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = null
    session.timeout.ms = 30000
    metrics.num.samples = 2
    client.id =
    ssl.endpoint.identification.algorithm = null
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    ssl.protocol = TLS
    check.crcs = true
    request.timeout.ms = 40000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.keystore.location = null
    heartbeat.interval.ms = 3000
    auto.commit.interval.ms = 5000
    receive.buffer.bytes = 65536
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = PLAINTEXT
    ssl.truststore.location = null
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    fetch.min.bytes = 1
    send.buffer.bytes = 131072
    auto.offset.reset = latest

2018-10-26 14:30:59,075 INFO  [http-bio-7070-exec-9] utils.AppInfoParser:83 : Kafka version : 0.9.0-kafka-2.0.1
2018-10-26 14:30:59,075 INFO  [http-bio-7070-exec-9] utils.AppInfoParser:84 : Kafka commitId : unknown
2018-10-26 14:30:59,193 ERROR [http-bio-7070-exec-9] controller.CubeController:305 : org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
    at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffset(KafkaClient.java:84)
    at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffsets(KafkaClient.java:127)
    at org.apache.kylin.source.kafka.KafkaSource.enrichSourcePartitionBeforeBuild(KafkaSource.java:84)
    at org.apache.kylin.rest.service.JobService.submitJobInternal(JobService.java:236)
    at org.apache.kylin.rest.service.JobService.submitJob(JobService.java:208)
    at org.apache.kylin.rest.service.JobServiceFastClassBySpringCGLIB

83a44b2a.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:68)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
    at org.apache.kylin.rest.service.JobServiceEnhancerBySpringCGLIB
9b79b224.submitJob(<generated>)
    at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:302)
    at org.apache.kylin.rest.controller.CubeController.rebuild2(CubeController.java:290)
    at org.apache.kylin.rest.controller.CubeController.build2(CubeController.java:283)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:832)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:743)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:961)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:895)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:967)
    at org.springframework.web.servlet.FrameworkServlet.doPut(FrameworkServlet.java:880)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:653)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:843)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:316)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:122)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:169)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:48)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.www.BasicAuthenticationFilter.doFilterInternal(BasicAuthenticationFilter.java:213)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:205)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:120)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:53)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:91)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:213)
    at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:176)
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:209)
    at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:244)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:505)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:169)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
    at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:436)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1078)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:625)
    at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:318)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:748)
2018-10-26 14:30:59,196 ERROR [http-bio-7070-exec-9] controller.BasicController:57 :
org.apache.kylin.rest.exception.InternalErrorException: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
    at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:306)
    at org.apache.kylin.rest.controller.CubeController.rebuild2(CubeController.java:290)
    at org.apache.kylin.rest.controller.CubeController.build2(CubeController.java:283)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:832)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:743)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:961)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:895)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:967)
    at org.springframework.web.servlet.FrameworkServlet.doPut(FrameworkServlet.java:880)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:653)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:843)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:316)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
    at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:122)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:169)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:48)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.www.BasicAuthenticationFilter.doFilterInternal(BasicAuthenticationFilter.java:213)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:205)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:120)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:53)
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:91)
    at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
    at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:213)
    at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:176)
    at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
    at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:209)
    at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:244)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:505)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:169)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
    at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:436)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1078)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:625)
    at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:318)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
    at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffset(KafkaClient.java:84)
    at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffsets(KafkaClient.java:127)
    at org.apache.kylin.source.kafka.KafkaSource.enrichSourcePartitionBeforeBuild(KafkaSource.java:84)
    at org.apache.kylin.rest.service.JobService.submitJobInternal(JobService.java:236)
    at org.apache.kylin.rest.service.JobService.submitJob(JobService.java:208)
    at org.apache.kylin.rest.service.JobServiceFastClassBySpringCGLIB
83a44b2a.invoke(<generated>)
    at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
    at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
    at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:68)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
    at org.apache.kylin.rest.service.JobServiceEnhancerBySpringCGLIB
9b79b224.submitJob(<generated>)
    at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:302)
    ... 78 more

 

请问各位,该如何解决这个问题呢?

CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti

CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti

参考文章:

flume kafka sparkstreaming整合后集群报错org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/ut

https://blog.csdn.net/u010936936/article/details/77247075?locationNum=2&fps=1

 

   最近在使用CDH 环境 提交 Kafka-Spark Streaming 作业的时候遇到了一些问题,特此记录如下:

主要的报错信息为:
 

Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;

 

Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;

 

并且在报错之前,可以看到 kafka-client 为 0.9.0 版本

 

18/11/01 16:48:04 INFO AppInfoParser: Kafka version : 0.9.0.0
18/11/01 16:48:04 INFO AppInfoParser: Kafka commitId : cb8625948210849f
 

但是,我们查看打的包高于此版本。

 

原因分析
      其实这个在官方文档中有介绍。地址如下:
https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html#running_jobs
       简单说,就是kafka集成spark2,需要在CDH中进行设置。官网介绍了2中方法。这里我采用了第二种,在CDH中进行修改配置的方法。

步骤如下:

1.进入CDH的spark2配置界面,在搜索框中输入SPARK_KAFKA_VERSION,出现如下图,

2.然后选择对应版本,这里我应该选择的是None,

    即 : 选用集群上传的Kafka-client 版本 !!

3.然后保存配置,重启生效。

4.重新跑sparkstreaming任务,问题解决。
 

 

 

 

今天关于Spring Kafka:kafkaTemplate executeInTransaction 方法如何发挥 Consumer 的 read_committed 隔离级别 我的具体场景我的困惑是的介绍到此结束,谢谢您的阅读,有关Apache Kafka(九)- Kafka Consumer 消费行为、Apache Kafka(八)- Kafka Delivery Semantics for Consumers、apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题、CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti等更多相关知识的信息可以在本站进行查询。

本文标签:

上一篇Stripe presentWithSetupIntent 不显示 SetupIntents 的 PaymentSheet

下一篇使用本地 JSON 文件运行 d3.js(本地json数据)