在本文中,我们将给您介绍关于SpringKafka:kafkaTemplateexecuteInTransaction方法如何发挥Consumer的read_committed隔离级别我的具体场景我的
在本文中,我们将给您介绍关于Spring Kafka:kafkaTemplate executeInTransaction 方法如何发挥 Consumer 的 read_committed 隔离级别 我的具体场景我的困惑是的详细内容,此外,我们还将为您提供关于Apache Kafka(九)- Kafka Consumer 消费行为、Apache Kafka(八)- Kafka Delivery Semantics for Consumers、apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题、CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti的知识。
本文目录一览:- Spring Kafka:kafkaTemplate executeInTransaction 方法如何发挥 Consumer 的 read_committed 隔离级别 我的具体场景我的困惑是
- Apache Kafka(九)- Kafka Consumer 消费行为
- Apache Kafka(八)- Kafka Delivery Semantics for Consumers
- apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题
- CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti
Spring Kafka:kafkaTemplate executeInTransaction 方法如何发挥 Consumer 的 read_committed 隔离级别 我的具体场景我的困惑是
如何解决Spring Kafka:kafkaTemplate executeInTransaction 方法如何发挥 Consumer 的 read_committed 隔离级别 我的具体场景我的困惑是?
我看到 isolation.level=read_committed
消费者属性确保消费者只会读取已提交的消息。我试图理解在这种情况下 committed message
究竟是什么意思?我们什么时候可以说生产者的消息已提交给主题?
我的具体场景
我正在使用 Spring-kafka,kafkaTemplate.executeInTransaction
方法异步向 kafka 发送消息。
我看到 executeInTransaction
方法在内部调用 producer.commitTransaction()
,如果它无法在 max.block.ms
内完成,则会抛出异常
我的困惑是
如果 producer.commitTransaction()
在 max.block.ms
内完成,是否表示消息已成功存储在主题中,准备好让具有 isolation.level=read_committed
的消费者消费?
我问这个是因为我看到还有另一个属性 delivery.timeout.ms
对应于在 send()
/max.block.ms
完成后启动的进程。
所以..这是否意味着即使在 producer.commitTransaction()
返回之后,我们仍然需要等待最多 delivery.timeout.ms
才能确定消息已写入主题?
解决方法
没有;当提交成功时,记录在日志中是安全的。
Apache Kafka(九)- Kafka Consumer 消费行为
1. Poll Messages
在Kafka Consumer 中消费messages时,使用的是poll模型,也就是主动去Kafka端取数据。其他消息管道也有的是push模型,也就是服务端向consumer推送数据,consumer仅需等待即可。
Kafka Consumer的poll模型使得consumer可以控制从log的指定offset去消费数据、消费数据的速度、以及replay events的能力。
Kafka Consumer 的poll模型工作如下图:
- · Consumer 调用.poll(Duration timeout) 方法,向broker请求数据
- · 若是broker端有数据则立即返回;否则在timeout时间后返回empty
我们可以通过参数控制 Kafka Consumer 行为,主要有:
- · Fetch.min.bytes(默认值是1)
o 控制在每个请求中,至少拉取多少数据
o 增加此参数可以提高吞吐并降低请求的数目,但是代价是增加延时
- · Max.poll.records(默认是500)
o 控制在每个请求中,接收多少条records
o 如果消息普遍都比较小而consumer端又有较大的内存,则可以考虑增大此参数
o 最好是监控在每个请求中poll了多少条消息
- · Max.partitions.fetch.bytes(默认为1MB)
o Broker中每个partition可返回的最多字节
o 如果目标端有100多个partitions,则需要较多内存
- · Fetch.max.bytes(默认50MB)
o 对每个fetch 请求,可以返回的最大数据量(一个fetch请求可以覆盖多个partitions)
o Consumer并行执行多个fetch操作
默认情况下,一般不建议手动调整以上参数,除非我们的consumer已经达到了默认配置下的最高的吞吐,且需要达到更高的吞吐。
2. Consumer Offset Commit 策略
在一个consumer 应用中,有两种常见的committing offsets的策略,分别为:
- · (较为简单)enable.auto.commit = true:自动commit offsets,但必须使用同步的方式处理数据
- · (进阶)enable.auto.commit = false:手动commit offsets
在设置enable.auto.commit = true时,考虑以下代码:
while(true) {
List<Records> batch = consumer.poll(Duration.ofMillis(100));
doSomethingSynchronous(batch);
}
一个Consumer 每隔100ms poll一次消息,然后以同步地方式处理这个batch的数据。此时offsets 会定期自动被commit,此定期时间由 auto.commit.interval.ms 决定,默认为 5000,也就是在每次调用 .poll() 方法 5 秒后,会自动commit offsets。
但是如果在处理数据时用的是异步的方式,则会导致“at-most-once”的行为。因为offsets可能会在数据被处理前就被commit。
所以对于新手来说,使用 enable.auto.commit = true 可能是有风险的,所以不建议一开始就使用这种方式 。
若设置 enable.auto.commit = false,考虑以下代码:
while(true) {
List<Records> batch = consumer.poll(Duration.ofMillis(100));
if isReady(batch){
doSomethingSynchronous(batch);
consumer.commitSync();
}
}
此例子明确指示了在同步地处理了数据后,再主动commit offsets。这样我们可以控制在什么条件下,去commit offsets。一个比较典型的场景为:将接收的数据读入缓存,然后flush 缓存到一个数据库中,最后再commit offsets。
3. 手动Commit Offset 示例
首先我们关闭自动commit offsets :
// disable auto commit of offsets
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
指定每个请求最多接收10条records,便于测试:
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10");
添加以下代码逻辑:
public static void main(String[] args) throws IOException {
Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
RestHighLevelClient client = createClient();
// create Kafka consumer
KafkaConsumer<String, String> consumer = createConsumer("kafka_demo");
// poll for new data
while(true){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMinutes(100));
logger.info("received " + records.count() + "records");
for(ConsumerRecord record : records) {
// construct a kafka generic ID
String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
// where we insert data into ElasticSearch
IndexRequest indexRequest = new IndexRequest(
"kafkademo"
).id(kafka_generic_id).source(record.value(), XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
String id = indexResponse.getId();
logger.info(id);
try {
Thread.sleep(10); // introduce a small delay
} catch (InterruptedException e) {
e.printStackTrace();
}
}
logger.info("Committing offsets...");
consumer.commitSync(); // commit offsets manually
logger.info("Offsets have been committed");
}
}
这里我们在处理每次获取的10条records后(也就是for 循环完整执行一次),手动执行一次offsets commit。打印日志记录为:
手动停止consumer 程序后,可以看到最后的committed offsets为165:
使用consumer-group cli 也可以验证当前committed offsets为165:
4. Performance Improvement using Batching
在这个例子中,consumer 限制每次poll 10条数据,然后每条依次处理(插入elastic search)。此方法效率较低,我们可以通过使用 batching 的方式增加吞吐。这里实现的方式是使用 elastic search API 提供的BulkRequest,基于之前的代码,修改如下:
public static void main(String[] args) throws IOException {
Logger logger = LoggerFactory.getLogger(ElasticSearchConsumer.class.getName());
RestHighLevelClient client = createClient();
// create Kafka consumer
KafkaConsumer<String, String> consumer = createConsumer("kafka_demo");
// poll for new data
while(true){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMinutes(100));
// bulk request
BulkRequest bulkRequest = new BulkRequest();
logger.info("received " + records.count() + "records");
for(ConsumerRecord record : records) {
// construct a kafka generic ID
String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
// where we insert data into ElasticSearch
IndexRequest indexRequest = new IndexRequest(
"kafkademo"
).id(kafka_generic_id).source(record.value(), XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
// add to our bulk request (takes no time)
bulkRequest.add(indexRequest);
//String id = indexResponse.getId();
//logger.info(id);
try {
Thread.sleep(10); // introduce a small delay
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// bulk response
BulkResponse bulkItemResponses = client.bulk(bulkRequest, RequestOptions.DEFAULT);
logger.info("Committing offsets...");
consumer.commitSync(); // commit offsets manually
logger.info("Offsets have been committed");
}
}
可以看到,consumer在poll到记录后,并不会一条条的向elastic search 发送,而是将它们放入一个BulkRequest,并在for循环结束后发送。在发送完毕后,再手动commit offsets。
执行结果为:
Apache Kafka(八)- Kafka Delivery Semantics for Consumers
Kafka Delivery Semantics
在Kafka Consumer中,有3种delivery semantics,分别为:至多一次(at most once)、至少一次(at least once)、以及准确一次(exactly once),下面我们分别介绍这3种Delivery 语义。
1. At Most Once
在message batch在被consumer接收后,立即commit offsets。此时若是在消息处理逻辑中出现异常,则未被处理的消息会丢失(不会再次被读取)。
此场景一个例子如下图:
此例流程如下:
- Consumer读一个batch的消息
- 在接收到消息后,Consumer commits offsets
- Consumer 处理数据,例如发送邮件,但是此时一个batch中的最后两条消息由于consumer异常宕机而未被正常处理
- Consumer 重启并重新开始读数据。但是此时由于已经committed offset,所以consumer会在最新的offset处读一个batch的消息,之前上一个batch中由于异常而未被处理的消息会丢失
所以at most once 会有丢失数据的风险,但若是应用可以承受丢失数据的风险,则可以使用此方式。
2. At Least Once
在消息被consumer接收并处理后,offsets才被 commit。若是在消息处理时发生异常,则消息会被重新消费。也就是说,会导致消息被重复处理。
At Least Once 是默认使用的语义,在这种情况下,需要保证应用是idempotent 类型(处理重复的消息不会对应用产生影响)。
此场景一个例子如下:
此示例流程如下:
- Consumer 读一个batch的消息
- 在接收到消息并正常处理
- 在consumer 正常处理消息完毕后,commits offset
- 继续读并处理下一个batch 的消息。若是在此过程中发生异常(例如consumer 重启),则consumer会从最近的 offset 开始读一个batch的消息并处理。所以此时会导致有重复消息被处理(此例中为4263、4264、4265)
3. Exactly once
此语义较难实现,在kafka中仅能在Kafka => Kafka的工作流中,通过使用Kafka Stream API 实现。对于Kafka => Sink 的工作流,请使用 idempotent consumer。
对于大部分应用程序,我们应使用at least once processing,并确保consumer端的transformation/processing 是idempotent类型。
4. 构建 idempotent consumer
一个idempotent consumer可以在处理重复消息时,不影响整个应用的逻辑。在ElasticSearch 中,通过一个_id 字段唯一识别一条消息。所以在这个场景下,为了实现idempotent consumer,我们需要对同样_id字段的消息做同样的处理。
之前给出的Elastic Search Consumer的例子中,每条消息的 _id 都是默认随机生成的,也就是说:若是处理之前重复的消息,生成的id也是一条新的随机_id,此行为不符合一个idempotent consumer。对此,我们可以自定义一个_id 模式,修改代码如下:
// poll for new data
while(true){
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofMinutes(100));
for(ConsumerRecord record : records) {
// construct a kafka generic ID
String kafka_generic_id = record.topic() + "_" + record.partition() + "_" + record.offset();
// where we insert data into ElasticSearch
IndexRequest indexRequest = new IndexRequest(
"kafkademo"
).id(kafka_generic_id).source(record.value(), XContentType.JSON);
IndexResponse indexResponse = client.index(indexRequest, RequestOptions.DEFAULT);
String id = indexResponse.getId();
logger.info(id);
try {
Thread.sleep(1000); // introduce a small delay
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
打印出id结果为:
可以看到新的 id 由 kafka topic + partition + offset 这3 部分组成,可以唯一定位一个 record。所以即使重复处理一条record,它发往 ElasticSearch 的 id 也是一样的(即处理逻辑一样)。在这个场景下,即为一个imdepotent consumer。
apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题
build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign (Ljava/util/Collection;) 的问题。
环境 CDH5.7.6,kylin2.1.0-bin-cdh57
kafka 版本:(官网要求是 0.10.0+)
报错如下:
2018-10-26 14:30:59,065 DEBUG [http-bio-7070-exec-9] kafka.KafkaSource:83 : Last segment doesn''t exist, and didn''t initiate the start offset, will seek from topic''s earliest offset.
2018-10-26 14:30:59,068 INFO [http-bio-7070-exec-9] consumer.ConsumerConfig:171 : ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
group.id = kylin_streaming_cube
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [master:9092]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = false
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXT
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1
send.buffer.bytes = 131072
auto.offset.reset = latest
2018-10-26 14:30:59,075 INFO [http-bio-7070-exec-9] utils.AppInfoParser:83 : Kafka version : 0.9.0-kafka-2.0.1
2018-10-26 14:30:59,075 INFO [http-bio-7070-exec-9] utils.AppInfoParser:84 : Kafka commitId : unknown
2018-10-26 14:30:59,193 ERROR [http-bio-7070-exec-9] controller.CubeController:305 : org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffset(KafkaClient.java:84)
at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffsets(KafkaClient.java:127)
at org.apache.kylin.source.kafka.KafkaSource.enrichSourcePartitionBeforeBuild(KafkaSource.java:84)
at org.apache.kylin.rest.service.JobService.submitJobInternal(JobService.java:236)
at org.apache.kylin.rest.service.JobService.submitJob(JobService.java:208)
at org.apache.kylin.rest.service.JobService
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:68)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
at org.apache.kylin.rest.service.JobService
at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:302)
at org.apache.kylin.rest.controller.CubeController.rebuild2(CubeController.java:290)
at org.apache.kylin.rest.controller.CubeController.build2(CubeController.java:283)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:832)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:743)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:961)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:895)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:967)
at org.springframework.web.servlet.FrameworkServlet.doPut(FrameworkServlet.java:880)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:653)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:843)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:316)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:122)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:169)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:48)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.www.BasicAuthenticationFilter.doFilterInternal(BasicAuthenticationFilter.java:213)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:205)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:120)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:53)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:91)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:213)
at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:176)
at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:209)
at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:244)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:505)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:169)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:436)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1078)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:625)
at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:318)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
2018-10-26 14:30:59,196 ERROR [http-bio-7070-exec-9] controller.BasicController:57 :
org.apache.kylin.rest.exception.InternalErrorException: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:306)
at org.apache.kylin.rest.controller.CubeController.rebuild2(CubeController.java:290)
at org.apache.kylin.rest.controller.CubeController.build2(CubeController.java:283)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:832)
at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:743)
at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:961)
at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:895)
at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:967)
at org.springframework.web.servlet.FrameworkServlet.doPut(FrameworkServlet.java:880)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:653)
at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:843)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:731)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:303)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:316)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.invoke(FilterSecurityInterceptor.java:126)
at org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter(FilterSecurityInterceptor.java:90)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.access.ExceptionTranslationFilter.doFilter(ExceptionTranslationFilter.java:114)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.session.SessionManagementFilter.doFilter(SessionManagementFilter.java:122)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.AnonymousAuthenticationFilter.doFilter(AnonymousAuthenticationFilter.java:111)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.servletapi.SecurityContextHolderAwareRequestFilter.doFilter(SecurityContextHolderAwareRequestFilter.java:169)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.savedrequest.RequestCacheAwareFilter.doFilter(RequestCacheAwareFilter.java:48)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.www.BasicAuthenticationFilter.doFilterInternal(BasicAuthenticationFilter.java:213)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.AbstractAuthenticationProcessingFilter.doFilter(AbstractAuthenticationProcessingFilter.java:205)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.authentication.logout.LogoutFilter.doFilter(LogoutFilter.java:120)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.header.HeaderWriterFilter.doFilterInternal(HeaderWriterFilter.java:64)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal(WebAsyncManagerIntegrationFilter.java:53)
at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter(SecurityContextPersistenceFilter.java:91)
at org.springframework.security.web.FilterChainProxy$VirtualFilterChain.doFilter(FilterChainProxy.java:330)
at org.springframework.security.web.FilterChainProxy.doFilterInternal(FilterChainProxy.java:213)
at org.springframework.security.web.FilterChainProxy.doFilter(FilterChainProxy.java:176)
at org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate(DelegatingFilterProxy.java:346)
at org.springframework.web.filter.DelegatingFilterProxy.doFilter(DelegatingFilterProxy.java:262)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:209)
at com.thetransactioncompany.cors.CORSFilter.doFilter(CORSFilter.java:244)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:241)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:208)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:220)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:122)
at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:505)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:169)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:103)
at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:956)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:116)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:436)
at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1078)
at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:625)
at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:318)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.assign(Ljava/util/Collection;)V
at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffset(KafkaClient.java:84)
at org.apache.kylin.source.kafka.util.KafkaClient.getEarliestOffsets(KafkaClient.java:127)
at org.apache.kylin.source.kafka.KafkaSource.enrichSourcePartitionBeforeBuild(KafkaSource.java:84)
at org.apache.kylin.rest.service.JobService.submitJobInternal(JobService.java:236)
at org.apache.kylin.rest.service.JobService.submitJob(JobService.java:208)
at org.apache.kylin.rest.service.JobService
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.security.access.intercept.aopalliance.MethodSecurityInterceptor.invoke(MethodSecurityInterceptor.java:68)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
at org.apache.kylin.rest.service.JobService
at org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:302)
... 78 more
请问各位,该如何解决这个问题呢?
CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti
参考文章:
flume kafka sparkstreaming整合后集群报错org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/ut
https://blog.csdn.net/u010936936/article/details/77247075?locationNum=2&fps=1
最近在使用CDH 环境 提交 Kafka-Spark Streaming 作业的时候遇到了一些问题,特此记录如下:
主要的报错信息为:
Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;
Exception in thread "streaming-start" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;
并且在报错之前,可以看到 kafka-client 为 0.9.0 版本
18/11/01 16:48:04 INFO AppInfoParser: Kafka version : 0.9.0.0
18/11/01 16:48:04 INFO AppInfoParser: Kafka commitId : cb8625948210849f
但是,我们查看打的包高于此版本。
原因分析
其实这个在官方文档中有介绍。地址如下:
https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html#running_jobs
简单说,就是kafka集成spark2,需要在CDH中进行设置。官网介绍了2中方法。这里我采用了第二种,在CDH中进行修改配置的方法。
步骤如下:
1.进入CDH的spark2配置界面,在搜索框中输入SPARK_KAFKA_VERSION,出现如下图,
2.然后选择对应版本,这里我应该选择的是None,
即 : 选用集群上传的Kafka-client 版本 !!
3.然后保存配置,重启生效。
4.重新跑sparkstreaming任务,问题解决。
今天关于Spring Kafka:kafkaTemplate executeInTransaction 方法如何发挥 Consumer 的 read_committed 隔离级别 我的具体场景我的困惑是的介绍到此结束,谢谢您的阅读,有关Apache Kafka(九)- Kafka Consumer 消费行为、Apache Kafka(八)- Kafka Delivery Semantics for Consumers、apache kylin 在 build kylin_streaming_cube 时发生 org.apache.kafka.clients.consumer.KafkaConsumer.assign 的问题、CDH-Kafka-SparkStreaming 异常:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/uti等更多相关知识的信息可以在本站进行查询。
本文标签: