GVKun编程网logo

JAVA多线程限流解决并发问题(java多线程限流解决并发问题的方法)

22

对于JAVA多线程限流解决并发问题感兴趣的读者,本文将提供您所需要的所有信息,我们将详细讲解java多线程限流解决并发问题的方法,并且为您提供关于Java中有哪些无锁技术来解决并发问题?如何使用?、j

对于JAVA多线程限流解决并发问题感兴趣的读者,本文将提供您所需要的所有信息,我们将详细讲解java多线程限流解决并发问题的方法,并且为您提供关于Java 中有哪些无锁技术来解决并发问题?如何使用?、java 多线程并发问题、java 并发问题,java 并发容器解决全局变量的并发问题、java1.5多线程_【Java多线程】JDK1.5并发包API杂谈的宝贵知识。

本文目录一览:

JAVA多线程限流解决并发问题(java多线程限流解决并发问题的方法)

JAVA多线程限流解决并发问题(java多线程限流解决并发问题的方法)

package concurrent; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; /** * Auth: zhouhongliang * Date:2019/8/1 * 并发限流功能 * Semaphore */ public class SemaphoreDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); Semaphore semaphore = new Semaphore(3); for (int i = 0; i < 10; i++) { executorService.execute(() -> { try { //semaphore.acquire();//一直等待 if (semaphore.tryAcquire(3,TimeUnit.SECONDS)) {//等待3秒 play(); semaphore.release(); } else { System.out.println(Thread.currentThread().getName() + " 进入超时"); } } catch (Exception e) { e.printstacktrace(); } finally { } }); } executorService.shutdown(); } /** * 模拟任务 */ public static void play() { SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println(simpleDateFormat.format(new Date()) + " " + Thread.currentThread().getName() + " 进入"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printstacktrace(); } System.out.println(simpleDateFormat.format(new Date()) + " " + Thread.currentThread().getName() + " 退出"); } }

输出结果:
2019-08-01 11:09:50 pool-1-thread-1 进入
2019-08-01 11:09:50 pool-1-thread-3 进入
2019-08-01 11:09:50 pool-1-thread-2 进入
2019-08-01 11:09:52 pool-1-thread-3 退出
2019-08-01 11:09:52 pool-1-thread-1 退出
2019-08-01 11:09:52 pool-1-thread-2 退出
2019-08-01 11:09:52 pool-1-thread-4 进入
2019-08-01 11:09:52 pool-1-thread-5 进入
2019-08-01 11:09:52 pool-1-thread-6 进入
pool-1-thread-7 进入超时
pool-1-thread-8 进入超时
pool-1-thread-9 进入超时
pool-1-thread-10 进入超时
2019-08-01 11:09:54 pool-1-thread-6 退出
2019-08-01 11:09:54 pool-1-thread-5 退出
2019-08-01 11:09:54 pool-1-thread-4 退出

Process finished with exit code 0

Java 中有哪些无锁技术来解决并发问题?如何使用?

Java 中有哪些无锁技术来解决并发问题?如何使用?

除了使用 synchronized、Lock 加锁之外,Java 中还有很多不需要加锁就可以解决并发问题的工具类

 

1、原子工具类

JDK 1.8 中,java.util.concurrent.atomic 包下类都是原子类,原子类都是基于 sun.misc.Unsafe 实现的。

  • CPU 为了解决并发问题,提供了 CAS 指令,全称 Compare And Swap,即比较并交互
  • CAS 指令需要 3 个参数,变量、比较值、新值。当变量的当前值与比较值相等时,才把变量更新为新值
  • CAS 是一条 CPU 指令,由 CPU 硬件级别上保证原子性
  • java.util.concurrent.atomic 包中的原子分为:原子性基本数据类型、原子性对象引用类型、原子性数组、原子性对象属性更新器和原子性累加器

原子性基本数据类型:AtomicBoolean、AtomicInteger、AtomicLong

原子性对象引用类型:AtomicReference、AtomicStampedReference、AtomicMarkableReference

原子性数组:AtomicIntegerArray、AtomicLongArray、AtomicReferenceArray

原子性对象属性更新:AtomicIntegerFieldUpdater、AtomicLongFieldUpdater、AtomicReferenceFieldUpdater

原子性累加器:DoubleAccumulator、DoubleAdder、LongAccumulator、LongAdder

 

修改我们之前测试原子性问题的类,使用 AtomicInteger 的简单例子

package constxiong.concurrency.a026;

import java.util.concurrent.atomic.AtomicInteger;

/**
 * 测试 原子类 AtomicInteger
 * 
 * @author ConstXiong
 */
public class TestAtomicInteger {

	// 计数变量
	static volatile AtomicInteger count = new AtomicInteger(0);

	public static void main(String[] args) throws InterruptedException {
		// 线程 1 给 count 加 10000
		Thread t1 = new Thread(() -> {
			for (int j = 0; j < 10000; j++) {
				count.incrementAndGet();
			}
			System.out.println("thread t1 count 加 10000 结束");
		});

		// 线程 2 给 count 加 10000
		Thread t2 = new Thread(() -> {
			for (int j = 0; j < 10000; j++) {
				count.incrementAndGet();
			}
			System.out.println("thread t2 count 加 10000 结束");
		});

		// 启动线程 1
		t1.start();
		// 启动线程 2
		t2.start();

		// 等待线程 1 执行完成
		t1.join();
		// 等待线程 2 执行完成
		t2.join();

		// 打印 count 变量
		System.out.println(count.get());
	}

}

 

打印结果如预期

thread t2 count 加 10000 结束
thread t1 count 加 10000 结束
20000

 

2、线程本地存储

  • java.lang.ThreadLocal 类用于线程本地化存储。
  • 线程本地化存储,就是为每一个线程创建一个变量,只有本线程可以在该变量中查看和修改值。
  • 典型的使用例子就是,spring 在处理数据库事务问题的时候,就用了 ThreadLocal 为每个线程存储了各自的数据库连接 Connection。
  • 使用 ThreadLocal 要注意,在不使用该变量的时候,一定要调用 remove() 方法移除变量,否则可能造成内存泄漏的问题。

 

示例

package constxiong.concurrency.a026;

/**
 * 测试 原子类 AtomicInteger
 * 
 * @author ConstXiong
 */
public class TestThreadLocal {

	// 线程本地存储变量
	private static final ThreadLocal<Integer> THREAD_LOCAL_NUM = new ThreadLocal<Integer>() {
		@Override
		protected Integer initialValue() {//初始值
			return 0;
		}
	};

	public static void main(String[] args) {
		for (int i = 0; i < 3; i++) {// 启动三个线程
			Thread t = new Thread() {
				@Override
				public void run() {
					add10ByThreadLocal();
				}
			};
			t.start();
		}
	}

	/**
	 * 线程本地存储变量加 5
	 */
	private static void add10ByThreadLocal() {
		try {
			for (int i = 0; i < 5; i++) {
				Integer n = THREAD_LOCAL_NUM.get();
				n += 1;
				THREAD_LOCAL_NUM.set(n);
				System.out.println(Thread.currentThread().getName() + " : ThreadLocal num=" + n);
			}
		} finally {
			THREAD_LOCAL_NUM.remove();// 将变量移除
		}
	}
}

 

每个线程最后一个值都打印到了 5

Thread-0 : ThreadLocal num=1
Thread-2 : ThreadLocal num=1
Thread-1 : ThreadLocal num=1
Thread-2 : ThreadLocal num=2
Thread-0 : ThreadLocal num=2
Thread-2 : ThreadLocal num=3
Thread-0 : ThreadLocal num=3
Thread-1 : ThreadLocal num=2
Thread-0 : ThreadLocal num=4
Thread-2 : ThreadLocal num=4
Thread-0 : ThreadLocal num=5
Thread-1 : ThreadLocal num=3
Thread-2 : ThreadLocal num=5
Thread-1 : ThreadLocal num=4
Thread-1 : ThreadLocal num=5

 

3、copy-on-write

  • 根据英文名称可以看出,需要写时复制,体现的是一种延时策略。
  • Java 中的 copy-on-write 容器包括:CopyOnWriteArrayList、CopyOnWriteArraySet。
  • 涉及到数组的全量复制,所以也比较耗内存,在写少的情况下使用比较适合。

 

简单的 CopyOnWriteArrayList 的示例,这里只是说明 CopyOnWriteArrayList 怎么用,并且是线程安全的。这个场景并不适合使用 CopyOnWriteArrayList,因为写多读少。

package constxiong.concurrency.a026;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;

/**
 * 测试 copy-on-write
 * @author ConstXiong
 */
public class TestCopyOnWrite {

	private static final Random R = new Random();
	
	private static CopyOnWriteArrayList<Integer> cowList = new CopyOnWriteArrayList<Integer>();
//	private static ArrayList<Integer> cowList = new ArrayList<Integer>();
	
	public static void main(String[] args) throws InterruptedException {
		List<Thread> threadList = new ArrayList<Thread>();
		//启动 1000 个线程,向 cowList 添加 5 个随机整数
		for (int i = 0; i < 1000; i++) {
			Thread t = new Thread(() -> {
				for (int j = 0; j < 5; j++) {
					//休眠 10 毫秒,让线程同时向 cowList 添加整数,引出并发问题
					try {
						Thread.sleep(10);
					} catch (InterruptedException e) {
						e.printStackTrace();
					}
					cowList.add(R.nextInt(100));
				}
			}) ;
			t.start();
			threadList.add(t);
		}
		
		for (Thread t : threadList) {
			t.join();
		}
		System.out.println(cowList.size());
	}
}

 

打印结果

5000

 

如果把

private static CopyOnWriteArrayList<Integer> cowList = new CopyOnWriteArrayList<Integer>();

改为

private static ArrayList<Integer> cowList = new ArrayList<Integer>();

 

打印结果就是小于 5000 的整数了


来一道刷了进BAT的面试题?
 

java 多线程并发问题

java 多线程并发问题

问题:50个线程,先查询数据库的一个记录 t,然后对这个记录+1,最后更新到数据库

更新的时候,不允许使用 update  test_concurrent set sum =sum -1 where id=1,如果这个做就看不出来效果了,必须使用update  test_concurrent set sum =? where id=1)。

1.创建表

CREATE TABLE `test_concurrent` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`sum` bigint(20) DEFAULT NULL COMMENT ''并发的和'',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

2.表里只有一条记录

INSERT INTO `test_concurrent` VALUES (1, 0);

对于这种并发问题有三种解决方法

1.使用悲观锁  select for update

2.使用乐观锁 新增一个字段,查询时设置这个字段,更新时根据这个字段更新

3.update  test_concurrent set sum =? where id=1 and sum =#{t}

4.update  test_concurrent set sum =sum-? where id=1 and sum >=?   

 

总和来说:4是最好的写法。

 

并发线程代码

@Test
    public void testAdd1() {
        log.info("开始执行----------------");
        CountDownLatch latch = new CountDownLatch(50);
        
        ExecutorService executor=new ThreadPoolExecutor(10, 50, 2000, TimeUnit.SECONDS, new ArrayBlockingQueue(50));
        AtomicInteger integer=new AtomicInteger(0);
        for(int i=0;i<50;i++){
            Integer s=i;
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    //log.info("开始执行----------------{}",s);
                    service.add1(1);
                    log.info("开始执行----------------{}",s);
                    integer.incrementAndGet();
                    latch.countDown();
                }
            });
            
            
        }
        try {
            latch.await();
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
        log.info("开始执行----------------{}",integer.intValue());

 注意:线程池的用法

 

java 并发问题,java 并发容器解决全局变量的并发问题

java 并发问题,java 并发容器解决全局变量的并发问题

 为了方便编写出线程安全的程序,Java 里面提供了一些线程安全类和并发工具,比如:同步容器、并发容器、阻塞队列、Synchronizer(比如 CountDownLatch)。今天我们就来讨论下同步容器。

  一。为什么会出现同步容器?

      在 Java 的集合容器框架中,主要有四大类别:List、Set、Queue、Map。

  List、Set、Queue 接口分别继承了 Collection 接口,Map 本身是一个接口。

  注意 Collection 和 Map 是一个顶层接口,而 List、Set、Queue 则继承了 Collection 接口,分别代表数组、集合和队列这三大类容器。 

  像 ArrayList、LinkedList 都是实现了 List 接口,HashSet 实现了 Set 接口,而 Deque(双向队列,允许在队首、队尾进行入队和出队操作)继承了 Queue 接口,PriorityQueue 实现了 Queue 接口。另外 LinkedList(实际上是双向链表)实现了了 Deque 接口。

  像 ArrayList、LinkedList、HashMap 这些容器都是非线程安全的。

  如果有多个线程并发地访问这些容器时,就会出现问题。

  因此,在编写程序时,必须要求程序员手动地在任何访问到这些容器的地方进行同步处理,这样导致在使用这些容器的时候非常地不方便。

  所以,Java 提供了同步容器供用户使用。

  二.Java 中的同步容器类

 在 Java 中,同步容器主要包括 2 类:

  1)Vector、Stack、HashTable、ConcurrentHashMap

  2)Collections 类中提供的静态工厂方法创建的类

  Vector 实现了 List 接口,Vector 实际上就是一个数组,和 ArrayList 类似,但是 Vector 中的方法都是 synchronized 方法,即进行了同步措施。

  Stack 也是一个同步容器,它的方法也用 synchronized 进行了同步,它实际上是继承于 Vector 类。

  HashTable 实现了 Map 接口,它和 HashMap 很相似,但是 HashTable 进行了同步处理,而 HashMap 没有。

       Hashtable 和 ConcurrentHashMap 有什么分别呢?它们都可以用于多线程的环境,但是当 Hashtable 的大小增加到一定的时候,性能会急剧下降,因为迭代时需要被锁定很长的时间。因为 ConcurrentHashMap 引入了分割 (segmentation),不论它变得多么大,仅仅需要锁定 map 的某个部分,而其它的线程不需要等到迭代完成才能访问 map。简而言之,在迭代的过程中,ConcurrentHashMap 仅仅锁定 map 的某个部分,而 Hashtable 则会锁定整个 map。

  Collections 类是一个工具提供类,注意,它和 Collection 不同,Collection 是一个顶层的接口。在 Collections 类中提供了大量的方法,比如对集合或者容器进行排序、查找等操作。最重要的是,在它里面提供了几个静态工厂方法来创建同步容器类,如下图所示:

  三。同步容器的缺陷

 从同步容器的具体实现源码可知,同步容器中的方法采用了 synchronized 进行了同步,那么很显然,这必然会影响到执行性能,另外,同步容器就一定是真正地完全线程安全吗?不一定,这个在下面会讲到。

  我们首先来看一下传统的非同步容器和同步容器的性能差异,我们以 ArrayList 和 Vector 为例:

1. 性能问题

  我们先通过一个例子看一下 Vector 和 ArrayList 在插入数据时性能上的差异:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        ArrayList<Integer> list = new ArrayList<Integer>();
        Vector<Integer> vector = new Vector<Integer>();
        long start = System.currentTimeMillis();
        for(int i=0;i<100000;i++)
            list.add(i);
        long end = System.currentTimeMillis();
        System.out.println("ArrayList进行100000次插入操作耗时:"+(end-start)+"ms");
        start = System.currentTimeMillis();
        for(int i=0;i<100000;i++)
            vector.add(i);
        end = System.currentTimeMillis();
        System.out.println("Vector进行100000次插入操作耗时:"+(end-start)+"ms");
    }
}

这段代码在我机器上跑出来的结果是:

进行同样多的插入操作,Vector 的耗时是 ArrayList 的两倍。

  这只是其中的一方面性能问题上的反映。

  另外,由于 Vector 中的 add 方法和 get 方法都进行了同步,因此,在有多个线程进行访问时,如果多个线程都只是进行读取操作,那么每个时刻就只能有一个线程进行读取,其他线程便只能等待,这些线程必须竞争同一把锁。

  因此为了解决同步容器的性能问题,在 Java 1.5 中提供了并发容器,位于 java.util.concurrent 目录下,并发容器的相关知识将在下一篇文章中讲述。

2. 同步容器真的是安全的吗?

  也有有人认为 Vector 中的方法都进行了同步处理,那么一定就是线程安全的,事实上这可不一定。看下面这段代码:

public class Test {
    static Vector<Integer> vector = new Vector<Integer>();
    public static void main(String[] args) throws InterruptedException {
        while(true) {
            for(int i=0;i<10;i++)
                vector.add(i);
            Thread thread1 = new Thread(){
                public void run() {
                    for(int i=0;i<vector.size();i++)
                        vector.remove(i);
                };
            };
            Thread thread2 = new Thread(){
                public void run() {
                    for(int i=0;i<vector.size();i++)
                        vector.get(i);
                };
            };
            thread1.start();
            thread2.start();
            while(Thread.activeCount()>10)   {
                 
            }
        }
    }
}

在我机器上运行的结果:

正如大家所看到的,这段代码报错了:数组下标越界。

  也许有朋友会问:Vector 是线程安全的,为什么还会报这个错?很简单,对于 Vector,虽然能保证每一个时刻只能有一个线程访问它,但是不排除这种可能:

  当某个线程在某个时刻执行这句时:

for(int i=0;i<vector.size();i++)
    vector.get(i);

假若此时 vector 的 size 方法返回的是 10,i 的值为 9,然后另外一个线程执行了这句:

for(int i=0;i<vector.size();i++)
    vector.remove(i);

       将下标为 9 的元素删除了。

  那么通过 get 方法访问下标为 9 的元素肯定就会出问题了。

  因此为了保证线程安全,必须在方法调用端做额外的同步措施,如下面所示:

public class Test {
    static Vector<Integer> vector = new Vector<Integer>();
    public static void main(String[] args) throws InterruptedException {
        while(true) {
            for(int i=0;i<10;i++)
                vector.add(i);
            Thread thread1 = new Thread(){
                public void run() {
                    synchronized (Test.class{   //进行额外的同步
                        for(int i=0;i<vector.size();i++)
                            vector.remove(i);
                    }
                };
            };
            Thread thread2 = new Thread(){
                public void run() {
                    synchronized (Test.class{
                        for(int i=0;i<vector.size();i++)
                            vector.get(i);
                    }
                };
            };
            thread1.start();
            thread2.start();
            while(Thread.activeCount()>10)   {
                 
            }
        }
    }
}

 3. ConcurrentModificationException 异常

  在对 Vector 等容器并发地进行迭代修改时,会报 ConcurrentModificationException 异常,关于这个异常将会在后续文章中讲述。

  但是在并发容器中不会出现这个问题。

最后贴一个 ArrayList 转 Vector 的代码

//示例
Vector v = new Vector();
ArrayList ar = new ArrayList();
Collections.copy(ar, v);

  资料:

  《深入理解 Java 虚拟机》

  《Java 并发编程实战》

  http://thinkgeek.diandian.com/post/2012-03-24/17905694

  http://blog.csdn.net/cutesource/article/details/5780740


java1.5多线程_【Java多线程】JDK1.5并发包API杂谈

java1.5多线程_【Java多线程】JDK1.5并发包API杂谈

并发与并行

并发

一个或多个处理器执行更多的任务(通过划分时间片来执行更多的任务),从逻辑上实现同时运行:

cccb928f38b47d8a6b6fe7f4ee80db7f.png

如,N个并发请求在一个两核cpu上:

b8918cd48f05d467e278503940d8e86c.png

并行

N个处理器分别同时执行N个任务,从物理上实现同时运行:

321c47294054d3ec7c83e65e7c4e5cba.png

线程互斥

阻塞地加锁,通过reentrantlock.lock()阻塞地加锁

阻塞地加锁的意义,在于在多线程环境下,同一时刻只有一个线程执行加锁代码,其他线程阻塞在加锁代码之前。

97008310dda16f306c17a0e3db44a664.png

reentrantlock继承Lock,Lock接口提供了这些方法:

6d6cfc45a6c7c9ea58fb6c5ca5682d3f.png

reentrantlock与synchronized既相似,又有所不同,比如:

reentrantlock支持公平和非公平加锁,synchronized只支持非公平加锁

reentrantlock支持非阻塞地尝试获取锁,synchronized并不支持

reentrantlock阻塞获取锁支持响应中断,而synchronized获取锁阻塞时不响应中断

package com.nicchagil.exercies.reentrantlock.lock;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.reentrantlock;

import java.util.logging.Logger;

public class LockExercise {

private static Logger logger = Logger.getLogger(LockExercise.class.getName());

private static Lock lock = new reentrantlock();

public static void main(String[] args) {

new Thread(new Runnable() {

@Override

public void run() {

lock.lock();

try {

logger.info(Thread.currentThread().getName() + " run.");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

} finally {

lock.unlock();

}

}

}).start();

new Thread(new Runnable() {

@Override

public void run() {

lock.lock();

try {

logger.info(Thread.currentThread().getName() + " run.");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

} finally {

lock.unlock();

}

}

}).start();

}

}

阻塞地加锁,通过synchronized阻塞地加锁

97008310dda16f306c17a0e3db44a664.png

package com.nicchagil.exercies.reentrantlock.lock;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;

public class SynchronizedExercise {

private static Logger logger = Logger.getLogger(SynchronizedExercise.class.getName());

private static Object obj = new Object();

public static void main(String[] args) {

new Thread(new Runnable() {

@Override

public void run() {

synchronized (obj) {

logger.info(Thread.currentThread().getName() + " run.");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

}

}

}).start();

new Thread(new Runnable() {

@Override

public void run() {

synchronized (obj) {

logger.info(Thread.currentThread().getName() + " run.");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

}

}

}).start();

}

}

获取锁阻塞时能响应中断

reentrantlock使用lockInterruptibly()阻塞获取锁时,能响应中断:

package com.nicchagil.exercies.reentrantlock.interruptibly;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.reentrantlock;

import java.util.logging.Logger;

public class LockInterruptiblyExercise {

private static Logger logger = Logger.getLogger(LockInterruptiblyExercise.class.getName());

public static void main(String[] args) {

Lock lock = new reentrantlock(); // 声明可重入锁

lock.lock(); // 阻塞获取锁

logger.info("阻塞获取锁");

try {

Thread t1 = new Thread(new Runnable() {

@Override

public void run() {

try {

lock.lockInterruptibly(); // 尝试获取锁

} catch (InterruptedException e) {

logger.info(Thread.currentThread().getName() + "获取锁被打断");

}

}

});

t1.start();

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

t1.interrupt(); // 打断线程

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

} finally {

lock.unlock(); // 释放锁

logger.info("释放锁");

}

}

}

结果:

八月 01, 2017 1:59:33 下午 com.nicchagil.exercies.reentrantlock.interruptibly.LockInterruptiblyExercise main

信息: 阻塞获取锁

八月 01, 2017 1:59:36 下午 com.nicchagil.exercies.reentrantlock.interruptibly.LockInterruptiblyExercise$1 run

信息: Thread-1获取锁被打断

八月 01, 2017 1:59:39 下午 com.nicchagil.exercies.reentrantlock.interruptibly.LockInterruptiblyExercise main

信息: 释放锁

而synchronized阻塞获取锁时不响应中断:

package com.nicchagil.exercies.reentrantlock.interruptibly;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;

public class SyncInterruptiblyExercise {

private static Logger logger = Logger.getLogger(SyncInterruptiblyExercise.class.getName());

private static Object obj = new Object();

/**

* 测试synchronized获取锁时被打断是否抛出InterruptedException

* 结果:

* 七月 12, 2017 9:30:42 下午 com.nicchagil.exercies.reentrantlock.interruptibly.SyncInterruptiblyExercise main

* 信息: 阻塞获取锁

* 七月 12, 2017 9:30:48 下午 com.nicchagil.exercies.reentrantlock.interruptibly.SyncInterruptiblyExercise main

* 信息: 释放锁

*/

public static void main(String[] args) {

synchronized (obj) {

logger.info("阻塞获取锁");

Thread t1 = new Thread(new Runnable() {

@Override

public void run() {

try {

synchronized (obj) {

}

} catch (Exception e) {

logger.info(Thread.currentThread().getName() + "获取锁被打断");

}

}

});

t1.start();

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

t1.interrupt(); // 打断线程

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info("释放锁");

}

}

}

结果:

八月 01, 2017 2:01:11 下午 com.nicchagil.exercies.reentrantlock.interruptibly.SyncInterruptiblyExercise main

信息: 阻塞获取锁

八月 01, 2017 2:01:17 下午 com.nicchagil.exercies.reentrantlock.interruptibly.SyncInterruptiblyExercise main

信息: 释放锁

读写锁,reentrantreadwritelock

加上写锁后,无论读锁还是写锁均堵塞:

package com.nicchagil.exercies.reentrantreadwritelock;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.reentrantreadwritelock;

import java.util.logging.Logger;

public class reentrantreadwritelockWriteLockExercise {

private static Logger logger = Logger.getLogger(reentrantreadwritelockWriteLockExercise.class.getName());

private static reentrantreadwritelock reentrantreadwritelock = new reentrantreadwritelock();

public static void main(String[] args) {

/* 先加写锁 */

new Thread(new Runnable() {

@Override

public void run() {

reentrantreadwritelock.writeLock().lock();

logger.info(Thread.currentThread().getName() + "加写锁");

try {

TimeUnit.SECONDS.sleep(10);

} catch (InterruptedException e) {

e.printstacktrace();

} finally {

reentrantreadwritelock.writeLock().unlock();

logger.info(Thread.currentThread().getName() + "解写锁");

}

}

}).start();

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e1) {

e1.printstacktrace();

}

/* 然后加写锁 */

new Thread(new Runnable() {

@Override

public void run() {

reentrantreadwritelock.writeLock().lock();

logger.info(Thread.currentThread().getName() + "加写锁");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

e.printstacktrace();

} finally {

reentrantreadwritelock.writeLock().unlock();

logger.info(Thread.currentThread().getName() + "解写锁");

}

}

}).start();

/* 然后加读锁 */

new Thread(new Runnable() {

@Override

public void run() {

reentrantreadwritelock.readLock().lock();

logger.info(Thread.currentThread().getName() + "加读锁");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

e.printstacktrace();

} finally {

reentrantreadwritelock.readLock().unlock();

logger.info(Thread.currentThread().getName() + "解读锁");

}

}

}).start();

}

}

结果:

八月 01, 2017 1:42:44 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockWriteLockExercise$1 run

信息: Thread-1加写锁

八月 01, 2017 1:42:54 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockWriteLockExercise$1 run

信息: Thread-1解写锁

八月 01, 2017 1:42:54 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockWriteLockExercise$2 run

信息: Thread-2加写锁

八月 01, 2017 1:42:57 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockWriteLockExercise$2 run

信息: Thread-2解写锁

八月 01, 2017 1:42:57 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockWriteLockExercise$3 run

信息: Thread-3加读锁

八月 01, 2017 1:43:00 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockWriteLockExercise$3 run

信息: Thread-3解读锁

获取读锁后,再获取读锁不堵塞,但获取写锁堵塞:

package com.nicchagil.exercies.reentrantreadwritelock;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.reentrantreadwritelock;

import java.util.logging.Logger;

public class reentrantreadwritelockReadLockExercise {

private static Logger logger = Logger.getLogger(reentrantreadwritelockReadLockExercise.class.getName());

private static reentrantreadwritelock reentrantreadwritelock = new reentrantreadwritelock();

public static void main(String[] args) {

/* 先加读锁 */

new Thread(new Runnable() {

@Override

public void run() {

reentrantreadwritelock.readLock().lock();

logger.info(Thread.currentThread().getName() + "加读锁");

try {

TimeUnit.SECONDS.sleep(10);

} catch (InterruptedException e) {

e.printstacktrace();

} finally {

reentrantreadwritelock.readLock().unlock();

logger.info(Thread.currentThread().getName() + "解读锁");

}

}

}).start();

try {

TimeUnit.SECONDS.sleep(1);

} catch (InterruptedException e1) {

e1.printstacktrace();

}

/* 然后加读锁 */

new Thread(new Runnable() {

@Override

public void run() {

reentrantreadwritelock.readLock().lock();

logger.info(Thread.currentThread().getName() + "加读锁");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

} finally {

reentrantreadwritelock.readLock().unlock();

logger.info(Thread.currentThread().getName() + "解读锁");

}

}

}).start();

/* 然后加写锁 */

new Thread(new Runnable() {

@Override

public void run() {

reentrantreadwritelock.writeLock().lock();

logger.info(Thread.currentThread().getName() + "加写锁");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

e.printstacktrace();

} finally {

reentrantreadwritelock.writeLock().unlock();

logger.info(Thread.currentThread().getName() + "解写锁");

}

}

}).start();

/* 然后加读锁 */

new Thread(new Runnable() {

@Override

public void run() {

reentrantreadwritelock.readLock().lock();

logger.info(Thread.currentThread().getName() + "加读锁");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

e.printstacktrace();

} finally {

reentrantreadwritelock.readLock().unlock();

logger.info(Thread.currentThread().getName() + "解读锁");

}

}

}).start();

}

}

结果:

八月 01, 2017 1:44:06 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockReadLockExercise$1 run

信息: Thread-1加读锁

八月 01, 2017 1:44:07 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockReadLockExercise$2 run

信息: Thread-2加读锁

八月 01, 2017 1:44:10 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockReadLockExercise$2 run

信息: Thread-2解读锁

八月 01, 2017 1:44:16 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockReadLockExercise$1 run

信息: Thread-1解读锁

八月 01, 2017 1:44:16 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockReadLockExercise$3 run

信息: Thread-3加写锁

八月 01, 2017 1:44:19 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockReadLockExercise$3 run

信息: Thread-3解写锁

八月 01, 2017 1:44:19 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockReadLockExercise$4 run

信息: Thread-4加读锁

八月 01, 2017 1:44:22 下午 com.nicchagil.exercies.reentrantreadwritelock.reentrantreadwritelockReadLockExercise$4 run

信息: Thread-4解读锁

阻塞与唤醒(线程间交互)

指定线程的阻塞与唤醒,LockSupport.park(Object blocker)

2eeeaf98424dfd1f6efaddae91d865ba.png

使用LockSupport.park():

package com.nicchagil.exercies.locksupportpart;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.LockSupport;

import java.util.logging.Logger;

public class PartUnpartExercise {

private static Logger logger = Logger.getLogger(PartUnpartExercise.class.getName());

public static void main(String[] args) {

Thread mainThread = Thread.currentThread();

/* 其他线程在30S后唤醒主线程 */

new Thread(new Runnable() {

@Override

public void run() {

try {

TimeUnit.SECONDS.sleep(30);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

LockSupport.unpark(mainThread); // 唤醒

logger.info(Thread.currentThread().getName() + "唤醒" + mainThread.getName());

}

}).start();

logger.info(Thread.currentThread().getName() + "准备被阻塞");

LockSupport.park(); // 阻塞

logger.info(Thread.currentThread().getName() + "被唤醒,开始执行");

}

}

使用LockSupport.park(Object blocker):

package com.nicchagil.exercies.locksupportpart;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.LockSupport;

import java.util.logging.Logger;

public class MyPartUnpartExercise {

private static Logger logger = Logger.getLogger(MyPartUnpartExercise.class.getName());

private static Object object = new Object();

public static void main(String[] args) {

Thread mainThread = Thread.currentThread();

/* 其他线程在30S后唤醒主线程 */

new Thread(new Runnable() {

@Override

public void run() {

try {

TimeUnit.SECONDS.sleep(30);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

LockSupport.unpark(mainThread); // 唤醒

logger.info(Thread.currentThread().getName() + "唤醒" + mainThread.getName());

}

}).start();

logger.info(Thread.currentThread().getName() + "准备被阻塞");

LockSupport.park(object); // 阻塞

logger.info(Thread.currentThread().getName() + "被唤醒,开始执行");

}

}

LockSupport.park()与LockSupport.park(Object blocker)区别在于阻塞时是否有标识等待的对象,后者是JDK6添加的,可传入等待的对象。用jstack工具生成的线程快照的对比可见下图:

ddc91b7006f0aee7d6381a6b2a135c03.png

获得锁的线程阻塞和唤醒,Condition.await()、Condition.signal()或Object.wait()、Object.notify()

在获取锁的情况下,线程阻塞和唤醒可分别使用Condition.await()、Condition.signal(),如果在没获得前下调用,会报异常java.lang.IllegalMonitorStateException。

00af957ef9bfc76f75a27b99bdcf05e2.png

package com.nicchagil.exercies.condition;

import java.util.concurrent.TimeUnit;

import java.util.concurrent.locks.Condition;

import java.util.concurrent.locks.Lock;

import java.util.concurrent.locks.reentrantlock;

import java.util.logging.Logger;

public class reentrantlockConditionExercise {

private static Logger logger = Logger.getLogger(reentrantlockConditionExercise.class.getName());

private static volatile boolean flag = false;

public static void main(String[] args) {

Lock lock = new reentrantlock();

Condition condition = lock.newCondition();

new Thread(new Runnable() {

@Override

public void run() {

lock.lock();

try {

while (!flag) {

logger.info(Thread.currentThread().getName() + "继续等待(条件还不成熟)");

condition.await(); // 等待其他线程改变当前线程需要的条件(会释放锁)

}

logger.info(Thread.currentThread().getName() + "继续业务(条件已成熟)");

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

} finally {

lock.unlock();

}

}

}).start();

new Thread(new Runnable() {

@Override

public void run() {

lock.lock();

try {

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info(Thread.currentThread().getName() + "开始改变数据");

flag = true;

condition.signal(); // 唤醒其他线程(释放锁)

logger.info(Thread.currentThread().getName() + "改变数据完毕,并通知其它线");

} finally {

lock.unlock();

}

}

}).start();

}

}

当然,也可使用Object.wait()、Object.notify()实现此功能。

d4bdc8f34e877a104b8be5d9b68a9237.png

package com.nicchagil.exercies.condition.waitnotify;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;

import com.nicchagil.exercies.condition.reentrantlockConditionExercise;

public class WaitNotifyExercise {

/*

* 内部类,封装boolean(不直接用Boolean,因为唤醒前改变数值时使用“flag = true”会修改flag的对象,导致用没加锁的对象调用“notify()”从而报异常)

*/

static class MyFlag {

private Boolean flag = false;

public Boolean getFlag() {

return flag;

}

public void setFlag(Boolean flag) {

this.flag = flag;

}

}

private static Logger logger = Logger.getLogger(reentrantlockConditionExercise.class.getName());

private static volatile MyFlag myFlag = new MyFlag();

public static void main(String[] args) {

new Thread(new Runnable() {

@Override

public void run() {

synchronized (myFlag) {

try {

while (!myFlag.getFlag()) {

logger.info(Thread.currentThread().getName() + "继续等待(条件还不成熟)");

myFlag.wait(); // 等待其他线程改变当前线程需要的条件(会释放锁)

}

logger.info(Thread.currentThread().getName() + "继续业务(条件已成熟)");

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

}

}

}).start();

new Thread(new Runnable() {

@Override

public void run() {

synchronized (myFlag) {

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info(Thread.currentThread().getName() + "开始改变数据");

myFlag.setFlag(true);

myFlag.notify(); // 唤醒其他线程(释放锁)

logger.info(Thread.currentThread().getName() + "改变数据完毕,并通知其它线");

}

}

}).start();

}

}

等待其它线程结束,CountDownLatch.countDown()、CountDownLatch.await()

常见场景,比如A、B、C三个业务逻辑,3个业务之间没有依赖,可以并行运行,3个业务都执行完毕后向前端反馈结果。

一个线程等待其他线程结束才继续运行,可以用CountDownLatch.countDown()、CountDownLatch.await()或CyclicBarrier.await()或Thread.join()。

当一个线程的业务执行完,使用CountDownLatch.countDown()减1个任务,在一个线程中使用CountDownLatch.await()等待任务数减至0:

a4ad018fdc8668fdb26d3518bbf9d146.png

package com.nicchagil.exercies.countdownlatch;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;

public class CountDownLatchExercise {

private static Logger logger = Logger.getLogger(CountDownLatchExercise.class.getName());

private static CountDownLatch countDownLatch = new CountDownLatch(2);

public static void main(String[] args) throws InterruptedException {

logger.info(Thread.currentThread().getName() + " start..."); // 主任务开始

ExecutorService executorService = Executors.newCachedThreadPool();

executorService.execute(new Runnable() {

@Override

public void run() {

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info(Thread.currentThread().getName() + " complete..."); // 子任务一完成

countDownLatch.countDown();

}

});

executorService.execute(new Runnable() {

@Override

public void run() {

try {

TimeUnit.SECONDS.sleep(5);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info(Thread.currentThread().getName() + " complete..."); // 子任务二完成

countDownLatch.countDown();

}

});

countDownLatch.await();

logger.info(Thread.currentThread().getName() + " complete..."); // 主任务完成

}

}

等待其它线程结束,CyclicBarrier.await()

各线程执行完毕都使用CyclicBarrier.await(),表示到达Barrier(屏障)。另外CyclicBarrier与CountDownLatch的区别还有,前者可通过cyclicBarrier.reset()重置数值,可通过构造方式CyclicBarrier(int parties, Runnable barrierAction)声明当屏障要被越过时由最后到达屏障的线程执行barrierAction任务:

2ce772d9ad49fbf9d819573bb438ba89.png

package com.nicchagil.exercies.cyclicbarrier;

import java.util.concurrent.brokenBarrierException;

import java.util.concurrent.CyclicBarrier;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;

import com.nicchagil.exercies.countdownlatch.CountDownLatchExercise;

public class CyclicBarrierExercise {

private static Logger logger = Logger.getLogger(CountDownLatchExercise.class.getName());

private static CyclicBarrier cyclicBarrier = new CyclicBarrier(3);

public static void main(String[] args) {

logger.info(Thread.currentThread().getName() + " start..."); // 主任务开始

ExecutorService executorService = Executors.newCachedThreadPool();

executorService.execute(new Runnable() {

@Override

public void run() {

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info(Thread.currentThread().getName() + " complete..."); // 子任务一完成

try {

cyclicBarrier.await();

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

} catch (brokenBarrierException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

}

});

executorService.execute(new Runnable() {

@Override

public void run() {

try {

TimeUnit.SECONDS.sleep(5);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info(Thread.currentThread().getName() + " complete..."); // 子任务二完成

try {

cyclicBarrier.await();

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

} catch (brokenBarrierException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

}

});

try {

cyclicBarrier.await();

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

} catch (brokenBarrierException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info(Thread.currentThread().getName() + " complete..."); // 主任务完成

}

}

等待join()的线程完成,Thread.join()

使用Thread.join():

f44e5bcd115f4d580cea631c0ccb1746.png

package com.nicchagil.exercies.countdownlatch.joinimplement;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;

public class JoinExercise {

private static Logger logger = Logger.getLogger(JoinExercise.class.getName());

public static void main(String[] args) {

logger.info(Thread.currentThread().getName() + " start..."); // 主任务开始

Thread t1 = new Thread(new Runnable() {

@Override

public void run() {

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info(Thread.currentThread().getName() + " complete..."); // 子任务一完成

}

});

Thread t2 = new Thread(new Runnable() {

@Override

public void run() {

try {

TimeUnit.SECONDS.sleep(5);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info(Thread.currentThread().getName() + " complete..."); // 子任务二完成

}

});

t1.start();

t2.start();

/* 插入主线程,让主线程等待其完成 */

try {

t1.join();

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

try {

t2.join();

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info(Thread.currentThread().getName() + " complete..."); // 主任务完成

}

}

线程睡眠,Thread.sleep(long millis)或TimeUnit.sleep(long timeout)

常用此俩方法可使线程睡眠,但不会释放锁。

使用Thread.sleep(long millis):

package com.nicchagil.exercies.threadsleep;

import java.util.logging.Logger;

public class ThreadSleep {

private static Logger logger = Logger.getLogger(ThreadSleep.class.getName());

public static void main(String[] args) {

logger.info("开始睡眠");

try {

Thread.sleep(3000);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info("结束睡眠");

}

}

使用TimeUnit.sleep(long timeout):

package com.nicchagil.exercies.threadsleep;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;

public class TimeUnitThreadSleep {

private static Logger logger = Logger.getLogger(TimeUnitThreadSleep.class.getName());

public static void main(String[] args) {

logger.info("开始睡眠");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info("结束睡眠");

}

}

狭路相逢勇者胜,同一时间限制指定数量的线程访问,Semaphore

在多线程环境,某些资源是有限的,比如文件IO、数据库连接,我们需要作流量控制,可以使用Semaphore.acquire()获取一个许可,Semaphore.release()释放一个许可:

package com.nicchagil.exercies.semaphore;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.Semaphore;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;

public class SemaphoreExercise {

private static Logger logger = Logger.getLogger(SemaphoreExercise.class.getName());

private static Semaphore semaphore = new Semaphore(3); // 最多同时通过3个信号的信号量

public static void main(String[] args) {

ExecutorService executorService = Executors.newCachedThreadPool();

for (int i = 0; i <= 10; i++) {

executorService.execute(new Runnable() {

@Override

public void run() {

try {

semaphore.acquire(); // 获取一个信号

} catch (InterruptedException e1) {

// Todo Auto-generated catch block

e1.printstacktrace();

}

/* 睡眠3S */

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

logger.info(Thread.currentThread().getName() + " run...");

semaphore.release(); // 释放一个信号

}

});

}

}

}

用数据库连接作为受限资源,同时最多只放行3个线程:

1e10758508f24ee6e01b522490eaa2c2.png

也许你会说,我一开始声明受限的线程数量就可以了,比如启动3个线程数(如下图)。但是,并非所有情况均如你所愿,比如线程不是由你启动的,由Servlet容器启动的呢;再比如,在数据库访问前有部分业务操作,这些操作比访问数据库耗时些,多启动些线程能增大吞吐量。

9c114341b662acf720bc482803681871.png

缓存线程,线程池,ExecutorService、Executors、ThreadPoolExecutor

将线程缓存起来重复利用,可以减低线程创建、销毁的成本,还可以对其进行管理。比如系统中线程的数量是有限的,不能无止境的创建。

线程池执行器,ThreadPoolExecutor

我们常用的Executors.newFixedThreadPool(int)、Executors.newCachedThreadPool()都是基于ThreadPoolExecutor,所以,先讲后者。

构造方法ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)的参数分别为:

corePoolSize,核心线程池线程的数量

maximumPoolSize,总线程池线程的最大数量

keepAliveTime,当总线程池中除了核心线程池的线程空闲时保持等待时间,超过此时间就回收此线程

unit,keepAliveTime时间的单位

workQueue,当提交的线程数超过核心线程池线程数量,线程在此队列中排队

提交线程,优先在核心线程池中创建线程执行

如果核心线程池已满,则在队列中排队待执行

如果队列已满,则在总线程池创建线程执行

如果总线程池也满了,则调用RejectedExecutionHandler(拒绝执行处理器)

package com.nicchagil.exercies.threadpool;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.LinkedBlockingQueue;

import java.util.concurrent.ThreadPoolExecutor;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;

public class ThreadPoolExecutorExercise {

private static Logger logger = Logger.getLogger(ThreadPoolExecutorExercise.class.getName());

public static void main(String[] args) {

/* 核心线程池为3,最大线程池位6,链式堵塞队列长度为2 */

ExecutorService executorService = new ThreadPoolExecutor(3, 6, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(2));

for (int i = 0; i <= 10; i++) {

try {

executorService.execute(new Runnable() {

@Override

public void run() {

logger.info(Thread.currentThread().getName() + "开始运行");

try {

TimeUnit.SECONDS.sleep(5);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

}

});

} catch (Exception e) {

logger.info("第几个线程提交失败:" + i);

}

}

}

}

结果如下:

七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run

信息: pool-1-thread-1开始运行

七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run

信息: pool-1-thread-5开始运行

七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run

信息: pool-1-thread-2开始运行

七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run

信息: pool-1-thread-4开始运行

七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run

信息: pool-1-thread-3开始运行

七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run

信息: pool-1-thread-6开始运行

七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise main

信息: 第几个线程提交失败:8

七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise main

信息: 第几个线程提交失败:9

七月 16, 2017 1:42:54 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise main

信息: 第几个线程提交失败:10

七月 16, 2017 1:42:59 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run

信息: pool-1-thread-5开始运行

七月 16, 2017 1:42:59 下午 com.nicchagil.exercies.threadpool.ThreadPoolExecutorExercise$1 run

信息: pool-1-thread-2开始运行

用指定数量的线程执行任务,Executors.newFixedThreadPool(int)

Executors.newFixedThreadPool(int),实际上是new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()),可知:

核心线程池、总线程池大小为nThreads

总线程池空闲线程不等待(实际上因核心线程池、总线程池大小相等,总线程池也没有额外的线程了)

使用链式堵塞队列,其最大容量为Integer.MAX_VALUE,可以视为无限吧(你提交2的31次方-1个任务试试?)

package com.nicchagil.exercies.threadpool;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;

public class NewFixedThreadPoolExercise {

private static Logger logger = Logger.getLogger(NewFixedThreadPoolExercise.class.getName());

public static void main(String[] args) {

// = new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())

ExecutorService executorService = Executors.newFixedThreadPool(3);

// = new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()))

// Executors.newSingleThreadExecutor();

for (int i = 0; i <= 10; i++) {

executorService.execute(new Runnable() {

@Override

public void run() {

logger.info(Thread.currentThread().getName() + "开始运行");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

}

});

}

}

}

用动态缓存的线程执行任务,Executors.newCachedThreadPool()

Executors.newCachedThreadPool(),实际上是new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue()),可知:

核心线程数为0,总线程数为Integer.MAX_VALUE,可视为无限吧

总线程池空闲线程等待新任务60秒,超时回收线程

使用同步队列。此队列特点为,无容量;总线程池空闲线程调用SynchronousQueue.poll(long timeout, TimeUnit unit)在指定时间内等待新任务,如果总线程池没有空闲线程,则在总线程池中创建新线程,而总线程池的容量又可视为无限的,所以提交任务的速度大于执行任务的速度,会创建大量线程,导致cpu耗尽,内存溢出。

package com.nicchagil.exercies.threadpool;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import java.util.logging.Logger;

public class NewCachedThreadPoolExercise {

private static Logger logger = Logger.getLogger(NewCachedThreadPoolExercise.class.getName());

public static void main(String[] args) {

// = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue())

ExecutorService executorService = Executors.newCachedThreadPool();

for (int i = 0; i <= 10; i++) {

executorService.execute(new Runnable() {

@Override

public void run() {

logger.info(Thread.currentThread().getName() + "开始运行");

try {

TimeUnit.SECONDS.sleep(3);

} catch (InterruptedException e) {

// Todo Auto-generated catch block

e.printstacktrace();

}

}

});

}

}

}

关于JAVA多线程限流解决并发问题java多线程限流解决并发问题的方法的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于Java 中有哪些无锁技术来解决并发问题?如何使用?、java 多线程并发问题、java 并发问题,java 并发容器解决全局变量的并发问题、java1.5多线程_【Java多线程】JDK1.5并发包API杂谈的相关知识,请在本站寻找。

本文标签: