Java多线程与并发基础
1)Thread & ThreadPoolExecutor
Thread例子如下:
for(int i=0; i<100; i++){
Thread t = new Thread(){
@Override
public void run() {
//job details
}
};
t.start();
try{
t.join();
}
catch(InterruptedException ex){
ex.printStackTrace();
}
}
相应的,ThreadPoolExecutor例子如下:
ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100));
for(int i=0; i<100; i++){
tp.execute(new Runnable()
{
@Override
public void run() {
//job details
}
});
tp.shutdown();
try{
tp.awaitTermination(1, TimeUnit.DAYS);
}
catch(InterruptedException ex){
ex.printStackTrace();
}
}
构造函数及说明:
java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)
java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler)
java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory)
java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory,
RejectedExecutionHandler handler)
int corePoolSize, 线程池维持线程数
int maximumPoolSize, 线程池允许最大线程数
long keepAliveTime, 最大线程数回收至维持线程数时,被回收线程保持IDLE的时间
TimeUnit unit, keepAliveTime的时间单位
BlockingQueue workQueue, 有三种队列,同步(默认SynchronousQueue)、固定大小(ArrayBlockingQueue)、无限制大小(LinkedBlockingQueue)
ThreadFactory threadFactory, 线程数不够时创建线程的Factory
RejectedExecutionHandler handler, 线程池压力过大,maximumPoolSize满,workQueue满,无法执行execute提交的Runnable时,交给handler处理。有4种内置的handler:
ThreadPoolExecutor.AbortPolicy,处理程序遭到拒绝将抛出运行时RejectedExecutionException;
ThreadPoolExecutor.CallerRunsPolicy,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度;
ThreadPoolExecutor.DiscardPolicy,不能执行的任务将被删除;
ThreadPoolExecutor.DiscardOldestPolicy,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。
线程池终止:
/**
* runState provides the main lifecyle control, taking on values:
*
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TERMINATED: Same as STOP, plus all threads have terminated
*
* The numerical order among these values matters, to allow
* ordered comparisons. The runState monotonically increases over
* time, but need not hit each state. The transitions are:
*
* RUNNING -> SHUTDOWN
* On invocation of shutdown(), perhaps implicitly in finalize()
* (RUNNING or SHUTDOWN) -> STOP
* On invocation of shutdownNow()
* SHUTDOWN -> TERMINATED
* When both queue and pool are empty
* STOP -> TERMINATED
* When pool is empty
*/
API原型如下:
tp.shutdown();
tp.shutdownNow();
tp.awaitTermination(timeout, timeout_unit);
使用Executors创建线程池:
ExecutorService es1 = Executors.newCachedThreadPool();
ExecutorService es2 = Executors.newFixedThreadPool(10);
ExecutorService es3 = Executors.newSingleThreadExecutor();
ExecutorService es4 = Executors.newScheduledThreadPool(10);
ExecutorService es5 = Executors.newSingleThreadScheduledExecutor();
2) synchronized
synchronized是关键字,可以修饰方法或语句块,其语义是reentrant block lock。思考并明确如下代码synchronized的作用是?
public class TestSynchronized {
public static synchronized void m1(){}
public static synchronized void m2(){}
public synchronized void m3(){}
public synchronized void m4(){}
public void m5() {
synchronized(TestSynchronized.class){
//stat...
}
}
public void m6() {
synchronized(this){
//stat...
}
}
}
3)volatile
1、volatile主要解决java对象在多线程共享时的可见性及内存同步问题。以volatile修饰的变量只要在一个线程中写变更,则其他线程读立即可以获取最新的值,因为每次写后其他线程都会从主存进行读取。
2、volatile还用于同一线程中,控制编译、指令重排优化时的排序。具体来说,就是在同时具有volatile变量读、volatile变量写或其组合的流程中,编译优化、指令重排优化将符合volatile的主存同步语义。从而可以用来禁止部分代码块经JVM优化后乱序执行流程。
4)object.wait(), object.notify(), object.notifyAll(), Thread.sleep(), Thread.yield()
wait、notify、notifyAll都是Java Object对象的方法,需要注意的是对这3个方法的调用必须在该对象(obj)线程锁获取后执行。通常在
synchronized(obj) {
}
中执行obj.wait()即obj.notify()、obj.notifyAll()。
obj.wait()和Thread.sleep()具有相似的语义,即释放CPU控制权,使当前线程进入IDLE状态。但wait()还会释放obj的线程锁。与之功能相似的还有Thread.yield()。
obj.notify()则是给obj对象发送通知,JVM将随机选择一个在obj上wait的线程来唤醒,以进行继续执行。
obj.notifyAll()也是给obj对象发送通知,此时JVM将每个在obj上wait的线程都唤醒,以进行继续执行。
wait有obj.wait(), obj.wait(ms), obj.wait(ms, ns)三种设置超时方式。
sleep有Thread.sleep(ms), Thread.sleep(ms, ns)两种设置超时方式。
yield有Thread.yield()一种方式,其意义是交出CPU控制权让同等优先级的线程执行机会,俗称"退让"。但如果不存在同等优先级线程需要控制权时,CPU控制权将可能会又交回给当前调用yield的线程。
5)Locks
java.util.concurrent.locks
|-Lock
|-ReentrantLock
|-ReentrantReadWriteLock.ReadLock
|-ReentrantReadWriteLock.WriteLock
|-Condition
|-AbstractQueuedLongSynchronizer.ConditionObject
|-AbstractQueuedSynchronizer.ConditionObject
|-ReadWriteLock
|-ReentrantReadWriteLock
简单的Lock:
//Lock用法
Lock lock = new ReentrantLock();
lock.lock();
//stat
lock.unlock();
读写Lock:
//ReadWriteLock用法
ReadWriteLock lock = new ReentrantReadWriteLock();
//写场景
Lock writeLock = lock.writeLock();
writeLock.lock();
//write job
writeLock.unlock();
//读场景
Lock readLock = lock.readLock();
readLock.lock();
//read job
readLock.unlock();
条件信号Condition:
class BoundedBuffer {
final Lock lock = new ReentrantLock(); //锁对象
final Condition notFull = lock.newCondition(); //写线程锁
final Condition notEmpty = lock.newCondition(); //读线程锁
final Object[] items = new Object[100];//缓存队列
int putptr; //写索引
int takeptr; //读索引
int count; //队列中数据数目
//写
public void put(Object x) throws InterruptedException {
lock.lock(); //锁定
try {
// 如果队列满,则阻塞<写线程>
while (count == items.length) {
notFull.await();
}
// 写入队列,并更新写索引
items[putptr] = x;
if (++putptr == items.length) putptr = 0;
++count;
// 唤醒<读线程>
notEmpty.signal();
} finally {
lock.unlock();//解除锁定
}
}
//读
public Object take() throws InterruptedException {
lock.lock(); //锁定
try {
// 如果队列空,则阻塞<读线程>
while (count == 0) {
notEmpty.await();
}
//读取队列,并更新读索引
Object x = items[takeptr];
if (++takeptr == items.length) takeptr = 0;
--count;
// 唤醒<写线程>
notFull.signal();
return x;
} finally {
lock.unlock();//解除锁定
}
6)Atomics
Java中的Atomics有AtomicInteger、AtomicReference、AtomicIntegerArray等。
其中AtomicInteger代表整数、AtomicReference
AtomicXxx的几个常用的原子方法:
compareAndSet(test, newValue)
addAndGet(delta)
get()
incrementAndGet()
decrementAndGet()
getAndSet(newValue)
等。
7)Collections & Containers
ConcurrentHashMap
并发HashMap。需要注意的是,并发情况下优先使用putIfAbsent,谨慎使用put。
问题1: 有了ConcurrentHashMap,Hashtable是否无存在意义?
问题2: 非线程安全的HashMap,在哪些情况下使用?
ConcurrentHashMap map = new ConcurrentHashMap<String,String>();
map.putIfAbsent("keyA", "valueA");
map.put("keyA", "valueA");
CopyOnWriteArrayList、CopyOnWriteArraySet
CopyOnWrite集合。可以理解为内存的读写分离,主要用作自己实现内存cache的场景,并且写少读多的情况下。这时的写是先Copy一份集合副本,再在副本上做Write操作。由于Write操作需要增加一次Copy副本的动作,所以Write操作可能比非CopyOnWrite的集合性能更低。因此,CopyOnWrite集合常用于写操作次数远远小于读操作次数时的性能优化。
8)CountDownLatch
CountDownLatch用于多个下游数据处理线程等待一组上游处理线程生成中间数据,解决下游计算时的数据依赖问题。
主要应用模式如下图所示:
需要注意的是,上图中Thread#2 Thread#3在调用latch.countDown()之后,并不进行等待,线程继续执行后续指令。而Thread#1在latch.await()处阻塞,直到Thread#2 Thread#3调用latch.countDown()完成Thread#2 #3 Result生成,然后Thread#1继续,例如处理Thread#1 #2 #3的Result。
代码样例如下:
final CountDownLatch latch = new CountDownLatch(10);
ThreadPoolExecutor tp5 = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10));
for(int i=0; i<10; i++)
{
tp5.execute(new Runnable()
{
@Override
public void run() {
// do thread job
latch.countDown();
}
});
try{
latch.await();
// do result handle job
}
catch(InterruptedException ex){
ex.printStackTrace();
}
}
9)CyclicBarrier
CyclicBarrier用于阻塞多个线程,都达到某个状态后再一起继续执行。主要应用模式如下图所示:
代码样例如下:
final CyclicBarrier barr = new CyclicBarrier(10+1);
ThreadPoolExecutor tp6 = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10));
for(int i=0; i<10; i++)
{
tp6.execute(new Runnable()
{
@Override
public void run() {
// do thread job
try {
barr.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
});
try{
barr.await();
// do Barrier over job
}
catch(InterruptedException ex){
ex.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
需要注意的是:
1、CyclicBarrier是可以被循环使用的。new CyclicBarrier(count+1)被wait到之后,所有的线程又可以开始新的一轮CyclicBarrier的await()。
2、CyclicBarrier的wait数如果比count+1小,那么所有调用await()的线程将会死锁。
3、CyclicBarrier的wait可以设置超时,await(timeout, TimeUnit)。
10)Semaphore
Semaphore就是传统的信号量。
和操作系统中的信号量一样,它的两个方法acquire(), release()将进行信号量的P操作和V操作。
信号量常用于管理有限个数的资源,如Rpc网络连接数、数据库连接池连接数等。
代码样例如下:
final Semaphore sema = new Semaphore(10);// 10个资源,如当前数据库连接池中有10个连接资源
ThreadPoolExecutor tp7 = new ThreadPoolExecutor(100, 100, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(100));
for(int i=0; i<50; i++)
{
tp7.execute(new Runnable()
{
@Override
public void run() {
try {
sema.acquire();
// do job use Shared resource, suchas DBConnectionPoolItem.
} catch (InterruptedException e) {
e.printStackTrace();
}
finally{
sema.release();
}
}
});
}
11)Exchanger
Exchanger比较特殊,它能实现恰好在两个线程对换某个指定类型的数据。exchange()调用将会使先执行到的线程阻塞,后执行到的线程执行exchange后完成数据对换,然后两个线程继续执行后续指令。
问题1: 哪里会用到Exchanger?
问题2: 多于2个线程调用exchange?
代码样例如下:
Main.java:
public class TestExchanger {
public static void main(String[] args) {
Exchanger<List <Integer>> exchanger = new Exchanger<>();
Consumer c = new Consumer(exchanger);
Producer p = new Producer(exchanger);
c.start();
p.start();
}
}
Consumer.java:
public class Consumer extends Thread {
List<Integer> list = new ArrayList<>();
Exchanger<List <Integer>> exchanger = null;
public Consumer(Exchanger<List <Integer>> exchanger) {
super();
this.exchanger = exchanger;
}
@Override
public void run() {
for(int i=0; i<10; i++) {
try {
list = exchanger.exchange(list);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print(list.get(0)+", ");
System.out.print(list.get(1)+", ");
System.out.print(list.get(2)+", ");
System.out.print(list.get(3)+", ");
System.out.println(list.get(4)+", ");
}
}
}
Producer.java:
public class Producer extends Thread {
List<Integer> list = new ArrayList<>();
Exchanger<List <Integer>> exchanger = null;
public Producer(Exchanger<List <Integer>> exchanger) {
super();
this.exchanger = exchanger;
}
@Override
public void run() {
Random rand = new Random();
for(int i=0; i<10; i++) {
list.clear();
list.add(rand.nextInt(10000));
list.add(rand.nextInt(10000));
list.add(rand.nextInt(10000));
list.add(rand.nextInt(10000));
list.add(rand.nextInt(10000));
try {
list = exchanger.exchange(list);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
12)Future & FutureTask
CompletionService
Callable
Future
ExecutorService
Future惯用法1,需要使用Future,ThreadPool(ExecutorService来创建),Callable实现健壮的同步转Future。代码样例:
private Future<HashMap> getFutureResultFromMethod() {
synchronized(this){
if (this.threadPool == null) {
this.threadPool = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(10));
}
}
return this.threadPool.submit(new Callable<HashMap>() {
@Override
public HashMap call() throws Exception {
return getResultFromSyncMethod();//这里是比较耗时的同步方法。
}
});
}
此时,getFutureResultFromMethod可以这样使用:
Future<HashMap> futureResult = getFutureResultFromMethod();
//do other job
HashMap result1 = futureResult.get();
HashMap result2 = futureResult.get(timeout, TimeUnit);
惯用法2,CompletionService配合上述逻辑,进行多个Future等待时的优化。
从本质上来讲,CompletionService维护一个保存Future节点的BlockingQueue,并在Future状态为完成时才将其加入该BlockingQueue。
其tack()方法在BlockingQueue非空时才返回,否则将阻塞住tack()调用。也就是take()调用的成功返回需要其中至少包含一个Future,即至少一个Future的状态为完成。
一个使用样例如下:
ExecutorService es = Executors.newFixedThreadPool(10);
CompletionService<HashMap> comp = new ExecutorCompletionService<HashMap>(es);
comp.submit(new Callable<HashMap>() {
@Override
public HashMap call() throws Exception {
return getResultFromSyncMethod1();
}
});
comp.submit(new Callable<HashMap>() {
@Override
public HashMap call() throws Exception {
return getResultFromSyncMethod2();
}
});
// 假如业务上允许 getResultFromSyncMethod1() 和 getResultFromSyncMethod2() 只要有一个返回就可以继续执行
Future<HashMap> future;
try {
future = comp.take();
System.out.println(future.get().size()); //执行业务
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
es.shutdown();
就总结到这里,基础的Java并发应用应该就够了。