在这篇文章中,我们将带领您了解如果使用thenApply,则不会调用CompletableFuture#whenComplete的全貌,包括如果要调用a.py内所有的模块应使用语句的相关情况。同时,我
在这篇文章中,我们将带领您了解如果使用thenApply,则不会调用CompletableFuture#whenComplete的全貌,包括如果要调用a.py内所有的模块应使用语句的相关情况。同时,我们还将为您介绍有关005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、006-优化web请求二-应用缓存、异步调用【Future、ListenableFuture、CompletableFuture】、ETag、WebSocket【SockJS、Stomp】、CompletableFuture allof(..)join()与CompletableFuture.join()、CompletableFuture allof(..)。join()与CompletableFuture.join()的知识,以帮助您更好地理解这个主题。
本文目录一览:- 如果使用thenApply,则不会调用CompletableFuture#whenComplete(如果要调用a.py内所有的模块应使用语句)
- 005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture
- 006-优化web请求二-应用缓存、异步调用【Future、ListenableFuture、CompletableFuture】、ETag、WebSocket【SockJS、Stomp】
- CompletableFuture allof(..)join()与CompletableFuture.join()
- CompletableFuture allof(..)。join()与CompletableFuture.join()
如果使用thenApply,则不会调用CompletableFuture#whenComplete(如果要调用a.py内所有的模块应使用语句)
我有以下代码,该代码在远程服务器上安排任务,然后使用轮询完成ScheduledExecutorService#scheduleAtFixedRate
。任务完成后,它将下载结果。我想将a返回Future
给调用方,以便他们可以确定阻止时间和阻止时间,并为他们提供取消任务的选项。
我的问题是,如果客户端取消Future
该download
方法返回的值,则whenComplete
块不会执行。如果我删除thenApply
它。很明显,我对Future
构图有些误解…我应该改变什么?
public Future<Object> download(Something something) { String jobId = schedule(something); CompletableFuture<String> job = pollForCompletion(jobId); return job.thenApply(this::downloadResult);}private CompletableFuture<String> pollForCompletion(String jobId) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); CompletableFuture<String> completionFuture = new CompletableFuture<>(); ScheduledFuture<?> checkFuture = executor.scheduleAtFixedRate(() -> { if (pollRemoteServer(jobId).equals("COMPLETE")) { completionFuture.complete(jobId); } }, 0, 10, TimeUnit.SECONDS); completionFuture .whenComplete((result, thrown) -> { System.out.println("XXXXXXXXXXX"); //Never happens unless thenApply is removed checkFuture.cancel(true); executor.shutdown(); }); return completionFuture;}
同样,如果我这样做:
return completionFuture.whenComplete(...)
代替
completionFuture.whenComplete(...);return completionFuture;
whenComplete
也永远不会执行。这对我来说似乎很违反直觉。从逻辑上说,Future
返回应该不是whenComplete
我应该坚持的那个吗?
编辑:
我更改了代码以明确向后传播取消。这是令人讨厌和难以理解的,但是它可以正常工作,我找不到更好的方法:
public Future<Object> download(Something something) throws ChartDataGenException, Exception { String jobId = schedule(something); CompletableFuture<String> job = pollForCompletion(jobId); CompletableFuture<Object> resulting = job.thenApply(this::download); resulting.whenComplete((result, thrown) -> { if (resulting.isCancelled()) { //the check is not necessary, but communicates the intent better job.cancel(true); } }); return resulting;}
编辑2:
我发现了tascalate-concurrent,这是一个出色的库,提供sane的明智实现CompletionStage
,并支持可以通过DependentPromise
透明方式反向传播取消的相关承诺(通过类)。似乎非常适合此用例。
这应该足够了:
DependentPromise .from(pollForCompletion(jobId)) .thenApply(this::download, true); //true means the cancellation should back-propagate
请注意,没有测试这种方法。
答案1
小编典典您的结构如下:
┌──────────────────┐ │ completionFuture | └──────────────────┘ ↓ ↓ ┌──────────────┐ ┌───────────┐ │ whenComplete | │ thenApply | └──────────────┘ └───────────┘
因此,当您取消thenApply
未来时,原始completionFuture
对象不会受到影响,因为它与thenApply
舞台无关。但是,如果您不链接该thenApply
阶段,那么您将返回原始completionFuture
实例,并且取消该阶段会导致所有相关阶段的取消,从而使whenComplete
操作立即执行。
但是,如果thenApply
取消该阶段,则满足条件的completionFuture
静止图像可能会完成pollRemoteServer(jobId).equals("COMPLETE")
,因为轮询不会停止。但是,我们不知道的关系`jobId
schedule(something)和
pollRemoteServer(jobId)`。如果您的应用程序状态以某种方式更改,使得取消下载后再也无法满足此条件,那么这种未来将永远无法完成…
关于您的最后一个问题,哪个期货是“我应该坚持的那个期货?”,实际上,并不需要线性的期货链,而方便的方法CompletableFuture
可以轻松地创建这样的链,而不仅仅是通常,这是最没有用的事情,因为如果您具有线性依赖性,则只需编写一段代码即可。您将两个独立阶段链接在一起的模型是正确的,但是取消不能通过它起作用,但是它也不可以通过线性链来起作用。
如果希望能够取消源阶段,则需要对其进行引用,但是,如果想要获得从属阶段的结果,则也需要对该阶段进行引用。
005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture
一、概述
创建线程的两种方式,一种是直接继承 Thread,另外一种就是实现 Runnable 接口。这两种方式都有一个缺陷就是:在执行完任务之后无法获取执行结果。如果需要获取执行结果,就必须通过共享变量或者使用线程通信的方式来达到效果,这样使用起来就比较麻烦。而自从 Java 1.5 开始,就提供了 Callable 和 Future,通过它们可以在任务执行完毕之后得到任务执行结果。
详述:https://www.cnblogs.com/bjlhx/p/7588971.html
1.1、Runnable 接口
它是一个接口,里面只声明了一个 run () 方法:
public interface Runnable {
public abstract void run();
}
由于 run () 方法返回值为 void 类型,所以在执行完任务之后无法返回任何结果。
1.2、Callable 接口
Callable 接口位于 java.util.concurrent 包下,在它里面也只声明了一个方法,只不过这个方法叫做 call ()。
public interface Callable<V> {
V call() throws Exception;
}
是一个泛型接口,call () 函数返回的类型就是传递进来的 V 类型。Callable 接口可以看作是 Runnable 接口的补充,call 方法带有返回值,并且可以抛出异常。
1.3、Future 接口
Future 的核心思想是:
一个方法,计算过程可能非常耗时,等待方法返回,显然不明智。可以在调用方法的时候,立马返回一个 Future,可以通过 Future 这个数据结构去控制方法 f 的计算过程。
Future 类位于 java.util.concurrent 包下,它是一个接口:这里的控制包括:
get 方法:获取计算结果(如果还没计算完,也是必须等待的)这个方法会产生阻塞,会一直等到任务执行完毕才返回;
get (long timeout, TimeUnit unit) 用来获取执行结果,如果在指定时间内,还没获取到结果,就直接返回 null。
cancel 方法:还没计算完,可以取消计算过程,如果取消任务成功则返回 true,如果取消任务失败则返回 false。参数 mayInterruptIfRunning 表示是否允许取消正在执行却没有执行完毕的任务,如果设置 true,则表示可以取消正在执行过程中的任务。如果任务已经完成,则无论 mayInterruptIfRunning 为 true 还是 false,此方法肯定返回 false,即如果取消已经完成的任务会返回 false;如果任务正在执行,若 mayInterruptIfRunning 设置为 true,则返回 true,若 mayInterruptIfRunning 设置为 false,则返回 false;如果任务还没有执行,则无论 mayInterruptIfRunning 为 true 还是 false,肯定返回 true。
isDone 方法:判断是否计算完
isCancelled 方法:判断计算是否被取消,方法表示任务是否被取消成功,如果在任务正常完成前被取消成功,则返回 true。
也就是说 Future 提供了三种功能:
1)判断任务是否完成;
2)能够中断任务;
3)能够获取任务执行结果。
Future 就是对于具体的 Runnable 或者 Callable 任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过 get 方法获取执行结果,该方法会阻塞直到任务返回结果。
因为 Future 只是一个接口,所以是无法直接用来创建对象使用的,因此就有了下面的 FutureTask。
使用 Callable+Future 获取执行结果:


public class Test {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
Future<Integer> result = executor.submit(task);
executor.shutdown();
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("主线程在执行任务");
try {
System.out.println("task运行结果"+result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("所有任务执行完毕");
}
}
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(3000);
int sum = 0;
for(int i=0;i<100;i++)
sum += i;
return sum;
}
}
1.4、FutureTask 类
FutureTask 继承体系中的核心接口是 Future。事实上,FutureTask 是 Future 接口的一个唯一实现类。
如何获取 Callable 的返回结果:一般是通过 FutureTask 这个中间媒介来实现的。整体的流程是这样的:
把 Callable 实例当作参数,生成一个 FutureTask 的对象,然后把这个对象当作一个 Runnable,作为参数另起线程。
1.4.1、FutureTask 结构
1.4.2、FutureTask 使用
方式一、使用 thread 方式
FutureTask 实现了 Runnable,因此它既可以通过 Thread 包装来直接执行,也可以提交给 ExecuteService 来执行。以下使用 Thread 包装线程方式启动
public static void main(String[] args) throws Exception {
Callable<Integer> call = () -> {
System.out.println("计算线程正在计算结果...");
Thread.sleep(3000);
return 1;
};
FutureTask<Integer> task = new FutureTask<>(call);
Thread thread = new Thread(task);
thread.start();
System.out.println("main线程干点别的...");
Integer result = task.get();
System.out.println("从计算线程拿到的结果为:" + result);
}
方式二、使用 ExecutorService
ExecutorService executor = Executors.newFixedThreadPool (2); 线程池方式


public static void main(String[] args) {
Callable<String> callable1=()->{
Thread.sleep(2000);
return Thread.currentThread().getName();
};
Callable<String> callable2=()->{
Thread.sleep(3000);
return Thread.currentThread().getName();
};
FutureTask<String> futureTask1 = new FutureTask<>(callable1);// 将Callable写的任务封装到一个由执行者调度的FutureTask对象
FutureTask<String> futureTask2 = new FutureTask<>(callable2);
ExecutorService executor = Executors.newFixedThreadPool(2); // 创建线程池并返回ExecutorService实例
executor.execute(futureTask1); // 执行任务
executor.execute(futureTask2);
//同时开启了两个任务
long startTime = System.currentTimeMillis();
while (true) {
try {
if(futureTask1.isDone() && futureTask2.isDone()){// 两个任务都完成
System.out.println("Done");
executor.shutdown(); // 关闭线程池和服务
return;
}
if(!futureTask1.isDone()){ // 任务1没有完成,会等待,直到任务完成
System.out.println("FutureTask1 output="+futureTask1.get());
}
System.out.println("Waiting for FutureTask2 to complete");
String s = futureTask2.get(200L, TimeUnit.MILLISECONDS);
if(s !=null){
System.out.println("FutureTask2 output="+s);
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}catch(TimeoutException e){
//do nothing
}
System.out.println((System.currentTimeMillis()-startTime));
}
}
使用 Callable+FutureTask 获取执行结果


public class Test {
public static void main(String[] args) {
//第一种方式
ExecutorService executor = Executors.newCachedThreadPool();
Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
executor.submit(futureTask);
executor.shutdown();
//第二种方式,注意这种方式和第一种方式效果是类似的,只不过一个使用的是ExecutorService,一个使用的是Thread
/*Task task = new Task();
FutureTask<Integer> futureTask = new FutureTask<Integer>(task);
Thread thread = new Thread(futureTask);
thread.start();*/
try {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
System.out.println("主线程在执行任务");
try {
System.out.println("task运行结果"+futureTask.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println("所有任务执行完毕");
}
}
class Task implements Callable<Integer>{
@Override
public Integer call() throws Exception {
System.out.println("子线程在进行计算");
Thread.sleep(3000);
int sum = 0;
for(int i=0;i<100;i++)
sum += i;
return sum;
}
}
1.5、CompletionService
原理:内部通过阻塞队列 + FutureTask,实现了任务先完成可优先获取到,即结果按照完成先后顺序排序。


package com.lhx.cloud.futruetask;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
public class CompletionServiceDemo {
public static void main(String[] args) {
Long start = System.currentTimeMillis();
//开启5个线程
ExecutorService exs = Executors.newFixedThreadPool(5);
try {
int taskCount = 10;
//结果集
List<Integer> list = new ArrayList<>();
//1.定义CompletionService
CompletionService<Integer> completionService = new ExecutorCompletionService<>(exs);
List<Future<Integer>> futureList = new ArrayList<>();
//2.添加任务
for(int i=0;i<taskCount;i++){
futureList.add(completionService.submit(new Task(i+1)));
}
//==================结果归集===================
//方法1:future是提交时返回的,遍历queue则按照任务提交顺序,获取结果
// for (Future<Integer> future : futureList) {
// System.out.println("====================");
// Integer result = future.get();//线程在这里阻塞等待该任务执行完毕,按照
// System.out.println("任务result="+result+"获取到结果!"+new Date());
// list.add(result);
// }
// //方法2.使用内部阻塞队列的take()
for(int i=0;i<taskCount;i++){
Integer result = completionService.take().get();//采用completionService.take(),内部维护阻塞队列,任务先完成的先获取到
System.out.println(LocalDateTime.now()+"---任务i=="+result+"完成!");
list.add(result);
}
System.out.println("list="+list);
System.out.println("总耗时="+(System.currentTimeMillis()-start));
} catch (Exception e) {
e.printStackTrace();
} finally {
exs.shutdown();//关闭线程池
}
}
static class Task implements Callable<Integer>{
Integer i;
public Task(Integer i) {
super();
this.i=i;
}
@Override
public Integer call() throws Exception {
if(i==5){
Thread.sleep(5000);
}else{
Thread.sleep(1000);
}
System.out.println("线程:"+Thread.currentThread().getName()+"任务i="+i+",执行完成!");
return i;
}
}
}
建议:使用率也挺高,而且能按照完成先后排序,建议如果有排序需求的优先使用。只是多线程并发执行任务结果归集,也可以使用。
二、CompletableFuture
2.1、对标 Futrue
Future 接口,用于描述一个异步计算的结果。虽然 Future 以及相关使用方法提供了异步执行任务的能力,但是对于结果的获取却是很不方便,只能通过阻塞或者轮询的方式得到任务的结果。
阻塞的方式显然和我们的异步编程的初衷相违背,轮询的方式又会耗费无谓的 CPU 资源,而且也不能及时地得到计算结果,为什么不能用观察者设计模式呢?即当计算结果完成及时通知监听者。
Future 局限性,它很难直接表述多个 Future 结果之间的依赖性。
2.2、类图
2.2.1、CompletionStage
-
CompletionStage 代表异步计算过程中的某一个阶段,一个阶段完成以后可能会触发另外一个阶段
-
一个阶段的计算执行可以是一个 Function,Consumer 或者 Runnable。比如:stage.thenApply (x -> square (x)).thenAccept (x -> System.out.print (x)).thenRun (() -> System.out.println ())
-
一个阶段的执行可能是被单个阶段的完成触发,也可能是由多个阶段一起触发
2.2.2、Future
2.3、创建 CompletableFuture 对象
CompletableFuture.compleatedFuture 是一个静态辅助方法,用来返回一个已经计算好的 CompletableFuture.
以下四个静态方法用来为一段异步执行的代码创建 CompletableFuture 对象:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
以 Async 结尾并且没有指定 Executor 的方法会使用 ForkJoinPool.commonPool () 作为它的线程池执行异步代码。
runAsync 方法:它以 Runnabel 函数式接口类型为参数,所以 CompletableFuture 的计算结果为空。
supplyAsync 方法以 Supplier<U> 函数式接口类型为参数,CompletableFuture 的计算结果类型为 U。
注意:这些线程都是 Daemon 线程,主线程结束 Daemon 线程不结束,只有 JVM 关闭时,生命周期终止。
示例:简单同步用法


public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
//长时间的计算任务
try {
System.out.println("计算型任务开始");
Thread.sleep(2000);
return "计算型任务结束";
} catch (InterruptedException e) {
e.printStackTrace();
}
return "·00";
});
System.out.println(future.get());
}
2.4、计算结果完成时的处理
当 CompletableFuture 的计算结果完成,或者抛出异常的时候,可以执行特定的 Action。主要是下面的方法:
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
可以看到 Action 的类型是 BiConsumer<? super T,? super Throwable> 它可以处理正常的计算结果,或者异常情况。
方法不以 Async 结尾,意味着 Action 使用相同的线程执行,而 Async 可能会使用其他线程执行(如果是使用相同的线程池,也可能会被同一个线程选中执行)
示例:


package com.lhx.cloud.futruetask;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
public class BasicFuture {
private static Random rand = new Random();
private static long t = System.currentTimeMillis();
static int getMoreData() {
System.out.println("begin to start compute");
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("end to compute,passed " + (System.currentTimeMillis()-t));
return rand.nextInt(1000);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(BasicFuture::getMoreData);
Future<Integer> f = future.whenComplete((v, e) -> {
System.out.println(v);
System.out.println(e);
});
System.out.println(f.get());
}}
2.5、转换
CompletableFuture 可以作为 monad (单子) 和 functor. 由于回调风格的实现,我们不必因为等待一个计算完成而阻塞着调用线程,而是告诉 CompletableFuture 当计算完成的时候请执行某个 Function. 还可以串联起来。
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
2.6、异常处理 completeExceptionally
为了能获取任务线程内发生的异常,需要使用 CompletableFuture 的 completeExceptionally
方法将导致 CompletableFuture 内发生问题的异常抛出。
这样,当执行任务发生异常时,调用 get()
方法的线程将会收到一个 ExecutionException
异常,该异常接收了一个包含失败原因的 Exception 参数。


/**
* 任务没有异常 正常执行,然后结束
*/
@Test
public void test1() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(() -> {
// 模拟执行耗时任务
System.out.println("task doing...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 告诉completableFuture任务已经完成
completableFuture.complete("ok");
}).start();
// 获取任务结果,如果没有完成会一直阻塞等待
String result = completableFuture.get();
System.out.println("计算结果:" + result);
}
/**
* 线程有异常 正常执行,然后无法结束,主线程会一直等待
*/
@Test
public void test2() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(() -> {
// 模拟执行耗时任务
System.out.println("task doing...");
try {
Thread.sleep(3000);
int i=1/0;
} catch (InterruptedException e) {
e.printStackTrace();
}
// 告诉completableFuture任务已经完成
completableFuture.complete("ok");
}).start();
// 获取任务结果,如果没有完成会一直阻塞等待
String result = completableFuture.get();
System.out.println("计算结果:" + result);
}
/**
* 线程有异常 正常执行,然后通过completableFuture.completeExceptionally(e);告诉completableFuture任务发生异常了
* 主线程接收到 程序继续处理,至结束
*/
@Test
public void test3() throws ExecutionException, InterruptedException {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
new Thread(() -> {
// 模拟执行耗时任务
System.out.println("task doing...");
try {
Thread.sleep(3000);
int i = 1/0;
} catch (Exception e) {
// 告诉completableFuture任务发生异常了
completableFuture.completeExceptionally(e);
}
// 告诉completableFuture任务已经完成
completableFuture.complete("ok");
}).start();
// 获取任务结果,如果没有完成会一直阻塞等待
String result = completableFuture.get();
System.out.println("计算结果:" + result);
}
2.7、多任务组合方法 allOf 和 anyOf
allOf
是等待所有任务完成,构造后 CompletableFuture 完成
anyOf
是只要有一个任务完成,构造后 CompletableFuture 就完成


package com.lhx.cloud.futruetask;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class CompletableFutureDemo {
public static void main(String[] args) {
Long start = System.currentTimeMillis();
// 结果集
List<String> list = new ArrayList<>();
ExecutorService executorService = Executors.newFixedThreadPool(10);
List<Integer> taskList = Arrays.asList(2, 1, 3, 4, 5, 6, 7, 8, 9, 10);
// 全流式处理转换成CompletableFuture[]+组装成一个无返回值CompletableFuture,join等待执行完毕。返回结果whenComplete获取
CompletableFuture[] cfs = taskList.stream()
.map(integer -> CompletableFuture.supplyAsync(() -> calc(integer), executorService)
.thenApply(h -> Integer.toString(h))
.whenComplete((s, e) -> {
System.out.println(LocalDateTime.now()+"---任务" + s + "完成!result=" + s + ",异常 e=" + e);
list.add(s);
})
).toArray(CompletableFuture[]::new);
// 封装后无返回值,必须自己whenComplete()获取
CompletableFuture.allOf(cfs).join();
System.out.println("list=" + list + ",耗时=" + (System.currentTimeMillis() - start));
}
public static Integer calc(Integer i) {
try {
if (i == 1) {
Thread.sleep(3000);//任务1耗时3秒
} else if (i == 5) {
Thread.sleep(5000);//任务5耗时5秒
} else {
Thread.sleep(1000);//其它任务耗时1秒
}
System.out.println(LocalDateTime.now()+"---task线程:" + Thread.currentThread().getName()
+ "任务i=" + i + ",完成!" );
} catch (InterruptedException e) {
e.printStackTrace();
}
return i;
}
}
2.8、常用多线程并发,取结果归集的几种实现方案
描述 | Future | FutureTask | CompletionService | CompletableFuture |
---|---|---|---|---|
原理 | Future 接口 | 接口 RunnableFuture 的唯一实现类,RunnableFuture 接口继承自 Future+Runnable | 内部通过阻塞队列 + FutureTask 接口 | JDK8 实现了 Future, CompletionStage 两个接口 |
多任务并发执行 | 支持 | 支持 | 支持 | 支持 |
获取任务结果的顺序 | 按照提交顺序获取结果 | 未知 | 支持任务完成的先后顺序 | 支持任务完成的先后顺序 |
异常捕捉 | 自己捕捉 | 自己捕捉 | 自己捕捉 | 原生 API 支持,返回每个任务的异常 |
建议 | CPU 高速轮询,耗资源,或者阻塞,可以使用,但不推荐 | 功能不对口,并发任务这一块多套一层,不推荐使用 | 推荐使用,没有 JDK8CompletableFuture 之前最好的方案 | API 极端丰富,配合流式编程,推荐使用! |
上表来源:https://www.cnblogs.com/dennyzhangdd/p/7010972.html
006-优化web请求二-应用缓存、异步调用【Future、ListenableFuture、CompletableFuture】、ETag、WebSocket【SockJS、Stomp】
四、应用缓存
使用spring应用缓存。使用方式:使用@EnableCache注解激活Spring的缓存功能,需要创建一个CacheManager来处理缓存。如使用一个内存缓存示例
package com.github.bjlhx15.gradle.demotest;
import org.springframework.cache.CacheManager;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.cache.concurrent.ConcurrentMapCache;
import org.springframework.cache.support.SimpleCacheManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Arrays;
@Configuration
@EnableCaching
public class CacheConfiguration {
@Bean
public CacheManager cacheManager(){
SimpleCacheManager simpleCacheManager=new SimpleCacheManager();
simpleCacheManager.setCaches(Arrays.asList(new ConcurrentMapCache("searches")));
return simpleCacheManager;
}
}
其他实现如:EhCacheManager、GuavaCacheManager等
主要标记:
@CacheEvict:将会从缓存中移除一个条目
@CachePut:会将方法结果放到缓存中,而不会影响到方法调用本身。
@Caching:将缓存注解重新分组
@CacheConfig:指向不同的缓存配置
更多spring 应用缓存:https://www.cnblogs.com/bjlhx/category/1233985.html
五、分布式缓存
推荐使用redis,系列文章:https://www.cnblogs.com/bjlhx/category/1066467.html
spring使用也比较方便:https://www.cnblogs.com/bjlhx/category/1233985.html
六、异步方法-EnableAsync
在程序执行时候还有一个瓶颈,串行执行,可以通过使用不同线程类快速提升应用的速度。
要启用Spring的异步功能,必须要使用@EnableAsync注解。这样将会透明地使用java.util.concurrent.Executor来执行所有带有@Async注解的方法。
@Async所修饰的函数不要定义为static类型,这样异步调用不会生效
针对调用的Async,如果不做Future特殊处理,执行完调用方法会立即返回结果,如异步邮件发送,不会真的等邮件发送完毕才响应客户,如需等待可以使用Future阻塞处理。
6.1、原始使用
1、main方法增加@EnableAsync注解
@SpringBootApplication
@EnableAsync
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
System.out.println("ThreadId:"+Thread.currentThread().getId());
}
}
2、在所需方法增加@Async注解
@Component
public class Task {
@Async
public void doTaskOne() throws Exception {
for (int i = 0; i < 3; i++) {
Thread.sleep(200);
System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskOne");
}
}
@Async
public void doTaskTwo() throws Exception {
for (int i = 0; i < 3; i++) {
Thread.sleep(200);
System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskTwo");
}
}
@Async
public void doTaskThree() throws Exception {
for (int i = 0; i < 3; i++) {
Thread.sleep(200);
System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskThree");
}
}
}
3、查看调用
@RestController
public class TestAsyns {
@Autowired
private Task task;
@RequestMapping("/testAsync")
public ResponseEntity testAsync() throws Exception {
task.doTaskOne();
task.doTaskTwo();
task.doTaskThree();
return ResponseEntity.ok("ok");
}
}
上述方法依次调用三个方法。
如果去除@EnableAsync注解,输出如下:【可见是串行执行】
ThreadId:33:doTaskOne
ThreadId:33:doTaskOne
ThreadId:33:doTaskOne
ThreadId:33:doTaskTwo
ThreadId:33:doTaskTwo
ThreadId:33:doTaskTwo
ThreadId:33:doTaskThree
ThreadId:33:doTaskThree
ThreadId:33:doTaskThree
如果增加@EnableAsync注解,输出如下:【可见是并行执行】
ThreadId:56:doTaskThree
ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:54:doTaskOne
ThreadId:55:doTaskTwo
ThreadId:56:doTaskThree
ThreadId:54:doTaskOne
ThreadId:56:doTaskThree
ThreadId:55:doTaskTwo
6.2、自定义执行器使用异步
1、配置类
方式一、注入Bean方式
import java.util.concurrent.Executor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class ThreadConfig {
// 执行需要依赖线程池,这里就来配置一个线程池
@Bean
public Executor getExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
executor.initialize();
return executor;
}
}
方式二、通过实现AsyncConfigurer接口,可以自定义默认的执行(executor)。新增如下配置类:
@Configuration
public class AsyncConfiguration implements AsyncConfigurer {
protected final Logger logger = LoggerFactory.getLogger(AsyncConfiguration.class);
@Override
public Executor getAsyncExecutor() {
//做好不超过10个,这里写两个方便测试
return Executors.newFixedThreadPool(2);
}
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return (ex,method,params)->logger.error("Uncaught async error",ex);
}
}
Executor的初始化配置,还有很多种,可以参看https://www.cnblogs.com/bjlhx/category/1086008.html
Spring 已经实现的异步线程池:
1. SimpleAsyncTaskExecutor:不是真的线程池,这个类不重用线程,每次调用都会创建一个新的线程。
2. SyncTaskExecutor:这个类没有实现异步调用,只是一个同步操作。只适用于不需要多线程的地方
3. ConcurrentTaskExecutor:Executor的适配类,不推荐使用。如果ThreadPoolTaskExecutor不满足要求时,才用考虑使用这个类
4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的类。线程池同时被quartz和非quartz使用,才需要使用此类
5. ThreadPoolTaskExecutor :最常使用,推荐。 其实质是对java.util.concurrent.ThreadPoolExecutor的包装
使用上述配置,能够确保在应用中,用来处理异步任务的线程不会超过10个。这对于web应用很重要,因为每个客户端都会有一个专用的线程。你所使用的线程越多,阻塞时间越长那么能够处理的客户端就会越少。
如果设置成两个,程序中有3个异步线程,也会只有两个运行,如下
ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:55:doTaskTwo
ThreadId:54:doTaskOne
ThreadId:55:doTaskThree
ThreadId:55:doTaskThree
ThreadId:55:doTaskThree
2、使用
同上述一致。
6.3、异步返回处理
方式一、使用Future【FutureTask是默认实现】处理+轮询处理【jdk1.5产物,没有提供Callback机制,只能主动轮询,通过get去获取结果】【不推荐】
修改异步执行的方法


@Component
public class TaskFutureDemo {
@Async
public Future<String> doTaskOne() throws Exception {
for (int i = 0; i < 3; i++) {
Thread.sleep(1000);
System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskOne");
}
return new AsyncResult<>("doTaskOne");
}
@Async
public Future<String> doTaskTwo() throws Exception {
for (int i = 0; i < 3; i++) {
Thread.sleep(1000);
System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskTwo");
}
return new AsyncResult<>("doTaskTwo");
}
@Async
public Future<String> doTaskThree() throws Exception {
for (int i = 0; i < 3; i++) {
Thread.sleep(1000);
System.out.println("ThreadId:"+Thread.currentThread().getId()+":doTaskThree");
}
return new AsyncResult<>("doTaskThree");
}
}
修改调用的方法


@RestController
public class TestAsynsFutureController {
@Autowired
private TaskFutureDemo task;
@RequestMapping("/testAsyncFuture")
public ResponseEntity testAsyncFuture() throws Exception {
Future<String> taskOne = task.doTaskOne();
Future<String> taskTwo = task.doTaskTwo();
Future<String> taskThree = task.doTaskThree();
while (true) {
if (taskOne.isDone() && taskTwo.isDone() && taskThree.isDone()) {
break;
}
}
return ResponseEntity.ok("ok");
}
}
方式二、Spring的ListenableFuture和CountDownLatch处理
Service实现类


@Service
public class TaskListenableFutureService {
private AsyncSearch asyncSearch;
@Autowired
public TaskListenableFutureService(AsyncSearch asyncSearch) {
this.asyncSearch=asyncSearch;
}
public List<String> search(List<String> keywords){
CountDownLatch latch=new CountDownLatch(keywords.size());
List<String> allResult=Collections.synchronizedList(new ArrayList<>());
keywords.stream()
.forEach(keyword->asyncFetch(latch,allResult,keyword));
await(latch);
return allResult;
}
private void asyncFetch(CountDownLatch latch, List<String> result, String keyword){
asyncSearch.asyncFetch(keyword)
.addCallback(
key->onSuccess(result,latch,key),
ex -> onError(latch,ex)
);
}
private void await(CountDownLatch latch){
try {
latch.await();
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
}
private static void onSuccess(List<String> result,CountDownLatch latch,String keyword){
result.add(keyword);
latch.countDown();
}
private static void onError(CountDownLatch latch,Throwable ex){
ex.printStackTrace();
latch.countDown();
}
@Component
private static class AsyncSearch{
@Autowired
public AsyncSearch() {
}
protected final Logger logger = LoggerFactory.getLogger(AsyncSearch.class);
@Async
public ListenableFuture<String> asyncFetch(String keyword){
logger.info(Thread.currentThread().getName()+"-"+keyword);
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new AsyncResult<>("keyword="+keyword);
}
}
}
调用方法


@RestController
public class TestAsynsListenableFutureController {
protected final Logger logger = LoggerFactory.getLogger(TestAsynsListenableFutureController.class);
@Autowired
private TaskListenableFutureService task;
@RequestMapping("/testAsyncListenableFuture")
public ResponseEntity testAsyncListenableFuture() throws Exception {
List<String> list = task.search(Arrays.asList("java", "html", "spring"));
list.stream().forEach(p-> logger.info(p));
return ResponseEntity.ok("ok");
}
}
方式三、使用CompletableFuture【推荐】
User实体类


import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@JsonIgnoreProperties(ignoreUnknown=true)
public class User {
private String name;
private String blog;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getBlog() {
return blog;
}
public void setBlog(String blog) {
this.blog = blog;
}
@Override
public String toString() {
return "User [name=" + name + ", blog=" + blog + "]";
}
}
CompletableFuture的服务


@Service
public class GitHubLookupService {
private static final Logger logger = LoggerFactory.getLogger(GitHubLookupService.class);
private final RestTemplate restTemplate;
public GitHubLookupService(RestTemplateBuilder restTemplateBuilder) {
this.restTemplate = restTemplateBuilder.build();
}
@Async
public CompletableFuture<User> findUser(String user) throws InterruptedException {
logger.info("Looking up " + user);
String url = String.format("https://api.github.com/users/%s", user);
User results = restTemplate.getForObject(url, User.class);
// Artificial delay of 1s for demonstration purposes
Thread.sleep(1000L);
return CompletableFuture.completedFuture(results);
}
}
调用方法


@RestController
public class CompletableFutureController {
private static final Logger logger = LoggerFactory.getLogger(CompletableFutureController.class);
@Autowired
private GitHubLookupService gitHubLookupService;
@RequestMapping("/testCompletableFuture")
public ResponseEntity testCompletableFuture() throws Exception {
// Start the clock
long start = System.currentTimeMillis();
// Kick of multiple, asynchronous lookups
CompletableFuture<User> page1 = gitHubLookupService.findUser("PivotalSoftware");
CompletableFuture<User> page2 = gitHubLookupService.findUser("CloudFoundry");
CompletableFuture<User> page3 = gitHubLookupService.findUser("Spring-Projects");
// Wait until they are all done
CompletableFuture.allOf(page1,page2,page3).join();
// Print results, including elapsed time
logger.info("Elapsed time: " + (System.currentTimeMillis() - start));
logger.info("--> " + page1.get());
logger.info("--> " + page2.get());
logger.info("--> " + page3.get());
return ResponseEntity.ok("ok");
}
}
七、ETag
7.1、什么是ETag?
ETag:是实体标签(Entity Tag)的缩写。ETag一般不以明文形式相应给客户端。在资源的各个生命周期中,它都具有不同的值,用于标识出资源的状态。当资源发生变更时,如果其头信息中一个或者多个发生变化,或者消息实体发生变化,那么ETag也随之发生变化。
ETag值的变更说明资源状态已经被修改。往往可以通过时间戳就可以便宜的得到ETag头信息。在服务端中如果发回给消费者的相应从一开始起就由ETag控制,那么可以确保更细粒度的ETag升级完全由服务来进行控制。服务计算ETag值,并在相应客户端请求时将它返回给客户端。
7.2、计算ETag值
在HTTP1.1协议中并没有规范如何计算ETag。ETag值可以是唯一标识资源的任何东西,如持久化存储中的某个资源关联的版本、一个或者多个文件属性,实体头信息和校验值、(CheckSum),也可以计算实体信息的散列值。有时候,为了计算一个ETag值可能有比较大的代价,此时可以采用生成唯一值等方式(如常见的GUID)。无论怎样,服务都应该尽可能的将ETag值返回给客户端。客户端不用关心ETag值如何产生,只要服务在资源状态发生变更的情况下将ETag值发送给它就行。
ETag值可以通过uuid、整数、长整形、字符串等四种类型。
计算ETag值时,需要考虑两个问题:计算与存储。如果一个ETag值只需要很小的代价以及占用很低的存储空间,那么我们可以在每次需要发送给客户端ETag值值的时候计算一遍就行行了。相反的,我们需要将之前就已经计算并存储好的ETag值发送给客户端。之前说:将时间戳作为字符串作为一种廉价的方式来获取ETag值。对于不是经常变化的消息,它是一种足够好的方案。注意:如果将时间戳做为ETag值,通常不应该用Last-Modified的值。由于HTTP机制中,所以当我们在通过服务校验资源状态时,客户端不需要进行相应的改动。计算ETag值开销最大的一般是计算采用哈希算法获取资源的表述值。可以只计算资源的哈希值,也可以将头信息和头信息的值也包含进去。如果包含头信息,那么注意不要包含计算机标识的头信息。同样也应该避免包含Expires、Cache-Control和Vary头信息。注意:在通过哈希算法。
7.3、ETag的类型以及他们之间的区别
ETag有两种类型:强ETag(strong ETag)与弱ETag(weak ETag)。
强ETag表示形式:"22FAA065-2664-4197-9C5E-C92EA03D0A16"。
弱ETag表现形式:w/"22FAA065-2664-4197-9C5E-C92EA03D0A16"。
强、弱ETag类型的出现与Apache服务器计算ETag的方式有关。Apache默认通过FileEtag中FileEtag INode Mtime Size的配置自动生成ETag(当然也可以通过用户自定义的方式)。假设服务端的资源频繁被修改(如1秒内修改了N次),此时如果有用户将Apache的配置改为MTime,由于MTime只能精确到秒,那么就可以避免强ETag在1秒内的ETag总是不同而频繁刷新Cache(如果资源在秒级经常被修改,也可以通过Last-Modified来解决)。
7.4、Etag - 作用
Etag 主要为了解决 Last-Modified 无法解决的一些问题。
1、 一些文件也许会周期性的更改,但是他的内容并不改变(仅仅改变的修改时间),这个时候我们并不希望客户端认为这个文件被修改了,而重新GET;
2、某些文件修改非常频繁,比如在秒以下的时间内进行修改,(比方说1s内修改了N次),If-Modified-Since能检查到的粒度是s级的,这种修改无法判断(或者说UNIX记录MTIME只能精确到秒
3、某些服务器不能精确的得到文件的最后修改时间;
为此,HTTP/1.1 引入了 Etag(Entity Tags).Etag仅仅是一个和文件相关的标记,可以是一个版本标记,比如说v1.0.0或者说"2e681a-6-5d044840"这么一串看起来很神秘的编码。但是HTTP/1.1标准并没有规定Etag的内容是什么或者说要怎么实现,唯一规定的是Etag需要放在""内。
7.5、Etag - 工作原理
Etag由服务器端生成,客户端通过If-Match或者说If-None-Match这个条件判断请求来验证资源是否修改。常见的是使用If-None-Match.请求一个文件的流程可能如下:
====第一次请求===
1.客户端发起 HTTP GET 请求一个文件;
2.服务器处理请求,返回文件内容和一堆Header,当然包括Etag(例如"2e681a-6-5d044840")(假设服务器支持Etag生成和已经开启了Etag).状态码200
====第二次请求===
1.客户端发起 HTTP GET 请求一个文件,注意这个时候客户端同时发送一个If-None-Match头,这个头的内容就是第一次请求时服务器返回的Etag:2e681a-6-5d044840
2.服务器判断发送过来的Etag和计算出来的Etag匹配,因此If-None-Match为False,不返回200,返回304,客户端继续使用本地缓存;
流程很简单,问题是,如果服务器又设置了Cache-Control:max-age和Expires呢,怎么办?
答案是同时使用,也就是说在完全匹配If-Modified-Since和If-None-Match即检查完修改时间和Etag之后,服务器才能返回304.
7.6、在spring中实践
虽然对请求已经做了应用缓存等处理,但是持续请求一个restful接口请求还是会发送到服务端去读取缓存,即使结果没有发生改变,但结果本身还是会多次发送给用户,造成浪费带宽。
ETag是Web响应数据的一个散列(Hash),并且会在头信息中进行发送。客户端可以记住资源的ETag,并且通过If-None-Match头信息将最新的已知版本发送给服务器。如果在这段时间内请求没有发生变化的话,服务器就会返回304 Not Modified。
在Spring中有一个特殊的Servlet过滤器来处理ETag,名为ShallowEtagHeaderFilter。只需将此类注入即可:
@Bean
public Filter etagFilter(){
return new ShallowEtagHeaderFilter();
}
只要响应头没有缓存控制头信息的话,系统就会为你的响应生成ETag。
示例
@GetMapping("/testNoChangeContent")
public ResponseEntity testNoChangeContent(){
return ResponseEntity.ok("OK");
}
@GetMapping("/testChangeContent")
public ResponseEntity testChangeContent(){
return ResponseEntity.ok("OK:"+LocalDateTime.now());
}
接口一、testNoChangeContent,是测试内容没有改变的,第一次请求是200,以后请求是304
接口二、testChangeContent,是测试内容有改变的,第一次请求是200,以后请求均是200
八、WebSocket
在优化web请求时,这是一种优化方案,在服务器端有可用数据时,就立即将其发送到客户端。通过多线程方式获取搜索结果,所以数据会分为多个块。这时可以一点点地进行发送,而不必等待所有结果。
8.1、概述
1、WebSocket
Http连接为一次请求(request)一次响应(response),必须为同步调用方式。WebSocket 协议提供了通过一个套接字实现全双工通信的功能。一次连接以后,会建立tcp连接,后续客户端与服务器交互为全双工方式的交互方式,客户端可以发送消息到服务端,服务端也可将消息发送给客户端。
WebSocket 是发送和接收消息的底层API,WebSocket 协议提供了通过一个套接字实现全双工通信的功能。也能够实现 web 浏览器和 server 间的异步通信,全双工意味着 server 与浏览器间可以发送和接收消息。需要注意的是必须考虑浏览器是否支持。
2、SockJS
SockJS 是 WebSocket 技术的一种模拟。为了应对许多浏览器不支持WebSocket协议的问题,设计了备选SockJs。开启并使用SockJS后,它会优先选用Websocket协议作为传输协议,如果浏览器不支持Websocket协议,则会在其他方案中,选择一个较好的协议进行通讯。原来在不支持WebSocket的情况下,也可以很简单地实现WebSocket的功能的,方法就是使用 SockJS。它会优先选择WebSocket进行连接,但是当服务器或客户端不支持WebSocket时,会自动在 XHR流、XDR流、iFrame事件源、iFrame HTML文件、XHR轮询、XDR轮询、iFrame XHR轮询、JSONP轮询 这几个方案中择优进行连接。
3、Stomp
STOMP 中文为: 面向消息的简单文本协议。websocket定义了两种传输信息类型: 文本信息和二进制信息。类型虽然被确定,但是他们的传输体是没有规定的。所以,需要用一种简单的文本传输类型来规定传输内容,它可以作为通讯中的文本传输协议,即交互中的高级协议来定义交互信息。
STOMP本身可以支持流类型的网络传输协议: websocket协议和tcp协议。
Stomp还提供了一个stomp.js,用于浏览器客户端使用STOMP消息协议传输的js库。
STOMP的优点如下:
(1)不需要自建一套自定义的消息格式
(2)现有stomp.js客户端(浏览器中使用)可以直接使用
(3)能路由信息到指定消息地点
(4)可以直接使用成熟的STOMP代理进行广播 如:RabbitMQ, ActiveMQ
4、WebSocket、SockJs、STOMP三者关系
简而言之,WebSocket 是底层协议,SockJS 是WebSocket 的备选方案,也是 底层协议,而 STOMP 是基于 WebSocket(SockJS) 的上层协议
- 假设HTTP协议并不存在,只能使用TCP套接字来编写web应用,你可能认为这是一件疯狂的事情。
- 不过幸好,我们有HTTP协议,它解决了 web 浏览器发起请求以及 web 服务器响应请求的细节。
- 直接使用 WebSocket(SockJS) 就很类似于 使用 TCP 套接字来编写 web 应用;因为没有高层协议,因此就需要我们定义应用间所发送消息的语义,还需要确保 连接的两端都能遵循这些语义。
- 同HTTP在TCP套接字上添加请求-响应模型层一样,STOMP在 WebSocket之上提供了一个基于帧的线路格式层,用来定义消息语义。
8.2、不支持WebSocket的场景有:
浏览器不支持
Web容器不支持,如tomcat7以前的版本不支持WebSocket
防火墙不允许
Nginx没有开启WebSocket支持
当遇到不支持WebSocket的情况时,SockJS会尝试使用其他的方案来连接,刚开始打开的时候因为需要尝试各种方案,所以会阻塞一会儿,之后可以看到连接有异常,那就是尝试失败的情况。
为了测试,使用Nginx做反向代理,把www.test.com指到项目启动的端口上,然后本地配HOST来达到模拟真实场景的效果。因为Nginx默认是不支持WebSocket的,所以这里模拟出了服务器不支持WebSocket的场景。、
8.3、spring下的WebSocket使用【WebSocket→sockJs→stomp】
项目中使用的pom
compile ''org.springframework.boot:spring-boot-starter-websocket''
compile ''org.springframework.boot:spring-messaging''
compile group: ''org.webjars'', name: ''sockjs-client'', version: ''1.1.2''
compile group: ''org.webjars'', name: ''stomp-websocket'', version: ''2.3.3''
compile group: ''org.webjars'', name: ''jquery'', version: ''3.3.1-1''
客户端JS
<script src="/webjars/jquery/3.3.1-1/jquery.js"></script>
<script src="/webjars/sockjs-client/1.1.2/sockjs.min.js"></script>
<script src="/webjars/stomp-websocket/2.3.3/stomp.min.js"></script>
<script src="test.js"></script>
8.3.1、使用Spring的底层级WebSocket API
Spring为WebSocket提供了良好支持,WebSocket协议允许客户端维持与服务器的长连接。数据可以通过WebSocket在这两个端点之间进行双向传输,因此消费数据的一方能够实时获取数据。
按照其最简单的形式,WebSocket只是两个应用之间通信的通道。位于WebSocket一端的应用发送消息,另外一端处理消息。因为它是全双工的,所以每一端都可以发送和处理消息。如图18.1所示。
WebSocket通信可以应用于任何类型的应用中,但是WebSocket最常见的应用场景是实现服务器和基于浏览器的应用之间的通信。
实现步骤:
1、编写Handler消息处理器类
方法一:实现 WebSocketHandler 接口,WebSocketHandler 接口如下
public interface WebSocketHandler {
void afterConnectionEstablished(WebSocketSession session) throws Exception;
void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;
void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;
boolean supportsPartialMessages();
}
方法二:扩展 AbstractWebSocketHandler
@Service
public class ChatHandler extends AbstractWebSocketHandler {
@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
session.sendMessage(new TextMessage("hello world."));
}
}
为了在Spring使用较低层级的API来处理消息,必须编写一个实现WebSocketHandler的类.WebSocketHandler需要我们实现五个方法。相比直接实现WebSocketHandler,更为简单的方法是扩展AbstractWebSocketHandler,这是WebSocketHandler的一个抽象实现。
除了重载WebSocketHandler中所定义的五个方法以外,我们还可以重载AbstractWebSocketHandler中所定义的三个方法:
- handleBinaryMessage()
- handlePongMessage()
- handleTextMessage()
这三个方法只是handleMessage()方法的具体化,每个方法对应于某一种特定类型的消息。
方案三、扩展TextWebSocketHandler或BinaryWebSocketHandler。
TextWebSocketHandler是AbstractWebSocketHandler的子类,它会拒绝处理二进制消息。它重载了handleBinaryMessage()方法,如果收到二进制消息的时候,将会关闭WebSocket连接。与之类似,BinaryWebSocketHandler也是AbstractWeb-SocketHandler的子类,它重载了handleTextMessage()方法,如果接收到文本消息的话,将会关闭连接。
2、增加websocket拦截器,管理用户
@Component
public class WebSocketHandshakeInterceptor implements HandshakeInterceptor {
@Override
public boolean beforeHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Map<String, Object> attributes) throws Exception {
if (request instanceof ServletServerHttpRequest) {
attributes.put("username","lhx");
}
return true;
}
@Override
public void afterHandshake(ServerHttpRequest request, ServerHttpResponse response, WebSocketHandler wsHandler, Exception exception) {
}
}
- beforeHandshake()方法,在调用 handler 前调用。常用来注册用户信息,绑定 WebSocketSession,在 handler 里根据用户信息获取WebSocketSession发送消息
3、WebSocketConfig配置
方式一、注解配置
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private ChatHandler chatHandler;
@Autowired
private WebSocketHandshakeInterceptor webSocketHandshakeInterceptor;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
registry.addHandler(chatHandler,"/chat")
.addInterceptors(webSocketHandshakeInterceptor);
}
@Bean
public ServletServerContainerFactoryBean createWebSocketContainer() {
ServletServerContainerFactoryBean container = new ServletServerContainerFactoryBean();
container.setMaxTextMessageBufferSize(8192*4);
container.setMaxBinaryMessageBufferSize(8192*4);
return container;
}
}
- 实现 WebSocketConfigurer 接口,重写 registerWebSocketHandlers 方法,这是一个核心实现方法,配置 websocket 入口,允许访问的域、注册 Handler、SockJs 支持和拦截器。
- registry.addHandler()注册和路由的功能,当客户端发起 websocket 连接,把 /path 交给对应的 handler 处理,而不实现具体的业务逻辑,可以理解为收集和任务分发中心。
- addInterceptors,顾名思义就是为 handler 添加拦截器,可以在调用 handler 前后加入我们自己的逻辑代码。
- ServletServerContainerFactoryBean可以添加对WebSocket的一些配置
方式二、xml配置
4、客户端配置
function contect() {
var wsServer = ''ws://''+window.location.host+''/chat'';
var websocket = new WebSocket(wsServer);
websocket.onopen = function (evt) { onOpen(evt) };
websocket.onclose = function (evt) { onClose(evt) };
websocket.onmessage = function (evt) { onMessage(evt) };
websocket.onerror = function (evt) { onError(evt) };
function onOpen(evt) {
console.log("Connected to WebSocket server.");
websocket.send("test");//客户端向服务器发送消息
}
function onClose(evt) {
console.log("Disconnected");
}
function onMessage(evt) {
console.log(''Retrieved data from server: '' + evt.data);
}
function onError(evt) {
console.log(''Error occured: '' + evt.data);
}
}
contect();
8.3.2、SockJs针对WebSocket支持稍差的场景
为了应对许多浏览器不支持WebSocket协议的问题,设计了备选SockJs
。
SockJS 是 WebSocket 技术的一种模拟。SockJS 会 尽可能对应 WebSocket API,但如果 WebSocket 技术不可用的话,就会选择另外的通信方式协议。
1、服务端只需增加:.withSockJS()即可
@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Autowired
private ChatHandler chatHandler;
@Autowired
private WebSocketHandshakeInterceptor webSocketHandshakeInterceptor;
@Override
public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
// withSockJS() 方法声明我们想要使用 SockJS 功能,如果WebSocket不可用的话,会使用 SockJS;
registry.addHandler(chatHandler,"/chat")
.addInterceptors(webSocketHandshakeInterceptor).withSockJS();
}
}
或者xml配置
<websocket:sockjs />
2、客户端
只需对请求
// var server = ''ws://''+window.location.host+''/chat'';
var server = ''http://''+window.location.host+''/chatsockjs'';
var websocket = new SockJS(server);
- SockJS 所处理的 URL 是 “http://“ 或 “https://“ 模式,而不是 “ws://“ or “wss://“;
- 其他的函数如 onopen, onmessage, and onclose ,SockJS 客户端与 WebSocket 一样,
8.3.3、Stomp方式
STOMP帧由命令,一个或多个头信息以及负载所组成。
直接使用WebSocket(或SockJS)就很类似于使用TCP套接字来编写Web应用。因为没有高层级的线路协议(wire protocol),因此就需要我们定义应用之间所发送消息的语义,还需要确保连接的两端都能遵循这些语义。
不过,好消息是我们并非必须要使用原生的WebSocket连接。就像HTTP在TCP套接字之上添加了请求-响应模型层一样,STOMP在WebSocket之上提供了一个基于帧的线路格式(frame-based wire format)层,用来定义消息的语义。
乍看上去,STOMP的消息格式非常类似于HTTP请求的结构。与HTTP请求和响应类似,STOMP帧由命令、一个或多个头信息以及负载所组成。例如,如下就是发送数据的一个STOMP帧:
SEND
destination:/app/room-message
content-length:20
{\"message\":\"Hello!\"}
对以上代码分析:
- SEND:STOMP命令,表明会发送一些内容;
- destination:头信息,用来表示消息发送到哪里;
- content-length:头信息,用来表示 负载内容的 大小;
- 空行;
- 帧内容(负载)内容
8.3.3.1、基本用法
1、服务端Configuration配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
//添加一个/socket-server-point 连接端点,客户端就可以通过这个端点来进行连接;withSockJS作用是添加SockJS支持
registry.addEndpoint("/socket-server-point").withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
//定义了一个客户端订阅地址的前缀信息,也就是客户端接收服务端发送消息的前缀信息
registry.enableSimpleBroker("/topic");
//定义了服务端接收地址的前缀,也即客户端给服务端发消息的地址前缀
registry.setApplicationDestinationPrefixes("/ws");
}
}
对以上代码分析:
- EnableWebSocketMessageBroker 注解表明: 这个配置类不仅配置了 WebSocket,还配置了基于代理的 STOMP 消息;
- 它复写了 registerStompEndpoints() 方法:添加一个服务端点,来接收客户端的连接。将 “/socket-server-point” 路径注册为 STOMP 端点。这个路径与之前发送和接收消息的目的路径有所不同, 这是一个端点,客户端在订阅或发布消息到目的地址前,要连接该端点,即用户发送请求 :url=’/127.0.0.1:8080/socket-server-point’ 与 STOMP server 进行连接,之后再转发到订阅url;
- 它复写了 configureMessageBroker() 方法:配置了一个 简单的消息代理,通俗一点讲就是设置消息连接请求的各种规范信息。
- 发送应用程序的消息将会带有 “/ws” 前缀。
Service服务处理
@Service
public class HandlerService {
private static final Logger logger = LoggerFactory.getLogger(HandlerService.class);
@Async
public CompletableFuture<String> handle(String key) throws Exception {
Thread.sleep(new Random().nextInt(3000));
logger.info("Looking up " + key);
key=key+":"+LocalDateTime.now();
return CompletableFuture.completedFuture(key);
}
}
Controller控制开发
@MessageMapping("/searchBase")
public ResponseEntity searchBase() throws Exception {
Consumer<List<String>> callback = p -> websocket.convertAndSend("/topic/searchResults", p);
List<String> list = Arrays.asList("bba", "aaa", "ccc");
localSearch(list, callback);
Map map = new HashMap();
map.put("list", list);
map.put("date", LocalDateTime.now());
return ResponseEntity.ok(map);
}
public void localSearch(List<String> keys, Consumer<List<String>> callback) throws Exception {
Thread.sleep(2000);
List<String> list = new ArrayList<>();
for (String key : keys) {
CompletableFuture<String> completableFuture = handlerService.handle(key);
completableFuture.thenAcceptAsync(p -> {
list.clear();
list.add(p);
callback.accept(list);
});
}
}
8.3.3.2、消息流
8.3.3.3、启用STOMP代理中继
对于生产环境下的应用来说,你可能会希望使用真正支持STOMP的代理来支撑WebSocket消息,如RabbitMQ或ActiveMQ。这样的代理提供了可扩展性和健壮性更好的消息功能,当然它们也会完整支持STOMP命令。我们需要根据相关的文档来为STOMP搭建代理。搭建就绪之后,就可以使用STOMP代理来替换内存代理了,只需按照如下方式重载configureMessageBroker()方法即可:
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/queue", "/topic");
//定义了一个客户端订阅地址的前缀信息,也就是客户端接收服务端发送消息的前缀信息
// registry.enableSimpleBroker("/topic");
//定义了服务端接收地址的前缀,也即客户端给服务端发消息的地址前缀
registry.setApplicationDestinationPrefixes("/ws");
}
-
上述configureMessageBroker()方法的第一行代码启用了STOMP代理中继(broker relay)功能,并将其目的地前缀设置为“/topic”和“/queue”。这样的话,Spring就能知道所有目的地前缀为“/topic”或“/queue”的消息都会发送到STOMP代理中。
-
在第二行的configureMessageBroker()方法中将应用的前缀设置为“/ws”。所有目的地以“/ws”打头的消息都将会路由到带有@MessageMapping注解的方法中,而不会发布到代理队列或主题中。
默认情况下,STOMP代理中继会假设代理监听localhost的61613端口,并且客户端的username和password均为“guest”。如果你的STOMP代理位于其他的服务器上,或者配置成了不同的客户端凭证,那么我们可以在启用STOMP代理中继的时候,需要配置这些细节信息:
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableStompBrokerRelay("/queue", "/topic")
.setRelayHost("rabbit.someotherserver")
.setRelayPort(62623)
.setClientLogin("marcopolo")
.setClientPasscode("letmein01")
registry.setApplicationDestinationPrefixes("/app");
}
8.3.3.4、处理来自客户端的STOMP消息
1、应用消息MessageMapping
Spring 4.0引入了@MessageMapping注解,它用于STOMP消息的处理,类似于Spring MVC的@RequestMapping注解。当消息抵达某个特定的目的地时,带有@MessageMapping注解的方法能够处理这些消息。
/**
* 处理来自客户端的STOMP消息
* @param incoming
* @return
*/
@MessageMapping("/incoming")
public Shout handleShout(Shout incoming) {
logger.info("Received message: " + incoming.getMessage());
try { Thread.sleep(2000); } catch (InterruptedException e) {}
Shout outgoing = new Shout();
outgoing.setMessage("incoming!");
return outgoing;
}
消息接受类
public class Shout {
private String message;
public String getMessage() {
return message;
}
public void setMessage(String message) {
this.message = message;
}
}
客户端
function contect() {
var socket=new SockJS(''/socket-server-point'');
stompCliet=Stomp.over(socket);
stompCliet.connect({},function (frame) {
console.log(''Connected :''+frame);
stompCliet.send("/ws/incoming",{},"{\"message\":\"Hello!\"}");
});
}
contect();
2、订阅模式SubscribeMapping
@SubscribeMapping的主要应用场景是实现请求-回应模式。在请求-回应模式中,客户端订阅某一个目的地,然后预期在这个目的地上获得一个一次性的响应。
例如,考虑如下@SubscribeMapping注解标注的方法:
@SubscribeMapping("/sub")
public Shout handleSubscription(){
logger.info("Received message: " +"subscription");
Shout outgoing = new Shout();
outgoing.setMessage("subscription!");
return outgoing;
}
当处理这个订阅时,handleSubscription()方法会产生一个输出的Shout对象并将其返回。然后,Shout对象会转换成一条消息,并且会按照客户端订阅时相同的目的地发送回客户端。
客户端
function contect() {
var socket=new SockJS(''/socket-server-point'');
stompCliet=Stomp.over(socket);
stompCliet.connect({},function (frame) {
console.log(''Connected :''+frame);
stompCliet.subscribe(''/ws/sub'',function (result) {
console.log("aaaa",JSON.parse(result.body));
});
stompCliet.send("/ws/sub",{},"{\"message\":\"Hello!\"}");
});
}
contect();
这种请求-回应模式与HTTP GET的请求-响应模式并没有太大差别。但是,这里的关键区别在于HTTP GET请求是同步的,而订阅的请求-回应模式则是异步的,这样客户端能够在回应可用时再去处理,而不必等待。
8.3.3.5、发送消息到客户端
Spring提供了两种发送数据给客户端的方法:
- 作为处理消息或处理订阅的附带结果;
- 使用消息模板。
方式一、作为处理消息或处理订阅的附带结果、在处理消息之后,发送消息
@MessageMapping("/incoming")
public Shout handleShout(Shout incoming) {
logger.info("Received message: " + incoming.getMessage());
try { Thread.sleep(2000); } catch (InterruptedException e) {}
Shout outgoing = new Shout();
outgoing.setMessage("incoming!");
return outgoing;
}
当@MessageMapping注解标示的方法有返回值的时候,返回的对象将会进行转换(通过消息转换器)并放到STOMP帧的负载中,然后发送给消息代理。
默认情况下,帧所发往的目的地会与触发处理器方法的目的地相同,只不过会添加上“/topic”前缀。就本例而言,这意味着handleShout()方法所返回的Shout对象会写入到STOMP帧的负载中,并发布到“/topic/incoming”目的地。不过,我们可以通过为方法添加@SendTo注解,重载目的地:
@MessageMapping("/incoming")
@SendTo("/topic/shout")
public Shout handleShout(Shout incoming) {
logger.info("Received message: " + incoming.getMessage());
try { Thread.sleep(2000); } catch (InterruptedException e) {}
Shout outgoing = new Shout();
outgoing.setMessage("incoming!");
return outgoing;
}
按照这个@SendTo注解,消息将会发布到“/topic/shout”。所有订阅这个主题的应用(如客户端)都会收到这条消息。
按照类似的方式,@SubscribeMapping注解标注的方式也能发送一条消息,作为订阅的回应。
@SubscribeMapping("/sub")
public Shout handleSubscription(){
logger.info("Received message: " +"subscription");
Shout outgoing = new Shout();
outgoing.setMessage("subscription!");
return outgoing;
}
@SubscribeMapping的区别在于这里的Shout消息将会直接发送给客户端,而不必经过消息代理。如果你为方法添加@SendTo注解的话,那么消息将会发送到指定的目的地,这样会经过代理。
对应客户端需要增加订阅即可
stompCliet.subscribe(''/ws/sub'',function (result) {
console.log("aaaa",JSON.parse(result.body));
});
正如前面看到的那样,使用 @MessageMapping 或者 @SubscribeMapping 注解可以处理客户端发送过来的消息,并选择方法是否有返回值。
如果 @MessageMapping 注解的控制器方法有返回值的话,返回值会被发送到消息代理,只不过会添加上"/topic"前缀。可以使用@SendTo 重写消息目的地;
如果 @SubscribeMapping 注解的控制器方法有返回值的话,返回值会直接发送到客户端,不经过代理。如果加上@SendTo 注解的话,则要经过消息代理。
方式二、使用消息模板【在任意地方发送消息】
@MessageMapping和@SubscribeMapping提供了一种很简单的方式来发送消息,这是接收消息或处理订阅的附带结果。不过,Spring的SimpMessagingTemplate能够在应用的任何地方发送消息,甚至不必以首先接收一条消息作为前提。
我们不必要求用户刷新页面,而是让首页订阅一个STOMP主题.
function contect() {
var socket=new SockJS(''/socket-server-point'');
stompCliet=Stomp.over(socket);
stompCliet.connect({},function (frame) {
console.log(''Connected :''+frame);
stompCliet.subscribe(''/topic/sendDataToClient'',function (result) {
console.log("aaaa",JSON.parse(result.body));
});
// stompCliet.send("/ws/sub",{},"{\"message\":\"Hello!\"}");
});
}
contect();
使用SimpMessagingTemplate能够在应用的任何地方发布消息
@Autowired(required = false)
private SimpMessagingTemplate websocket;
@GetMapping("/sendDataToClient")
public ResponseEntity sendDataToClient() throws Exception {
Map map=new HashMap();
map.put("aa","aaa");
map.put("bb","bbb");
websocket.convertAndSend("/topic/sendDataToClient",map);
return ResponseEntity.ok("ok");
}
当然此处的SimpMessagingTemplate也可以使用父接口SimpMessageSendingOperations注入
在这个场景下,我们希望所有的客户端都能及时看到实时的/topic/sendDataToClient,这种做法是很好的。但有的时候,我们希望发送消息给指定的用户,而不是所有的客户端。
8.3.3.6、为目标用户发送消息
以上说明了如何广播消息,订阅目的地的所有用户都能收到消息。如果消息只想发送给特定的用户呢?spring-websocket 介绍了以下
在使用Spring和STOMP消息功能的时候,我们有两种方式利用认证用户:
1、@MessageMapping和@SubscribeMapping标注的方法基于@SendToUser注解和Principal参数来获取认证用户;@MessageMapping、@SubscribeMapping和@MessageException方法返回的值能够以消息的形式发送给认证用户;
2、SimpMessageSendingOperations接口或SimpMessagingTemplate的convertAndSendToUser方法能够发送消息给特定用户。
1、在控制器中处理用户的消息
在控制器的@MessageMapping或@SubscribeMapping方法中,处理消息时有两种方式了解用户信息。在处理器方法中,通过简单地添加一个Principal参数,这个方法就能知道用户是谁并利用该信息关注此用户相关的数据。除此之外,处理器方法还可以使用@SendToUser注解,表明它的返回值要以消息的形式发送给某个认证用户的客户端(只发送给该客户端)。
@SendToUser 表示要将消息发送给指定的用户,会自动在消息目的地前补上"/user"前缀。如下,最后消息会被发布在 /user/queue/notifications-username。但是问题来了,这个username是怎么来的呢?就是通过 principal 参数来获得的。那么,principal 参数又是怎么来的呢?需要在spring-websocket 的配置类中重写 configureClientInboundChannel 方法,添加上用户的认证。
服务端增加configuration


@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfiguration implements WebSocketMessageBrokerConfigurer {
/**
* 1、设置拦截器
* 2、首次连接的时候,获取其Header信息,利用Header里面的信息进行权限认证
* 3、通过认证的用户,使用 accessor.setUser(user); 方法,将登陆信息绑定在该 StompHeaderAccessor 上,在Controller方法上可以获取 StompHeaderAccessor 的相关信息
* @param registration
*/
@Override
public void configureClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptorAdapter() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
StompHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
//1、判断是否首次连接
if (StompCommand.CONNECT.equals(accessor.getCommand())){
//2、判断用户名和密码
String username = accessor.getNativeHeader("username").get(0);
String password = accessor.getNativeHeader("password").get(0);
if ("admin".equals(username) && "admin".equals(password)){
Principal principal = new Principal() {
@Override
public String getName() {
return username;
}
};
accessor.setUser(principal);
return message;
}else {
return null;
}
}
//不是首次连接,已经登陆成功
return message;
}
});
}
}
服务端处理逻辑
@MessageMapping("/shout")
@SendToUser("/queue/notifications")
public Shout userStomp(Principal principal, Shout shout) {
String name = principal.getName();
String message = shout.getMessage();
logger.info("认证的名字是:{},收到的消息是:{}", name, message);
return shout;
}
前端js代码
var headers={
username:''admin'',
password:''admin''
};
function contect() {
var socket=new SockJS(''/socket-server-point'');
stompCliet=Stomp.over(socket);
stompCliet.connect(headers,function (frame) {
console.log(''Connected :''+frame);
stompCliet.subscribe(''/user/queue/notifications'',function (result) {
console.log("aaaa",JSON.parse(result.body));
});
stompCliet.send("/ws/shout",{},"{\"message\":\"Hello!\"}");
});
}
contect();
2、convertAndSendToUser方法
除了convertAndSend()以外,SimpMessageSendingOperations 还提供了convertAndSendToUser()方法。按照名字就可以判断出来,convertAndSendToUser()方法能够让我们给特定用户发送消息。
@MessageMapping("/singleShout")
public void singleUser(Shout shout, StompHeaderAccessor stompHeaderAccessor) {
String message = shout.getMessage();
LOGGER.info("接收到消息:" + message);
Principal user = stompHeaderAccessor.getUser();
simpMessageSendingOperations.convertAndSendToUser(user.getName(), "/queue/shouts", shout);
}
如上,这里虽然我还是用了认证的信息得到用户名。但是,其实大可不必这样,因为 convertAndSendToUser 方法可以指定要发送给哪个用户。也就是说,完全可以把用户名的当作一个参数传递给控制器方法,从而绕过身份认证!convertAndSendToUser 方法最终会把消息发送到 /user/sername/queue/shouts 目的地上。
8.3.3.7、处理消息异常
在处理消息的时候,有可能会出错并抛出异常。因为STOMP消息异步的特点,发送者可能永远也不会知道出现了错误。@MessageExceptionHandler标注的方法能够处理消息方法中所抛出的异常。我们可以把错误发送给用户特定的目的地上,然后用户从该目的地上订阅消息,从而用户就能知道自己出现了什么错误
@MessageExceptionHandler(Exception.class)
@SendToUser("/queue/errors")
public Exception handleExceptions(Exception t){
t.printStackTrace();
return t;
}
8.3.3.8、更多stomp配置
1、发起连接
其中headers表示客户端的认证信息:
若无需认证,直接使用空对象 “{}” 即可;
(1)connectCallback 表示连接成功时(服务器响应 CONNECTED 帧)的回调方法;
(2)errorCallback 表示连接失败时(服务器响应 ERROR 帧)的回调方法,非必须;
默认链接端点
//默认的和STOMP端点连接
/*stomp.connect("guest", "guest", function (franme) {
});*/
有用户认证的
var headers={
username:''admin'',
password:''admin''
};
stomp.connect(headers, function (frame) {
示例
// 建立连接对象(还未发起连接)
var socket=new SockJS("/endpointChat");
// 获取 STOMP 子协议的客户端对象
var stompClient = Stomp.over(socket);
// 向服务器发起websocket连接并发送CONNECT帧
stompClient.connect( {},
function connectCallback (frame) {
// 连接成功时(服务器响应 CONNECTED 帧)的回调方法
console.log(''已连接【'' + frame + ''】'');
//订阅一个消息
stompClient.subscribe(''/topic/getResponse'',
function (response) {
showResponse(response.body);
});
},
function errorCallBack (error) {
// 连接失败时(服务器响应 ERROR 帧)的回调方法
console.log(''连接失败【'' + error + ''】'');
} );
2、断开连接
若要从客户端主动断开连接,可调用 disconnect() 方法:
client.disconnect(
function () {
alert("断开连接");
});
3、发送消息
连接成功后,客户端可使用 send() 方法向服务器发送信息:
client.send(destination url, headers, body);
其中:
(1)destination url 为服务器 controller中 @MessageMapping 中匹配的URL,字符串,必须参数;
(2)headers 为发送信息的header,JavaScript 对象,可选参数;
(3)body 为发送信息的 body,字符串,可选参数;
示例
client.send("/queue/test", {priority: 9}, "Hello, STOMP");
client.send("/queue/test", {}, "Hello, STOMP");
4、订阅、接收消息
STOMP 客户端要想接收来自服务器推送的消息,必须先订阅相应的URL,即发送一个 SUBSCRIBE 帧,然后才能不断接收来自服务器的推送消息。
订阅和接收消息通过 subscribe() 方法实现:
subscribe(destination url, callback, headers)
其中
(1)destination url 为服务器 @SendTo 匹配的 URL,字符串;
(2)callback 为每次收到服务器推送的消息时的回调方法,该方法包含参数 message;
(3)headers 为附加的headers,JavaScript 对象;该方法返回一个包含了id属性的 JavaScript 对象,可作为 unsubscribe() 方法的参数;默认情况下,如果没有在headers额外添加,这个库会默认构建一个独一无二的ID。在传递headers这个参数时,可以使用你自己id。
示例
var headers = {
ack: ''client'',
//这个客户端指定了它会确认接收的信息,只接收符合这个selector : location = ''Europe''的消息。
''selector'': "location = ''Europe''",
//id:’myid’
};
var callback = function(message) {
if (message.body) {
alert("got message with body " +JSON.parse( message.body)) }
else{
alert("got empty message");
} });
var subscription = client.subscribe("/queue/test", callback, headers);
如果想让客户端订阅多个目的地,你可以在接收所有信息的时候调用相同的回调函数:
onmessage = function(message) {
// called every time the client receives a message
}
var sub1 = client.subscribe("queue/test", onmessage);
var sub2 = client.subscribe("queue/another", onmessage)
5、取消订阅
var subscription = client.subscribe(...);
subscription.unsubscribe();
6、事务支持
可以在将消息的发送和确认接收放在一个事务中。
客户端调用自身的begin()方法就可以开始启动事务了,begin()有一个可选的参数transaction,一个唯一的可标识事务的字符串。如果没有传递这个参数,那么库会自动构建一个。
这个方法会返回一个object。这个对象有一个id属性对应这个事务的ID,还有两个方法:
commit()提交事务
abort()中止事务
在一个事务中,客户端可以在发送/接受消息时指定transaction id来设置transaction。
// start the transaction
var tx = client.begin();
// send the message in a transaction
client.send("/queue/test", {transaction: tx.id}, "message in a transaction");
// commit the transaction to effectively send the message
tx.commit();
如果你在调用send()方法发送消息的时候忘记添加transction header,那么这不会称为事务的一部分,这个消息会直接发送,不会等到事务完成后才发送。
var txid = "unique_transaction_identifier";
// start the transaction
var tx = client.begin();
// oops! send the message outside the transaction
client.send("/queue/test", {}, "I thought I was in a transaction!");
tx.abort(); // Too late! the message has been sent
7、消息确认
默认情况,在消息发送给客户端之前,服务端会自动确认(acknowledged)。
客户端可以选择通过订阅一个目的地时设置一个ack header为client或client-individual来处理消息确认。
在下面这个例子,客户端必须调用message.ack()来通知客户端它已经接收了消息。
var subscription = client.subscribe("/queue/test",
function(message) {
// do something with the message
...
// and acknowledge it
message.ack();
},
{ack: ''client''}
);
ack()接受headers参数用来附加确认消息。例如,将消息作为事务(transaction)的一部分,当要求接收消息时其实代理(broker)已经将ACK STOMP frame处理了。
var tx = client.begin();
message.ack({ transaction: tx.id, receipt: ''my-receipt'' });
tx.commit();
ack()也可以用来通知STOMP 1.1.brokers(代理):客户端不能消费这个消息。与ack()方法的参数相同。
8、debug调试
有一些测试代码能有助于你知道库发送或接收的是什么,从而来调试程序。
客户端可以将其debug属性设置为一个函数,传递一个字符串参数去观察库所有的debug语句。
client.debug = function(str) {
// append the debug log to a #debug div somewhere in the page using JQuery:
$("#debug").append(str + "\n");
};
默认情况,debug消息会被记录在在浏览器的控制台。
9、心跳机制
如果STOMP broker(代理)接收STOMP 1.1版本的帧,heart-beating是默认启用的。heart-beating也就是频率,incoming是接收频率,outgoing是发送频率。
通过改变incoming和outgoing可以更改客户端的heart-beating(默认为10000ms):
client.heartbeat.outgoing = 20000;
// client will send heartbeats every 20000ms
client.heartbeat.incoming = 0;
// client does not want to receive heartbeats
// from the server
heart-beating是利用window.setInterval()去规律地发送heart-beats或者检查服务端的heart-beats。
更多


stomp.connect(headers, function (frame) {
//发送消息
//第二个参数是一个头信息的Map,它会包含在STOMP的帧中
//事务支持
var tx = stomp.begin();
stomp.send("/app/marco", {transaction: tx.id}, strJson);
tx.commit();
//订阅服务端消息 subscribe(destination url, callback[, headers])
stomp.subscribe("/topic/marco", function (message) {
var content = message.body;
var obj = JSON.parse(content);
console.log("订阅的服务端消息:" + obj.message);
}, {});
stomp.subscribe("/app/getShout", function (message) {
var content = message.body;
var obj = JSON.parse(content);
console.log("订阅的服务端直接返回的消息:" + obj.message);
}, {});
/*以下是针对特定用户的订阅*/
var adminJSON = JSON.stringify({''message'': ''ADMIN''});
/*第一种*/
stomp.send("/app/singleShout", {}, adminJSON);
stomp.subscribe("/user/queue/shouts",function (message) {
var content = message.body;
var obj = JSON.parse(content);
console.log("admin用户特定的消息1:" + obj.message);
});
/*第二种*/
stomp.send("/app/shout", {}, adminJSON);
stomp.subscribe("/user/queue/notifications",function (message) {
var content = message.body;
var obj = JSON.parse(content);
console.log("admin用户特定的消息2:" + obj.message);
});
/*订阅异常消息*/
stomp.subscribe("/user/queue/errors", function (message) {
console.log(message.body);
});
//若使用STOMP 1.1 版本,默认开启了心跳检测机制(默认值都是10000ms)
stomp.heartbeat.outgoing = 20000;
stomp.heartbeat.incoming = 0; //客户端不从服务端接收心跳包
});
参看代码:https://github.com/JMCuixy/SpringWebSocket
CompletableFuture allof(..)join()与CompletableFuture.join()
我目前正在使用CompletableFuture supplyAsync()方法向公共线程池提交一些任务。以下是代码片段的样子:
final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
.map(resolver -> supplyAsync(() -> task.doWork()))
.collect(toList());
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();
final List<Test> tests = new ArrayList<>();
completableFutures.stream()
.map(completableFuture -> completableFuture.getNow())
.forEach(tests::addAll);
我想知道下面与上面的代码有何不同。我从下面的代码中删除了父completableFuture,并为每个completableFuture添加了join()而不是getNow():
final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
.map(resolver -> supplyAsync(() -> task.doWork()))
.collect(toList());
final List<Test> tests = new ArrayList<>();
completableFutures.stream()
.map(completableFuture -> completableFuture.join())
.forEach(tests::addAll);
我在spring服务中使用了它,线程池耗尽有问题。任何指针深表赞赏。
CompletableFuture allof(..)。join()与CompletableFuture.join()
我目前正在使用CompletableFuture supplyAsync()方法向公共线程池提交一些任务。以下是代码片段的样子:
final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
.map(resolver -> supplyAsync(() -> task.doWork()))
.collect(toList());
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();
final List<Test> tests = new ArrayList<>();
completableFutures.stream()
.map(completableFuture -> completableFuture.getNow())
.forEach(tests::addAll);
我想知道下面与上面的代码有何不同。我从下面的代码中删除了父completableFuture,并为每个completableFuture添加了join()而不是getNow():
final List<CompletableFuture<List<Test>>> completableFutures = resolvers.stream()
.map(resolver -> supplyAsync(() -> task.doWork()))
.collect(toList());
final List<Test> tests = new ArrayList<>();
completableFutures.stream()
.map(completableFuture -> completableFuture.join())
.forEach(tests::addAll);
我在spring服务中使用了它,线程池耗尽有问题。任何指针深表赞赏。
关于如果使用thenApply,则不会调用CompletableFuture#whenComplete和如果要调用a.py内所有的模块应使用语句的问题我们已经讲解完毕,感谢您的阅读,如果还想了解更多关于005 - 多线程 - JUC 线程池 - Future、FutureTask、CompletionService 、CompletableFuture、006-优化web请求二-应用缓存、异步调用【Future、ListenableFuture、CompletableFuture】、ETag、WebSocket【SockJS、Stomp】、CompletableFuture allof(..)join()与CompletableFuture.join()、CompletableFuture allof(..)。join()与CompletableFuture.join()等相关内容,可以在本站寻找。
本文标签: