1)Thread & ThreadPoolExecutor

Thread例子如下:

for(int i=0; i<100; i++){
    Thread t = new Thread(){
        @Override
        public void run() {
            //job details
        }
    };
    t.start();
    try{ 
        t.join();
    }
    catch(InterruptedException ex){
        ex.printStackTrace();
    }
}

相应的,ThreadPoolExecutor例子如下:

ThreadPoolExecutor tp = new ThreadPoolExecutor(1, 1, 60, TimeUnit.SECONDS, 
           new LinkedBlockingQueue<Runnable>(100));
for(int i=0; i<100; i++){
    tp.execute(new Runnable() 
    {
        @Override
        public void run() {
            //job details
        }
    });
    tp.shutdown();
    try{ 
        tp.awaitTermination(1, TimeUnit.DAYS);
    }
    catch(InterruptedException ex){
        ex.printStackTrace();
    }
}

构造函数及说明:

java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
         long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)
java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
         long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, RejectedExecutionHandler handler)
java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
         long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory)
java.util.concurrent.ThreadPoolExecutor.ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, 
         long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, 
         RejectedExecutionHandler handler)
int corePoolSize, 线程池维持线程数
int maximumPoolSize, 线程池允许最大线程数
long keepAliveTime, 最大线程数回收至维持线程数时,被回收线程保持IDLE的时间
TimeUnit unit, keepAliveTime的时间单位
BlockingQueue workQueue, 有三种队列,同步(默认SynchronousQueue)、固定大小(ArrayBlockingQueue)、无限制大小(LinkedBlockingQueue)
ThreadFactory threadFactory, 线程数不够时创建线程的Factory

RejectedExecutionHandler handler, 线程池压力过大,maximumPoolSize满,workQueue满,无法执行execute提交的Runnable时,交给handler处理。有4种内置的handler:
  ThreadPoolExecutor.AbortPolicy,处理程序遭到拒绝将抛出运行时RejectedExecutionException;
  ThreadPoolExecutor.CallerRunsPolicy,线程调用运行该任务的 execute 本身。此策略提供简单的反馈控制机制,能够减缓新任务的提交速度;
  ThreadPoolExecutor.DiscardPolicy,不能执行的任务将被删除;
  ThreadPoolExecutor.DiscardOldestPolicy,如果执行程序尚未关闭,则位于工作队列头部的任务将被删除,然后重试执行程序(如果再次失败,则重复此过程)。

线程池终止:

/**
 * runState provides the main lifecyle control, taking on values:
 *
 *   RUNNING:  Accept new tasks and process queued tasks
 *   SHUTDOWN: Don't accept new tasks, but process queued tasks
 *   STOP:     Don't accept new tasks, don't process queued tasks,
 *             and interrupt in-progress tasks
 *   TERMINATED: Same as STOP, plus all threads have terminated
 *
 * The numerical order among these values matters, to allow
 * ordered comparisons. The runState monotonically increases over
 * time, but need not hit each state. The transitions are:
 *
 * RUNNING -> SHUTDOWN
 *    On invocation of shutdown(), perhaps implicitly in finalize()
 * (RUNNING or SHUTDOWN) -> STOP
 *    On invocation of shutdownNow()
 * SHUTDOWN -> TERMINATED
 *    When both queue and pool are empty
 * STOP -> TERMINATED
 *    When pool is empty
 */

API原型如下:

tp.shutdown(); 
tp.shutdownNow();
tp.awaitTermination(timeout, timeout_unit);

使用Executors创建线程池:

ExecutorService es1 = Executors.newCachedThreadPool();
ExecutorService es2 = Executors.newFixedThreadPool(10);
ExecutorService es3 = Executors.newSingleThreadExecutor();
ExecutorService es4 = Executors.newScheduledThreadPool(10);
ExecutorService es5 = Executors.newSingleThreadScheduledExecutor();

2) synchronized

synchronized是关键字,可以修饰方法或语句块,其语义是reentrant block lock。思考并明确如下代码synchronized的作用是?

public class TestSynchronized {

    public static synchronized void m1(){}

    public static synchronized void m2(){}

    public synchronized void m3(){}

    public synchronized void m4(){}

    public void m5() {
        synchronized(TestSynchronized.class){
            //stat...
        }
    }

    public void m6() {
        synchronized(this){
            //stat...
        }
    }
}

3)volatile

1、volatile主要解决java对象在多线程共享时的可见性及内存同步问题。以volatile修饰的变量只要在一个线程中写变更,则其他线程读立即可以获取最新的值,因为每次写后其他线程都会从主存进行读取。

2、volatile还用于同一线程中,控制编译、指令重排优化时的排序。具体来说,就是在同时具有volatile变量读、volatile变量写或其组合的流程中,编译优化、指令重排优化将符合volatile的主存同步语义。从而可以用来禁止部分代码块经JVM优化后乱序执行流程。

4)object.wait(), object.notify(), object.notifyAll(), Thread.sleep(), Thread.yield()

wait、notify、notifyAll都是Java Object对象的方法,需要注意的是对这3个方法的调用必须在该对象(obj)线程锁获取后执行。通常在

synchronized(obj) {

}

中执行obj.wait()即obj.notify()、obj.notifyAll()。

obj.wait()和Thread.sleep()具有相似的语义,即释放CPU控制权,使当前线程进入IDLE状态。但wait()还会释放obj的线程锁。与之功能相似的还有Thread.yield()。

obj.notify()则是给obj对象发送通知,JVM将随机选择一个在obj上wait的线程来唤醒,以进行继续执行。

obj.notifyAll()也是给obj对象发送通知,此时JVM将每个在obj上wait的线程都唤醒,以进行继续执行。

wait有obj.wait(), obj.wait(ms), obj.wait(ms, ns)三种设置超时方式。

sleep有Thread.sleep(ms), Thread.sleep(ms, ns)两种设置超时方式。

yield有Thread.yield()一种方式,其意义是交出CPU控制权让同等优先级的线程执行机会,俗称"退让"。但如果不存在同等优先级线程需要控制权时,CPU控制权将可能会又交回给当前调用yield的线程。

5)Locks

java.util.concurrent.locks
|-Lock
  |-ReentrantLock
  |-ReentrantReadWriteLock.ReadLock
  |-ReentrantReadWriteLock.WriteLock
|-Condition
  |-AbstractQueuedLongSynchronizer.ConditionObject
  |-AbstractQueuedSynchronizer.ConditionObject
|-ReadWriteLock
  |-ReentrantReadWriteLock

简单的Lock:

//Lock用法
Lock lock = new ReentrantLock();
lock.lock();
//stat
lock.unlock();

读写Lock:

//ReadWriteLock用法         
ReadWriteLock lock = new ReentrantReadWriteLock(); 
//写场景 
Lock writeLock = lock.writeLock();  
writeLock.lock();  
//write job
writeLock.unlock();

//读场景
Lock readLock = lock.readLock();
readLock.lock();
//read job
readLock.unlock();

条件信号Condition:

class BoundedBuffer {  
  final Lock lock = new ReentrantLock();          //锁对象  
  final Condition notFull  = lock.newCondition(); //写线程锁  
  final Condition notEmpty = lock.newCondition(); //读线程锁  
  
  final Object[] items = new Object[100];//缓存队列  
  int putptr;  //写索引  
  int takeptr; //读索引  
  int count;   //队列中数据数目  
  
  //写  
  public void put(Object x) throws InterruptedException {  
    lock.lock(); //锁定  
    try {  
      // 如果队列满,则阻塞<写线程>  
      while (count == items.length) {  
        notFull.await();   
      }  
      // 写入队列,并更新写索引  
      items[putptr] = x;   
      if (++putptr == items.length) putptr = 0;   
      ++count;  
  
      // 唤醒<读线程>  
      notEmpty.signal();   
    } finally {   
      lock.unlock();//解除锁定   
    }   
  }  
  
  //读   
  public Object take() throws InterruptedException {   
    lock.lock(); //锁定   
    try {  
      // 如果队列空,则阻塞<读线程>  
      while (count == 0) {  
         notEmpty.await();  
      }  
  
      //读取队列,并更新读索引  
      Object x = items[takeptr];   
      if (++takeptr == items.length) takeptr = 0;  
      --count;  
  
      // 唤醒<写线程>  
      notFull.signal();   
      return x;   
    } finally {   
      lock.unlock();//解除锁定   
    }   

6)Atomics

Java中的Atomics有AtomicInteger、AtomicReference、AtomicIntegerArray等。

其中AtomicInteger代表整数、AtomicReference代表可能是离散的enum的状态、AtomicIntegerArray代表整数数组。

AtomicXxx的几个常用的原子方法:

compareAndSet(test, newValue)
addAndGet(delta)
get()
incrementAndGet()
decrementAndGet()
getAndSet(newValue)

等。

7)Collections & Containers

ConcurrentHashMap

并发HashMap。需要注意的是,并发情况下优先使用putIfAbsent,谨慎使用put。

问题1: 有了ConcurrentHashMap,Hashtable是否无存在意义?

问题2: 非线程安全的HashMap,在哪些情况下使用?

ConcurrentHashMap map = new ConcurrentHashMap<String,String>();
map.putIfAbsent("keyA", "valueA");
map.put("keyA", "valueA");

CopyOnWriteArrayList、CopyOnWriteArraySet

CopyOnWrite集合。可以理解为内存的读写分离,主要用作自己实现内存cache的场景,并且写少读多的情况下。这时的写是先Copy一份集合副本,再在副本上做Write操作。由于Write操作需要增加一次Copy副本的动作,所以Write操作可能比非CopyOnWrite的集合性能更低。因此,CopyOnWrite集合常用于写操作次数远远小于读操作次数时的性能优化。

8)CountDownLatch

CountDownLatch用于多个下游数据处理线程等待一组上游处理线程生成中间数据,解决下游计算时的数据依赖问题。

主要应用模式如下图所示:

需要注意的是,上图中Thread#2 Thread#3在调用latch.countDown()之后,并不进行等待,线程继续执行后续指令。而Thread#1在latch.await()处阻塞,直到Thread#2 Thread#3调用latch.countDown()完成Thread#2 #3 Result生成,然后Thread#1继续,例如处理Thread#1 #2 #3的Result。

代码样例如下:

final CountDownLatch latch = new CountDownLatch(10);
ThreadPoolExecutor tp5 = new ThreadPoolExecutor(2, 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10));
for(int i=0; i<10; i++)
{
    tp5.execute(new Runnable() 
    {
        @Override
        public void run() {
            // do thread job
            latch.countDown();
        }
    });
    try{ 
        latch.await();
        // do result handle job
    }
    catch(InterruptedException ex){
        ex.printStackTrace();
    }
}

9)CyclicBarrier

CyclicBarrier用于阻塞多个线程,都达到某个状态后再一起继续执行。主要应用模式如下图所示:

代码样例如下:

final CyclicBarrier barr = new CyclicBarrier(10+1);
ThreadPoolExecutor tp6 = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10));
for(int i=0; i<10; i++)
{
    tp6.execute(new Runnable() 
    {
        @Override
        public void run() {
            // do thread job
            try {
                barr.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
            
        }
    });
    try{ 
        barr.await();
        // do Barrier over job
    }
    catch(InterruptedException ex){
        ex.printStackTrace();
    } catch (BrokenBarrierException e) {
        e.printStackTrace();
    }
}

需要注意的是:

1、CyclicBarrier是可以被循环使用的。new CyclicBarrier(count+1)被wait到之后,所有的线程又可以开始新的一轮CyclicBarrier的await()。

2、CyclicBarrier的wait数如果比count+1小,那么所有调用await()的线程将会死锁。

3、CyclicBarrier的wait可以设置超时,await(timeout, TimeUnit)。

10)Semaphore

Semaphore就是传统的信号量。

和操作系统中的信号量一样,它的两个方法acquire(), release()将进行信号量的P操作和V操作。

信号量常用于管理有限个数的资源,如Rpc网络连接数、数据库连接池连接数等。

代码样例如下:

final Semaphore sema = new Semaphore(10);// 10个资源,如当前数据库连接池中有10个连接资源
ThreadPoolExecutor tp7 = new ThreadPoolExecutor(100, 100, 60, TimeUnit.SECONDS, 
          new LinkedBlockingQueue<Runnable>(100));
for(int i=0; i<50; i++)
{
    tp7.execute(new Runnable() 
    {
        @Override
        public void run() {
            try {
                sema.acquire();
                // do job use Shared resource, suchas DBConnectionPoolItem.
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            finally{
                sema.release();
            }
        }
    });
}

11)Exchanger

Exchanger比较特殊,它能实现恰好在两个线程对换某个指定类型的数据。exchange()调用将会使先执行到的线程阻塞,后执行到的线程执行exchange后完成数据对换,然后两个线程继续执行后续指令。

问题1: 哪里会用到Exchanger?

问题2: 多于2个线程调用exchange?

代码样例如下:

Main.java:

public class TestExchanger {
    public static void main(String[] args) {
        Exchanger<List <Integer>> exchanger = new Exchanger<>();
        Consumer c = new Consumer(exchanger);
        Producer p = new Producer(exchanger);
        c.start();
        p.start();
    }
}

Consumer.java:

public class Consumer extends Thread {
    List<Integer> list = new ArrayList<>();
    Exchanger<List <Integer>> exchanger = null;
    public Consumer(Exchanger<List <Integer>> exchanger) {
        super();
        this.exchanger = exchanger;
    }
    @Override
    public void run() {
        for(int i=0; i<10; i++) {
            try {
                list = exchanger.exchange(list);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print(list.get(0)+", ");
            System.out.print(list.get(1)+", ");
            System.out.print(list.get(2)+", ");
            System.out.print(list.get(3)+", ");
            System.out.println(list.get(4)+", ");
        }
    }
}

Producer.java:

public class Producer extends Thread {
    List<Integer> list = new ArrayList<>();
    Exchanger<List <Integer>> exchanger = null;
    public Producer(Exchanger<List <Integer>> exchanger) {
        super();
        this.exchanger = exchanger;
    }
    
    @Override
    public void run() {
        Random rand = new Random();
        for(int i=0; i<10; i++) {
            list.clear();
            list.add(rand.nextInt(10000));
            list.add(rand.nextInt(10000));
            list.add(rand.nextInt(10000));
            list.add(rand.nextInt(10000));
            list.add(rand.nextInt(10000));
            try {
                list = exchanger.exchange(list);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

12)Future & FutureTask

CompletionService

Callable

Future

ExecutorService

Future惯用法1,需要使用Future,ThreadPool(ExecutorService来创建),Callable实现健壮的同步转Future。代码样例:

private Future<HashMap> getFutureResultFromMethod() {
    synchronized(this){
        if (this.threadPool == null) {
            this.threadPool = new ThreadPoolExecutor(10, 10, 60, TimeUnit.SECONDS,
                    new LinkedBlockingQueue<Runnable>(10));
        }
    }
    return this.threadPool.submit(new Callable<HashMap>() {
        @Override
        public HashMap call() throws Exception {
            return getResultFromSyncMethod();//这里是比较耗时的同步方法。
        }
    });
}

此时,getFutureResultFromMethod可以这样使用:

Future<HashMap> futureResult = getFutureResultFromMethod();
//do other job
HashMap result1 = futureResult.get();
HashMap result2 = futureResult.get(timeout, TimeUnit);

惯用法2,CompletionService配合上述逻辑,进行多个Future等待时的优化。

从本质上来讲,CompletionService维护一个保存Future节点的BlockingQueue,并在Future状态为完成时才将其加入该BlockingQueue。

其tack()方法在BlockingQueue非空时才返回,否则将阻塞住tack()调用。也就是take()调用的成功返回需要其中至少包含一个Future,即至少一个Future的状态为完成。

一个使用样例如下:

ExecutorService es = Executors.newFixedThreadPool(10);  
CompletionService<HashMap> comp = new ExecutorCompletionService<HashMap>(es); 
 
comp.submit(new Callable<HashMap>() {
    @Override
    public HashMap call() throws Exception {
        return getResultFromSyncMethod1();
    }
}); 

comp.submit(new Callable<HashMap>() {
    @Override
    public HashMap call() throws Exception {
        return getResultFromSyncMethod2();
    }
}); 

// 假如业务上允许 getResultFromSyncMethod1() 和 getResultFromSyncMethod2() 只要有一个返回就可以继续执行
Future<HashMap> future;
try {
    future = comp.take();
    System.out.println(future.get().size()); //执行业务
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}  

es.shutdown();

就总结到这里,基础的Java并发应用应该就够了。