GVKun编程网logo

聊聊jesque的WorkerImpl与WorkerPool

5

在本文中,我们将带你了解聊聊jesque的WorkerImpl与WorkerPool在这篇文章中,同时我们还将给您一些技巧,以帮助您实现更有效的Apache的worker和prefork模式比较、Ba

在本文中,我们将带你了解聊聊jesque的WorkerImpl与WorkerPool在这篇文章中,同时我们还将给您一些技巧,以帮助您实现更有效的Apache的worker和prefork模式比较、BackgroundWorker RunWorkerCompleted事件、c# – 如何使用BackgroundWorker事件RunWorkerCompleted、com.amazonaws.services.simpleworkflow.flow.ActivityWorker的实例源码

本文目录一览:

聊聊jesque的WorkerImpl与WorkerPool

聊聊jesque的WorkerImpl与WorkerPool

本文主要讲一下jesque的WorkerImpl与WorkerPool。

resque

Resque是一个使用redis来创建后台任务的ruby组件。而jesque是其java版本。通常用来做延时队列。

WorkerImpl

		List<String> queues = Arrays.asList(delayedQueue);
		final Worker worker = new WorkerImpl(jesqueConfig,queues, new MapBasedJobFactory(map(entry("DemoJob", DemoJob.class))));
		final Thread workerThread = new Thread(worker);
		workerThread.start();

这是worker实例 jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerImpl.java

/**
     * Starts this worker. Registers the worker in Redis and begins polling the queues for jobs.<br>
     * Stop this worker by calling end() on any thread.
     */
    @Override
    public void run() {
        if (this.state.compareAndSet(NEW, RUNNING)) {
            try {
                renameThread("RUNNING");
                this.threadRef.set(Thread.currentThread());
                this.jedis.sadd(key(WORKERS), this.name);
                this.jedis.set(key(WORKER, this.name, STARTED), new SimpleDateFormat(DATE_FORMAT).format(new Date()));
                this.listenerDelegate.fireEvent(WORKER_START, this, null, null, null, null, null);
                this.popScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_LUA)));
                this.lpoplpushScriptHash.set(this.jedis.scriptLoad(ScriptUtils.readScript(LPOPLPUSH_LUA)));
                this.multiPriorityQueuesScriptHash
                        .set(this.jedis.scriptLoad(ScriptUtils.readScript(POP_FROM_MULTIPLE_PRIO_QUEUES)));
                poll();
            } catch (Exception ex) {
                LOG.error("Uncaught exception in worker run-loop!", ex);
                this.listenerDelegate.fireEvent(WORKER_ERROR, this, null, null, null, null, ex);
            } finally {
                renameThread("STOPPING");
                this.listenerDelegate.fireEvent(WORKER_STOP, this, null, null, null, null, null);
                this.jedis.srem(key(WORKERS), this.name);
                this.jedis.del(key(WORKER, this.name), key(WORKER, this.name, STARTED), key(STAT, FAILED, this.name),
                        key(STAT, PROCESSED, this.name));
                this.jedis.quit();
                this.threadRef.set(null);
            }
        } else if (RUNNING.equals(this.state.get())) {
            throw new IllegalStateException("This WorkerImpl is already running");
        } else {
            throw new IllegalStateException("This WorkerImpl is shutdown");
        }
    }

实现了runnable方法,里头poll方法无限循环

   /**
     * Polls the queues for jobs and executes them.
     */
    protected void poll() {
        int missCount = 0;
        String curQueue = null;
        while (RUNNING.equals(this.state.get())) {
            try {
                if (threadNameChangingEnabled) {
                    renameThread("Waiting for " + JesqueUtils.join(",", this.queueNames));
                }
                curQueue = getNextQueue();
                if (curQueue != null) {
                    checkPaused();
                    // Might have been waiting in poll()/checkPaused() for a while
                    if (RUNNING.equals(this.state.get())) {
                        this.listenerDelegate.fireEvent(WORKER_POLL, this, curQueue, null, null, null, null);
                        final String payload = pop(curQueue);
                        if (payload != null) {
                            process(ObjectMapperFactory.get().readValue(payload, Job.class), curQueue);
                            missCount = 0;
                        } else {
                            missCount++;
                            if (shouldSleep(missCount) && RUNNING.equals(this.state.get())) {
                                // Keeps worker from busy-spinning on empty queues
                                missCount = 0;
                                Thread.sleep(EMPTY_QUEUE_SLEEP_TIME);
                            }
                        }
                    }
                }
            } catch (InterruptedException ie) {
                if (!isShutdown()) {
                    recoverFromException(curQueue, ie);
                }
            } catch (JsonParseException | JsonMappingException e) {
                // If the job JSON is not deserializable, we never want to submit it again...
                removeInFlight(curQueue);
                recoverFromException(curQueue, e);
            } catch (Exception e) {
                recoverFromException(curQueue, e);
            }
        }
    }

不断地pop和process

/**
     * Materializes and executes the given job.
     * 
     * @param job the Job to process
     * @param curQueue the queue the payload came from
     */
    protected void process(final Job job, final String curQueue) {
        try {
            this.processingJob.set(true);
            if (threadNameChangingEnabled) {
                renameThread("Processing " + curQueue + " since " + System.currentTimeMillis());
            }
            this.listenerDelegate.fireEvent(JOB_PROCESS, this, curQueue, job, null, null, null);
            this.jedis.set(key(WORKER, this.name), statusMsg(curQueue, job));
            final Object instance = this.jobFactory.materializeJob(job);
            final Object result = execute(job, curQueue, instance);
            success(job, instance, result, curQueue);
        } catch (Throwable thrwbl) {
            failure(thrwbl, job, curQueue);
        } finally {
            removeInFlight(curQueue);
            this.jedis.del(key(WORKER, this.name));
            this.processingJob.set(false);
        }
    }

而process这个方法,就是实例化目标job,然后execute

/**
     * Executes the given job.
     * 
     * @param job the job to execute
     * @param curQueue the queue the job came from
     * @param instance the materialized job
     * @throws Exception if the instance is a {@link Callable} and throws an exception
     * @return result of the execution
     */
    protected Object execute(final Job job, final String curQueue, final Object instance) throws Exception {
        if (instance instanceof WorkerAware) {
            ((WorkerAware) instance).setWorker(this);
        }
        this.listenerDelegate.fireEvent(JOB_EXECUTE, this, curQueue, job, instance, null, null);
        final Object result;
        if (instance instanceof Callable) {
            result = ((Callable<?>) instance).call(); // The job is executing!
        } else if (instance instanceof Runnable) {
            ((Runnable) instance).run(); // The job is executing!
            result = null;
        } else { // Should never happen since we''re testing the class earlier
            throw new ClassCastException(
                    "Instance must be a Runnable or a Callable: " + instance.getClass().getName() + " - " + instance);
        }
        return result;
    }

而execute就是调用call或者run方法。 从这里可以看出是单线程阻塞的,如果一个job比较耗时,是会影响其他job的触发和执行。

WorkerPool

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerPool.java

/**
     * Create a WorkerPool with the given number of Workers and the given <code>ThreadFactory</code>.
     * @param workerFactory a Callable that returns an implementation of Worker
     * @param numWorkers the number of Workers to create
     * @param threadFactory the factory to create pre-configured Threads
     */
    public WorkerPool(final Callable<? extends Worker> workerFactory, final int numWorkers,
            final ThreadFactory threadFactory) {
        this.workers = new ArrayList<>(numWorkers);
        this.threads = new ArrayList<>(numWorkers);
        this.eventEmitter = new WorkerPoolEventEmitter(this.workers);
        for (int i = 0; i < numWorkers; i++) {
            try {
                final Worker worker = workerFactory.call();
                this.workers.add(worker);
                this.threads.add(threadFactory.newThread(worker));
            } catch (RuntimeException re) {
                throw re;
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }
/**
     * {@inheritDoc}
     */
    @Override
    public void run() {
        for (final Thread thread : this.threads) {
            thread.start();
        }
        Thread.yield();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public void end(final boolean now) {
        for (final Worker worker : this.workers) {
            worker.end(now);
        }
    }    

workerpool维护了一组worker实例,起线程池的作用,尽可能提高job的并发度。

doc

  • resque
  • jesque

Apache的worker和prefork模式比较

Apache的worker和prefork模式比较

Apache worker & prefork
 
选择prefork还是worker可以在编译时使用–with-mpm=MPM参数指定,默认为prefork,
prefork
prefork采用预派生子进程方式,用单独的子进程来处理 不同的请求,进程之间彼此独立。在make编译和make install安装后,使用httpd -l来确定当前使用的MPM是prefork.c。查看httpd-mpm.conf配置文件,里面包含如下默认的配置段:
<IfModule prefork.c>
StartServers 5
MinSpareServers 5
MaxSpareServers 10
MaxClients 150
MaxRequestsPerChild 0
</IfModule>
prefork 控制进程在最初建立“StartServers”个子进程后,为了满足MinSpareServers设置的需要创建一个进程,等待一秒钟,继续创建两 个,再等待一秒钟,继续创建四个……如此按指数级增加创建的进程数,最多达到每秒32个,直到满足MinSpareServers设置的值为止。这种模式 可以不必在请求到来时再产生新的进程,从而减小了系统开销以增加性能。MaxSpareServers设置了最大的空闲进程数,如果空闲进程数大于这个 值,Apache会自动kill掉一些多余进程。这个值不要设得过大,但如果设的值比MinSpareServers小,Apache会自动把其调整为 MinSpareServers+1。如果站点负载较大,可考虑同时加大MinSpareServers和MaxSpareServers。 MaxRequestsPerChild设置的是每个子进程可处理的请求数。每个子进程在处理了“MaxRequestsPerChild”个请求后将自 动销毁。0意味着无限,即子进程永不销毁。虽然缺省设为0可以使每个子进程处理更多的请求,但如果设成非零值也有两点重要的好处:1、可防止意外的内存泄 漏。2、在服务器负载下降的时侯会自动减少子进程数。因此,可根据服务器的负载来调整这个值。MaxClients是这些指令中最为重要的一个,设定的是 Apache可以同时处理的请求,是对Apache性能影响最大的参数。其缺省值150是远远不够的,如果请求总数已达到这个值(可通过ps -ef|grep http|wc -l来确认),那么后面的请求就要排队,直到某个已处理请求完毕。这就是系统资源还剩下很多而HTTP访问却很慢的主要原因。虽然理论上这个值越大,可以 处理的请求就越多,但Apache默认的限制不能大于256。ServerLimit指令无须重编译Apache就可以加大MaxClients。
<IfModule prefork.c>
ServerLimit 10000
StartServers 5
MinSpareServers 5
MaxSpareServers 10
MaxClients 10000
MaxRequestsPerChild 0
</IfModule>
Worker
相对于prefork,worker全新的支持多线程和多进程混合模型的MPM。由于 使用线程来处理,所以可以处理相对海量的请求,而系统资源的开销要小于基于进程的服务器。但是,worker也使用了多进程,每个进程又生成多个线程,以 获得基于进程服务器的稳定性。在configure –with-mpm=worker后,进行make编译、make install安装。在缺省生成的httpd-mpm.conf中有以下默认配置段:
<IfModule worker.c>
StartServers 2
MaxClients 150
MinSpareThreads 25
MaxSpareThreads 75
ThreadsPerChild 25
MaxRequestsPerChild 0
</IfModule>
Worker 由主控制进程生成“StartServers”个子进程,每个子进程中包含固定的ThreadsPerChild线程数,各个线程独立地处理请求。同样, 为了不在请求到来时再生成线程,MinSpareThreads和MaxSpareThreads设置了最少和最多的空闲线程数;而MaxClients 设置了同时连入的clients最大总数。如果现有子进程中的线程总数不能满足负载,控制进程将派生新的子进程。MinSpareThreads和 MaxSpareThreads的最大缺省值分别是75和250。这两个参数对Apache的性能影响并不大,可以按照实际情况相应调节。 ThreadsPerChild是worker MPM中与性能相关最密切的指令。ThreadsPerChild的最大缺省值是64,如果负载较大,64也是不够的。这时要显式使用 ThreadLimit指令,它的最大缺省值是20000。Worker模式下所能同时处理的请求总数是由子进程总数乘以ThreadsPerChild 值决定的,应该大于等于MaxClients。如果负载很大,现有的子进程数不能满足时,控制进程会派生新的子进程。默认最大的子进程总数是16,加大时 也需要显式声明ServerLimit(最大值是20000)。需要注意的是,如果显式声明了ServerLimit,那么它乘以 ThreadsPerChild的值必须大于等于MaxClients,而且MaxClients必须是ThreadsPerChild的整数倍,否则 Apache将会自动调节到一个相应值。
<IfModule worker.c>
ServerLimit 25
ThreadLimit 200
StartServers 3
MaxClients 2000
MinSpareThreads 50
MaxSpareThreads 200
ThreadsPerChild 100
MaxRequestsPerChild 0
</IfModule>
下面是利用Apache自带的测试工具ab对Server进行测试的情况(设定请求的index页面为6bytes),cpu%为cpu占用率,mem为内存使用量(M为单位),RequestsPerSecond为每秒处理的请求数。
1、Prefor方式
  (ServerLimit,StartServer,MinSpareServers,MaxSpareServers,MaxClients,MaxRequestPerChild)            

-n/-c(ab参数) cpu% Mem
Requestspersecond
(-,5,10,150,0)
100000/100 28.8 285 8434
100000/200 29.2 304 8032
100000/500 25.3 323 7348
100000/1000 24.4 330 5886
(10000,500,0)
100000/100 28.7 371 8345
100000/200 27.4 389 7929
100000/500 24.9 417 7229
100000/1000 23.4 437 6676
(10000,1000,0)
100000/100 28.8 408 8517
100000/200 27.0 422 8045
100000/500 24.2 455 7236
100000/1000 22.5 470 6570
(10000,1500,0)
100000/100 29.6 330 8407
100000/200 28.1 349 8014
100000/500 26.4 380 7290
100000/1000 24.0 400 6686
2、Worker方式
(ServerLimt,Threadlimt,Startservers,MinspareThread,MaxspareThread,ThreadperChild,MaxRequestPerChild)
                   

-n/-c(ab参数) cpu% mem RequestsperSecond (50,10000,50,200,0) 100000/100 18.6 188 6020 100000/200 20.1 195 5892 100000/500 19.8 209 5708 100000/1000 22.2 218 6081 (100,100,0) 100000/100 24.5 240 6919 100000/200 23.6 247 6798 100000/500 24.6 254 6827 100000/1000 22.3 271 6114 (200,0) 100000/100 27.3 301 7781 100000/200 27.4 307 7789 100000/500 26.0 320 7141 100000/1000 21.8 344 6110
相对来说,prefork方式速度要稍高于worker,然而它需要的cpu和memory资源也稍多于woker。

BackgroundWorker RunWorkerCompleted事件

BackgroundWorker RunWorkerCompleted事件

我的C#应用​​程序有几个后台工作人员。有时,一名后台工作者会解雇另一名。当第一个后台工作人员完成并且RunWorkerCompleted事件被触发时,该事件将在哪个线程上触发,UI或从哪个线程RunWorkerAsync调用?我正在使用Microsoft
Visual C#2008 Express Edition。您可能有任何想法或建议。谢谢。

答案1

小编典典

如果BackgroundWorker是从UI线程创建的,则RunWorkerCompleted事件还将在UI线程上引发。

如果是从后台线程创建的,则事件将在未定义的后台线程上引发(不一定是同一线程,除非您使用custom SynchronizationContext)。

有趣的是,这似乎并没有在MSDN上得到充分记录。我能够找到的最佳参考是在这里:

在您的应用程序中实现多线程的首选方法是使用BackgroundWorker组件。的BackgroundWorker的组件使用一个事件驱动的模型为多线程。
后台线程运行您的 DoWork事件处理程序,而创建控件的线程运行您的ProgressChanged和RunWorkerCompleted事件处理程序。您可以从ProgressChanged和RunWorkerCompleted事件处理程序中调用控件。

c# – 如何使用BackgroundWorker事件RunWorkerCompleted

c# – 如何使用BackgroundWorker事件RunWorkerCompleted

总而言之,我已经知道BackgroundWorker在WinForm中处理多线程案例的基本用法.代码结构如下所示.

在应用程序的主线程中.刚刚启动BackgroundWork.

if (backgroundWorker1.IsBusy != true)
    {
        // Start the asynchronous operation.
        backgroundWorker1.RunWorkerAsync();
    }

然后它会触发DoWork事件.所以我们可以在那里做点什么.

private void backgroundWorker1_DoWork(object sender,DoWorkEventArgs e)
{
    BackgroundWorker worker = sender as BackgroundWorker;
    ......
    // report progress.
    worker.ReportProgress(iProgress);
    ....
}

然后我们只需要处理ProgressChanged事件以显示BackgroundWorker进度.

private void backgroundWorker1_ProgressChanged(object sender,ProgressChangedEventArgs e)
{
    //show progress.   
    resultLabel.Text = (e.Progresspercentage.ToString() + "%");
}

DoWork完成或发生异常后.将触发RunWorkerCompleted事件.

这是我对这个事件处理的问题.请帮助审查它们.谢谢.

我注意到RunWorkerCompletedEventArgs中有一个名为’Result’的属性,它用于什么?我怎么用呢?

是否有可能接收我的自定义异常消息而不是e.error?如果有,如何制作?

private void backgroundWorker1_RunWorkerCompleted(object sender,RunWorkerCompletedEventArgs e)
{
    if (e.Cancelled == true)
    {
        resultLabel.Text = "Canceled!";
    }
    else if (e.Error != null)
    {
        resultLabel.Text = "Error: " + e.Error.Message;
    }
    else
    {
        resultLabel.Text = e.Result.ToString();
    }
}

解决方法

RunWorkerCompletedEventArgs中的Result属性是您在DoWork()中分配给DoWorkEventHandler的Result属性的值.

您可以为此分配任何您喜欢的内容,因此您可以返回整数,字符串,对象/复合类型等.

如果在DoWork()中抛出异常,则可以在RunWorkerCompletedEventArgs的Error属性中访问该异常.在这种情况下,访问Result属性将导致抛出TargetInvocationException.

com.amazonaws.services.simpleworkflow.flow.ActivityWorker的实例源码

com.amazonaws.services.simpleworkflow.flow.ActivityWorker的实例源码

项目:swf-flow-gradle    文件:GreeterWorker.java   
public static void main(String[] args) throws Exception {
    AmazonSimpleWorkflow service = new AmazonSimpleWorkflowClient();
    service.setEndpoint("https://swf.us-east-1.amazonaws.com");

    String domain = "helloWorldWalkthrough";
    String taskListToPoll = "HelloWorldAsyncList";

    WorkflowWorker wfw = new WorkflowWorker(service,domain,taskListToPoll);
    wfw.setRegisterDomain(true);
    wfw.setDomainRetentionPeriodindays(1);
    wfw.addWorkflowImplementationType(GreeterWorkflowImpl.class);
    wfw.start();

    ActivityWorker aw = new ActivityWorker(service,taskListToPoll);
    aw.addActivitiesImplementation(new GreeteractivitiesImpl());
    aw.start();

    GreeterWorkflowClientExternalFactory clientFactory = new GreeterWorkflowClientExternalFactoryImpl(service,domain);
    GreeterWorkflowClientExternal client = clientFactory.getClient();
    client.greet();
}
项目:swf-horserace    文件:HorseActivitiesWorker.java   
private HorseActivitiesWorker(final int instance) throws Exception {

        /*
         * Configure an Flow Framework ACTIVITY worker with a domain and queue.
         */
        this.worker = new ActivityWorker(SWF,DOMAIN,TASKLIST);

        /*
         * Can add multiple activities implementation instances. Each is a
         * singleton that remains active for the duration of the worker. There
         * should be no shared mutable state in an activities implementation.
         */
        this.worker.addActivitiesImplementation(new HorseActivitiesImpl(
                instance));

    }
项目:swf-horserace    文件:AnnouncerActivitiesWorker.java   
private AnnouncerActivitiesWorker(final int instance) throws Exception {

        /*
         * Configure an Flow Framework ACTIVITY worker with a domain and queue.
         */
        this.worker = new ActivityWorker(SWF,TASKLIST);

        /*
         * Can add multiple activities implementation instances. Each is a
         * singleton that remains active for the duration of the worker. There
         * should be no shared mutable state in an activities implementation.
         */
        this.worker.addActivitiesImplementation(new AnnouncerActivitiesImpl(
                instance));

    }
项目:aws-flow-maven-eclipse-samples    文件:ActivityHost.java   
private void startWorker(ConfigHelper configHelper) throws Exception {      
    // Create activity implementations
    BookingActivities bookingActivitiesImpl = new BookingActivitiesImpl();

    // Start worker to poll the common task list
    String taskList = configHelper.getValueFromConfig(BookingConfigKeys.ACTIVITY_WORKER_TASKLIST);
       worker = new ActivityWorker(swfService,taskList);
       worker.setDomainRetentionPeriodindays(domainRetentionPeriodindays);
       worker.setRegisterDomain(true);
    worker.addActivitiesImplementation(bookingActivitiesImpl);
    worker.start();
       System.out.println("Worker Started for Activity Task List: " + taskList);        
}

今天关于聊聊jesque的WorkerImpl与WorkerPool的分享就到这里,希望大家有所收获,若想了解更多关于Apache的worker和prefork模式比较、BackgroundWorker RunWorkerCompleted事件、c# – 如何使用BackgroundWorker事件RunWorkerCompleted、com.amazonaws.services.simpleworkflow.flow.ActivityWorker的实例源码等相关知识,可以在本站进行查询。

本文标签: