在这篇文章中,我们将为您详细介绍JavaConcurrency/Threads的内容,并且讨论关于1——join详解的相关问题。此外,我们还会涉及一些关于hivejoin,outerjoin,semi
在这篇文章中,我们将为您详细介绍Java Concurrency/Threads的内容,并且讨论关于1——join详解的相关问题。此外,我们还会涉及一些关于hive join,outer join, semi join详解、Hive 中Join的专题---Join详解、Java 21 新特性:虚拟线程(Virtual Threads)、Java 8 Concurrency Tutorial--转的知识,以帮助您更全面地了解这个主题。
本文目录一览:- Java Concurrency/Threads(1)——join详解(java thread.join)
- hive join,outer join, semi join详解
- Hive 中Join的专题---Join详解
- Java 21 新特性:虚拟线程(Virtual Threads)
- Java 8 Concurrency Tutorial--转
Java Concurrency/Threads(1)——join详解(java thread.join)
参考自:https://www.cnblogs.com/enjiex/p/3661551.html
1. Java Thread join 方法作用:
将当前线程A变为wait,执行join操作的线程B直到B结束。如果该B线程在执行中被中断.
2. join方法的三个重载:
public final void join();//此方法会把当前线程变为wait,直到执行join操作的线程结束,如果该线程在执行中被中断,则会抛出InterruptedException public final synchronized void join(long milis); //此方法会把当前线程变为wait,直到执行join操作的线程结束或者在执行join后等待millis的时间。因为线程调度依赖于操作系统的实现,因为这并不能保证当前线程一定会在millis时间变为RUnnable。 public final synchronized void join(long milis,int nanos); //此方法会把当前线程变为wait,直到执行join操作的线程结束或者在join后等待millis+nanos的时间。
package com.guoqiang; import static com.guoqiang.ThreadColor.*; public class Main { public static void main(String[] args) { Thread t1 = new Thread(new MyRunnable(),"t1"); Thread t2 = new Thread(new MyRunnable(),"t2"); Thread t3 = new Thread(new MyRunnable(),"t3"); t1.start(); //start second thread after waiting for 2 seconds or if it's dead try { t1.join(2000); } catch (InterruptedException e) { e.printstacktrace(); } t2.start(); //start third thread only when first thread is dead try { t1.join(); } catch (InterruptedException e) { e.printstacktrace(); } t3.start(); //let all threads finish execution before finishing main thread try { t1.join(); t2.join(); t3.join(); } catch (InterruptedException e) { // Todo Auto-generated catch block e.printstacktrace(); } System.out.println("All threads are dead,exiting main thread"); } } class MyRunnable implements Runnable{ @Override public void run() { System.out.println("Thread started:::"+Thread.currentThread().getName()); try { Thread.sleep(4000); } catch (InterruptedException e) { e.printstacktrace(); } System.out.println("Thread ended:::"+Thread.currentThread().getName()); } }
OUT: Thread started:::t1 Thread started:::t2 Thread ended:::t1 Thread started:::t3 Thread ended:::t2 Thread ended:::t3 All threads are dead,exiting main thread
hive join,outer join, semi join详解
hive join,outer join, semi join详解
- join 最简单 两个表取交集
- left outer join是以左表驱动,右表不存在的key均赋值为null
- right outer join是以右表驱动,左表不存在的key均赋值为null
此外hive sql不支持 in函数 比如
-
SELECT a.key, a.value
FROM a
WHERE a.key in (SELECT b.key FROM B);
这个可以用 left outer join替换
-
SELECT a.key, a.value
FROM a LEFT OUTER JOIN b ON (a.key = b.key)
WHERE b.key <> NULL;
更高端的写法是 用 semi join
-
SELECT a.key, a.value
FROM a LEFT SEMI JOIN b on (a.key = b.key);
Hive 中Join的专题---Join详解
1.什么是等值连接?
2.hive转换多表join时,如果每个表在join字句中,使用的都是同一个列,该如何处理?
3.LEFT,RIGHT,FULL OUTER连接的作用是什么?
4.LEFT或RIGHT join是连接从左边还有右边?
Hive表连接的语法支持如下:
Sql代码 :
join_table:
table_reference JOIN table_factor [join_condition]
| table_reference {LEFT|RIGHT|FULL} [OUTER] JOIN table_reference join_condition
| table_reference LEFT SEMI JOIN table_reference join_condition
| table_reference CROSS JOIN table_reference [join_condition] (as of Hive 0.10)
table_reference:
table_factor
| join_table
table_factor:
tbl_name [alias]
| table_subquery alias
| ( table_references )
join_condition:
ON equality_expression ( AND equality_expression )*
hive只支持等连接,外连接,左半连接。hive不支持非相等的join条件(通过其他方式实现,如left outer join),因为它很难在map/reducejob实现这样的条件。而且,hive可以join两个以上的表。
例子
写join查询时,有几个典型的点要考虑,如下:
等连接
只有等连接才允许
Sql代码 :
SELECT a.* FROM a JOIN b ON (a.id = b.id)
SELECT a.* FROM a JOIN b ON (a.id = b.id AND a.department = b.department)
这两个是合法的连接
Sql代码 :
SELECT a.* FROM a JOIN b ON (a.id <> b.id)
这个是不允许的。
多表连接
同个查询,可以join两个以上的表
Sql代码 :
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
join的缓存和任务转换
hive转换多表join时,如果每个表在join字句中,使用的都是同一个列,只会转换为一个单独的map/reduce。
Sql代码 :
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
这个会转换为单独的map/reduce任务,只有b表的key1列在join被调用。
另一方面
Sql代码 :
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
被转换为两个map/reduce任务,因为b的key1列在第一个join条件使用,而b表的key2列在第二个join条件使用。第一个map/reduce任务join a和b。第二个任务是第一个任务的结果join c。
在join的每个map/reduce阶段,序列中的最后一个表,当其他被缓存时,它会流到reducers。所以,reducers需要缓存join关键字的特定值组成的行,通过组织最大的表出现在序列的最后,有助于减少reducers的内存。
Sql代码 :
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
三个表,在同一个独立的map/reduce任务做join。a和b的key对应的特定值组成的行,会缓存在reducers的内存。然后reducers接受c的每一行,和缓存的每一行做join计算。
类似
Sql代码 :
SELECT a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key2)
这里有两个map/reduce任务在join计算被调用。第一个是a和b做join,然后reducers缓存a的值,另一边,从流接收b的值。第二个阶段,reducers缓存第一个join的结果,另一边从流接收c的值。
在join的每个map/reduce阶段,通过关键字,可以指定哪个表从流接收。
Sql代码 :
SELECT /*+ STREAMTABLE(a) */ a.val, b.val, c.val FROM a JOIN b ON (a.key = b.key1) JOIN c ON (c.key = b.key1)
三个表的连接,会转换为一个map/reduce任务,reducer会把b和c的key的特定值缓存在内存里,然后从流接收a的每一行,和缓存的行做join。
join的结果
LEFT,RIGHT,FULL OUTER连接存在是为了提供ON语句在没有匹配时的更多控制。例如,这个查询:
Sql代码 :
SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key)
将会返回a的每一行。如果b.key等于a.key,输出将是a.val,b.val,如果a没有和 b.key匹配,输出的行将是a.val,NULL。如果b的行没有和a.key匹配上,将被抛弃。语法"FROM a LEFT OUTER JOIN b"必须写在一行,为了理解它如何工作——这个查询,a是b的左边,a的所有行会被保持;RIGHT OUTER JOIN将保持b的所有行, FULLOUTER JOIN将会保存a和b的所有行。OUTER JOIN语义应该符合标准的SQL规范。
join的过滤
Joins发生在where字句前,所以,如果要限制join的输出,需要写在where字句,否则写在JOIN字句。现在讨论的一个混乱的大点,就是分区表
Sql代码 :
SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key)
WHERE a.ds=''2009-07-07'' AND b.ds=''2009-07-07''
将会连接a和b,产生a.val和b.val的列表。WHERE字句,也可以引用join的输出列,然后过滤他们。
但是,无论何时JOIN的行找到a的key,但是找不到b的key时,b的所有列会置成NULL,包括ds列。这就是说,将过滤join输出的所有行,包括没有合法的b.key的行。然后你会在LEFT OUTER的要求扑空。
也就是说,如果你在WHERE字句引用b的任何列,LEFTOUTER的部分join结果是不相关的。所以,当外连接时,使用这个语句
Sql代码 :
SELECT a.val, b.val FROM a LEFT OUTER JOIN b ON (a.key=b.key AND b.ds=''2009-07-07'' AND a.ds=''2009-07-07''
join的输出会预先过滤,然后你不用对有a.key而没有b.key的行做过滤。RIGHT和FULL join也是一样的逻辑。
join的顺序
join是不可替换的,连接是从左到右,不管是LEFT或RIGHT join。
Sql代码 :
SELECT a.val1, a.val2, b.val, c.val
FROM a
JOIN b ON (a.key = b.key)
LEFT OUTER JOIN c ON (a.key = c.key)
首先,连接a和b,扔掉a和b中没有匹配的key的行。结果表再连接c。这提供了直观的结果,如果有一个键 都存在于A和C,但不是B:完整行(包括 a.val1,a.val2,a.key)会在"a jOINb"步骤,被丢弃,因为它不在b中。结果没有a.key,所以当它和c做LEFT OUTER JOIN,c.val也无法做到,因为没有c.key匹配a.key(因为a的行都被移除了)。类似的,RIGHTOUTER JOIN(替换为LEFT),我们最终会更怪的效果,NULL, NULL, NULL, c.val。因为尽管指定了join key是a.key=c.key,我们已经在第一个JOIN丢弃了不匹配的a的所有行。
为了达到更直观的效果,相反,我们应该从
Sql代码 :
FROM c LEFT OUTER JOIN a ON (c.key = a.key) LEFT OUTER JOIN b ON (c.key = b.key).
LEFT SEMI JOIN实现了相关的IN / EXISTS的子查询语义的有效途径。由于Hive目前不支持IN / EXISTS的子查询,所以你可以用 LEFT SEMI JOIN 重写你的子查询语句。LEFT SEMIJOIN 的限制是, JOIN 子句中右边的表只能在 ON 子句中设置过滤条件,在 WHERE 子句、SELECT 子句或其他地方过滤都不行。
Sql代码 :
SELECT a.key, a.value FROM a WHERE a.key in (SELECT b.key FROM B);
可以重写为
Sql代码 :
SELECT a.key, a.val FROM a LEFT SEMI JOIN b on (a.key = b.key)
mapjoin
但如果所有被连接的表是小表,join可以被转换为只有一个map任务。查询是
SELECT /*+ MAPJOIN(b) */ a.key, a.value
FROM a join b ona.key = b.key
不需要reducer。对于每一个mapper,A和B已经被完全读出。限制是a FULL/RIGHTOUTER JOIN b不能使用。
如果表在join的列已经分桶了,其中一张表的桶的数量,是另一个表的桶的数量的整倍,那么两者可以做桶的连接。如果A有4个桶,表B有4个桶,下面的连接:
Sql代码 :
SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM a join b on a.key = b.key
只能在mapper工作。为了为A的每个mapper完整抽取B。对于上面的查询,mapper处理A的桶1,只会抽取B的桶1,这不是默认行为,要使用以下参数:
Sql代码 :
set hive.optimize.bucketmapjoin = true;
如果表在join的列经过排序,分桶,而且他们有相同数量的桶,可以使用排序-合并 join。每个mapper,相关的桶会做连接。如果A和B有4个桶,
Sql代码 :
SELECT /*+ MAPJOIN(b) */ a.key, a.value FROM A a join B b on a.key = b.key
只能在mapper使用。使用A的桶的mapper,也会遍历B相关的桶。这个不是默认行为,需要配置以下参数:
Sql代码 :
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
Java 21 新特性:虚拟线程(Virtual Threads)
在 Java 21 中,引入了虚拟线程(Virtual Threads)来简化和增强并发性,这使得在 Java 中编程并发程序更容易、更高效。
虚拟线程,也称为 “用户模式线程(user-mode threads)” 或 “纤程(fibers)”。该功能旨在简化并发编程并提供更好的可扩展性。虚拟线程是轻量级的,这意味着它们可以比传统线程创建更多数量,并且开销要少得多。这使得在自己的线程中运行单独任务或请求变得更加实用,即使在高吞吐量的程序中也是如此。
创建和使用虚拟线程
在 Java 21 中创建和使用虚拟线程有多种方法:
1. 使用静态构建器方法
Thread.startVirtualThread
方法将可运行对象作为参数来创建,并立即启动虚拟线程,具体如下代码:
Runnable runnable = () -> {
System.out.println("Hello, www.didispace.com");
};
// 使用静态构建器方法
Thread virtualThread = Thread.startVirtualThread(runnable);
也可以使用 Thread.ofVirtual()
来创建,这里还可以设置一些属性,比如:线程名称。具体如下代码:
Thread.ofVirtual()
.name("didispace-virtual-thread")
.start(runnable);
2. 与 ExecutorService
结合使用
从 Java 5 开始,就推荐开发人员使用 ExecutorServices
而不是直接使用 Thread
类了。现在,Java 21 中引入了使用虚拟线程,所以也有了新的 ExecutorService
来适配,看看下面的例子:
Runnable runnable = () -> {
System.out.println("Hello, www.didispace.com");
};
try (ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()) {
for (int i = 0; i < 100; i++) {
executorService.submit(runnable);
}
}
上述代码在 try 代码块中创建了一个 ExecutorServices
,用来为每个提交的任务创建虚拟线程。
3. 使用虚拟线程工厂
开发者还可以创建一个生成虚拟线程的工厂来管理,具体看下面的例子例子:
Runnable runnable = () -> {
System.out.println("Hello, www.didispace.com");
};
ThreadFactory virtualThreadFactory = Thread.ofVirtual()
.name("didispace", 0)
.factory();
Thread factoryThread = virtualThreadFactory.newThread(runnable);
factoryThread.start;
这段代码创建了一个虚拟线程工厂,每个虚拟线程都会以 didispace
为前缀、以数字结尾(从 0 开始累加)的名称。
小结
上面我们介绍了虚拟线程的创建和使用,而我们大多数 Java 开发者都基于 Spring 来开发具体业务应用,所以很多场景下可能都不太涉及手工创建的操作。所以,对于虚拟线程的概念,你只需要有一个基本的认识。所以,在文章的最后,做一个小结,以方便大家理解和记忆:
- 虚拟线程是由 JVM 管理的轻量级线程。
- 虚拟线程不需要任何显式分配或调度。
- 虚拟线程非常适合 I/O 密集型任务或需要大量并行性的任务。
- 虚拟线程也可以用来实现异步操作。
另外,值得注意的是,虽然虚拟线程可以在并发性和可扩展性方面提供显着的帮助,但它们并不总是适合所有场景。有些需要大量计算的任务,并不一定在虚拟线程中运行更好,因为虚拟线程也有上下文切换的开。具体情况还是需要通过测试评测,以找到最优解。
如果您学习过程中如遇困难?可以加入我们超高质量的技术交流群,参与交流与讨论,更好的学习与进步!另外,不要走开,关注我!持续更新 Java 新特性专栏!
参考资料
- Java 21:JEP444-Virtual-Threads
- Spring Boot 虚拟线程与 Webflux 在 JWT 验证和 MySQL 查询上的性能比较
欢迎关注我的公众号:程序猿 DD。第一时间了解前沿行业消息、分享深度技术干货、获取优质学习资源
Java 8 Concurrency Tutorial--转
<h1>Threads and Executors
Welcome to the first part of my Java 8 Concurrency tutorial. This guide teaches you in Java 8 with easily understood code examples. It's the first part out of a series of tutorials covering the Java Concurrency API. In the next 15 min you learn how to execute code in parallel via threads,tasks and executor services.
- Part 1: Threads and Executors
- Part 2:
- Part 3:
The was first introduced with the release of Java 5 and then progressively enhanced with every new Java release. The majority of concepts shown in this article also work in older versions of Java. However my code samples focus on Java 8 and make heavy use of lambda expressions and other new features. If you're not yet familiar with lambdas I recommend reading my first.
Threads and Runnables
All modern operating systems support concurrency both via and . Processes are instances of programs which typically run independent to each other,e.g. if you start a java program the operating system spawns a new process which runs in parallel to other programs. Inside those processes we can utilize threads to execute code concurrently,so we can make the most out of the available cores of the CPU.
Java supports since JDK 1.0. Before starting a new thread you have to specify the code to be executed by this thread,often called the task. This is done by implementing Runnable
- a functional interface defining a single void no-args method run()
as demonstrated in the following example:
stemtask<span>.<span>run<span>(<span>)<span>;Thread thread <span>= <span>new <span>Thread<span>(task<span>)<span>;
thread<span>.<span>start<span>(<span>)<span>;System<span>.out<span>.<span>println<span>(<span>"Done!"<span>)<span>;
Since Runnable
is a functional interface we can utilize Java 8 lambda expressions to print the current threads name to the console. First we execute the runnable directly on the main thread before starting a new thread.
The result on the console might look like this:
Or that:
Due to concurrent execution we cannot predict if the runnable will be invoked before or after printing 'done'. The order is non-deterministic,thus making concurrent programming a complex task in larger applications.
Threads can be put to sleep for a certain duration. This is quite handy to simulate long running tasks in the subsequent code samples of this article:
stemstemprintstacktraceThread thread <span>= <span>new <span>Thread<span>(runnable<span>)<span>;
thread<span>.<span>start<span>(<span>)<span>;
When you run the above code you'll notice the one second delay between the first and the second print statement. TimeUnit
is a useful enum for working with units of time. Alternatively you can achieve the same by calling Thread.sleep(1000)
.
Working with the Thread
class can be very tedious and error-prone. Due to that reason the Concurrency API has been introduced back in 2004 with the release of Java 5. The API is located in package java.util.concurrent
and contains many useful classes for handling concurrent programming. Since that time the Concurrency API has been enhanced with every new Java release and even Java 8 provides new classes and methods for dealing with concurrency.
Now let's take a deeper look at one of the most important parts of the Concurrency API - the executor services.
Executors
The Concurrency API introduces the concept of an ExecutorService
as a higher level replacement for working with threads directly. Executors are capable of running asynchronous tasks and typically manage a pool of threads,so we don't have to create new threads manually. All threads of the internal pool will be reused under the hood for revenant tasks,so we can run as many concurrent tasks as we want throughout the life-cycle of our application with a single executor service.
This is how the first thread-example looks like using executors:
stem<span>// => Hello pool-1-thread-1
The class Executors
provides convenient factory methods for creating different kinds of executor services. In this sample we use an executor with a thread pool of size one.
The result looks similar to the above sample but when running the code you'll notice an important difference: the java process never stops! Executors have to be stopped explicitly - otherwise they keep listening for new tasks.
An ExecutorService
provides two methods for that purpose: shutdown()
waits for currently running tasks to finish while shutdownNow()
interrupts all running tasks and shut the executor down immediately.
This is the preferred way how I typically shutdown executors:
stemstemstemNowstem
The executor shuts down softly by waiting a certain amount of time for termination of currently running tasks. After a maximum of five seconds the executor finally shuts down by interrupting all running tasks.
Callables and Futures@H_912_301@
In addition to Runnable
executors support another kind of task named Callable
. Callables are functional interfaces just like runnables but instead of being void
they return a value.
This lambda expression defines a callable returning an integer after sleeping for one second:
task galStateException
Callables can be submitted to executor services just like runnables. But what about the callables result? Since submit()
doesn't wait until the task completes,the executor service cannot return the result of the callable directly. Instead the executor returns a special result of type Future
which can be used to retrieve the actual result at a later point in time.
future System<span>.out<span>.<span>println<span>(<span>"future done? " <span>+ future<span>.<span>isDone<span>(<span>)<span>)<span>;
Integer result <span>= future<span>.<span>get<span>(<span>)<span>;
System<span>.out<span>.<span>println<span>(<span>"future done? " <span>+ future<span>.<span>isDone<span>(<span>)<span>)<span>;
System<span>.out<span>.<span>print<span>(<span>"result: " <span>+ result<span>)<span>;
After submitting the callable to the executor we first check if the future has already been finished execution via isDone()
. I'm pretty sure this isn't the case since the above callable sleeps for one second before returning the integer.
Calling the method get()
blocks the current thread and waits until the callable completes before returning the actual result 123
. Now the future is finally done and we see the following result on the console:
Futures are tightly coupled to the underlying executor service. Keep in mind that every non-terminated future will throw exceptions if you shutdown the executor:
Now
You might have noticed that the creation of the executor slightly differs from the prevIoUs example. We use newFixedThreadPool(1)
to create an executor service backed by a thread-pool of size one. This is equivalent to newSingleThreadExecutor()
but we Could later increase the pool size by simply passing a value larger than one.
Timeouts@H_912_301@
Any call to future.get()
will block and wait until the underlying callable has been terminated. In the worst case a callable runs forever - thus making your application unresponsive. You can simply counteract those scenarios by passing a timeout:
Future<span><Integer<span>> future <span>= executor<span>.<span>submit<span>(<span>(<span>) <span>-<span>> <span>{
<span>try <span>{
TimeUnit<span>.SECONDS<span>.<span>sleep<span>(<span>2<span>)<span>;
<span>return <span>123<span>;
<span>}
<span>catch <span>(<span>InterruptedException e<span>) <span>{
<span>throw <span>new <span>IllegalStateException<span>(<span>"task interrupted"<span>,e<span>)<span>;
<span>}
<span>}<span>)<span>;future<span>.<span>get<span>(<span>1<span>,TimeUnit<span>.SECONDS<span>)<span>;
Executing the above code results in a TimeoutException
:
You might already have guessed why this exception is thrown: We specified a maximum wait time of one second but the callable actually needs two seconds before returning the result.
InvokeAll@H_912_301@
Executors support batch submitting of multiple callables at once via invokeAll()
. This method accepts a collection of callables and returns a list of futures.
tealingPoolList<span><Callable<span><String<span>>> callables <span>= Arrays<span>.<span>asList<span>(
<span>(<span>) <span>-<span>> <span>"task1"<span>,<span>(<span>) <span>-<span>> <span>"task2"<span>,<span>(<span>) <span>-<span>> <span>"task3"<span>)<span>;
executor<span>.<span>invokeAll<span>(callables<span>)
<span>.<span>stream<span>(<span>)
<span>.<span>map<span>(future <span>-<span>> <span>{
<span>try <span>{
<span>return future<span>.<span>get<span>(<span>)<span>;
<span>}
<span>catch <span>(<span>Exception e<span>) <span>{
<span>throw <span>new <span>IllegalStateException<span>(e<span>)<span>;
<span>}
<span>}<span>)
<span>.<span>forEach<span>(System<span>.out<span>:<span>:println<span>)<span>;
In this example we utilize Java 8 functional streams in order to process all futures returned by the invocation of invokeAll
. We first map each future to its return value and then print each value to the console. If you're not yet familiar with streams read my .
InvokeAny
Another way of batch-submitting callables is the method invokeAny()
which works slightly different to invokeAll()
. Instead of returning future objects this method blocks until the first callable terminates and returns the result of that callable.
In order to test this behavior we use this helper method to simulate callables with different durations. The method returns a callable that sleeps for a certain amount of time until returning the given result:
We use this method to create a bunch of callables with different durations from one to three seconds. Submitting those callables to an executor via invokeAny()
returns the string result of the fastest callable - in that case task2:
List<span><Callable<span><String<span>>> callables <span>= Arrays<span>.<span>asList<span>(
<span>callable<span>(<span>"task1"<span>,<span>2<span>)<span>,<span>callable<span>(<span>"task2"<span>,<span>1<span>)<span>,<span>callable<span>(<span>"task3"<span>,<span>3<span>)<span>)<span>;String result <span>= executor<span>.<span>invokeAny<span>(callables<span>)<span>;
System<span>.out<span>.<span>println<span>(result<span>)<span>;<span>// => task2
The above example uses yet another type of executor created via newWorkStealingPool()
. This factory method is part of Java 8 and returns an executor of type ForkJoinPool
which works slightly different than normal executors. Instead of using a fixed size thread-pool are created for a given parallelism size which per default is the number of available cores of the hosts cpu.
ForkJoinPools exist since Java 7 and will be covered in detail in a later tutorial of this series. Let's finish this tutorial by taking a deeper look at scheduled executors.
Scheduled Executors
We've already learned how to submit and run tasks once on an executor. In order to periodically run common tasks multiple times,we can utilize scheduled thread pools.
A scheduledexecutorservice
is capable of scheduling tasks to run either periodically or once after a certain amount of time has elapsed.
This code sample schedules a task to run after an initial delay of three seconds has passed:
Runnable task <span>= <span>(<span>) <span>-<span>> System<span>.out<span>.<span>println<span>(<span>"Scheduling: " <span>+ System<span>.<span>nanoTime<span>(<span>)<span>)<span>;
ScheduledFuture<span><<span>?<span>> future <span>= executor<span>.<span>schedule<span>(task<span>,<span>3<span>,TimeUnit<span>.SECONDS<span>)<span>;TimeUnit<span>.MILLISECONDS<span>.<span>sleep<span>(<span>1337<span>)<span>;
<span>long remainingDelay <span>= future<span>.<span>getDelay<span>(TimeUnit<span>.MILLISECONDS<span>)<span>;
System<span>.out<span>.<span>printf<span>(<span>"Remaining Delay: %sms"<span>,remainingDelay<span>)<span>;
Scheduling a task produces a specialized future of type ScheduledFuture
which - in addition to Future
- provides the method getDelay()
to retrieve the remaining delay. After this delay has elapsed the task will be executed concurrently.
In order to schedule tasks to be executed periodically,executors provide the two methods scheduleAtFixedrate()
and scheduleWithFixedDelay()
. The first method is capable of executing tasks with a fixed time rate,e.g. once every second as demonstrated in this example:
Runnable task <span>= <span>(<span>) <span>-<span>> System<span>.out<span>.<span>println<span>(<span>"Scheduling: " <span>+ System<span>.<span>nanoTime<span>(<span>)<span>)<span>;<span>int initialDelay <span>= <span>0<span>;
<span>int period <span>= <span>1<span>;
executor<span>.<span>scheduleAtFixedrate<span>(task<span>,initialDelay<span>,period<span>,TimeUnit<span>.SECONDS<span>)<span>;
Additionally this method accepts an initial delay which describes the leading wait time before the task will be executed for the first time.
Please keep in mind that scheduleAtFixedrate()
doesn't take into account the actual duration of the task. So if you specify a period of one second but the task needs 2 seconds to be executed then the thread pool will working to capacity very soon.
In that case you should consider using scheduleWithFixedDelay()
instead. This method works just like the counterpart described above. The difference is that the wait time period applies between the end of a task and the start of the next task. For example:
scheduledexecutorservice executor Runnable task <span>= <span>(<span>) <span>-<span>> <span>{
<span>try <span>{
TimeUnit<span>.SECONDS<span>.<span>sleep<span>(<span>2<span>)<span>;
System<span>.out<span>.<span>println<span>(<span>"Scheduling: " <span>+ System<span>.<span>nanoTime<span>(<span>)<span>)<span>;
<span>}
<span>catch <span>(<span>InterruptedException e<span>) <span>{
System<span>.err<span>.<span>println<span>(<span>"task interrupted"<span>)<span>;
<span>}
<span>}<span>;
executor<span>.<span>scheduleWithFixedDelay<span>(task<span>,<span>0<span>,<span>1<span>,TimeUnit<span>.SECONDS<span>)<span>;
This example schedules a task with a fixed delay of one second between the end of an execution and the start of the next execution. The initial delay is zero and the tasks duration is two seconds. So we end up with an execution interval of 0s,3s,6s,9s and so on. As you can see scheduleWithFixedDelay()
is handy if you cannot predict the duration of the scheduled tasks.
This was the first part out of a series of concurrency tutorials. I recommend practicing the shown code samples by your own. You find all code samples from this article on ,so feel free to fork the repo and .
<h1>Synchronization and Locks
Welcome to the second part of my Java 8 Concurrency Tutorial out of a series of guides teaching multi-threaded programming in Java 8 with easily understood code examples. In the next 15 min you learn how to synchronize access to mutable shared variables via the synchronized keyword,locks and semaphores.
- Part 1:
- Part 2: Synchronization and Locks
- Part 3:
The majority of concepts shown in this article also work in older versions of Java. However the code samples focus on Java 8 and make heavy use of lambda expressions and new concurrency features. If you're not yet familiar with lambdas I recommend reading my first.
For simplicity the code samples of this tutorial make use of the two helper methods sleep(seconds)
and stop(executor)
as defined .
Synchronized
In the we've learned how to execute code in parallel via executor services. When writing such multi-threaded code you have to pay particular attention when accessing shared mutable variables concurrently from multiple threads. Let's just say we want to increment an integer which is accessible simultaneously from multiple threads.
We define a field count
with a method increment()
to increase count by one:
<span>void <span>increment<span>(<span>) <span>{
count <span>= count <span>+ <span>1<span>;
<span>}
When calling this method concurrently from multiple threads we're in serious trouble:
IntStream<span>.<span>range<span>(<span>0<span>,<span>10000<span>)
<span>.<span>forEach<span>(i <span>-<span>> executor<span>.<span>submit<span>(<span>this<span>:<span>:increment<span>)<span>)<span>;<span>stop<span>(executor<span>)<span>;
System<span>.out<span>.<span>println<span>(count<span>)<span>; <span>// 9965
Instead of seeing a constant result count of 10000 the actual result varies with every execution of the above code. The reason is that we share a mutable variable upon different threads without synchronizing the access to this variable which results in a .
Three steps have to be performed in order to increment the number: (i) read the current value,(ii) increase this value by one and (iii) write the new value to the variable. If two threads perform these steps in parallel it's possible that both threads perform step 1 simultaneously thus reading the same current value. This results in lost writes so the actual result is lower. In the above sample 35 increments got lost due to concurrent unsynchronized access to count but you may see different results when executing the code by yourself.
Luckily Java supports thread-synchronization since the early days via the synchronized
keyword. We can utilize synchronized
to fix the above race conditions when incrementing the count:
When using incrementSync()
concurrently we get the desired result count of 10000. No race conditions occur any longer and the result is stable with every execution of the code:
IntStream<span>.<span>range<span>(<span>0<span>,<span>10000<span>)
<span>.<span>forEach<span>(i <span>-<span>> executor<span>.<span>submit<span>(<span>this<span>:<span>:incrementSync<span>)<span>)<span>;<span>stop<span>(executor<span>)<span>;
System<span>.out<span>.<span>println<span>(count<span>)<span>; <span>// 10000
The synchronized
keyword is also available as a block statement.
Internally Java uses a so called monitor also kNown as in order to manage synchronization. This monitor is bound to an object,e.g. when using synchronized methods each method share the same monitor of the corresponding object.
All implicit monitors implement the reentrant characteristics. Reentrant means that locks are bound to the current thread. A thread can safely acquire the same lock multiple times without running into deadlocks (e.g. a synchronized method calls another synchronized method on the same object).
Locks
Instead of using implicit locking via the synchronized
keyword the Concurrency API supports various explicit locks specified by the Lock
interface. Locks support various methods for finer grained lock control thus are more expressive than implicit monitors.
Multiple lock implementations are available in the standard JDK which will be demonstrated in the following sections.
ReentrantLock
The class ReentrantLock
is a mutual exclusion lock with the same basic behavior as the implicit monitors accessed via the synchronized
keyword but with extended capabilities. As the name suggests this lock implements reentrant characteristics just as implicit monitors.
Let's see how the above sample looks like using ReentrantLock
:
<span>void <span>increment<span>(<span>) <span>{
lock<span>.<span>lock<span>(<span>)<span>;
<span>try <span>{
count<span>++<span>;
<span>} <span>finally <span>{
lock<span>.<span>unlock<span>(<span>)<span>;
<span>}
<span>}
A lock is acquired via lock()
and released via unlock()
. It's important to wrap your code into a try/finally
block to ensure unlocking in case of exceptions. This method is thread-safe just like the synchronized counterpart. If another thread has already acquired the lock subsequent calls to lock()
pause the current thread until the lock has been unlocked. Only one thread can hold the lock at any given time.
Locks support various methods for fine grained control as seen in the next sample:
executor<span>.<span>submit<span>(<span>(<span>) <span>-<span>> <span>{
lock<span>.<span>lock<span>(<span>)<span>;
<span>try <span>{
<span>sleep<span>(<span>1<span>)<span>;
<span>} <span>finally <span>{
lock<span>.<span>unlock<span>(<span>)<span>;
<span>}
<span>}<span>)<span>;executor<span>.<span>submit<span>(<span>(<span>) <span>-<span>> <span>{
System<span>.out<span>.<span>println<span>(<span>"Locked: " <span>+ lock<span>.<span>isLocked<span>(<span>)<span>)<span>;
System<span>.out<span>.<span>println<span>(<span>"Held by me: " <span>+ lock<span>.<span>isHeldByCurrentThread<span>(<span>)<span>)<span>;
<span>boolean locked <span>= lock<span>.<span>tryLock<span>(<span>)<span>;
System<span>.out<span>.<span>println<span>(<span>"Lock acquired: " <span>+ locked<span>)<span>;
<span>}<span>)<span>;<span>stop<span>(executor<span>)<span>;
While the first task holds the lock for one second the second task obtains different information about the current state of the lock:
The method tryLock()
as an alternative to lock()
tries to acquire the lock without pausing the current thread. The boolean result must be used to check if the lock has actually been acquired before accessing any shared mutable variables.
ReadWriteLock
The interface ReadWriteLock
specifies another type of lock maintaining a pair of locks for read and write access. The idea behind read-write locks is that it's usually safe to read mutable variables concurrently as long as nobody is writing to this variable. So the read-lock can be held simultaneously by multiple threads as long as no threads hold the write-lock. This can improve performance and throughput in case that reads are more frequent than writes.
map executor<span>.<span>submit<span>(<span>(<span>) <span>-<span>> <span>{
lock<span>.<span>writeLock<span>(<span>)<span>.<span>lock<span>(<span>)<span>;
<span>try <span>{
<span>sleep<span>(<span>1<span>)<span>;
map<span>.<span>put<span>(<span>"foo"<span>,<span>"bar"<span>)<span>;
<span>} <span>finally <span>{
lock<span>.<span>writeLock<span>(<span>)<span>.<span>unlock<span>(<span>)<span>;
<span>}
<span>}<span>)<span>;
The above example first acquires a write-lock in order to put a new value to the map after sleeping for one second. Before this task has finished two other tasks are being submitted trying to read the entry from the map and sleep for one second:
executor<span>.<span>submit<span>(readTask<span>)<span>;
executor<span>.<span>submit<span>(readTask<span>)<span>;
<span>stop<span>(executor<span>)<span>;
When you execute this code sample you'll notice that both read tasks have to wait the whole second until the write task has finished. After the write lock has been released both read tasks are executed in parallel and print the result simultaneously to the console. They don't have to wait for each other to finish because read-locks can safely be acquired concurrently as long as no write-lock is held by another thread.
StampedLock
Java 8 ships with a new kind of lock called StampedLock
which also support read and write locks just like in the example above. In contrast to ReadWriteLock
the locking methods of a StampedLock
return a stamp represented by a long
value. You can use these stamps to either release a lock or to check if the lock is still valid. Additionally stamped locks support another lock mode called optimistic locking.
Let's rewrite the last example code to use StampedLock
instead of ReadWriteLock
:
map executor<span>.<span>submit<span>(<span>(<span>) <span>-<span>> <span>{
<span>long stamp <span>= lock<span>.<span>writeLock<span>(<span>)<span>;
<span>try <span>{
<span>sleep<span>(<span>1<span>)<span>;
map<span>.<span>put<span>(<span>"foo"<span>,<span>"bar"<span>)<span>;
<span>} <span>finally <span>{
lock<span>.<span>unlockWrite<span>(stamp<span>)<span>;
<span>}
<span>}<span>)<span>;
Runnable readTask <span>= <span>(<span>) <span>-<span>> <span>{
<span>long stamp <span>= lock<span>.<span>readLock<span>(<span>)<span>;
<span>try <span>{
System<span>.out<span>.<span>println<span>(map<span>.<span>get<span>(<span>"foo"<span>)<span>)<span>;
<span>sleep<span>(<span>1<span>)<span>;
<span>} <span>finally <span>{
lock<span>.<span>unlockRead<span>(stamp<span>)<span>;
<span>}
<span>}<span>;
executor<span>.<span>submit<span>(readTask<span>)<span>;
executor<span>.<span>submit<span>(readTask<span>)<span>;
<span>stop<span>(executor<span>)<span>;
Obtaining a read or write lock via readLock()
or writeLock()
returns a stamp which is later used for unlocking within the finally block. Keep in mind that stamped locks don't implement reentrant characteristics. Each call to lock returns a new stamp and blocks if no lock is available even if the same thread already holds a lock. So you have to pay particular attention not to run into deadlocks.
Just like in the previous ReadWriteLock
example both read tasks have to wait until the write lock has been released. Then both read tasks print to the console simultaneously because multiple reads doesn't block each other as long as no write-lock is held.
The next example demonstrates optimistic locking:
executor<span>.<span>submit<span>(<span>(<span>) <span>-<span>> <span>{
<span>long stamp <span>= lock<span>.<span>tryOptimisticRead<span>(<span>)<span>;
<span>try <span>{
System<span>.out<span>.<span>println<span>(<span>"Optimistic Lock Valid: " <span>+ lock<span>.<span>validate<span>(stamp<span>)<span>)<span>;
<span>sleep<span>(<span>1<span>)<span>;
System<span>.out<span>.<span>println<span>(<span>"Optimistic Lock Valid: " <span>+ lock<span>.<span>validate<span>(stamp<span>)<span>)<span>;
<span>sleep<span>(<span>2<span>)<span>;
System<span>.out<span>.<span>println<span>(<span>"Optimistic Lock Valid: " <span>+ lock<span>.<span>validate<span>(stamp<span>)<span>)<span>;
<span>} <span>finally <span>{
lock<span>.<span>unlock<span>(stamp<span>)<span>;
<span>}
<span>}<span>)<span>;executor<span>.<span>submit<span>(<span>(<span>) <span>-<span>> <span>{
<span>long stamp <span>= lock<span>.<span>writeLock<span>(<span>)<span>;
<span>try <span>{
System<span>.out<span>.<span>println<span>(<span>"Write Lock acquired"<span>)<span>;
<span>sleep<span>(<span>2<span>)<span>;
<span>} <span>finally <span>{
lock<span>.<span>unlock<span>(stamp<span>)<span>;
System<span>.out<span>.<span>println<span>(<span>"Write done"<span>)<span>;
<span>}
<span>}<span>)<span>;<span>stop<span>(executor<span>)<span>;
An optimistic read lock is acquired by calling tryOptimisticRead()
which always returns a stamp without blocking the current thread,no matter if the lock is actually available. If there's already a write lock active the returned stamp equals zero. You can always check if a stamp is valid by calling lock.validate(stamp)
.
Executing the above code results in the following output:
The optimistic lock is valid right after acquiring the lock. In contrast to normal read locks an optimistic lock doesn't prevent other threads to obtain a write lock instantaneously. After sending the first thread to sleep for one second the second thread obtains a write lock without waiting for the optimistic read lock to be released. From this point the optimistic read lock is no longer valid. Even when the write lock is released the optimistic read locks stays invalid.
So when working with optimistic locks you have to validate the lock every time after accessing any shared mutable variable to make sure the read was still valid.
Sometimes it's useful to convert a read lock into a write lock without unlocking and locking again. StampedLock
provides the method tryConvertToWriteLock()
for that purpose as seen in the next sample:
executor<span>.<span>submit<span>(<span>(<span>) <span>-<span>> <span>{
<span>long stamp <span>= lock<span>.<span>readLock<span>(<span>)<span>;
<span>try <span>{
<span>if <span>(count <span>== <span>0<span>) <span>{
stamp <span>= lock<span>.<span>tryConvertToWriteLock<span>(stamp<span>)<span>;
<span>if <span>(stamp <span>== 0L<span>) <span>{
System<span>.out<span>.<span>println<span>(<span>"Could not convert to write lock"<span>)<span>;
stamp <span>= lock<span>.<span>writeLock<span>(<span>)<span>;
<span>}
count <span>= <span>23<span>;
<span>}
System<span>.out<span>.<span>println<span>(count<span>)<span>;
<span>} <span>finally <span>{
lock<span>.<span>unlock<span>(stamp<span>)<span>;
<span>}
<span>}<span>)<span>;<span>stop<span>(executor<span>)<span>;
The task first obtains a read lock and prints the current value of field count
to the console. But if the current value is zero we want to assign a new value of 23
. We first have to convert the read lock into a write lock to not break potential concurrent access by other threads. Calling tryConvertToWriteLock()
doesn't block but may return a zero stamp indicating that no write lock is currently available. In that case we call writeLock()
to block the current thread until a write lock is available.
Semaphores
In addition to locks the Concurrency API also supports counting semaphores. Whereas locks usually grant exclusive access to variables or resources,a semaphore is capable of maintaining whole sets of permits. This is useful in different scenarios where you have to limit the amount concurrent access to certain parts of your application.
Here's an example how to limit access to a long running task simulated by sleep(5)
:
Semaphore semaphore <span>= <span>new <span>Semaphore<span>(<span>5<span>)<span>;Runnable longRunningTask <span>= <span>(<span>) <span>-<span>> <span>{
<span>boolean permit <span>= <span>false<span>;
<span>try <span>{
permit <span>= semaphore<span>.<span>tryAcquire<span>(<span>1<span>,TimeUnit<span>.SECONDS<span>)<span>;
<span>if <span>(permit<span>) <span>{
System<span>.out<span>.<span>println<span>(<span>"Semaphore acquired"<span>)<span>;
<span>sleep<span>(<span>5<span>)<span>;
<span>} <span>else <span>{
System<span>.out<span>.<span>println<span>(<span>"Could not acquire semaphore"<span>)<span>;
<span>}
<span>} <span>catch <span>(<span>InterruptedException e<span>) <span>{
<span>throw <span>new <span>IllegalStateException<span>(e<span>)<span>;
<span>} <span>finally <span>{
<span>if <span>(permit<span>) <span>{
semaphore<span>.<span>release<span>(<span>)<span>;
<span>}
<span>}
<span>}IntStream<span>.<span>range<span>(<span>0<span>,<span>10<span>)
<span>.<span>forEach<span>(i <span>-<span>> executor<span>.<span>submit<span>(longRunningTask<span>)<span>)<span>;<span>stop<span>(executor<span>)<span>;
The executor can potentially run 10 tasks concurrently but we use a semaphore of size 5,thus limiting concurrent access to 5. It's important to use a try/finally
block to properly release the semaphore even in case of exceptions.
Executing the above code results in the following output:
The semaphores permits access to the actual long running operation simulated by sleep(5)
up to a maximum of 5. Every subsequent call to tryAcquire()
elapses the maximum wait timeout of one second,resulting in the appropriate console output that no semaphore could be acquired.
This was the second part out of a series of concurrency tutorials. More parts will be released in the near future,so stay tuned. As usual you find all code samples from this article on ,so feel free to fork the repo and try it by your own.
I hope you've enjoyed this article. If you have any further questions send me your Feedback in the comments below. You should also for more dev-related stuff!
- Part 1:
- Part 2: Synchronization and Locks
- Part 3:
Welcome to the third part of my tutorial series about multi-threaded programming in Java 8. This tutorial covers two important parts of the Concurrency API: Atomic Variables and Concurrent Maps. Both have been greatly improved with the introduction of lambda expressions and functional programming in the latest Java 8 release. All those new features are described with a bunch of easily understood code samples. Enjoy!
- Part 1:
- Part 2:
- Part 3: Atomic Variables and ConcurrentMap
For simplicity the code samples of this tutorial make use of the two helper methods sleep(seconds)
and stop(executor)
as defined .
AtomicInteger
The package java.concurrent.atomic
contains many useful classes to perform atomic operations. An operation is atomic when you can safely perform the operation in parallel on multiple threads without using the synchronized
keyword or locks as shown in my .
Internally,the atomic classes make heavy use of (CAS),an atomic instruction directly supported by most modern cpus. Those instructions usually are much faster than synchronizing via locks. So my advice is to prefer atomic classes over locks in case you just have to change a single mutable variable concurrently.
Now let's pick one of the atomic classes for a few examples: AtomicInteger
ExecutorService executor <span>= Executors<span>.<span>newFixedThreadPool<span>(<span>2<span>)<span>;IntStream<span>.<span>range<span>(<span>0<span>,<span>1000<span>)
<span>.<span>forEach<span>(i <span>-<span>> executor<span>.<span>submit<span>(atomicInt<span>:<span>:incrementAndGet<span>)<span>)<span>;<span>stop<span>(executor<span>)<span>;
System<span>.out<span>.<span>println<span>(atomicInt<span>.<span>get<span>(<span>)<span>)<span>; <span>// => 1000
By using AtomicInteger
as a replacement for Integer
we're able to increment the number concurrently in a thread-safe manor without synchronizing the access to the variable. The method incrementAndGet()
is an atomic operation so we can safely call this method from multiple threads.
AtomicInteger supports varIoUs kinds of atomic operations. The method updateAndGet()
accepts a lambda expression in order to perform arbitrary arithmetic operations upon the integer:
ExecutorService executor <span>= Executors<span>.<span>newFixedThreadPool<span>(<span>2<span>)<span>;IntStream<span>.<span>range<span>(<span>0<span>,<span>1000<span>)
<span>.<span>forEach<span>(i <span>-<span>> <span>{
Runnable task <span>= <span>(<span>) <span>-<span>>
atomicInt<span>.<span>updateAndGet<span>(n <span>-<span>> n <span>+ <span>2<span>)<span>;
executor<span>.<span>submit<span>(task<span>)<span>;
<span>}<span>)<span>;<span>stop<span>(executor<span>)<span>;
System<span>.out<span>.<span>println<span>(atomicInt<span>.<span>get<span>(<span>)<span>)<span>; <span>// => 2000
The method accumulateAndGet()
accepts another kind of lambda expression of type IntBinaryOperator
. We use this method to sum up all values from 0 to 1000 concurrently in the next sample:
ExecutorService executor <span>= Executors<span>.<span>newFixedThreadPool<span>(<span>2<span>)<span>;IntStream<span>.<span>range<span>(<span>0<span>,<span>1000<span>)
<span>.<span>forEach<span>(i <span>-<span>> <span>{
Runnable task <span>= <span>(<span>) <span>-<span>>
atomicInt<span>.<span>accumulateAndGet<span>(i<span>,<span>(n<span>,m<span>) <span>-<span>> n <span>+ m<span>)<span>;
executor<span>.<span>submit<span>(task<span>)<span>;
<span>}<span>)<span>;<span>stop<span>(executor<span>)<span>;
System<span>.out<span>.<span>println<span>(atomicInt<span>.<span>get<span>(<span>)<span>)<span>; <span>// => 499500
Other useful atomic classes are , and .
LongAdder
The class LongAdder
as an alternative to AtomicLong
can be used to consecutively add values to a number.
IntStream<span>.<span>range<span>(<span>0<span>,<span>1000<span>)
<span>.<span>forEach<span>(i <span>-<span>> executor<span>.<span>submit<span>(adder<span>:<span>:increment<span>)<span>)<span>;<span>stop<span>(executor<span>)<span>;
System<span>.out<span>.<span>println<span>(adder<span>.<span>sumThenReset<span>(<span>)<span>)<span>; <span>// => 1000
LongAdder provides methods add()
and increment()
just like the atomic number classes and is also thread-safe. But instead of summing up a single result this class maintains a set of variables internally to reduce contention over threads. The actual result can be retrieved by calling sum()
or sumThenReset()
.
This class is usually preferable over atomic numbers when updates from multiple threads are more common than reads. This is often the case when capturing statistical data,e.g. you want to count the number of requests served on a web server. The drawback of LongAdder
is higher memory consumption because a set of variables is held in-memory.
LongAccumulator
LongAccumulator is a more generalized version of LongAdder. Instead of performing simple add operations the class LongAccumulator
builds around a lambda expression of type LongBinaryOperator
as demonstrated in this code sample:
ExecutorService executor <span>= Executors<span>.<span>newFixedThreadPool<span>(<span>2<span>)<span>;
IntStream<span>.<span>range<span>(<span>0<span>,<span>10<span>)
<span>.<span>forEach<span>(i <span>-<span>> executor<span>.<span>submit<span>(<span>(<span>) <span>-<span>> accumulator<span>.<span>accumulate<span>(i<span>)<span>)<span>)<span>;
<span>stop<span>(executor<span>)<span>;
System<span>.out<span>.<span>println<span>(accumulator<span>.<span>getThenReset<span>(<span>)<span>)<span>; <span>// => 2539
We create a LongAccumulator with the function 2 * x + y
and an initial value of one. With every call to accumulate(i)
both the current result and the value i
are passed as parameters to the lambda expression.
A LongAccumulator
just like LongAdder
maintains a set of variables internally to reduce contention over threads.
ConcurrentMap
The interface ConcurrentMap
extends the map interface and defines one of the most useful concurrent collection types. Java 8 introduces functional programming by adding new methods to this interface.
In the next code snippets we use the following sample map to demonstrates those new methods:
map
The method forEach()
accepts a lambda expression of type BiConsumer
with both the key and value of the map passed as parameters. It can be used as a replacement to for-each loops to iterate over the entries of the concurrent map. The iteration is performed sequentially on the current thread.
System
The method putIfAbsent()
puts a new value into the map only if no value exists for the given key. At least for the ConcurrentHashMap
implementation of this method is thread-safe just like put()
so you don't have to synchronize when accessing the map concurrently from different threads:
The method getOrDefault()
returns the value for the given key. In case no entry exists for this key the passed default value is returned:
The method replaceAll()
accepts a lambda expression of type BiFunction
. BiFunctions take two parameters and return a single value. In this case the function is called with the key and the value of each map entry and returns a new value to be assigned for the current key:
Instead of replacing all values of the map compute()
let's us transform a single entry. The method accepts both the key to be computed and a bi-function to specify the transformation of the value.
value
In addition to compute()
two variants exist: computeIfAbsent()
and computeIfPresent()
. The functional parameters of these methods only get called if the key is absent or present respectively.
Finally,the method merge()
can be utilized to unify a new value with an existing value in the map. Merge accepts a key,the new value to be merged into the existing entry and a bi-function to specify the merging behavior of both values:
newVal
ConcurrentHashMap
All those methods above are part of the ConcurrentMap
interface,thereby available to all implementations of that interface. In addition the most important implementationConcurrentHashMap
has been further enhanced with a couple of new methods to perform parallel operations upon the map.
Just like parallel streams those methods use a special ForkJoinPool
available via ForkJoinPool.commonPool()
in Java 8. This pool uses a preset parallelism which depends on the number of available cores. Four CPU cores are available on my machine which results in a parallelism of three:
This value can be decreased or increased by setting the following JVM parameter:
We use the same example map for demonstrating purposes but this time we work upon the concrete implementation ConcurrentHashMap
instead of the interface ConcurrentMap
,so we can access all public methods from this class:
Java 8 introduces three kinds of parallel operations: forEach
, search
and reduce
. Each of those operations are available in four forms accepting functions with keys,values,entries and key-value pair arguments.
All of those methods use a common first argument called parallelismThreshold
. This threshold indicates the minimum collection size when the operation should be executed in parallel. E.g. if you pass a threshold of 500 and the actual size of the map is 499 the operation will be performed sequentially on a single thread. In the next examples we use a threshold of one to always force parallel execution for demonstrating purposes.
ForEach
The method forEach()
is capable of iterating over the key-value pairs of the map in parallel. The lambda expression of type BiConsumer
is called with the key and value of the current iteration step. In order to visualize parallel execution we print the current threads name to the console. Keep in mind that in my case the underlying ForkJoinPool
uses up to a maximum of three threads.
System<span>// key: r2; value: d2; thread: main
<span>// key: foo; value: bar; thread: ForkJoinPool.commonPool-worker-1
<span>// key: han; value: solo; thread: ForkJoinPool.commonPool-worker-2
<span>// key: c3; value: p0; thread: main
Search
The method search()
accepts a BiFunction
returning a non-null search result for the current key-value pair or null
if the current iteration doesn't match the desired search criteria. As soon as a non-null result is returned further processing is suppressed. Keep in mind that ConcurrentHashMap
is unordered. The search function should not depend on the actual processing order of the map. If multiple entries of the map match the given search function the result may be non-deterministic.
<span>// ForkJoinPool.commonPool-worker-2
<span>// main
<span>// ForkJoinPool.commonPool-worker-3
<span>// Result: bar
Here's another example searching solely on the values of the map:
System<span>.out<span>.<span>println<span>(<span>"Result: " <span>+ result<span>)<span>;
<span>// ForkJoinPool.commonPool-worker-2
<span>// main
<span>// main
<span>// ForkJoinPool.commonPool-worker-1
<span>// Result: solo
Reduce
The method reduce()
already known from Java 8 Streams accepts two lambda expressions of type BiFunction
. The first function transforms each key-value pair into a single value of any type. The second function combines all those transformed values into a single result,ignoring any possible null
values.
System<span>.out<span>.<span>println<span>(<span>"Result: " <span>+ result<span>)<span>;
<span>// Transform: ForkJoinPool.commonPool-worker-2
<span>// Transform: main
<span>// Transform: ForkJoinPool.commonPool-worker-3
<span>// Reduce: ForkJoinPool.commonPool-worker-3
<span>// Transform: main
<span>// Reduce: main
<span>// Reduce: main
<span>// Result: r2=d2,c3=p0,han=solo,foo=bar
I hope you've enjoyed reading the third part of my tutorial series about Java 8 Concurrency. The code samples from this tutorial are along with many other Java 8 code snippets. You're welcome to fork the repo and try it by your own.
If you want to support my work,please share this tutorial with your friends. You should also as I constantly tweet about Java and programming related stuff.
- Part 1:
- Part 2:
- Part 3: Atomic Variables and ConcurrentMap
今天关于Java Concurrency/Threads和1——join详解的介绍到此结束,谢谢您的阅读,有关hive join,outer join, semi join详解、Hive 中Join的专题---Join详解、Java 21 新特性:虚拟线程(Virtual Threads)、Java 8 Concurrency Tutorial--转等更多相关知识的信息可以在本站进行查询。
本文标签: