在本文中,我们将带你了解聊聊jesque的WorkerImpl与WorkerPool在这篇文章中,同时我们还将给您一些技巧,以帮助您实现更有效的Apache的worker和prefork模式比较、Ba
在本文中,我们将带你了解聊聊jesque的WorkerImpl与WorkerPool在这篇文章中,同时我们还将给您一些技巧,以帮助您实现更有效的Apache的worker和prefork模式比较、BackgroundWorker RunWorkerCompleted事件、c# – 如何使用BackgroundWorker事件RunWorkerCompleted、com.amazonaws.services.simpleworkflow.flow.ActivityWorker的实例源码。
本文目录一览:- 聊聊jesque的WorkerImpl与WorkerPool
- Apache的worker和prefork模式比较
- BackgroundWorker RunWorkerCompleted事件
- c# – 如何使用BackgroundWorker事件RunWorkerCompleted
- com.amazonaws.services.simpleworkflow.flow.ActivityWorker的实例源码
聊聊jesque的WorkerImpl与WorkerPool
序
本文主要讲一下jesque的WorkerImpl与WorkerPool。
resque
Resque是一个使用redis来创建后台任务的ruby组件。而jesque是其java版本。通常用来做延时队列。
WorkerImpl
List<String> queues = Arrays.asList(delayedQueue);
final Worker worker = new WorkerImpl(jesqueConfig,queues, new MapBasedJobFactory(map(entry("DemoJob", DemoJob.class))));
final Thread workerThread = new Thread(worker);
workerThread.start();
这是worker实例 jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerImpl.java
/**
* Starts this worker. Registers the worker in Redis and begins polling the queues for jobs.<br>
* Stop this worker by calling end() on any thread.
*/
@Override
public void run() {
if (this.state.compareAndSet(NEW, RUNNING)) {
try {
renameThread("RUNNING");
this.threadRef.set(Thread.currentThread());
this.jedis.sadd(key(WORKERS), this.name);
this.jedis.set(key(WORKER, this.name, STARTED), new SimpleDateFormat(DATE_FORMAT).format(new Date()));
this.listenerDelegate.fireEvent(WORKER_START, this, null, null, null, null, null);
this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA)));
this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
this.multiPriorityQueuesScriptHash
.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES)));
poll();
} catch (Exception ex) {
LOG.error("Uncaught exception in worker run-loop!", ex);
this.listenerDelegate.fireEvent(WORKER_ERROR, this, null, null, null, null, ex);
} finally {
renameThread("STOPPING");
this.listenerDelegate.fireEvent(WORKER_STOP, this, null, null, null, null, null);
this.jedis.srem(key(WORKERS), this.name);
this.jedis.del(key(WORKER, this.name), key(WORKER, this.name, STARTED), key(STAT, FAILED, this.name),
key(STAT, PROCESSED, this.name));
this.jedis.quit();
this.threadRef.set(null);
}
} else if (RUNNING.equals(this.state.get())) {
throw new IllegalStateException("This WorkerImpl is already running");
} else {
throw new IllegalStateException("This WorkerImpl is shutdown");
}
}
实现了runnable方法,里头poll方法无限循环
/**
* Polls the queues for jobs and executes them.
*/
protected void poll() {
int missCount = 0;
String curQueue = null;
while (RUNNING.equals(this.state.get())) {
try {
if (threadNameChangingEnabled) {
renameThread("Waiting for " + JesqueUtils.join(",", this.queueNames));
}
curQueue = getNextQueue();
if (curQueue != null) {
checkPaused();
// Might have been waiting in poll()/checkPaused() for a while
if (RUNNING.equals(this.state.get())) {
this.listenerDelegate.fireEvent(WORKER_POLL, this, curQueue, null, null, null, null);
final String payload = pop(curQueue);
if (payload != null) {
process(ObjectMapperFactory.get().readValue(payload, Job.class), curQueue);
missCount = 0;
} else {
missCount++;
if (shouldSleep(missCount) && RUNNING.equals(this.state.get())) {
// Keeps worker from busy-spinning on empty queues
missCount = 0;
Thread.sleep(EMPTY_QUEUE_SLEEP_TIME);
}
}
}
}
} catch (InterruptedException ie) {
if (!isShutdown()) {
recoverFromException(curQueue, ie);
}
} catch (JsonParseException | JsonMappingException e) {
// If the job JSON is not deserializable, we never want to submit it again...
removeInFlight(curQueue);
recoverFromException(curQueue, e);
} catch (Exception e) {
recoverFromException(curQueue, e);
}
}
}
不断地pop和process
/**
* Materializes and executes the given job.
*
* @param job the Job to process
* @param curQueue the queue the payload came from
*/
protected void process(final Job job, final String curQueue) {
try {
this.processingJob.set(true);
if (threadNameChangingEnabled) {
renameThread("Processing " + curQueue + " since " + System.currentTimeMillis());
}
this.listenerDelegate.fireEvent(JOB_PROCESS, this, curQueue, job, null, null, null);
this.jedis.set(key(WORKER, this.name), statusMsg(curQueue, job));
final Object instance = this.jobFactory.materializeJob(job);
final Object result = execute(job, curQueue, instance);
success(job, instance, result, curQueue);
} catch (Throwable thrwbl) {
failure(thrwbl, job, curQueue);
} finally {
removeInFlight(curQueue);
this.jedis.del(key(WORKER, this.name));
this.processingJob.set(false);
}
}
而process这个方法,就是实例化目标job,然后execute
/**
* Executes the given job.
*
* @param job the job to execute
* @param curQueue the queue the job came from
* @param instance the materialized job
* @throws Exception if the instance is a {@link Callable} and throws an exception
* @return result of the execution
*/
protected Object execute(final Job job, final String curQueue, final Object instance) throws Exception {
if (instance instanceof WorkerAware) {
((WorkerAware) instance).setWorker(this);
}
this.listenerDelegate.fireEvent(JOB_EXECUTE, this, curQueue, job, instance, null, null);
final Object result;
if (instance instanceof Callable) {
result = ((Callable<?>) instance).call(); // The job is executing!
} else if (instance instanceof Runnable) {
((Runnable) instance).run(); // The job is executing!
result = null;
} else { // Should never happen since we''re testing the class earlier
throw new ClassCastException(
"Instance must be a Runnable or a Callable: " + instance.getClass().getName() + " - " + instance);
}
return result;
}
而execute就是调用call或者run方法。 从这里可以看出是单线程阻塞的,如果一个job比较耗时,是会影响其他job的触发和执行。
WorkerPool
jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerPool.java
/**
* Create a WorkerPool with the given number of Workers and the given <code>ThreadFactory</code>.
* @param workerFactory a Callable that returns an implementation of Worker
* @param numWorkers the number of Workers to create
* @param threadFactory the factory to create pre-configured Threads
*/
public WorkerPool(final Callable<? extends Worker> workerFactory, final int numWorkers,
final ThreadFactory threadFactory) {
this.workers = new ArrayList<>(numWorkers);
this.threads = new ArrayList<>(numWorkers);
this.eventEmitter = new WorkerPoolEventEmitter(this.workers);
for (int i = 0; i < numWorkers; i++) {
try {
final Worker worker = workerFactory.call();
this.workers.add(worker);
this.threads.add(threadFactory.newThread(worker));
} catch (RuntimeException re) {
throw re;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
/**
* {@inheritDoc}
*/
@Override
public void run() {
for (final Thread thread : this.threads) {
thread.start();
}
Thread.yield();
}
/**
* {@inheritDoc}
*/
@Override
public void end(final boolean now) {
for (final Worker worker : this.workers) {
worker.end(now);
}
}
workerpool维护了一组worker实例,起线程池的作用,尽可能提高job的并发度。
doc
- resque
- jesque
Apache的worker和prefork模式比较
StartServers 5
MinSpareServers 5
MaxSpareServers 10
MaxClients 150
MaxRequestsPerChild 0
</IfModule>
prefork 控制进程在最初建立“StartServers”个子进程后,为了满足MinSpareServers设置的需要创建一个进程,等待一秒钟,继续创建两 个,再等待一秒钟,继续创建四个……如此按指数级增加创建的进程数,最多达到每秒32个,直到满足MinSpareServers设置的值为止。这种模式 可以不必在请求到来时再产生新的进程,从而减小了系统开销以增加性能。MaxSpareServers设置了最大的空闲进程数,如果空闲进程数大于这个 值,Apache会自动kill掉一些多余进程。这个值不要设得过大,但如果设的值比MinSpareServers小,Apache会自动把其调整为 MinSpareServers+1。如果站点负载较大,可考虑同时加大MinSpareServers和MaxSpareServers。 MaxRequestsPerChild设置的是每个子进程可处理的请求数。每个子进程在处理了“MaxRequestsPerChild”个请求后将自 动销毁。0意味着无限,即子进程永不销毁。虽然缺省设为0可以使每个子进程处理更多的请求,但如果设成非零值也有两点重要的好处:1、可防止意外的内存泄 漏。2、在服务器负载下降的时侯会自动减少子进程数。因此,可根据服务器的负载来调整这个值。MaxClients是这些指令中最为重要的一个,设定的是 Apache可以同时处理的请求,是对Apache性能影响最大的参数。其缺省值150是远远不够的,如果请求总数已达到这个值(可通过ps -ef|grep http|wc -l来确认),那么后面的请求就要排队,直到某个已处理请求完毕。这就是系统资源还剩下很多而HTTP访问却很慢的主要原因。虽然理论上这个值越大,可以 处理的请求就越多,但Apache默认的限制不能大于256。ServerLimit指令无须重编译Apache就可以加大MaxClients。
StartServers 5
MinSpareServers 5
MaxSpareServers 10
MaxClients 10000
MaxRequestsPerChild 0
</IfModule>
相对于prefork,worker全新的支持多线程和多进程混合模型的MPM。由于 使用线程来处理,所以可以处理相对海量的请求,而系统资源的开销要小于基于进程的服务器。但是,worker也使用了多进程,每个进程又生成多个线程,以 获得基于进程服务器的稳定性。在configure –with-mpm=worker后,进行make编译、make install安装。在缺省生成的httpd-mpm.conf中有以下默认配置段:
<IfModule worker.c>
StartServers 2
MaxClients 150
MinSpareThreads 25
MaxSpareThreads 75
ThreadsPerChild 25
MaxRequestsPerChild 0
</IfModule>
Worker 由主控制进程生成“StartServers”个子进程,每个子进程中包含固定的ThreadsPerChild线程数,各个线程独立地处理请求。同样, 为了不在请求到来时再生成线程,MinSpareThreads和MaxSpareThreads设置了最少和最多的空闲线程数;而MaxClients 设置了同时连入的clients最大总数。如果现有子进程中的线程总数不能满足负载,控制进程将派生新的子进程。MinSpareThreads和 MaxSpareThreads的最大缺省值分别是75和250。这两个参数对Apache的性能影响并不大,可以按照实际情况相应调节。 ThreadsPerChild是worker MPM中与性能相关最密切的指令。ThreadsPerChild的最大缺省值是64,如果负载较大,64也是不够的。这时要显式使用 ThreadLimit指令,它的最大缺省值是20000。Worker模式下所能同时处理的请求总数是由子进程总数乘以ThreadsPerChild 值决定的,应该大于等于MaxClients。如果负载很大,现有的子进程数不能满足时,控制进程会派生新的子进程。默认最大的子进程总数是16,加大时 也需要显式声明ServerLimit(最大值是20000)。需要注意的是,如果显式声明了ServerLimit,那么它乘以 ThreadsPerChild的值必须大于等于MaxClients,而且MaxClients必须是ThreadsPerChild的整数倍,否则 Apache将会自动调节到一个相应值。
<IfModule worker.c>
ServerLimit 25
ThreadLimit 200
StartServers 3
MaxClients 2000
MinSpareThreads 50
MaxSpareThreads 200
ThreadsPerChild 100
MaxRequestsPerChild 0
</IfModule>
1、Prefor方式
(ServerLimit,StartServer,MinSpareServers,MaxSpareServers,MaxClients,MaxRequestPerChild)
-n/-c(ab参数) |
cpu% |
Mem |
|
Requestspersecond
(-,5,10,150,0) | 100000/100 |
28.8 |
285 |
8434 |
100000/200 |
29.2 |
304 |
8032 |
100000/500 |
25.3 |
323 |
7348 |
100000/1000 |
24.4 |
330 |
5886 |
(10000,500,0) | 100000/100 |
28.7 |
371 |
8345 |
100000/200 |
27.4 |
389 |
7929 |
100000/500 |
24.9 |
417 |
7229 |
100000/1000 |
23.4 |
437 |
6676 |
(10000,1000,0) | 100000/100 |
28.8 |
408 |
8517 |
100000/200 |
27.0 |
422 |
8045 |
100000/500 |
24.2 |
455 |
7236 |
100000/1000 |
22.5 |
470 |
6570 |
(10000,1500,0) | 100000/100 |
29.6 |
330 |
8407 |
100000/200 |
28.1 |
349 |
8014 |
100000/500 |
26.4 |
380 |
7290 |
100000/1000 |
24.0 |
400 |
6686 |
|
(ServerLimt,Threadlimt,Startservers,MinspareThread,MaxspareThread,ThreadperChild,MaxRequestPerChild)
BackgroundWorker RunWorkerCompleted事件
我的C#应用程序有几个后台工作人员。有时,一名后台工作者会解雇另一名。当第一个后台工作人员完成并且RunWorkerCompleted
事件被触发时,该事件将在哪个线程上触发,UI或从哪个线程RunWorkerAsync
调用?我正在使用Microsoft
Visual C#2008 Express Edition。您可能有任何想法或建议。谢谢。
答案1
小编典典如果BackgroundWorker
是从UI线程创建的,则RunWorkerCompleted
事件还将在UI线程上引发。
如果是从后台线程创建的,则事件将在未定义的后台线程上引发(不一定是同一线程,除非您使用custom SynchronizationContext
)。
有趣的是,这似乎并没有在MSDN上得到充分记录。我能够找到的最佳参考是在这里:
在您的应用程序中实现多线程的首选方法是使用BackgroundWorker组件。的BackgroundWorker的组件使用一个事件驱动的模型为多线程。
后台线程运行您的 DoWork事件处理程序,而创建控件的线程运行您的ProgressChanged和RunWorkerCompleted事件处理程序。您可以从ProgressChanged和RunWorkerCompleted事件处理程序中调用控件。
c# – 如何使用BackgroundWorker事件RunWorkerCompleted
在应用程序的主线程中.刚刚启动BackgroundWork.
if (backgroundWorker1.IsBusy != true) { // Start the asynchronous operation. backgroundWorker1.RunWorkerAsync(); }
然后它会触发DoWork事件.所以我们可以在那里做点什么.
private void backgroundWorker1_DoWork(object sender,DoWorkEventArgs e) { BackgroundWorker worker = sender as BackgroundWorker; ...... // report progress. worker.ReportProgress(iProgress); .... }
然后我们只需要处理ProgressChanged事件以显示BackgroundWorker进度.
private void backgroundWorker1_ProgressChanged(object sender,ProgressChangedEventArgs e) { //show progress. resultLabel.Text = (e.Progresspercentage.ToString() + "%"); }
DoWork完成或发生异常后.将触发RunWorkerCompleted事件.
这是我对这个事件处理的问题.请帮助审查它们.谢谢.
我注意到RunWorkerCompletedEventArgs中有一个名为’Result’的属性,它用于什么?我怎么用呢?
是否有可能接收我的自定义异常消息而不是e.error?如果有,如何制作?
private void backgroundWorker1_RunWorkerCompleted(object sender,RunWorkerCompletedEventArgs e) { if (e.Cancelled == true) { resultLabel.Text = "Canceled!"; } else if (e.Error != null) { resultLabel.Text = "Error: " + e.Error.Message; } else { resultLabel.Text = e.Result.ToString(); } }
解决方法
您可以为此分配任何您喜欢的内容,因此您可以返回整数,字符串,对象/复合类型等.
如果在DoWork()中抛出异常,则可以在RunWorkerCompletedEventArgs的Error属性中访问该异常.在这种情况下,访问Result属性将导致抛出TargetInvocationException.
com.amazonaws.services.simpleworkflow.flow.ActivityWorker的实例源码
public static void main(String[] args) throws Exception { AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient(); service.setEndpoint("https://swf.us-east-1.amazonaws.com"); String domain = "helloWorldWalkthrough"; String taskListToPoll = "HelloWorldAsyncList"; WorkflowWorker wfw = new WorkflowWorker(service,domain,taskListToPoll); wfw.setRegisterDomain(true); wfw.setDomainRetentionPeriodindays(1); wfw.addWorkflowImplementationType(GreeterWorkflowImpl.class); wfw.start(); ActivityWorker aw = new ActivityWorker(service,taskListToPoll); aw.addActivitiesImplementation(new GreeteractivitiesImpl()); aw.start(); GreeterWorkflowClientExternalFactory clientFactory = new GreeterWorkflowClientExternalFactoryImpl(service,domain); GreeterWorkflowClientExternal client = clientFactory.getClient(); client.greet(); }
private HorseActivitiesWorker(final int instance) throws Exception { /* * Configure an Flow Framework ACTIVITY worker with a domain and queue. */ this.worker = new ActivityWorker(SWF,DOMAIN,TASKLIST); /* * Can add multiple activities implementation instances. Each is a * singleton that remains active for the duration of the worker. There * should be no shared mutable state in an activities implementation. */ this.worker.addActivitiesImplementation(new HorseActivitiesImpl( instance)); }
private AnnouncerActivitiesWorker(final int instance) throws Exception { /* * Configure an Flow Framework ACTIVITY worker with a domain and queue. */ this.worker = new ActivityWorker(SWF,TASKLIST); /* * Can add multiple activities implementation instances. Each is a * singleton that remains active for the duration of the worker. There * should be no shared mutable state in an activities implementation. */ this.worker.addActivitiesImplementation(new AnnouncerActivitiesImpl( instance)); }
private void startWorker(ConfigHelper configHelper) throws Exception { // Create activity implementations BookingActivities bookingActivitiesImpl = new BookingActivitiesImpl(); // Start worker to poll the common task list String taskList = configHelper.getValueFromConfig(BookingConfigKeys.ACTIVITY_WORKER_TASKLIST); worker = new ActivityWorker(swfService,taskList); worker.setDomainRetentionPeriodindays(domainRetentionPeriodindays); worker.setRegisterDomain(true); worker.addActivitiesImplementation(bookingActivitiesImpl); worker.start(); System.out.println("Worker Started for Activity Task List: " + taskList); }
今天关于聊聊jesque的WorkerImpl与WorkerPool的分享就到这里,希望大家有所收获,若想了解更多关于Apache的worker和prefork模式比较、BackgroundWorker RunWorkerCompleted事件、c# – 如何使用BackgroundWorker事件RunWorkerCompleted、com.amazonaws.services.simpleworkflow.flow.ActivityWorker的实例源码等相关知识,可以在本站进行查询。
本文标签: