针对ApacheCamel将异常传播到父路由而不使用noErrorHandler和java异常传递这两个问题,本篇文章进行了详细的解答,同时本文还将给你拓展android–使用自定义ErrorHand
针对Apache Camel 将异常传播到父路由而不使用 noErrorHandler和java异常传递这两个问题,本篇文章进行了详细的解答,同时本文还将给你拓展android – 使用自定义ErrorHandler时,Retrofit不会触发onError、Apache Camel抛出java.lang.NoSuchMethodError:org.apache.camel.util.ObjectHelper.notNull、Apache Camel框架之Error handling、beam 的异常处理 Error Handling Elements in Apache Beam Pipelines等相关知识,希望可以帮助到你。
本文目录一览:- Apache Camel 将异常传播到父路由而不使用 noErrorHandler(java异常传递)
- android – 使用自定义ErrorHandler时,Retrofit不会触发onError
- Apache Camel抛出java.lang.NoSuchMethodError:org.apache.camel.util.ObjectHelper.notNull
- Apache Camel框架之Error handling
- beam 的异常处理 Error Handling Elements in Apache Beam Pipelines
Apache Camel 将异常传播到父路由而不使用 noErrorHandler(java异常传递)
如何解决Apache Camel 将异常传播到父路由而不使用 noErrorHandler?
我想为一个路由实现错误处理,它只处理特定的异常并将其他异常传播到父路由而不是原始调用者。 这就像结合了 noErrorHandler 和 onException 子句的功能。
onException(HttpOperationFailedException.class)
.handle(true)
.process(bean,"generatetoken");
from(direct:helloWorld)
.errorHandler(noErrorHandler())
.to(http://localhost/fetch/data)
我知道 noErrorHandler 禁用错误处理并且 onException 在这种情况下不起作用,如果 defaultErrorHandler 与 onException 一起使用,未处理的异常将传播回原始调用者而不是父路由。
有什么方法可以实现所需的功能吗?
解决方法
无法在您的路由中处理某些异常,而在调用父路由中处理其他异常,因为正如您所说,noErrorHandler 禁用了错误处理。
对此的解决方案是从路由中的调用路由复制异常处理的公共部分。
android – 使用自定义ErrorHandler时,Retrofit不会触发onError
我附上了源代码.
初始化:
HttpClient的:
OkHttpClient httpClient = new OkHttpClient(); httpClient.setHostnameVerifier(...); // allow all try { httpClient.setSslSocketFactory(...); } catch (NoSuchAlgorithmException | KeyManagementException e) { e.printstacktrace(); }
RestAdapter:
Gson gson = new GsonBuilder().setDateFormat("yyyy-MM-dd''T''HH:mm:ssZ").create(); RestAdapter restAdapter = new RestAdapter.Builder() .setEndpoint(BASE_API_URL) .setLogLevel(RestAdapter.LogLevel.FULL) .setClient(httpClient) .setRequestInterceptor(new RequestInterceptor() { @Override public void intercept(RequestFacade request) { request.addHeader("Accept-Language",Locale.getDefault().getLanguage()); } }) .setConverter(new GsonConverter(gson)) .setErrorHandler(new CustomErrorHandler(this)) .build(); Api api = restAdapter.create(Api.class);
自定义ErrorHandler:
public class CustomErrorHandler implements ErrorHandler { private final Context mContext; public UniversalErrorHandler(Context context) { mContext = context; } @Override public Throwable handleError(RetrofitError cause) { String errorMessage; if (cause.getKind() == RetrofitError.Kind.NETWORK) { errorMessage = mContext.getString(R.string.no_network_connection); } else { if (cause.getResponse() == null || cause.getResponse().getStatus() == 500) { errorMessage = mContext.getString(R.string.error_contacting_server); } else { errorMessage = mContext.getString(R.string.unkNown_error); } } return new Exception(errorMessage,cause); } }
用法:
Observable<List<Object>> observable = api.getobjects(); observable .subscribeOn(Schedulers.newThread()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Subscriber<List<Object>>() { @Override public void onCompleted() { Timber.i("onCompleted"); } @Override public void onError(Throwable e) { Timber.e(e,"onError"); } @Override public void onNext(List<Object> objects) { Timber.i("onNext %d",objects.size()); } });
解决方法
Timber.e("Error: %s,%d",cause.getMessage(),cause.getResponse().getStatus());
getStatus()导致NullPointerException,因为响应为null …我忽略了它,因为logcat中没有任何日志.
如果从handleError()方法抛出运行时异常,则不会调用onError().
Apache Camel抛出java.lang.NoSuchMethodError:org.apache.camel.util.ObjectHelper.notNull
我有一个时髦的剧本,我正试着去;它的工作是读取一个消息队列,然后将消息重新排队到我们正在运行的一组新MQ中.
这是代码:
@Grab(group='org.apache.camel', module='camel-core', version='2.11.0')
@Grab(group='org.apache.activemq', module='activemq-core', version='5.7.0')
@Grab(group='org.apache.activemq', module='activemq-camel', version='5.8.0')
@Grab(group='ch.qos.logback', module='logback-classic', version='1.0.13')
import org.apache.activemq.camel.component.ActiveMQComponent
import org.apache.activemq.ActiveMQConnectionFactory
import org.apache.camel.CamelContext
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.impl.DefaultCamelContext
class copyFromOneserverToAnother extends RouteBuilder{
public static void main(String[] args) {
final CamelContext camelContext = new DefaultCamelContext();
camelContext.addComponent("jms-01", ActiveMQComponent.activeMQComponent("tcp://mq01:61616"));
try {
// Add the routes defined below to the camel context
camelContext.addRoutes(new copyFromOneserverToAnother());
camelContext.start();
Thread.sleep(10000000);
}
catch (final Exception e) {
e.printstacktrace();
}
finally {
try {
camelContext.stop();
}
catch (final Exception e) {
e.printstacktrace();
}
}
}
@Override
public void configure() throws Exception {
/// The name of the AMQ instance and queue where message will be taken from
from("jms-01:TEST-1")
.log("Processed")
// Final resting point for each message.
.to("jms-01:TEST-3")
.end();
}
}
因此,这是使用相同的代理来源和目标,但不同的队列.
这会读取消息,但在尝试编写消息时会抛出异常:
15:15:31.548 [Camel (camel-1) thread #0 - JmsConsumer[TEST-1]] INFO route1 - Processed
15:15:31.548 [Camel (camel-1) thread #0 - JmsConsumer[TEST-1]] DEBUG o.a.camel.processor.SendProcessor - >>>> Endpoint[jms-01://TEST-3] Exchange[JmsMessage[JmsMessageID: ID:mq01.dummycorp.corp-60083-1370438794752-5:131:1:1:1]]
15:15:31.594 [Camel (camel-1) thread #0 - JmsConsumer[TEST-1]] DEBUG o.a.c.processor.DefaultErrorHandler - Failed delivery for (MessageId: ID:mq01.dummycorp.corp-60083-1370438794752-5:131:1:1:1 on ExchangeId: ID-ukm038662-local-53821-1370441730008-0-10). On delivery attempt: 0 caught: org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[JmsMessage[JmsMessageID: ID:mq01.dummycorp.corp-60083-1370438794752-5:131:1:1:1]]
15:15:31.595 [Camel (camel-1) thread #0 - JmsConsumer[TEST-1]] ERROR o.a.c.processor.DefaultErrorHandler - Failed delivery for (MessageId: ID:mq01.dummycorp.corp-60083-1370438794752-5:131:1:1:1 on ExchangeId: ID-ukm038662-local-53821-1370441730008-0-10). Exhausted after delivery attempt: 1 caught: org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[JmsMessage[JmsMessageID: ID:mq01.dummycorp.corp-60083-1370438794752-5:131:1:1:1]]
org.apache.camel.CamelExecutionException: Exception occurred during execution on the exchange: Exchange[JmsMessage[JmsMessageID: ID:mq01.dummycorp.corp-60083-1370438794752-5:131:1:1:1]]
at org.apache.camel.util.ObjectHelper.wrapCamelExecutionException(ObjectHelper.java:1354) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.impl.DefaultExchange.setException(DefaultExchange.java:272) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:137) ~[camel-jms-2.10.3.jar:2.10.3]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91) ~[camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:390) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:117) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86) [camel-core-2.11.0.jar:2.11.0]
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:104) [camel-jms-2.10.3.jar:2.10.3]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:562) [spring-jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:500) [spring-jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:468) [spring-jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:326) [spring-jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:264) [spring-jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1071) [spring-jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1063) [spring-jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncmessageListenerInvoker.run(DefaultMessageListenerContainer.java:960) [spring-jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) [na:1.7.0_21]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) [na:1.7.0_21]
at java.lang.Thread.run(Thread.java:722) [na:1.7.0_21]
Caused by: java.lang.NoSuchMethodError: org.apache.camel.util.ObjectHelper.notNull(Ljava/lang/Object;Ljava/lang/String;)V
at org.apache.camel.component.jms.JmsBinding.makeJmsMessage(JmsBinding.java:285) ~[camel-jms-2.10.3.jar:2.10.3]
at org.apache.camel.component.jms.JmsProducer$2.createMessage(JmsProducer.java:270) ~[camel-jms-2.10.3.jar:2.10.3]
at org.apache.camel.component.jms.JmsConfiguration$Cameljmstemplate.doSendToDestination(JmsConfiguration.java:216) ~[camel-jms-2.10.3.jar:2.10.3]
at org.apache.camel.component.jms.JmsConfiguration$Cameljmstemplate.access$100(JmsConfiguration.java:159) ~[camel-jms-2.10.3.jar:2.10.3]
at org.apache.camel.component.jms.JmsConfiguration$Cameljmstemplate$1.doInJms(JmsConfiguration.java:173) ~[camel-jms-2.10.3.jar:2.10.3]
at org.springframework.jms.core.jmstemplate.execute(jmstemplate.java:466) ~[spring-jms-3.1.3.RELEASE.jar:3.1.3.RELEASE]
at org.apache.camel.component.jms.JmsConfiguration$Cameljmstemplate.send(JmsConfiguration.java:170) ~[camel-jms-2.10.3.jar:2.10.3]
at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:402) ~[camel-jms-2.10.3.jar:2.10.3]
at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:356) ~[camel-jms-2.10.3.jar:2.10.3]
at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:132) ~[camel-jms-2.10.3.jar:2.10.3]
... 49 common frames omitted
解决方法:
我只是偶然发现了同样的问题.添加此依赖项:
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-jms</artifactId>
<version>${camel.version}</version>
</dependency>
我的pom修好了.没有它,使用了一个较旧的(比我的其他骆驼库)版本的库 – 正如克劳斯在另一个答案中所建议的那样
Apache Camel框架之Error handling
Apache Camel提供了许多处理路由过程出现的Exception的机制,本文简单介绍一下其异常处理的方法.
1,默认的情况是在路由过程中没有处理的异常会被被抛出到路由的发起者,对发生异常的路由停止进行后续步骤的处理.
比如下面的路由在process(p1)出错,那么当前路由停止,文件不会到达"d:/temp/outbox",同时d:/temp/inbox里造成异常的那个文件仍然留在d:/temp/inbox文件夹中,由于Camel会轮询这个文件夹,所以下次轮询时,对这个文件处理的时候会继续异常.
from("file:d:/temp/inbox?delay=30000").process(p0).process(p1).to("file:d:/temp/outbox"); public class TProcessor0 implements Processor{ public void process(Exchange exchange) throws Exception { System.out.println("what if here has a transaction,later processing failed?"); } } public class TProcessor0 implements Processor{ public void process(Exchange exchange) throws Exception { String nullStr=null; nullStr.length(); } }
如果路由改成如下:因为异常是在process(p1)里发生,所以文件会会到达"d:/temp/outbox",但是d:/temp/inbox造成异常的文件仍然在d:/temp/inbox文件夹中,下次轮询时对这个文件处理的时候会继续异常.
from("file:d:/temp/inbox?delay=30000").to("file:d:/temp/outbox").process(p0).process(p1);
默认情况基本上就是已经做过的步骤没有rollback的操作,如果需要事务控制就更不行了.[会另外写一篇关于Camel如何做事务控制TransactionErrorHandler]
2,利用Camel提供的DeadLetterChannel将出错的消息路由到"死队列"里,然后停止当前的路由,其示例图如下:(图片来源于Camel in Action)
errorHandler(deadLetterChannel("file:d:/temp/error"));
from("file:d:/temp/inbox?delay=30000").process(p0).process(p1).to("file:d:/temp/outbox");
在process(p1)出错,处理会停止处理process(p1)之后的步骤,d:/temp/inbox里造成异常的文件会被放到d:/temp/error文件夹,d:/temp/inbox里的文件会被移走.
errorHandler(deadLetterChannel("file:d:/temp/error"));写在某一个routebuilder里的时候,是这个routebuilder所有路由的error handler.如果要为其中某一个路由指定error handler,示例如下:from("file:d:/temp/inbox?delay=30000").errorHandler(deadLetterChannel("file:d:/temp/error")).process(p0).process(p1).to("file:d:/temp/outbox");
3,利用Camel提供的onException功能,当有异常发生的时候,会根据不同的异常类型,跳到和onException里指定异常匹配的的步骤进行处理.
onException(NullPointerException.class).process(p2).handled(true).to("file:d:/temp/nullerror").end();
onException(TestException.class).process(p2).continued(true).to("file:d:/temp/TEerror").end();
from("file:d:/temp/inbox?delay=30000").process(p0).process(p1).to("file:d:/temp/outbox");//route xx
如上面在route builder里定义的话,当有NullPointerException发生的时候,会转到第一个onException,这时候会停止进行route xx的后续步骤处理(handled(true)设置的作用),将错误消息保存到:d:/temp/nullerror.当有TestException发生的时候,会转到第二个onException,这时候会将错误消息保存到:d:/temp/TEerror,忽略异常继续routexx的后续步骤(continued(true设置的作用).
Camel还支持类似如java try catch的语法,如下:TestException由process(p2)处理.
from("file:d:/temp/inbox?delay=30000").doTry().process(p0).process(p1).to("file:d:/temp/outbox")
.doCatch(TestException.class).process(p2)
.doCatch(NullPointerException.class).process(p2).end();
Camel的onException还可以和onWhen,onRedeliver,retryWhile结合使用.
onException(XXXException.class)
.onWhen(bean(XXX.class, "isIllegalData"))
.handled(true)
.to("file:/acme/files/illegal");
XXXException为异常的类型的类,XXX.class有一个isIllegalData方法返回true或者false,对异常进行更细致的区别处理.
errorHandler(defaultErrorHandler()
.maximumRedeliveries(3)
.onRedeliver(new MyOnRedeliveryProcessor());
onException(IOException.class)
.maximumRedeliveries(5)
.onRedeliver(new MyOtherOnRedeliveryProcessor());
在重新处理之前对exchange里的内容做一些改动的时候用onRedeliver.
public class MyRetryRuleset {
public boolean shouldRetry(@Header(Exchange.REDELIVERY_COUNTER) Integer counter,Exception causedBy) {
...
}
onException(IOException.class).retryWhile(bean(MyRetryRuletset.class));
在异常发生后捕捉到异常的点循环重试,直到MyRetryRuletset的shouldRetry方法返回false为止.
原文链接: http://blog.csdn.net/kkdelta/article/details/7246226
beam 的异常处理 Error Handling Elements in Apache Beam Pipelines
Error Handling Elements in Apache Beam Pipelines

I have noticed a deficit of documentation or examples outside of the official Beam docs, as data pipelines are often intimately linked with business logic. While working with streaming pipelines, I developed a simple error handling technique, to reduce the disruption that errors cause to streaming or long-running jobs. Here I have an explanation of that technique, and a simple demo pipeline.
Apache Beam is a high level model for programming data processing pipelines. It provides language interfaces in both Java and Python, though Java support is more feature-complete.
Beam supports running in two modes: batch, and streaming. In batch mode, a finite data set is read in, processed, then output in one huge chunk. Streaming mode allows for data to be continuously read in from a streaming source (such as a message queue), processed in small chunks, and output as processing occurs. Streaming allows for analytics to be performed in “real time” as events occurs. This is extremely valuable for telemetry and logging, where engineers or other systems need feedback as events happen.
Beam pipelines are composed of a series of typed data sets (PCollections), and transforms. Transforms take a PCollection, perform a programmer-defined operation on the collection elements, then output zero or more new PCollections as a result.
The problem with these transforms is that they need to eventually operate on data. As anyone familiar with handling user input or data from large systems can attest, that data can be malformed, or just unexpected. If a bad piece of data enters the system, it may cause the entire pipeline to crash. This is a waste of time and compute resources at best, but can also result in losing in-memory streaming data, or disrupting downstream systems relying on the Beam output.
In order to stop a catastrophic failure, you need graceful error handling in your pipeline. The easiest way to do this is to add try-catch blocks within each transform, which prevents shutdown and allows all other elements to be processed.

A basic try/catch around a string conversion.
This is a start, but it’s not enough on its own. You’ll want to record failures — what data failed what transform, and why. To do this, you’ll want to create a data structure to store these errors, and an output channel for them.
The data structure for a failure should contain:
- Source data in some form (data ID, the raw data fed into the transform, or the raw data precursor that was fed into the pipeline).
- The reason for the failure.
- The transform that failed.

Example constructor of a Failure object.
We can instantiate a Failure if an exception or error is thrown during a transform.

Parsing some fields out of auditd log strings. In this example, we use an inappropriately small number type. If the number is too large for an Integer, the transform outputs a Failure object, and continues processing elements.
Next, we need to be able to record the failure for developers to reference.
Beam transforms by default only have one output PCollection, but they can output multiple PCollections. A transform can return a PCollectionTuple, which uses TupleTag objects to reference which PCollection to put an element into, and which PCollection to fetch from the TupleTag. This has many uses, and we can use it here to separately output a PCollection of successful results, and a PCollection of Failure objects.

Accessing the PCollections stored in a PCollectionTuple.
In the demo repo, successes and failures are simply written to files. In a real pipeline, they would likely be sent to a database, or a message queue for additional processing or reporting.
You may also want to extend coverage beyond just handling thrown exceptions. For example, we could validate that all data falls within expected parameters (EG all user ids are ≥ 0) and is present, to prevent logical errors, missing records, or DB insertion failures further along. That validation could be extended into the Failure class, or it could be a new Invalid class and PCollection.
This covers the handling of elements themselves, but there are many design decisions beyond that, such as: what next? Data scientists or developers must review the errors, and discard data that is outright bad. If data is merely in an unexpected format, or exposed a now-fixed bug in the pipeline, then that data should be re-processed. It’s common (moreso in batch pipelines) to retry a whole dataset after any bugs in the pipeline are addressed. This is time consuming to process, but easy to support, and allows for grouped data (sums, aggregates, etc) to be corrected by adding the missing data. Some pipelines may only retry individual elements, if the pipeline is a 1-in-1-out process.
There is a GitHub repo at https://github.com/vllry/beam-errorhandle-example which shows the full proof of concept using auditd log files.
final TupleTag<Output> successTag = new TupleTag<>() {};
final TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* … */;
PCollectionTuple outputTuple = input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element());
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
.apply(BigQueryIO.write(...));
// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing as desired ...
今天关于Apache Camel 将异常传播到父路由而不使用 noErrorHandler和java异常传递的介绍到此结束,谢谢您的阅读,有关android – 使用自定义ErrorHandler时,Retrofit不会触发onError、Apache Camel抛出java.lang.NoSuchMethodError:org.apache.camel.util.ObjectHelper.notNull、Apache Camel框架之Error handling、beam 的异常处理 Error Handling Elements in Apache Beam Pipelines等更多相关知识的信息可以在本站进行查询。
本文标签: