博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
计算机程序的思维逻辑 (68) - 线程的基本协作机制 (下)
阅读量:6940 次
发布时间:2019-06-27

本文共 8910 字,大约阅读时间需要 29 分钟。

本系列文章经补充和完善,已修订整理成书《Java编程的逻辑》(马俊昌著),由机械工业出版社华章分社出版,于2018年1月上市热销,读者好评如潮!各大网店和书店有售,欢迎购买:

本节继续的内容,探讨如何使用wait/notify实现更多的协作场景。

同时开始

同时开始,类似于运动员比赛,在听到比赛开始枪响后同时开始,下面,我们模拟下这个过程,这里,有一个主线程和N个子线程,每个子线程模拟一个运动员,主线程模拟裁判,它们协作的共享变量是一个开始信号。我们用一个类FireFlag来表示这个协作对象,代码如下所示:

static class FireFlag {    private volatile boolean fired = false;    public synchronized void waitForFire() throws InterruptedException {        while (!fired) {            wait();        }    }    public synchronized void fire() {        this.fired = true;        notifyAll();    }}复制代码

子线程应该调用waitForFire()等待枪响,而主线程应该调用fire()发射比赛开始信号。

表示比赛运动员的类如下:

static class Racer extends Thread {    FireFlag fireFlag;    public Racer(FireFlag fireFlag) {        this.fireFlag = fireFlag;    }    @Override    public void run() {        try {            this.fireFlag.waitForFire();            System.out.println("start run "                    + Thread.currentThread().getName());        } catch (InterruptedException e) {        }    }}复制代码

主程序代码如下所示:

public static void main(String[] args) throws InterruptedException {    int num = 10;    FireFlag fireFlag = new FireFlag();    Thread[] racers = new Thread[num];    for (int i = 0; i < num; i++) {        racers[i] = new Racer(fireFlag);        racers[i].start();    }    Thread.sleep(1000);    fireFlag.fire();}复制代码

这里,启动了10个子线程,每个子线程启动后等待fire信号,主线程调用fire()后各个子线程才开始执行后续操作。

等待结束

理解join

在中我们使用join方法让主线程等待子线程结束,join实际上就是调用了wait,其主要代码是:

while (isAlive()) {    wait(0);}复制代码

只要线程是活着的,isAlive()返回true,join就一直等待。谁来通知它呢?当线程运行结束的时候,Java系统调用notifyAll来通知。

使用协作对象

使用join有时比较麻烦,需要主线程逐一等待每个子线程。这里,我们演示一种新的写法。主线程与各个子线程协作的共享变量是一个数,这个数表示未完成的线程个数,初始值为子线程个数,主线程等待该值变为0,而每个子线程结束后都将该值减一,当减为0时调用notifyAll,我们用MyLatch来表示这个协作对象,示例代码如下:

public class MyLatch {    private int count;    public MyLatch(int count) {        this.count = count;    }    public synchronized void await() throws InterruptedException {        while (count > 0) {            wait();        }    }    public synchronized void countDown() {        count--;        if (count <= 0) {            notifyAll();        }    }}复制代码

这里,MyLatch构造方法的参数count应初始化为子线程的个数,主线程应该调用await(),而子线程在执行完后应该调用countDown()。

工作子线程的示例代码如下:

static class Worker extends Thread {    MyLatch latch;    public Worker(MyLatch latch) {        this.latch = latch;    }    @Override    public void run() {        try {            // simulate working on task            Thread.sleep((int) (Math.random() * 1000));            this.latch.countDown();        } catch (InterruptedException e) {        }    }}复制代码

主线程的示例代码如下:

public static void main(String[] args) throws InterruptedException {    int workerNum = 100;    MyLatch latch = new MyLatch(workerNum);    Worker[] workers = new Worker[workerNum];    for (int i = 0; i < workerNum; i++) {        workers[i] = new Worker(latch);        workers[i].start();    }    latch.await();    System.out.println("collect worker results");}复制代码

MyLatch是一个用于同步协作的工具类,主要用于演示基本原理,在Java中有一个专门的同步类CountDownLatch,在实际开发中应该使用它,关于CountDownLatch,我们会在后续章节介绍。

MyLatch的功能是比较通用的,它也可以应用于上面"同时开始"的场景,初始值设为1,Racer类调用await(),主线程调用countDown()即可,如下所示:

public class RacerWithLatchDemo {    static class Racer extends Thread {        MyLatch latch;        public Racer(MyLatch latch) {            this.latch = latch;        }        @Override        public void run() {            try {                this.latch.await();                System.out.println("start run "                        + Thread.currentThread().getName());            } catch (InterruptedException e) {            }        }    }    public static void main(String[] args) throws InterruptedException {        int num = 10;        MyLatch latch = new MyLatch(1);        Thread[] racers = new Thread[num];        for (int i = 0; i < num; i++) {            racers[i] = new Racer(latch);            racers[i].start();        }        Thread.sleep(1000);        latch.countDown();    }}复制代码

异步结果

在主从模式中,手工创建线程往往比较麻烦,一种常见的模式是异步调用,异步调用返回一个一般称为Promise或Future的对象,通过它可以获得最终的结果。在Java中,表示子任务的接口是Callable,声明为:

public interface Callable
{ V call() throws Exception;}复制代码

为表示异步调用的结果,我们定义一个接口MyFuture,如下所示:

public interface MyFuture 
{ V get() throws Exception ;}复制代码

这个接口的get方法返回真正的结果,如果结果还没有计算完成,get会阻塞直到计算完成,如果调用过程发生异常,则get方法抛出调用过程中的异常。

为方便主线程调用子任务,我们定义一个类MyExecutor,其中定义一个public方法execute,表示执行子任务并返回异步结果,声明如下:

public 
MyFuture
execute(final Callable
task)复制代码

利用该方法,对于主线程,它就不需要创建并管理子线程了,并且可以方便地获取异步调用的结果,比如,在主线程中,可以类似这样启动异步调用并获取结果:

public static void main(String[] args) {    MyExecutor executor = new MyExecutor();    // 子任务    Callable
subTask = new Callable
() { @Override public Integer call() throws Exception { // ... 执行异步任务 int millis = (int) (Math.random() * 1000); Thread.sleep(millis); return millis; } }; // 异步调用,返回一个MyFuture对象 MyFuture
future = executor.execute(subTask); // ... 执行其他操作 try { // 获取异步调用的结果 Integer result = future.get(); System.out.println(result); } catch (Exception e) { e.printStackTrace(); }}复制代码

MyExecutor的execute方法是怎么实现的呢?它封装了创建子线程,同步获取结果的过程,它会创建一个执行子线程,该子线程的代码如下所示:

static class ExecuteThread
extends Thread { private V result = null; private Exception exception = null; private boolean done = false; private Callable
task; private Object lock; public ExecuteThread(Callable
task, Object lock) { this.task = task; this.lock = lock; } @Override public void run() { try { result = task.call(); } catch (Exception e) { exception = e; } finally { synchronized (lock) { done = true; lock.notifyAll(); } } } public V getResult() { return result; } public boolean isDone() { return done; } public Exception getException() { return exception; }}复制代码

这个子线程执行实际的子任务,记录执行结果到result变量、异常到exception变量,执行结束后设置共享状态变量done为true并调用notifyAll以唤醒可能在等待结果的主线程。

MyExecutor的execute的方法的代码为:

public 
MyFuture
execute(final Callable
task) { final Object lock = new Object(); final ExecuteThread
thread = new ExecuteThread<>(task, lock); thread.start(); MyFuture
future = new MyFuture
() { @Override public V get() throws Exception { synchronized (lock) { while (!thread.isDone()) { try { lock.wait(); } catch (InterruptedException e) { } } if (thread.getException() != null) { throw thread.getException(); } return thread.getResult(); } } }; return future;}复制代码

execute启动一个线程,并返回MyFuture对象,MyFuture的get方法会阻塞等待直到线程运行结束。

以上的MyExecutore和MyFuture主要用于演示基本原理,实际上,Java中已经包含了一套完善的框架Executors,相关的部分接口和类有:

  • 表示异步结果的接口Future和实现类FutureTask
  • 用于执行异步任务的接口Executor、以及有更多功能的子接口ExecutorService
  • 用于创建Executor和ExecutorService的工厂方法类Executors

后续章节,我们会详细介绍这套框架。

集合点

各个线程先是分头行动,然后各自到达一个集合点,在集合点需要集齐所有线程,交换数据,然后再进行下一步动作。怎么表示这种协作呢?协作的共享变量依然是一个数,这个数表示未到集合点的线程个数,初始值为子线程个数,每个线程到达集合点后将该值减一,如果不为0,表示还有别的线程未到,进行等待,如果变为0,表示自己是最后一个到的,调用notifyAll唤醒所有线程。我们用AssemblePoint类来表示这个协作对象,示例代码如下:

public class AssemblePoint {    private int n;    public AssemblePoint(int n) {        this.n = n;    }    public synchronized void await() throws InterruptedException {        if (n > 0) {            n--;            if (n == 0) {                notifyAll();            } else {                while (n != 0) {                    wait();                }            }        }    }}复制代码

多个游客线程,各自先独立运行,然后使用该协作对象到达集合点进行同步的示例代码如下:

public class AssemblePointDemo {    static class Tourist extends Thread {        AssemblePoint ap;        public Tourist(AssemblePoint ap) {            this.ap = ap;        }        @Override        public void run() {            try {                // 模拟先各自独立运行                Thread.sleep((int) (Math.random() * 1000));                // 集合                ap.await();                System.out.println("arrived");                // ... 集合后执行其他操作            } catch (InterruptedException e) {            }        }    }    public static void main(String[] args) {        int num = 10;        Tourist[] threads = new Tourist[num];        AssemblePoint ap = new AssemblePoint(num);        for (int i = 0; i < num; i++) {            threads[i] = new Tourist(ap);            threads[i].start();        }    }}复制代码

这里实现的是AssemblePoint主要用于演示基本原理,Java中有一个专门的同步工具类CyclicBarrier可以替代它,关于该类,我们后续章节介绍。

小结

上节和本节介绍了Java中线程间协作的基本机制wait/notify,协作关键要想清楚协作的共享变量和条件是什么,为进一步理解,针对多种协作场景,我们演示了wait/notify的用法及基本协作原理,Java中有专门为协作而建的阻塞队列、同步工具类、以及Executors框架,我们会在后续章节介绍,在实际开发中,应该尽量使用这些现成的类,而非重新发明轮子。

之前,我们多次碰到了InterruptedException并选择了忽略,现在是时候进一步了解它了。

(与其他章节一样,本节所有代码位于 )


未完待续,查看最新文章,敬请关注微信公众号“老马说编程”(扫描下方二维码),深入浅出,老马和你一起探索Java编程及计算机技术的本质。用心原创,保留所有版权。

转载地址:http://kginl.baihongyu.com/

你可能感兴趣的文章
Win7(64位旗舰SP1)系统安装Oracle10g
查看>>
设计模式概述
查看>>
C#多个控件有同一个事件,优化
查看>>
DbContext 和ObjectContext两者的区别
查看>>
CH6201走廊泼水节
查看>>
linux下/boot目录丢失的恢复
查看>>
进来看看吧 多学点知识不亏.......
查看>>
新手学习arm的建议
查看>>
记一次被中间人攻击的经历
查看>>
原来你是这样的Websocket--抓包分析
查看>>
mysql Navicat通过代理链接数据库
查看>>
把网站发布到远程服务器上
查看>>
解析特殊locale的日期格式
查看>>
发布Acey.ExcelX3.2
查看>>
微信接口出现“调用支付jsapi缺少参数appid”
查看>>
利用ResultFilter实现asp.net mvc3 页面静态化
查看>>
PHP 图片加水印的方法
查看>>
javascript报错集锦
查看>>
koa2实现拦截器进行登录前session校验
查看>>
[java]窗口中的菜单项
查看>>