www.91084.com

GVKun编程网logo

如果使用thenApply,则不会调用CompletableFuture#whenComplete(如果要调用a.py内所有的模块应使用语句)

16

在这篇文章中,我们将带领您了解如果使用thenApply,则不会调用CompletableFuture#whenComplete的全貌,包括如果要调用a.py内所有的模块应使用语句的相关情况。同时,我

在这篇文章中,我们将带领您了解如果使用thenApply,则不会调用CompletableFuture#whenComplete的全貌,包括如果要调用a.py内所有的模块应使用语句的相关情况。同时,我们还将为您介绍有关005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、006-优化web请求二-应用缓存、异步调用【Future、ListenableFuture、CompletableFuture】、ETag、WebSocket【SockJS、Stomp】、CompletableFuture allof(..)join()与CompletableFuture.join()、CompletableFuture allof(..)。join()与CompletableFuture.join()的知识,以帮助您更好地理解这个主题。

本文目录一览:

如果使用thenApply,则不会调用CompletableFuture#whenComplete(如果要调用a.py内所有的模块应使用语句)

如果使用thenApply,则不会调用CompletableFuture#whenComplete(如果要调用a.py内所有的模块应使用语句)

我有以下代码,该代码在远程服务器上安排任务,然后使用轮询完成ScheduledExecutorService#scheduleAtFixedRate。任务完成后,它将下载结果。我想将a返回Future给调用方,以便他们可以确定阻止时间和阻止时间,并为他们提供取消任务的选项。

我的问题是,如果客户端取消Futuredownload方法返回的值,则whenComplete块不会执行。如果我删除thenApply它。很明显,我对Future构图有些误解…我应该改变什么?

public Future<Object> download(Something something) {    String jobId = schedule(something);    CompletableFuture<String> job = pollForCompletion(jobId);    return job.thenApply(this::downloadResult);}private CompletableFuture<String> pollForCompletion(String jobId) {    ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();    CompletableFuture<String> completionFuture = new CompletableFuture<>();    ScheduledFuture<?> checkFuture = executor.scheduleAtFixedRate(() -> {                       if (pollRemoteServer(jobId).equals("COMPLETE")) {                completionFuture.complete(jobId);            }    }, 0, 10, TimeUnit.SECONDS);    completionFuture                            .whenComplete((result, thrown) -> {                System.out.println("XXXXXXXXXXX"); //Never happens unless thenApply is removed                checkFuture.cancel(true);                executor.shutdown();            });    return completionFuture;}

同样,如果我这样做:

return completionFuture.whenComplete(...)

代替

completionFuture.whenComplete(...);return completionFuture;

whenComplete也永远不会执行。这对我来说似乎很违反直觉。从逻辑上说,Future返回应该不是whenComplete我应该坚持的那个吗?

编辑:

我更改了代码以明确向后传播取消。这是令人讨厌和难以理解的,但是它可以正常工作,我找不到更好的方法:

public Future<Object> download(Something something) throws ChartDataGenException, Exception {        String jobId = schedule(something);        CompletableFuture<String> job = pollForCompletion(jobId);        CompletableFuture<Object> resulting = job.thenApply(this::download);        resulting.whenComplete((result, thrown) -> {            if (resulting.isCancelled()) { //the check is not necessary, but communicates the intent better                job.cancel(true);            }        });        return resulting;}

编辑2:

我发现了tascalate-concurrent,这是一个出色的库,提供sane的明智实现CompletionStage,并支持可以通过DependentPromise透明方式反向传播取消的相关承诺(通过类)。似乎非常适合此用例。

这应该足够了:

DependentPromise          .from(pollForCompletion(jobId))          .thenApply(this::download, true); //true means the cancellation should back-propagate

请注意,没有测试这种方法。

答案1

小编典典

您的结构如下:

           ┌──────────────────┐           │ completionFuture |           └──────────────────┘             ↓              ↓  ┌──────────────┐      ┌───────────┐  │ whenComplete |      │ thenApply |  └──────────────┘      └───────────┘

因此,当您取消thenApply未来时,原始completionFuture对象不会受到影响,因为它与thenApply舞台无关。但是,如果您不链接该thenApply阶段,那么您将返回原始completionFuture实例,并且取消该阶段会导致所有相关阶段的取消,从而使whenComplete操作立即执行。

但是,如果thenApply取消该阶段,则满足条件的completionFuture静止图像可能会完成pollRemoteServer(jobId).equals("COMPLETE"),因为轮询不会停止。但是,我们不知道的关系`jobId

schedule(something)pollRemoteServer(jobId)`。如果您的应用程序状态以某种方式更改,使得取消下载后再也无法满足此条件,那么这种未来将永远无法完成…


关于您的最后一个问题,哪个期货是“我应该坚持的那个期货?”,实际上,并不需要线性的期货链,而方便的方法CompletableFuture可以轻松地创建这样的链,而不仅仅是通常,这是最没有用的事情,因为如果您具有线性依赖性,则只需编写一段代码即可。您将两个独立阶段链接在一起的模型是正确的,但是取消不能通过它起作用,但是它也不可以通过线性链来起作用。

如果希望能够取消源阶段,则需要对其进行引用,但是,如果想要获得从属阶段的结果,则也需要对该阶段进行引用。

005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture

005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture

一、概述

  创建线程的两种方式,一种是直接继承 Thread,另外一种就是实现 Runnable 接口。这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从 Java 1.5 开始,就提供了 Callable 和 Future,通过它们可以在任务执行完毕之后得到任务执行结果。

  详述:https://www.cnblogs.com/bjlhx/p/7588971.html

1.1、Runnable 接口

它是一个接口,里面只声明了一个 run () 方法:

public interface Runnable {
    public abstract void run();
}

由于 run () 方法返回值为 void 类型,所以在执行完任务之后无法返回任何结果。

1.2、Callable 接口

Callable 接口位于 java.util.concurrent 包下,在它里面也只声明了一个方法,只不过这个方法叫做 call ()。

public interface Callable<V> {   
    V call() throws Exception;
}

是一个泛型接口,call () 函数返回的类型就是传递进来的 V 类型。Callable 接口可以看作是 Runnable 接口的补充,call 方法带有返回值,并且可以抛出异常。

1.3、Future 接口

  Future 的核心思想是:

    一个方法,计算过程可能非常耗时,等待方法返回,显然不明智。可以在调用方法的时候,立马返回一个 Future,可以通过 Future 这个数据结构去控制方法 f 的计算过程。

  Future 类位于 java.util.concurrent 包下,它是一个接口:这里的控制包括:

    get 方法:获取计算结果(如果还没计算完,也是必须等待的)这个方法会产生阻塞,会一直等到任务执行完毕才返回;

    get (long timeout, TimeUnit unit) 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回 null。

    cancel 方法:还没计算完,可以取消计算过程,如果取消任务成功则返回 true,如果取消任务失败则返回 false。参数 mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务,如果设置 true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论 mayInterruptIfRunning 为 true 还是 false,此方法肯定返回 false,即如果取消已经完成的任务会返回 false;如果任务正在执行,若 mayInterruptIfRunning 设置为 true,则返回 true,若 mayInterruptIfRunning 设置为 false,则返回 false;如果任务还没有执行,则无论 mayInterruptIfRunning 为 true 还是 false,肯定返回 true。

    isDone 方法:判断是否计算完

    isCancelled 方法:判断计算是否被取消,方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。

  也就是说 Future 提供了三种功能:

    1)判断任务是否完成;

    2)能够中断任务;

    3)能够获取任务执行结果。

  Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。

  因为 Future 只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的 FutureTask。

使用 Callable+Future 获取执行结果:

public class Test {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        Future<Integer> result = executor.submit(task);
        executor.shutdown();
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.4、FutureTask 类

FutureTask 继承体系中的核心接口是 Future。事实上,FutureTask 是 Future 接口的一个唯一实现类。

如何获取 Callable 的返回结果:一般是通过 FutureTask 这个中间媒介来实现的。整体的流程是这样的:

把 Callable 实例当作参数,生成一个 FutureTask 的对象,然后把这个对象当作一个 Runnable,作为参数另起线程。

1.4.1、FutureTask 结构

  

1.4.2、FutureTask 使用

方式一、使用 thread 方式

  FutureTask 实现了 Runnable,因此它既可以通过 Thread 包装来直接执行,也可以提交给 ExecuteService 来执行。以下使用 Thread 包装线程方式启动

public static void main(String[] args) throws Exception {
        Callable<Integer> call = () -> {
            System.out.println("计算线程正在计算结果...");
            Thread.sleep(3000);
            return 1;
        };
        FutureTask<Integer> task = new FutureTask<>(call);
        Thread thread = new Thread(task);
        thread.start();

        System.out.println("main线程干点别的...");

        Integer result = task.get();
        System.out.println("从计算线程拿到的结果为:" + result);
    }

方式二、使用 ExecutorService

   ExecutorService executor = Executors.newFixedThreadPool (2); 线程池方式

public static void main(String[] args) {
        Callable<String> callable1=()->{
            Thread.sleep(2000);
            return Thread.currentThread().getName();
        };
        Callable<String> callable2=()->{
            Thread.sleep(3000);
            return Thread.currentThread().getName();
        };
        FutureTask<String> futureTask1 = new FutureTask<>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象
        FutureTask<String> futureTask2 = new FutureTask<>(callable2);

        ExecutorService executor = Executors.newFixedThreadPool(2);        // 创建线程池并返回ExecutorService实例
        executor.execute(futureTask1);  // 执行任务
        executor.execute(futureTask2);
        //同时开启了两个任务
        long startTime = System.currentTimeMillis();
        while (true) {
            try {
                if(futureTask1.isDone() && futureTask2.isDone()){//  两个任务都完成
                    System.out.println("Done");
                    executor.shutdown();                          // 关闭线程池和服务
                    return;
                }

                if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成
                    System.out.println("FutureTask1 output="+futureTask1.get());
                }

                System.out.println("Waiting for FutureTask2 to complete");
                String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
                if(s !=null){
                    System.out.println("FutureTask2 output="+s);
                }
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }catch(TimeoutException e){
                //do nothing
            }
            System.out.println((System.currentTimeMillis()-startTime));
        }
    }
View Code

使用 Callable+FutureTask 获取执行结果

public class Test {
    public static void main(String[] args) {
        //第一种方式
        ExecutorService executor = Executors.newCachedThreadPool();
        Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        executor.submit(futureTask);
        executor.shutdown();
         
        //第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
        /*Task task = new Task();
        FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
        Thread thread = new Thread(futureTask);
        thread.start();*/
         
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e1) {
            e1.printStackTrace();
        }
         
        System.out.println("主线程在执行任务");
         
        try {
            System.out.println("task运行结果"+futureTask.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
         
        System.out.println("所有任务执行完毕");
    }
}
class Task implements Callable<Integer>{
    @Override
    public Integer call() throws Exception {
        System.out.println("子线程在进行计算");
        Thread.sleep(3000);
        int sum = 0;
        for(int i=0;i<100;i++)
            sum += i;
        return sum;
    }
}
View Code

1.5、CompletionService

原理:内部通过阻塞队列 + FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

public class CompletionServiceDemo {
    public static void main(String[] args)  {
        Long start = System.currentTimeMillis();
        //开启5个线程
        ExecutorService exs = Executors.newFixedThreadPool(5);
        try {
            int taskCount = 10;
            //结果集
            List<Integer> list = new ArrayList<>();
            //1.定义CompletionService
            CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
            List<Future<Integer>> futureList = new ArrayList<>();
            //2.添加任务
            for(int i=0;i<taskCount;i++){
                futureList.add(completionService.submit(new Task(i+1)));
            }
            //==================结果归集===================
            //方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
//            for (Future<Integer> future : futureList) {
//                System.out.println("====================");
//                Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照
//                System.out.println("任务result="+result+"获取到结果!"+new Date());
//                list.add(result);
//            }

//            //方法2.使用内部阻塞队列的take()
            for(int i=0;i<taskCount;i++){
                Integer result = completionService.take().get();//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
                System.out.println(LocalDateTime.now()+"---任务i=="+result+"完成!");
                list.add(result);
            }
            System.out.println("list="+list);
            System.out.println("总耗时="+(System.currentTimeMillis()-start));
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            exs.shutdown();//关闭线程池
        }

    }


    static class Task implements Callable<Integer>{
        Integer i;

        public Task(Integer i) {
            super();
            this.i=i;
        }

        @Override
        public Integer call() throws Exception {
            if(i==5){
                Thread.sleep(5000);
            }else{
                Thread.sleep(1000);
            }
            System.out.println("线程:"+Thread.currentThread().getName()+"任务i="+i+",执行完成!");
            return i;
        }

    }
}
View Code

建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。

二、CompletableFuture

2.1、对标 Futrue

  Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。

  阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式呢?即当计算结果完成及时通知监听者。

    Future 局限性,它很难直接表述多个 Future 结果之间的依赖性。

2.2、类图

  

2.2.1、CompletionStage

  • CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段

  • 一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如:stage.thenApply (x -> square (x)).thenAccept (x -> System.out.print (x)).thenRun (() -> System.out.println ())

  • 一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发

2.2.2、Future

2.3、创建 CompletableFuture 对象

  CompletableFuture.compleatedFuture 是一个静态辅助方法,用来返回一个已经计算好的 CompletableFuture.

  以下四个静态方法用来为一段异步执行的代码创建 CompletableFuture 对象:

public static CompletableFuture<Void>     runAsync(Runnable runnable)
public static CompletableFuture<Void>     runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U>     supplyAsync(Supplier<U> supplier, Executor executor)

  以 Async 结尾并且没有指定 Executor 的方法会使用 ForkJoinPool.commonPool () 作为它的线程池执行异步代码。

  runAsync 方法:它以 Runnabel 函数式接口类型为参数,所以 CompletableFuture 的计算结果为空。

  supplyAsync 方法以 Supplier<U> 函数式接口类型为参数,CompletableFuture 的计算结果类型为 U。

  注意:这些线程都是 Daemon 线程,主线程结束 Daemon 线程不结束,只有 JVM 关闭时,生命周期终止。

示例:简单同步用法

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            //长时间的计算任务
            try {
                System.out.println("计算型任务开始");
                Thread.sleep(2000);
                return "计算型任务结束";
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "·00";
        });
        System.out.println(future.get());
    }
View Code

2.4、计算结果完成时的处理

当 CompletableFuture 的计算结果完成,或者抛出异常的时候,可以执行特定的 Action。主要是下面的方法:

public CompletableFuture<T>     whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T>     whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T>     exceptionally(Function<Throwable,? extends T> fn)  

  可以看到 Action 的类型是 BiConsumer<? super T,? super Throwable> 它可以处理正常的计算结果,或者异常情况。

  方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)

示例:

package com.lhx.cloud.futruetask;

import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

public class BasicFuture {

    private static Random rand = new Random();
    private static long t = System.currentTimeMillis();

    static int getMoreData()  {
        System.out.println("begin to start compute");
        try {
            TimeUnit.SECONDS.sleep(3);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("end to compute,passed " + (System.currentTimeMillis()-t));
        return rand.nextInt(1000);
    }


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(BasicFuture::getMoreData);
        Future<Integer> f = future.whenComplete((v, e) -> {
            System.out.println(v);
            System.out.println(e);
        });
        System.out.println(f.get());
    }}
View Code

2.5、转换

CompletableFuture 可以作为 monad (单子) 和 functor. 由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉 CompletableFuture 当计算完成的时候请执行某个 Function. 还可以串联起来。

public <U> CompletableFuture<U>     thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U>     thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

2.6、异常处理 completeExceptionally

  为了能获取任务线程内发生的异常,需要使用 CompletableFuture 的 completeExceptionally 方法将导致 CompletableFuture 内发生问题的异常抛出。

  这样,当执行任务发生异常时,调用 get() 方法的线程将会收到一个 ExecutionException 异常,该异常接收了一个包含失败原因的 Exception 参数。

/**
     * 任务没有异常 正常执行,然后结束
     */
    @Test
    public void test1() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }

    /**
     * 线程有异常  正常执行,然后无法结束,主线程会一直等待
     */
    @Test
    public void test2() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i=1/0;
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }
View Code
/**
     * 线程有异常  正常执行,然后通过completableFuture.completeExceptionally(e);告诉completableFuture任务发生异常了
     * 主线程接收到 程序继续处理,至结束
     */
    @Test
    public void test3() throws ExecutionException, InterruptedException {
        CompletableFuture<String> completableFuture = new CompletableFuture<>();
        new Thread(() -> {
            // 模拟执行耗时任务
            System.out.println("task doing...");
            try {
                Thread.sleep(3000);
                int i = 1/0;
            } catch (Exception e) {
                // 告诉completableFuture任务发生异常了
                completableFuture.completeExceptionally(e);
            }
            // 告诉completableFuture任务已经完成
            completableFuture.complete("ok");
        }).start();
        // 获取任务结果,如果没有完成会一直阻塞等待
        String result = completableFuture.get();
        System.out.println("计算结果:" + result);
    }

2.7、多任务组合方法 allOf 和 anyOf

allOf 是等待所有任务完成,构造后 CompletableFuture 完成

anyOf 是只要有一个任务完成,构造后 CompletableFuture 就完成

package com.lhx.cloud.futruetask;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CompletableFutureDemo {
    public static void main(String[] args) {
        Long start = System.currentTimeMillis();
        // 结果集
        List<String> list = new ArrayList<>();

        ExecutorService executorService = Executors.newFixedThreadPool(10);

        List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
        // 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
        CompletableFuture[] cfs = taskList.stream()
                .map(integer -> CompletableFuture.supplyAsync(() -> calc(integer), executorService)
                        .thenApply(h -> Integer.toString(h))
                        .whenComplete((s, e) -> {
                            System.out.println(LocalDateTime.now()+"---任务" + s + "完成!result=" + s + ",异常 e=" + e);
                            list.add(s);
                        })
                ).toArray(CompletableFuture[]::new);
        // 封装后无返回值,必须自己whenComplete()获取
        CompletableFuture.allOf(cfs).join();
        System.out.println("list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
    }

    public static Integer calc(Integer i) {
        try {
            if (i == 1) {
                Thread.sleep(3000);//任务1耗时3秒
            } else if (i == 5) {
                Thread.sleep(5000);//任务5耗时5秒
            } else {
                Thread.sleep(1000);//其它任务耗时1秒
            }
            System.out.println(LocalDateTime.now()+"---task线程:" + Thread.currentThread().getName()
                    + "任务i=" + i + ",完成!" );
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return i;
    }
}
View Code

2.8、常用多线程并发,取结果归集的几种实现方案

描述 Future FutureTask CompletionService CompletableFuture
原理 Future 接口 接口 RunnableFuture 的唯一实现类,RunnableFuture 接口继承自 Future+Runnable 内部通过阻塞队列 + FutureTask 接口 JDK8 实现了 Future, CompletionStage 两个接口
多任务并发执行 支持 支持 支持 支持
获取任务结果的顺序 按照提交顺序获取结果 未知 支持任务完成的先后顺序 支持任务完成的先后顺序
异常捕捉 自己捕捉 自己捕捉 自己捕捉 原生 API 支持,返回每个任务的异常
建议 CPU 高速轮询,耗资源,或者阻塞,可以使用,但不推荐 功能不对口,并发任务这一块多套一层,不推荐使用 推荐使用,没有 JDK8CompletableFuture 之前最好的方案 API 极端丰富,配合流式编程,推荐使用!

上表来源:https://www.cnblogs.com/dennyzhangdd/p/7010972.html

 

 

   

006-优化web请求二-应用缓存、异步调用【Future、ListenableFuture、CompletableFuture】、ETag、WebSocket【SockJS、Stomp】

006-优化web请求二-应用缓存、异步调用【Future、ListenableFuture、CompletableFuture】、ETag、WebSocket【SockJS、Stomp】

四、应用缓存

  使用spring应用缓存。使用方式:使用@EnableCache注解激活Spring的缓存功能,需要创建一个CacheManager来处理缓存。如使用一个内存缓存示例

package com.github.bjlhx15.gradle.demotest;

import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.concurrent.ConcurrentMapCache;
import org.springframework.cache.support.SimpleCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Arrays;

@Configuration
@EnableCaching
public class CacheConfiguration {
    
    @Bean
    public CacheManager cacheManager(){
        SimpleCacheManager simpleCacheManager=new SimpleCacheManager();
        simpleCacheManager.setCaches(Arrays.asList(new ConcurrentMapCache("searches")));
        return simpleCacheManager;
    }
}

  其他实现如:EhCacheManager、GuavaCacheManager等

  主要标记:

    @CacheEvict:将会从缓存中移除一个条目

    @CachePut:会将方法结果放到缓存中,而不会影响到方法调用本身。

    @Caching:将缓存注解重新分组

    @CacheConfig:指向不同的缓存配置

  更多spring 应用缓存:https://www.cnblogs.com/bjlhx/category/1233985.html

五、分布式缓存

  推荐使用redis,系列文章:https://www.cnblogs.com/bjlhx/category/1066467.html

  spring使用也比较方便:https://www.cnblogs.com/bjlhx/category/1233985.html

六、异步方法-EnableAsync

  在程序执行时候还有一个瓶颈,串行执行,可以通过使用不同线程类快速提升应用的速度。

  要启用Spring的异步功能,必须要使用@EnableAsync注解。这样将会透明地使用java.util.concurrent.Executor来执行所有带有@Async注解的方法。

  @Async所修饰的函数不要定义为static类型,这样异步调用不会生效

  针对调用的Async,如果不做Future特殊处理,执行完调用方法会立即返回结果,如异步邮件发送,不会真的等邮件发送完毕才响应客户,如需等待可以使用Future阻塞处理。

6.1、原始使用

1、main方法增加@EnableAsync注解 

@SpringBootApplication
@EnableAsync
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
        System.out.println("ThreadId:"+Thread.currentThread().getId());
    }
}

2、在所需方法增加@Async注解

@Component
public class Task {
    @Async
    public void doTaskOne() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(200);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskOne");
        }
    }

    @Async
    public void doTaskTwo() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(200);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskTwo");
        }
    }

    @Async
    public void doTaskThree() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(200);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskThree");
        }
    }
}

3、查看调用

@RestController
public class TestAsyns {
    @Autowired
    private Task task;
    @RequestMapping("/testAsync")
    public ResponseEntity testAsync() throws Exception {
        task.doTaskOne();
        task.doTaskTwo();
        task.doTaskThree();
        return ResponseEntity.ok("ok");
    }
}

上述方法依次调用三个方法。

如果去除@EnableAsync注解,输出如下:【可见是串行执行】

ThreadId:33:doTaskOne
ThreadId:33:doTaskOne
ThreadId:33:doTaskOne
ThreadId:33:doTaskTwo
ThreadId:33:doTaskTwo
ThreadId:33:doTaskTwo
ThreadId:33:doTaskThree
ThreadId:33:doTaskThree
ThreadId:33:doTaskThree

如果增加@EnableAsync注解,输出如下:【可见是并行执行】

ThreadId:56:doTaskThree
ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:54:doTaskOne
ThreadId:55:doTaskTwo
ThreadId:56:doTaskThree
ThreadId:54:doTaskOne
ThreadId:56:doTaskThree
ThreadId:55:doTaskTwo

6.2、自定义执行器使用异步

1、配置类

  方式一、注入Bean方式

import java.util.concurrent.Executor;  
  
import org.springframework.context.annotation.Bean;  
import org.springframework.context.annotation.ComponentScan;  
import org.springframework.context.annotation.Configuration;  
import org.springframework.scheduling.annotation.EnableAsync;  
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;  
  
@Configuration  
public class ThreadConfig  {  
  
     // 执行需要依赖线程池,这里就来配置一个线程池  
     @Bean  
     public Executor getExecutor() {  
          ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
          executor.setCorePoolSize(5);  
          executor.setMaxPoolSize(10);  
          executor.setQueueCapacity(25);  
          executor.initialize();  
          return executor;  
     }  
}  

  方式二、通过实现AsyncConfigurer接口,可以自定义默认的执行(executor)。新增如下配置类:

@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
    protected final Logger logger = LoggerFactory.getLogger(AsyncConfiguration.class);

    @Override
    public Executor getAsyncExecutor() {
        //做好不超过10个,这里写两个方便测试
        return Executors.newFixedThreadPool(2);
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return (ex,method,params)->logger.error("Uncaught async error",ex);
    }
}

  Executor的初始化配置,还有很多种,可以参看https://www.cnblogs.com/bjlhx/category/1086008.html

  Spring 已经实现的异步线程池:
    1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。
    2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方
    3. ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
    4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类
    5. ThreadPoolTaskExecutor :最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装

  使用上述配置,能够确保在应用中,用来处理异步任务的线程不会超过10个。这对于web应用很重要,因为每个客户端都会有一个专用的线程。你所使用的线程越多,阻塞时间越长那么能够处理的客户端就会越少。

  如果设置成两个,程序中有3个异步线程,也会只有两个运行,如下

ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:55:doTaskThree
ThreadId:55:doTaskThree
ThreadId:55:doTaskThree

2、使用

  同上述一致。

6.3、异步返回处理

方式一、使用Future【FutureTask是默认实现】处理+轮询处理【jdk1.5产物,没有提供Callback机制,只能主动轮询,通过get去获取结果】【不推荐】

修改异步执行的方法

@Component
public class TaskFutureDemo {
    @Async
    public Future<String> doTaskOne() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(1000);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskOne");
        }
        return new AsyncResult<>("doTaskOne");
    }

    @Async
    public Future<String> doTaskTwo() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(1000);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskTwo");
        }
        return new AsyncResult<>("doTaskTwo");
    }

    @Async
    public Future<String> doTaskThree() throws Exception {
        for (int i = 0; i < 3; i++) {
            Thread.sleep(1000);
            System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskThree");
        }
        return new AsyncResult<>("doTaskThree");
    }
}
View Code

修改调用的方法

@RestController
public class TestAsynsFutureController {
    @Autowired
    private TaskFutureDemo task;

    @RequestMapping("/testAsyncFuture")
    public ResponseEntity testAsyncFuture() throws Exception {
        Future<String> taskOne = task.doTaskOne();
        Future<String> taskTwo = task.doTaskTwo();
        Future<String> taskThree = task.doTaskThree();
        while (true) {
            if (taskOne.isDone() && taskTwo.isDone() && taskThree.isDone()) {
                break;
            }
        }
        return ResponseEntity.ok("ok");
    }
}
View Code

方式二、Spring的ListenableFuture和CountDownLatch处理

Service实现类

@Service
public class TaskListenableFutureService {
    private AsyncSearch asyncSearch;

    @Autowired
    public TaskListenableFutureService(AsyncSearch asyncSearch) {
        this.asyncSearch=asyncSearch;
    }

    public List<String> search(List<String> keywords){
        CountDownLatch latch=new CountDownLatch(keywords.size());
        List<String> allResult=Collections.synchronizedList(new ArrayList<>());
        keywords.stream()
                .forEach(keyword->asyncFetch(latch,allResult,keyword));
        await(latch);
        return allResult;
    }

    private void asyncFetch(CountDownLatch latch, List<String> result, String keyword){
        asyncSearch.asyncFetch(keyword)
                .addCallback(
                        key->onSuccess(result,latch,key),
                        ex -> onError(latch,ex)
                );
    }

    private void await(CountDownLatch latch){
        try {
            latch.await();
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
    }
    private static void onSuccess(List<String> result,CountDownLatch latch,String keyword){
        result.add(keyword);
        latch.countDown();
    }
    private static void onError(CountDownLatch latch,Throwable ex){
        ex.printStackTrace();
        latch.countDown();
    }
    @Component
    private static class AsyncSearch{
        @Autowired
        public AsyncSearch() {
        }

        protected final Logger logger = LoggerFactory.getLogger(AsyncSearch.class);
        @Async
        public ListenableFuture<String> asyncFetch(String keyword){
            logger.info(Thread.currentThread().getName()+"-"+keyword);
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return new AsyncResult<>("keyword="+keyword);
        }
    }
}
View Code

调用方法

@RestController
public class TestAsynsListenableFutureController {
    protected final Logger logger = LoggerFactory.getLogger(TestAsynsListenableFutureController.class);
    @Autowired
    private TaskListenableFutureService task;

    @RequestMapping("/testAsyncListenableFuture")
    public ResponseEntity testAsyncListenableFuture() throws Exception {
        List<String> list = task.search(Arrays.asList("java", "html", "spring"));
        list.stream().forEach(p-> logger.info(p));

        return ResponseEntity.ok("ok");
    }
}
View Code

方式三、使用CompletableFuture【推荐】

User实体类

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;

@JsonIgnoreProperties(ignoreUnknown=true)
public class User {

    private String name;
    private String blog;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getBlog() {
        return blog;
    }

    public void setBlog(String blog) {
        this.blog = blog;
    }

    @Override
    public String toString() {
        return "User [name=" + name + ", blog=" + blog + "]";
    }

}
View Code

CompletableFuture的服务

@Service
public class GitHubLookupService {

    private static final Logger logger = LoggerFactory.getLogger(GitHubLookupService.class);

    private final RestTemplate restTemplate;

    public GitHubLookupService(RestTemplateBuilder restTemplateBuilder) {
        this.restTemplate = restTemplateBuilder.build();
    }

    @Async
    public CompletableFuture<User> findUser(String user) throws InterruptedException {
        logger.info("Looking up " + user);
        String url = String.format("https://api.github.com/users/%s", user);
        User results = restTemplate.getForObject(url, User.class);
        // Artificial delay of 1s for demonstration purposes
        Thread.sleep(1000L);
        return CompletableFuture.completedFuture(results);
    }

}
View Code

调用方法

@RestController
public class CompletableFutureController {


    private static final Logger logger = LoggerFactory.getLogger(CompletableFutureController.class);
    @Autowired
    private  GitHubLookupService gitHubLookupService;

    @RequestMapping("/testCompletableFuture")
    public ResponseEntity testCompletableFuture() throws Exception {
        // Start the clock
        long start = System.currentTimeMillis();

        // Kick of multiple, asynchronous lookups
        CompletableFuture<User> page1 = gitHubLookupService.findUser("PivotalSoftware");
        CompletableFuture<User> page2 = gitHubLookupService.findUser("CloudFoundry");
        CompletableFuture<User> page3 = gitHubLookupService.findUser("Spring-Projects");

        // Wait until they are all done
        CompletableFuture.allOf(page1,page2,page3).join();

        // Print results, including elapsed time
        logger.info("Elapsed time: " + (System.currentTimeMillis() - start));
        logger.info("--> " + page1.get());
        logger.info("--> " + page2.get());
        logger.info("--> " + page3.get());

        return ResponseEntity.ok("ok");
    }
}
View Code

七、ETag

7.1、什么是ETag? 

  ETag:是实体标签(Entity Tag)的缩写。ETag一般不以明文形式相应给客户端。在资源的各个生命周期中,它都具有不同的值,用于标识出资源的状态。当资源发生变更时,如果其头信息中一个或者多个发生变化,或者消息实体发生变化,那么ETag也随之发生变化。

  ETag值的变更说明资源状态已经被修改。往往可以通过时间戳就可以便宜的得到ETag头信息。在服务端中如果发回给消费者的相应从一开始起就由ETag控制,那么可以确保更细粒度的ETag升级完全由服务来进行控制。服务计算ETag值,并在相应客户端请求时将它返回给客户端。

7.2、计算ETag值

  在HTTP1.1协议中并没有规范如何计算ETag。ETag值可以是唯一标识资源的任何东西,如持久化存储中的某个资源关联的版本、一个或者多个文件属性,实体头信息和校验值、(CheckSum),也可以计算实体信息的散列值。有时候,为了计算一个ETag值可能有比较大的代价,此时可以采用生成唯一值等方式(如常见的GUID)。无论怎样,服务都应该尽可能的将ETag值返回给客户端。客户端不用关心ETag值如何产生,只要服务在资源状态发生变更的情况下将ETag值发送给它就行。

  ETag值可以通过uuid、整数、长整形、字符串等四种类型。

  计算ETag值时,需要考虑两个问题:计算与存储。如果一个ETag值只需要很小的代价以及占用很低的存储空间,那么我们可以在每次需要发送给客户端ETag值值的时候计算一遍就行行了。相反的,我们需要将之前就已经计算并存储好的ETag值发送给客户端。之前说:将时间戳作为字符串作为一种廉价的方式来获取ETag值。对于不是经常变化的消息,它是一种足够好的方案。注意:如果将时间戳做为ETag值,通常不应该用Last-Modified的值。由于HTTP机制中,所以当我们在通过服务校验资源状态时,客户端不需要进行相应的改动。计算ETag值开销最大的一般是计算采用哈希算法获取资源的表述值。可以只计算资源的哈希值,也可以将头信息和头信息的值也包含进去。如果包含头信息,那么注意不要包含计算机标识的头信息。同样也应该避免包含Expires、Cache-Control和Vary头信息。注意:在通过哈希算法。

7.3、ETag的类型以及他们之间的区别

  ETag有两种类型:强ETag(strong ETag)与弱ETag(weak ETag)。

    强ETag表示形式:"22FAA065-2664-4197-9C5E-C92EA03D0A16"。

    弱ETag表现形式:w/"22FAA065-2664-4197-9C5E-C92EA03D0A16"。

  强、弱ETag类型的出现与Apache服务器计算ETag的方式有关。Apache默认通过FileEtag中FileEtag INode Mtime Size的配置自动生成ETag(当然也可以通过用户自定义的方式)。假设服务端的资源频繁被修改(如1秒内修改了N次),此时如果有用户将Apache的配置改为MTime,由于MTime只能精确到秒,那么就可以避免强ETag在1秒内的ETag总是不同而频繁刷新Cache(如果资源在秒级经常被修改,也可以通过Last-Modified来解决)。

7.4、Etag - 作用

Etag 主要为了解决 Last-Modified 无法解决的一些问题。

1、 一些文件也许会周期性的更改,但是他的内容并不改变(仅仅改变的修改时间),这个时候我们并不希望客户端认为这个文件被修改了,而重新GET;

2、某些文件修改非常频繁,比如在秒以下的时间内进行修改,(比方说1s内修改了N次),If-Modified-Since能检查到的粒度是s级的,这种修改无法判断(或者说UNIX记录MTIME只能精确到秒 

3、某些服务器不能精确的得到文件的最后修改时间;

  为此,HTTP/1.1 引入了 Etag(Entity Tags).Etag仅仅是一个和文件相关的标记,可以是一个版本标记,比如说v1.0.0或者说"2e681a-6-5d044840"这么一串看起来很神秘的编码。但是HTTP/1.1标准并没有规定Etag的内容是什么或者说要怎么实现,唯一规定的是Etag需要放在""内。

7.5、Etag - 工作原理

Etag由服务器端生成,客户端通过If-Match或者说If-None-Match这个条件判断请求来验证资源是否修改。常见的是使用If-None-Match.请求一个文件的流程可能如下:

====第一次请求===
1.客户端发起 HTTP GET 请求一个文件;

2.服务器处理请求,返回文件内容和一堆Header,当然包括Etag(例如"2e681a-6-5d044840")(假设服务器支持Etag生成和已经开启了Etag).状态码200

====第二次请求===
1.客户端发起 HTTP GET 请求一个文件,注意这个时候客户端同时发送一个If-None-Match头,这个头的内容就是第一次请求时服务器返回的Etag:2e681a-6-5d044840

2.服务器判断发送过来的Etag和计算出来的Etag匹配,因此If-None-Match为False,不返回200,返回304,客户端继续使用本地缓存;

  流程很简单,问题是,如果服务器又设置了Cache-Control:max-age和Expires呢,怎么办?
  答案是同时使用,也就是说在完全匹配If-Modified-Since和If-None-Match即检查完修改时间和Etag之后,服务器才能返回304.

 7.6、在spring中实践

  虽然对请求已经做了应用缓存等处理,但是持续请求一个restful接口请求还是会发送到服务端去读取缓存,即使结果没有发生改变,但结果本身还是会多次发送给用户,造成浪费带宽。

  ETag是Web响应数据的一个散列(Hash),并且会在头信息中进行发送。客户端可以记住资源的ETag,并且通过If-None-Match头信息将最新的已知版本发送给服务器。如果在这段时间内请求没有发生变化的话,服务器就会返回304 Not Modified。

  在Spring中有一个特殊的Servlet过滤器来处理ETag,名为ShallowEtagHeaderFilter。只需将此类注入即可:

    @Bean
    public Filter etagFilter(){
        return new ShallowEtagHeaderFilter();
    }

  只要响应头没有缓存控制头信息的话,系统就会为你的响应生成ETag。

示例

@GetMapping("/testNoChangeContent")
    public ResponseEntity testNoChangeContent(){
        return ResponseEntity.ok("OK");
    }
    @GetMapping("/testChangeContent")
    public ResponseEntity testChangeContent(){
        return ResponseEntity.ok("OK:"+LocalDateTime.now());
    }

接口一、testNoChangeContent,是测试内容没有改变的,第一次请求是200,以后请求是304

接口二、testChangeContent,是测试内容有改变的,第一次请求是200,以后请求均是200

八、WebSocket

在优化web请求时,这是一种优化方案,在服务器端有可用数据时,就立即将其发送到客户端。通过多线程方式获取搜索结果,所以数据会分为多个块。这时可以一点点地进行发送,而不必等待所有结果。

8.1、概述

1、WebSocket

  Http连接为一次请求(request)一次响应(response),必须为同步调用方式。WebSocket 协议提供了通过一个套接字实现全双工通信的功能。一次连接以后,会建立tcp连接,后续客户端与服务器交互为全双工方式的交互方式,客户端可以发送消息到服务端,服务端也可将消息发送给客户端。

  WebSocket 是发送和接收消息的底层API,WebSocket 协议提供了通过一个套接字实现全双工通信的功能。也能够实现 web 浏览器和 server 间的异步通信,全双工意味着 server 与浏览器间可以发送和接收消息。需要注意的是必须考虑浏览器是否支持。

2、SockJS

  SockJS 是 WebSocket 技术的一种模拟。为了应对许多浏览器不支持WebSocket协议的问题,设计了备选SockJs。开启并使用SockJS后,它会优先选用Websocket协议作为传输协议,如果浏览器不支持Websocket协议,则会在其他方案中,选择一个较好的协议进行通讯。原来在不支持WebSocket的情况下,也可以很简单地实现WebSocket的功能的,方法就是使用 SockJS。它会优先选择WebSocket进行连接,但是当服务器或客户端不支持WebSocket时,会自动在 XHR流、XDR流、iFrame事件源、iFrame HTML文件、XHR轮询、XDR轮询、iFrame XHR轮询、JSONP轮询 这几个方案中择优进行连接。

3、Stomp

        STOMP 中文为: 面向消息的简单文本协议。websocket定义了两种传输信息类型: 文本信息和二进制信息。类型虽然被确定,但是他们的传输体是没有规定的。所以,需要用一种简单的文本传输类型来规定传输内容,它可以作为通讯中的文本传输协议,即交互中的高级协议来定义交互信息。

  STOMP本身可以支持流类型的网络传输协议: websocket协议和tcp协议。

  Stomp还提供了一个stomp.js,用于浏览器客户端使用STOMP消息协议传输的js库。

  STOMP的优点如下:

  (1)不需要自建一套自定义的消息格式

  (2)现有stomp.js客户端(浏览器中使用)可以直接使用

  (3)能路由信息到指定消息地点

  (4)可以直接使用成熟的STOMP代理进行广播 如:RabbitMQ, ActiveMQ

4、WebSocket、SockJs、STOMP三者关系

  简而言之,WebSocket 是底层协议,SockJS 是WebSocket 的备选方案,也是 底层协议,而 STOMP 是基于 WebSocket(SockJS) 的上层协议

  1. 假设HTTP协议并不存在,只能使用TCP套接字来编写web应用,你可能认为这是一件疯狂的事情。
  2. 不过幸好,我们有HTTP协议,它解决了 web 浏览器发起请求以及 web 服务器响应请求的细节。
  3. 直接使用 WebSocket(SockJS) 就很类似于 使用 TCP 套接字来编写 web 应用;因为没有高层协议,因此就需要我们定义应用间所发送消息的语义,还需要确保 连接的两端都能遵循这些语义。
  4. 同HTTP在TCP套接字上添加请求-响应模型层一样,STOMP在 WebSocket之上提供了一个基于帧的线路格式层,用来定义消息语义。

8.2、不支持WebSocket的场景有:

  浏览器不支持
  Web容器不支持,如tomcat7以前的版本不支持WebSocket
  防火墙不允许
  Nginx没有开启WebSocket支持

  当遇到不支持WebSocket的情况时,SockJS会尝试使用其他的方案来连接,刚开始打开的时候因为需要尝试各种方案,所以会阻塞一会儿,之后可以看到连接有异常,那就是尝试失败的情况。

  为了测试,使用Nginx做反向代理,把www.test.com指到项目启动的端口上,然后本地配HOST来达到模拟真实场景的效果。因为Nginx默认是不支持WebSocket的,所以这里模拟出了服务器不支持WebSocket的场景。、

8.3、spring下的WebSocket使用【WebSocket→sockJs→stomp】

项目中使用的pom

compile ''org.springframework.boot:spring-boot-starter-websocket''
    compile ''org.springframework.boot:spring-messaging''
    compile group: ''org.webjars'', name: ''sockjs-client'', version: ''1.1.2''
    compile group: ''org.webjars'', name: ''stomp-websocket'', version: ''2.3.3''
    compile group: ''org.webjars'', name: ''jquery'', version: ''3.3.1-1''

客户端JS

<script src="/webjars/jquery/3.3.1-1/jquery.js"></script>
    <script src="/webjars/sockjs-client/1.1.2/sockjs.min.js"></script>
    <script src="/webjars/stomp-websocket/2.3.3/stomp.min.js"></script>
    <script src="test.js"></script>

8.3.1、使用Spring的底层级WebSocket API

  Spring为WebSocket提供了良好支持,WebSocket协议允许客户端维持与服务器的长连接。数据可以通过WebSocket在这两个端点之间进行双向传输,因此消费数据的一方能够实时获取数据。

  按照其最简单的形式,WebSocket只是两个应用之间通信的通道。位于WebSocket一端的应用发送消息,另外一端处理消息。因为它是全双工的,所以每一端都可以发送和处理消息。如图18.1所示。

    
  WebSocket通信可以应用于任何类型的应用中,但是WebSocket最常见的应用场景是实现服务器和基于浏览器的应用之间的通信。

实现步骤:

1、编写Handler消息处理器类

方法一:实现 WebSocketHandler 接口,WebSocketHandler 接口如下 

public interface WebSocketHandler {
    void afterConnectionEstablished(WebSocketSession session) throws Exception;
    void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
    void handleTransportError(WebSocketSession session, Throwable exception) throws Exception; 
    void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception; 
    boolean supportsPartialMessages();
}

方法二:扩展 AbstractWebSocketHandler

@Service
public class ChatHandler extends AbstractWebSocketHandler {
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        session.sendMessage(new TextMessage("hello world."));
    }
}

  为了在Spring使用较低层级的API来处理消息,必须编写一个实现WebSocketHandler的类.WebSocketHandler需要我们实现五个方法。相比直接实现WebSocketHandler,更为简单的方法是扩展AbstractWebSocketHandler,这是WebSocketHandler的一个抽象实现。

  除了重载WebSocketHandler中所定义的五个方法以外,我们还可以重载AbstractWebSocketHandler中所定义的三个方法:

    • handleBinaryMessage()
    • handlePongMessage()
    • handleTextMessage() 
      这三个方法只是handleMessage()方法的具体化,每个方法对应于某一种特定类型的消息。

方案三、扩展TextWebSocketHandler或BinaryWebSocketHandler。

  TextWebSocketHandler是AbstractWebSocketHandler的子类,它会拒绝处理二进制消息。它重载了handleBinaryMessage()方法,如果收到二进制消息的时候,将会关闭WebSocket连接。与之类似,BinaryWebSocketHandler也是AbstractWeb-SocketHandler的子类,它重载了handleTextMessage()方法,如果接收到文本消息的话,将会关闭连接。

2、增加websocket拦截器,管理用户

@Component
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
    @Override
    public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
        if (request instanceof ServletServerHttpRequest) {
            attributes.put("username","lhx");
        }
        return true;
    }

    @Override
    public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
    }
}
  • beforeHandshake()方法,在调用 handler 前调用。常用来注册用户信息,绑定 WebSocketSession,在 handler 里根据用户信息获取WebSocketSession发送消息

3、WebSocketConfig配置

方式一、注解配置

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Autowired
    private ChatHandler chatHandler;
    @Autowired
    private WebSocketHandshakeInterceptor webSocketHandshakeInterceptor;
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(chatHandler,"/chat")
                .addInterceptors(webSocketHandshakeInterceptor);
    }
    @Bean
    public ServletServerContainerFactoryBean createWebSocketContainer() {
        ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
        container.setMaxTextMessageBufferSize(8192*4);
        container.setMaxBinaryMessageBufferSize(8192*4);
        return container;
    }
}
  • 实现 WebSocketConfigurer 接口,重写 registerWebSocketHandlers 方法,这是一个核心实现方法,配置 websocket 入口,允许访问的域、注册 Handler、SockJs 支持和拦截器。
  • registry.addHandler()注册和路由的功能,当客户端发起 websocket 连接,把 /path 交给对应的 handler 处理,而不实现具体的业务逻辑,可以理解为收集和任务分发中心。
  • addInterceptors,顾名思义就是为 handler 添加拦截器,可以在调用 handler 前后加入我们自己的逻辑代码。
  • ServletServerContainerFactoryBean可以添加对WebSocket的一些配置

方式二、xml配置

4、客户端配置

function contect() {
    var  wsServer = ''ws://''+window.location.host+''/chat'';
    var  websocket = new WebSocket(wsServer);
    websocket.onopen = function (evt) { onOpen(evt) };
    websocket.onclose = function (evt) { onClose(evt) };
    websocket.onmessage = function (evt) { onMessage(evt) };
    websocket.onerror = function (evt) { onError(evt) };
    function onOpen(evt) {
        console.log("Connected to WebSocket server.");
        websocket.send("test");//客户端向服务器发送消息
    }
    function onClose(evt) {
        console.log("Disconnected");
    }
    function onMessage(evt) {
        console.log(''Retrieved data from server: '' + evt.data);
    }
    function onError(evt) {
        console.log(''Error occured: '' + evt.data);
    }
}

contect();

8.3.2、SockJs针对WebSocket支持稍差的场景

  为了应对许多浏览器不支持WebSocket协议的问题,设计了备选SockJs

  SockJS 是 WebSocket 技术的一种模拟。SockJS 会 尽可能对应 WebSocket API,但如果 WebSocket 技术不可用的话,就会选择另外的通信方式协议。

1、服务端只需增加:.withSockJS()即可

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
    @Autowired
    private ChatHandler chatHandler;
    @Autowired
    private WebSocketHandshakeInterceptor webSocketHandshakeInterceptor;
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        // withSockJS() 方法声明我们想要使用 SockJS 功能,如果WebSocket不可用的话,会使用 SockJS;
        registry.addHandler(chatHandler,"/chat")
                .addInterceptors(webSocketHandshakeInterceptor).withSockJS();
    }
}

或者xml配置

<websocket:sockjs />

2、客户端

只需对请求

// var  server = ''ws://''+window.location.host+''/chat'';
    var  server = ''http://''+window.location.host+''/chatsockjs'';
    var  websocket = new SockJS(server);
  • SockJS 所处理的 URL 是 “http://“ 或 “https://“ 模式,而不是 “ws://“ or “wss://“;
  • 其他的函数如 onopen, onmessage, and onclose ,SockJS 客户端与 WebSocket 一样,

8.3.3、Stomp方式

  STOMP帧由命令,一个或多个头信息以及负载所组成。

  直接使用WebSocket(或SockJS)就很类似于使用TCP套接字来编写Web应用。因为没有高层级的线路协议(wire protocol),因此就需要我们定义应用之间所发送消息的语义,还需要确保连接的两端都能遵循这些语义。 
  不过,好消息是我们并非必须要使用原生的WebSocket连接。就像HTTP在TCP套接字之上添加了请求-响应模型层一样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。

  乍看上去,STOMP的消息格式非常类似于HTTP请求的结构。与HTTP请求和响应类似,STOMP帧由命令、一个或多个头信息以及负载所组成。例如,如下就是发送数据的一个STOMP帧:

SEND
destination:/app/room-message
content-length:20

{\"message\":\"Hello!\"}

对以上代码分析:

  1. SEND:STOMP命令,表明会发送一些内容;
  2. destination:头信息,用来表示消息发送到哪里;
  3. content-length:头信息,用来表示 负载内容的 大小;
  4. 空行;
  5. 帧内容(负载)内容

8.3.3.1、基本用法

1、服务端Configuration配置

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration  implements WebSocketMessageBrokerConfigurer {
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //添加一个/socket-server-point 连接端点,客户端就可以通过这个端点来进行连接;withSockJS作用是添加SockJS支持
        registry.addEndpoint("/socket-server-point").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        //定义了一个客户端订阅地址的前缀信息,也就是客户端接收服务端发送消息的前缀信息
        registry.enableSimpleBroker("/topic");
        //定义了服务端接收地址的前缀,也即客户端给服务端发消息的地址前缀
        registry.setApplicationDestinationPrefixes("/ws");
    }
}

对以上代码分析:

  1. EnableWebSocketMessageBroker 注解表明: 这个配置类不仅配置了 WebSocket,还配置了基于代理的 STOMP 消息;
  2. 它复写了 registerStompEndpoints() 方法:添加一个服务端点,来接收客户端的连接。将 “/socket-server-point” 路径注册为 STOMP 端点。这个路径与之前发送和接收消息的目的路径有所不同, 这是一个端点,客户端在订阅或发布消息到目的地址前,要连接该端点,即用户发送请求 :url=’/127.0.0.1:8080/socket-server-point’ 与 STOMP server 进行连接,之后再转发到订阅url;
  3. 它复写了 configureMessageBroker() 方法:配置了一个 简单的消息代理,通俗一点讲就是设置消息连接请求的各种规范信息。
  4. 发送应用程序的消息将会带有 “/ws” 前缀。
2、控制器以及逻辑开发
Service服务处理
@Service
public class HandlerService {
    private static final Logger logger = LoggerFactory.getLogger(HandlerService.class);
    @Async
    public CompletableFuture<String> handle(String key) throws Exception {
        Thread.sleep(new Random().nextInt(3000));
        logger.info("Looking up " + key);
        key=key+":"+LocalDateTime.now();
        return CompletableFuture.completedFuture(key);
    }
}

Controller控制开发

@MessageMapping("/searchBase")
    public ResponseEntity searchBase() throws Exception {
        Consumer<List<String>> callback = p -> websocket.convertAndSend("/topic/searchResults", p);
        List<String> list = Arrays.asList("bba", "aaa", "ccc");
        localSearch(list, callback);
        Map map = new HashMap();
        map.put("list", list);
        map.put("date", LocalDateTime.now());
        return ResponseEntity.ok(map);
    }

    public void localSearch(List<String> keys, Consumer<List<String>> callback) throws Exception {
        Thread.sleep(2000);
        List<String> list = new ArrayList<>();
        for (String key : keys) {
            CompletableFuture<String> completableFuture = handlerService.handle(key);
            completableFuture.thenAcceptAsync(p -> {
                list.clear();
                list.add(p);
                callback.accept(list);
            });
        }
    }

8.3.3.2、消息流

  

8.3.3.3、启用STOMP代理中继

  对于生产环境下的应用来说,你可能会希望使用真正支持STOMP的代理来支撑WebSocket消息,如RabbitMQ或ActiveMQ。这样的代理提供了可扩展性和健壮性更好的消息功能,当然它们也会完整支持STOMP命令。我们需要根据相关的文档来为STOMP搭建代理。搭建就绪之后,就可以使用STOMP代理来替换内存代理了,只需按照如下方式重载configureMessageBroker()方法即可:

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/queue", "/topic");
        //定义了一个客户端订阅地址的前缀信息,也就是客户端接收服务端发送消息的前缀信息
//        registry.enableSimpleBroker("/topic");
        //定义了服务端接收地址的前缀,也即客户端给服务端发消息的地址前缀
        registry.setApplicationDestinationPrefixes("/ws");
    }
  • 上述configureMessageBroker()方法的第一行代码启用了STOMP代理中继(broker relay)功能,并将其目的地前缀设置为“/topic”和“/queue”。这样的话,Spring就能知道所有目的地前缀为“/topic”或“/queue”的消息都会发送到STOMP代理中。

  • 在第二行的configureMessageBroker()方法中将应用的前缀设置为“/ws”。所有目的地以“/ws”打头的消息都将会路由到带有@MessageMapping注解的方法中,而不会发布到代理队列或主题中。

默认情况下,STOMP代理中继会假设代理监听localhost的61613端口,并且客户端的username和password均为“guest”。如果你的STOMP代理位于其他的服务器上,或者配置成了不同的客户端凭证,那么我们可以在启用STOMP代理中继的时候,需要配置这些细节信息:

  @Override
  public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableStompBrokerRelay("/queue", "/topic")
            .setRelayHost("rabbit.someotherserver")
            .setRelayPort(62623)
            .setClientLogin("marcopolo")
            .setClientPasscode("letmein01")
    registry.setApplicationDestinationPrefixes("/app");
  }

8.3.3.4、处理来自客户端的STOMP消息

1、应用消息MessageMapping

Spring 4.0引入了@MessageMapping注解,它用于STOMP消息的处理,类似于Spring MVC的@RequestMapping注解。当消息抵达某个特定的目的地时,带有@MessageMapping注解的方法能够处理这些消息。

/**
     * 处理来自客户端的STOMP消息
     * @param incoming
     * @return
     */
    @MessageMapping("/incoming")
    public Shout handleShout(Shout incoming) {
        logger.info("Received message: " + incoming.getMessage());
        try { Thread.sleep(2000); } catch (InterruptedException e) {}
        Shout outgoing = new Shout();
        outgoing.setMessage("incoming!");
        return outgoing;
    }

消息接受类

public class Shout {
    private String message;

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }
}

客户端

function contect() {
    var socket=new SockJS(''/socket-server-point'');
    stompCliet=Stomp.over(socket);
    stompCliet.connect({},function (frame) {
        console.log(''Connected :''+frame);
        stompCliet.send("/ws/incoming",{},"{\"message\":\"Hello!\"}");
    });
}
contect();

2、订阅模式SubscribeMapping

@SubscribeMapping的主要应用场景是实现请求-回应模式。在请求-回应模式中,客户端订阅某一个目的地,然后预期在这个目的地上获得一个一次性的响应。 
例如,考虑如下@SubscribeMapping注解标注的方法:

@SubscribeMapping("/sub")
    public Shout handleSubscription(){
        logger.info("Received message: " +"subscription");
        Shout outgoing = new Shout();
        outgoing.setMessage("subscription!");
        return outgoing;
    }

当处理这个订阅时,handleSubscription()方法会产生一个输出的Shout对象并将其返回。然后,Shout对象会转换成一条消息,并且会按照客户端订阅时相同的目的地发送回客户端。

客户端

function contect() {
    var socket=new SockJS(''/socket-server-point'');
    stompCliet=Stomp.over(socket);
    stompCliet.connect({},function (frame) {
        console.log(''Connected :''+frame);
        stompCliet.subscribe(''/ws/sub'',function (result) {
            console.log("aaaa",JSON.parse(result.body));
        });
        stompCliet.send("/ws/sub",{},"{\"message\":\"Hello!\"}");
    });
}
contect();

这种请求-回应模式与HTTP GET的请求-响应模式并没有太大差别。但是,这里的关键区别在于HTTP GET请求是同步的,而订阅的请求-回应模式则是异步的,这样客户端能够在回应可用时再去处理,而不必等待。

8.3.3.5、发送消息到客户端

 Spring提供了两种发送数据给客户端的方法:

  • 作为处理消息或处理订阅的附带结果;
  • 使用消息模板。

方式一、作为处理消息或处理订阅的附带结果、在处理消息之后,发送消息

@MessageMapping("/incoming")
    public Shout handleShout(Shout incoming) {
        logger.info("Received message: " + incoming.getMessage());
        try { Thread.sleep(2000); } catch (InterruptedException e) {}
        Shout outgoing = new Shout();
        outgoing.setMessage("incoming!");
        return outgoing;
    }

当@MessageMapping注解标示的方法有返回值的时候,返回的对象将会进行转换(通过消息转换器)并放到STOMP帧的负载中,然后发送给消息代理。

默认情况下,帧所发往的目的地会与触发处理器方法的目的地相同,只不过会添加上“/topic”前缀。就本例而言,这意味着handleShout()方法所返回的Shout对象会写入到STOMP帧的负载中,并发布到“/topic/incoming”目的地。不过,我们可以通过为方法添加@SendTo注解,重载目的地:

@MessageMapping("/incoming")
    @SendTo("/topic/shout")
    public Shout handleShout(Shout incoming) {
        logger.info("Received message: " + incoming.getMessage());
        try { Thread.sleep(2000); } catch (InterruptedException e) {}
        Shout outgoing = new Shout();
        outgoing.setMessage("incoming!");
        return outgoing;
    }

按照这个@SendTo注解,消息将会发布到“/topic/shout”。所有订阅这个主题的应用(如客户端)都会收到这条消息。 

按照类似的方式,@SubscribeMapping注解标注的方式也能发送一条消息,作为订阅的回应。

@SubscribeMapping("/sub")
    public Shout handleSubscription(){
        logger.info("Received message: " +"subscription");
        Shout outgoing = new Shout();
        outgoing.setMessage("subscription!");
        return outgoing;
    }

@SubscribeMapping的区别在于这里的Shout消息将会直接发送给客户端,而不必经过消息代理。如果你为方法添加@SendTo注解的话,那么消息将会发送到指定的目的地,这样会经过代理。

对应客户端需要增加订阅即可

stompCliet.subscribe(''/ws/sub'',function (result) {
            console.log("aaaa",JSON.parse(result.body));
        });

正如前面看到的那样,使用 @MessageMapping 或者 @SubscribeMapping 注解可以处理客户端发送过来的消息,并选择方法是否有返回值。

    如果 @MessageMapping 注解的控制器方法有返回值的话,返回值会被发送到消息代理,只不过会添加上"/topic"前缀。可以使用@SendTo 重写消息目的地;

    如果 @SubscribeMapping 注解的控制器方法有返回值的话,返回值会直接发送到客户端,不经过代理。如果加上@SendTo 注解的话,则要经过消息代理。

方式二、使用消息模板【在任意地方发送消息】

  @MessageMapping和@SubscribeMapping提供了一种很简单的方式来发送消息,这是接收消息或处理订阅的附带结果。不过,Spring的SimpMessagingTemplate能够在应用的任何地方发送消息,甚至不必以首先接收一条消息作为前提。

  我们不必要求用户刷新页面,而是让首页订阅一个STOMP主题.

function contect() {
    var socket=new SockJS(''/socket-server-point'');
    stompCliet=Stomp.over(socket);
    stompCliet.connect({},function (frame) {
        console.log(''Connected :''+frame);
        stompCliet.subscribe(''/topic/sendDataToClient'',function (result) {
            console.log("aaaa",JSON.parse(result.body));
        });
        // stompCliet.send("/ws/sub",{},"{\"message\":\"Hello!\"}");
    });
}
contect();

使用SimpMessagingTemplate能够在应用的任何地方发布消息

@Autowired(required = false)
    private SimpMessagingTemplate websocket;

    @GetMapping("/sendDataToClient")
    public ResponseEntity sendDataToClient() throws Exception {
        Map map=new HashMap();
        map.put("aa","aaa");
        map.put("bb","bbb");
        websocket.convertAndSend("/topic/sendDataToClient",map);
        return ResponseEntity.ok("ok");
    }

当然此处的SimpMessagingTemplate也可以使用父接口SimpMessageSendingOperations注入

在这个场景下,我们希望所有的客户端都能及时看到实时的/topic/sendDataToClient,这种做法是很好的。但有的时候,我们希望发送消息给指定的用户,而不是所有的客户端。 

8.3.3.6、为目标用户发送消息

  以上说明了如何广播消息,订阅目的地的所有用户都能收到消息。如果消息只想发送给特定的用户呢?spring-websocket 介绍了以下

在使用Spring和STOMP消息功能的时候,我们有两种方式利用认证用户:

  1、@MessageMapping和@SubscribeMapping标注的方法基于@SendToUser注解和Principal参数来获取认证用户;@MessageMapping、@SubscribeMapping和@MessageException方法返回的值能够以消息的形式发送给认证用户;

  2、SimpMessageSendingOperations接口或SimpMessagingTemplate的convertAndSendToUser方法能够发送消息给特定用户。

1、在控制器中处理用户的消息

  在控制器的@MessageMapping或@SubscribeMapping方法中,处理消息时有两种方式了解用户信息。在处理器方法中,通过简单地添加一个Principal参数,这个方法就能知道用户是谁并利用该信息关注此用户相关的数据。除此之外,处理器方法还可以使用@SendToUser注解,表明它的返回值要以消息的形式发送给某个认证用户的客户端(只发送给该客户端)。

  @SendToUser 表示要将消息发送给指定的用户,会自动在消息目的地前补上"/user"前缀。如下,最后消息会被发布在  /user/queue/notifications-username。但是问题来了,这个username是怎么来的呢?就是通过 principal 参数来获得的。那么,principal 参数又是怎么来的呢?需要在spring-websocket 的配置类中重写 configureClientInboundChannel 方法,添加上用户的认证。

服务端增加configuration

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
    /**
     * 1、设置拦截器
     * 2、首次连接的时候,获取其Header信息,利用Header里面的信息进行权限认证
     * 3、通过认证的用户,使用 accessor.setUser(user); 方法,将登陆信息绑定在该 StompHeaderAccessor 上,在Controller方法上可以获取 StompHeaderAccessor 的相关信息
     * @param registration
     */
    @Override
    public void configureClientInboundChannel(ChannelRegistration registration) {
        registration.interceptors(new ChannelInterceptorAdapter() {
            @Override
            public Message<?> preSend(Message<?> message, MessageChannel channel) {
                StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
                //1、判断是否首次连接
                if (StompCommand.CONNECT.equals(accessor.getCommand())){
                    //2、判断用户名和密码
                    String username = accessor.getNativeHeader("username").get(0);
                    String password = accessor.getNativeHeader("password").get(0);

                    if ("admin".equals(username) && "admin".equals(password)){
                        Principal principal = new Principal() {
                            @Override
                            public String getName() {
                                return username;
                            }
                        };
                        accessor.setUser(principal);
                        return message;
                    }else {
                        return null;
                    }
                }
                //不是首次连接,已经登陆成功
                return message;
            }

        });
    }
}
View Code

服务端处理逻辑

@MessageMapping("/shout")
    @SendToUser("/queue/notifications")
    public Shout userStomp(Principal principal, Shout shout) {
        String name = principal.getName();
        String message = shout.getMessage();
        logger.info("认证的名字是:{},收到的消息是:{}", name, message);
        return shout;
    }

前端js代码

var headers={
    username:''admin'',
    password:''admin''
};
function contect() {
    var socket=new SockJS(''/socket-server-point'');
    stompCliet=Stomp.over(socket);
    stompCliet.connect(headers,function (frame) {
        console.log(''Connected :''+frame);
        stompCliet.subscribe(''/user/queue/notifications'',function (result) {
            console.log("aaaa",JSON.parse(result.body));
        });
        stompCliet.send("/ws/shout",{},"{\"message\":\"Hello!\"}");
    });
}
contect();

2、convertAndSendToUser方法

   除了convertAndSend()以外,SimpMessageSendingOperations 还提供了convertAndSendToUser()方法。按照名字就可以判断出来,convertAndSendToUser()方法能够让我们给特定用户发送消息。

@MessageMapping("/singleShout")
    public void singleUser(Shout shout, StompHeaderAccessor stompHeaderAccessor) {
        String message = shout.getMessage();
        LOGGER.info("接收到消息:" + message);
        Principal user = stompHeaderAccessor.getUser();
        simpMessageSendingOperations.convertAndSendToUser(user.getName(), "/queue/shouts", shout);
    }

  如上,这里虽然我还是用了认证的信息得到用户名。但是,其实大可不必这样,因为 convertAndSendToUser 方法可以指定要发送给哪个用户。也就是说,完全可以把用户名的当作一个参数传递给控制器方法,从而绕过身份认证!convertAndSendToUser 方法最终会把消息发送到 /user/sername/queue/shouts 目的地上。

8.3.3.7、处理消息异常

  在处理消息的时候,有可能会出错并抛出异常。因为STOMP消息异步的特点,发送者可能永远也不会知道出现了错误。@MessageExceptionHandler标注的方法能够处理消息方法中所抛出的异常。我们可以把错误发送给用户特定的目的地上,然后用户从该目的地上订阅消息,从而用户就能知道自己出现了什么错误

@MessageExceptionHandler(Exception.class)
     @SendToUser("/queue/errors")
     public Exception handleExceptions(Exception t){
         t.printStackTrace();
         return t;
     }

8.3.3.8、更多stomp配置

1、发起连接

其中headers表示客户端的认证信息:

若无需认证,直接使用空对象 “{}” 即可;

 (1)connectCallback 表示连接成功时(服务器响应 CONNECTED 帧)的回调方法; 

 (2)errorCallback 表示连接失败时(服务器响应 ERROR 帧)的回调方法,非必须;

默认链接端点

//默认的和STOMP端点连接
/*stomp.connect("guest", "guest", function (franme) {
});*/

有用户认证的

var headers={
    username:''admin'',
    password:''admin''
};

stomp.connect(headers, function (frame) {

示例

// 建立连接对象(还未发起连接)
 var socket=new SockJS("/endpointChat"); 
 // 获取 STOMP 子协议的客户端对象 
 var stompClient = Stomp.over(socket); 
 // 向服务器发起websocket连接并发送CONNECT帧 
 stompClient.connect( {}, 
function connectCallback (frame) { 
     // 连接成功时(服务器响应 CONNECTED 帧)的回调方法 
console.log(''已连接【'' + frame + ''】''); 
//订阅一个消息
     stompClient.subscribe(''/topic/getResponse'',
function (response) { 
showResponse(response.body);
});
 },
     function errorCallBack (error) { 
     // 连接失败时(服务器响应 ERROR 帧)的回调方法 
console.log(''连接失败【'' + error + ''】''); 
} );

2、断开连接

若要从客户端主动断开连接,可调用 disconnect() 方法:

client.disconnect(
function () { 
    alert("断开连接");
});

3、发送消息

连接成功后,客户端可使用 send() 方法向服务器发送信息:

client.send(destination url, headers, body);

其中: 

(1)destination url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数; 

(2)headers 为发送信息的header,JavaScript 对象,可选参数; 

(3)body 为发送信息的 body,字符串,可选参数;
示例

client.send("/queue/test", {priority: 9}, "Hello, STOMP");
client.send("/queue/test", {}, "Hello, STOMP");

4、订阅、接收消息

STOMP 客户端要想接收来自服务器推送的消息,必须先订阅相应的URL,即发送一个 SUBSCRIBE 帧,然后才能不断接收来自服务器的推送消息。

订阅和接收消息通过 subscribe() 方法实现:

subscribe(destination url, callback, headers)

其中 

(1)destination url 为服务器 @SendTo 匹配的 URL,字符串; 

(2)callback 为每次收到服务器推送的消息时的回调方法,该方法包含参数 message; 

(3)headers 为附加的headers,JavaScript 对象;该方法返回一个包含了id属性的 JavaScript 对象,可作为 unsubscribe() 方法的参数;默认情况下,如果没有在headers额外添加,这个库会默认构建一个独一无二的ID。在传递headers这个参数时,可以使用你自己id。
示例

var headers = {
ack: ''client'',
//这个客户端指定了它会确认接收的信息,只接收符合这个selector : location = ''Europe''的消息。
 ''selector'': "location = ''Europe''",
//id:’myid’
}; 
var callback = function(message) {
if (message.body) {
 alert("got message with body " +JSON.parse( message.body)) }
 else{
alert("got empty message"); 
} }); 
var subscription = client.subscribe("/queue/test", callback, headers);
 
如果想让客户端订阅多个目的地,你可以在接收所有信息的时候调用相同的回调函数:
onmessage = function(message) {
    // called every time the client receives a message
}
var sub1 = client.subscribe("queue/test", onmessage);
var sub2 = client.subscribe("queue/another", onmessage)

5、取消订阅

var subscription = client.subscribe(...);
 
subscription.unsubscribe();

6、事务支持

可以在将消息的发送和确认接收放在一个事务中。

客户端调用自身的begin()方法就可以开始启动事务了,begin()有一个可选的参数transaction,一个唯一的可标识事务的字符串。如果没有传递这个参数,那么库会自动构建一个。

这个方法会返回一个object。这个对象有一个id属性对应这个事务的ID,还有两个方法:

commit()提交事务
 
abort()中止事务

在一个事务中,客户端可以在发送/接受消息时指定transaction id来设置transaction。

// start the transaction
 
var tx = client.begin();
 
// send the message in a transaction
 
client.send("/queue/test", {transaction: tx.id}, "message in a transaction");
 
// commit the transaction to effectively send the message
 
tx.commit();

如果你在调用send()方法发送消息的时候忘记添加transction header,那么这不会称为事务的一部分,这个消息会直接发送,不会等到事务完成后才发送。

var txid = "unique_transaction_identifier";
 
// start the transaction
 
var tx = client.begin();
 
// oops! send the message outside the transaction
 
client.send("/queue/test", {}, "I thought I was in a transaction!");
 
tx.abort(); // Too late! the message has been sent

7、消息确认

默认情况,在消息发送给客户端之前,服务端会自动确认(acknowledged)。

客户端可以选择通过订阅一个目的地时设置一个ack header为client或client-individual来处理消息确认。

在下面这个例子,客户端必须调用message.ack()来通知客户端它已经接收了消息。

var subscription = client.subscribe("/queue/test",
    function(message) {
        // do something with the message
        ...
        // and acknowledge it
        message.ack();
    },
    {ack: ''client''}
);

ack()接受headers参数用来附加确认消息。例如,将消息作为事务(transaction)的一部分,当要求接收消息时其实代理(broker)已经将ACK STOMP frame处理了。

var tx = client.begin();
message.ack({ transaction: tx.id, receipt: ''my-receipt'' });
tx.commit();

ack()也可以用来通知STOMP 1.1.brokers(代理):客户端不能消费这个消息。与ack()方法的参数相同。

8、debug调试

有一些测试代码能有助于你知道库发送或接收的是什么,从而来调试程序。

客户端可以将其debug属性设置为一个函数,传递一个字符串参数去观察库所有的debug语句。

client.debug = function(str) {
 
    // append the debug log to a #debug div somewhere in the page using JQuery:
 
    $("#debug").append(str + "\n");
};

默认情况,debug消息会被记录在在浏览器的控制台。

9、心跳机制

如果STOMP broker(代理)接收STOMP 1.1版本的帧,heart-beating是默认启用的。heart-beating也就是频率,incoming是接收频率,outgoing是发送频率。

通过改变incoming和outgoing可以更改客户端的heart-beating(默认为10000ms):

client.heartbeat.outgoing = 20000;
 
// client will send heartbeats every 20000ms
 
client.heartbeat.incoming = 0;
 
// client does not want to receive heartbeats
 
// from the server

heart-beating是利用window.setInterval()去规律地发送heart-beats或者检查服务端的heart-beats。

 

更多

stomp.connect(headers, function (frame) {

    //发送消息
    //第二个参数是一个头信息的Map,它会包含在STOMP的帧中
    //事务支持
    var tx = stomp.begin();
    stomp.send("/app/marco", {transaction: tx.id}, strJson);
    tx.commit();


    //订阅服务端消息 subscribe(destination url, callback[, headers])
    stomp.subscribe("/topic/marco", function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("订阅的服务端消息:" + obj.message);
    }, {});


    stomp.subscribe("/app/getShout", function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("订阅的服务端直接返回的消息:" + obj.message);
    }, {});


    /*以下是针对特定用户的订阅*/
    var adminJSON = JSON.stringify({''message'': ''ADMIN''});
    /*第一种*/
    stomp.send("/app/singleShout", {}, adminJSON);
    stomp.subscribe("/user/queue/shouts",function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("admin用户特定的消息1:" + obj.message);
    });
    /*第二种*/
    stomp.send("/app/shout", {}, adminJSON);
    stomp.subscribe("/user/queue/notifications",function (message) {
        var content = message.body;
        var obj = JSON.parse(content);
        console.log("admin用户特定的消息2:" + obj.message);
    });

    /*订阅异常消息*/
    stomp.subscribe("/user/queue/errors", function (message) {
        console.log(message.body);
    });

    //若使用STOMP 1.1 版本,默认开启了心跳检测机制(默认值都是10000ms)
    stomp.heartbeat.outgoing = 20000;

    stomp.heartbeat.incoming = 0; //客户端不从服务端接收心跳包
});
View Code

 

参看代码:https://github.com/JMCuixy/SpringWebSocket 

 

CompletableFuture allof(..)join()与CompletableFuture.join()

CompletableFuture allof(..)join()与CompletableFuture.join()

我目前正在使用CompletableFuture supplyAsync()方法向公共线程池提交一些任务。以下是代码片段的样子:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);

我想知道下面与上面的代码有何不同。我从下面的代码中删除了父completableFuture,并为每个completableFuture添加了join()而不是getNow():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);

我在spring服务中使用了它,线程池耗尽有问题。任何指针深表赞赏。

CompletableFuture allof(..)。join()与CompletableFuture.join()

CompletableFuture allof(..)。join()与CompletableFuture.join()

我目前正在使用CompletableFuture supplyAsync()方法向公共线程池提交一些任务。以下是代码片段的样子:

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.getNow())
        .forEach(tests::addAll);

我想知道下面与上面的代码有何不同。我从下面的代码中删除了父completableFuture,并为每个completableFuture添加了join()而不是getNow():

final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
        .map(resolver -> supplyAsync(() -> task.doWork()))
        .collect(toList());

final List<Test> tests = new ArrayList<>();
completableFutures.stream()
        .map(completableFuture -> completableFuture.join())
        .forEach(tests::addAll);

我在spring服务中使用了它,线程池耗尽有问题。任何指针深表赞赏。

关于如果使用thenApply,则不会调用CompletableFuture#whenComplete如果要调用a.py内所有的模块应使用语句的问题我们已经讲解完毕,感谢您的阅读,如果还想了解更多关于005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、006-优化web请求二-应用缓存、异步调用【Future、ListenableFuture、CompletableFuture】、ETag、WebSocket【SockJS、Stomp】、CompletableFuture allof(..)join()与CompletableFuture.join()、CompletableFuture allof(..)。join()与CompletableFuture.join()等相关内容,可以在本站寻找。

本文标签: