当你使用多线程来同时运行多个任务时,可以通过使用锁来同步两个任务的行为,从而使的一个任务不会干涉另一个任务的资源。也就是说,如果两个任务交替的步入某项共享资源,你可以使用互斥来保证任何时刻只有一个任务可以访问这项资源。
线程之间的协作
上面的问题已经解决了,下一步是如何使得任务彼此之间可以协作,使得多个任务可以一起工作去解决某个问题。现在的问题不是彼此之间的干涉,而是彼此之间的协作。解决这类问题的关键是某些部分必须在其他部分被解决之前解决。
当任务协作时,关键问题是这些任务之间的握手。为了实现握手,我们使用了相同的基础特性:互斥。在这种情况下,互斥能够确保只有一个任务可以响应某个信号,这样就能根除任何可能的竞争条件。在互斥上,我们为任务添加了一种途径,可以将自身挂起,直至某些外部条件发生变化,表示是时候让这个任务开始为止。
wait() 与 notifyAll()
wait() 可以使你等待某个条件发生变化,而改变这个条件通常是由另一个任务来改变。你肯定不想在你的任务测试这个条件的同时,不断的进行空循环,这被称为忙等待,是一种不良的 cpu 使用方式。因此 wait() 会在外部条件发生变化的时候将任务挂起,并且只有在 notif() 或 notifAll() 发生时,这个任务才会被唤醒并去检查所发生的变化。因此,wait() 提供了一种在任务之间对活动同步的方式。
调用 sleep() 时候锁并没有被释放,调用 yield() 也是一样。当一个任务在方法里遇到对 wait() 调用时,线程执行被挂起,对象的锁被释放。这就意味着另一个任务可以获得锁,因此在改对象中的其他 synchronized 方法可以在 wait() 期间被调用。因此,当你在调用 wait() 时,就是在声明:“我已经做完了所有的事情,但是我希望其他的 synchronized 操作在条件何时的情况下能够被执行”。
有两种形式的 wait():
- 第一种接受毫秒作为参数:指再次暂停的时间。
- 在 wait() 期间对象锁是被释放的。
- 可以通过 notif() 或 notifAll(),或者指令到期,从 wait() 中恢复执行。
- 第二种不接受参数的 wait().
- 这种 wait() 将无线等待下去,直到线程接收到 notif() 或 notifAll()。
wait()、notif()以及 notifAll() 有一个比较特殊的方面,那就是这些方法是基类 Object 的一部分,而不是属于 Thread 类。仅仅作为线程的功能却成为了通用基类的一部分。原因是这些方法操作的锁,也是所有对象的一部分。所以你可以将 wait() 放进任何同步控制方法里,而不用考虑这个类是继承自 Thread 还是 Runnable。实际上,只能在同步方法或者同步代码块里调用 wait()、notif() 或者 notifAll()。如果在非同步代码块里操作这些方法,程序可以通过编译,但是在运行时会得到 IllegalMonitorStateException 异常。意思是,在调用 wait()、notif() 或者 notifAll() 之前必须拥有获取对象的锁。
比如,如果向对象 x 发送 notifAll(),那就必须在能够得到 x 的锁的同步控制块中这么做:
synchronized(x){ x.notifAll();}复制代码
我们看一个示例:一个是将蜡涂到 Car 上,一个是抛光它。抛光任务在涂蜡任务完成之前,是不能执行其工作的,而涂蜡任务在涂另一层蜡之前必须等待抛光任务完成。
public class Car { //涂蜡和抛光的状态 private boolean waxOn = false; //打蜡 public synchronized void waxed() { waxOn = true; notifyAll(); } //抛光 public synchronized void buffed() { waxOn = false; notifyAll(); } //抛光结束被挂起即将开始打蜡任务 public synchronized void waitForWaxing() throws InterruptedException{ while (waxOn == false) { wait(); } } //打蜡结束被挂起即将开始抛任务 public synchronized void waitForBuffing() throws InterruptedException{ while (waxOn == true) { wait(); } }}复制代码
开始打蜡的任务:
public class WaxOn implements Runnable{ private Car car; protected WaxOn(Car car) { super(); this.car = car; } @Override public void run() { try { while (!Thread.interrupted()) { System.out.println("Wax one"); TimeUnit.MICROSECONDS.sleep(200); //开始打蜡 car.waxed(); //当前任务被挂起 car.waitForBuffing(); } } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println(" Exiting via interrupt"); } System.out.println("Ending wax on task"); }}复制代码
开始抛光的任务:
public class WaxOff implements Runnable{ private Car car; protected WaxOff(Car car) { super(); this.car = car; } @Override public void run() { // TODO Auto-generated method stub try { while (!Thread.interrupted()) { //如果还是在打蜡就挂起 car.waitForWaxing(); System.out.println("Wax off"); TimeUnit.MICROSECONDS.sleep(200); //开始抛光 car.buffed(); } } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println("Wxtiing via interrupt"); } System.out.println("Ending wax off task"); }}复制代码
测试类:
public class WaxOmatic { public static void main(String[] args) throws Exception{ // TODO Auto-generated method stub Car car = new Car(); ExecutorService service = Executors.newCachedThreadPool(); service.execute(new WaxOff(car)); service.execute(new WaxOn(car)); //暂停2秒钟 TimeUnit.SECONDS.sleep(1); //关闭所有的任务 service.shutdownNow(); }}复制代码
执行结果:
/.....Wax oneWax offWax oneWax offWax oneWax off Exiting via interruptWxtiing via interruptEnding wax on taskEnding wax off task复制代码
在 waitForWaxing() 中检查 WaxOn 标志,如果它是 false,那么这个调用任务将会被挂起。这个行为发生在 synchronized 方法中这一点很重要。因为在这个方法中任务已经获得了锁。当你调用 wait() 时,线程被挂起,而锁被释放。释放锁是本质所在,因为为了安全的改变对象的状态,其他某个任务就必须能够获得这个锁。
WaxOn.run() 表示给汽车打蜡的第一个步骤,它执行他的操作:调用 sleep() 模拟打蜡的时间,然后告知汽车打蜡结束,并且调用 waitForWaxing(),这个方法会调用 wait() 挂起当前打蜡的任务。直到 WaxOff 任务调用这两车的 buffed(),从而改变状态并且调用 notfiAll() 重新唤醒为止。翻过来也是一样的,在运行程序时,你可以看到控制权在两个任务之间来回的传递,这两个步骤过程在不断的重复。
错失的信号
当两个线程使用 notif()/wait() 或者 notifAll()/wait() 进行协作时,有可能会错过某个信号。假设线程 T1 是通知 T2 的线程,而这两个线程都使用下面的方式实现:
T1:synchronized(X){ //设置 T2 的一个条件x.notif();}T2:while(someCondition){ //Potit synchronized(x){ x.wait(); }}复制代码
以上的例子假设 T2 对 someCondition 发现其为 true()。在执行 Potit 其中线程调度器可能切换到了 T1。而 T1 将会执行重新设置 condition,并且调用唤醒。当 T2 继续执行时,以至于不能意识到条件已经发生变化,因此会盲目的进入 wait()。此时唤醒在之前已经调用过了,而 T2 将无限的等待下去唤醒的信号。
解决该问题的方案是防止 someCondition 变量上产生竞争条件:
synchronized(x){ while(someCondition){ x.wait(); }}复制代码
notif() 与 notifAll()
可能有多个任务在单个 Car 对象上被挂起处于 wait() 状态,因此调用 notifyAll() 比调用 notify() 更安全。使用 notify() 而不是 notifyAll() 是一种优化。使用 notify() 时,在众多等待同一个锁的任务中只有一个被唤醒,因此如果你希望使用 notify(),就必须保证被唤醒的是恰当的任务。另外使用 notify() ,所有任务都必须等待相同的条件,因为如果你有多个任务在等待不同的条件,那你就不会知道是否唤醒了恰当的任务。如果使用 notfiy(),当条件发生变化时,必须只有一个任务能从中收益。最后,这些限制对所有可能存在的子类都必须总起作用。如果这些规则任何一条不满足都必须使用 notifyAll()。
在 Java 的线程机制中,有一个描述是这样的:notifyAll() 将唤醒所有正在等待的任务。这是否意味着在程序中任何地方,任何处于 wait() 状态中的任务都将被任何对 notifyAll() 的调用唤醒呢?在下面的实例中说明了情况并非如此,当 notifyAll() 因某个特定锁被调用时,只有等待这个锁的任务才会被唤醒:
public class Blocker { synchronized void waitingCall() { try { while(!Thread.interrupted()) { wait(); System.out.print(Thread.currentThread() + " "); } } catch(InterruptedException e) { // OK to exit this way } } synchronized void prod() { notify(); } synchronized void prodAll() { notifyAll(); }}复制代码
创建任务 Task:
public class Task implements Runnable { static Blocker blocker = new Blocker(); public void run() { blocker.waitingCall(); }}复制代码
创建任务 Task2:
public class Task2 implements Runnable { // A separate Blocker object: static Blocker blocker = new Blocker(); public void run() { blocker.waitingCall(); }}复制代码
测试类:
public class NotifyVsNotifyAll { public static void main(String[] args) throws Exception{ ExecutorService service = Executors.newCachedThreadPool(); for (int i = 0; i < 3; i++) { service.execute(new Task()); } service.execute(new Task2()); Timer timer = new Timer(); timer.scheduleAtFixedRate(new TimerTask() { boolean prod = true; @Override public void run() { // TODO Auto-generated method stub if (prod) { System.out.println("notify"); Task.blocker.prod(); prod = false; }else { System.out.println("notifyAll"); Task.blocker.prodAll(); prod = true; } } }, 400, 400); TimeUnit.SECONDS.sleep(5); timer.cancel(); System.out.println("Time cancle"); TimeUnit.MILLISECONDS.sleep(500); System.out.println("Task2.blocker.prodAll() "); Task2.blocker.prodAll(); TimeUnit.MILLISECONDS.sleep(500); System.out.println("\nShutting down"); service.shutdownNow(); // Interrupt all tasks }}复制代码
测试结果:
notifyThread[pool-1-thread-2,5,main] notifyAllThread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] notifyThread[pool-1-thread-2,5,main] notifyAllThread[pool-1-thread-2,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-3,5,main] notifyThread[pool-1-thread-2,5,main] notifyAllThread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] notifyThread[pool-1-thread-2,5,main] notifyAllThread[pool-1-thread-2,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-3,5,main] notifyThread[pool-1-thread-2,5,main] notifyAllThread[pool-1-thread-2,5,main] Thread[pool-1-thread-3,5,main] Thread[pool-1-thread-1,5,main] notifyThread[pool-1-thread-2,5,main] notifyAllThread[pool-1-thread-2,5,main] Thread[pool-1-thread-1,5,main] Thread[pool-1-thread-3,5,main] Time cancleTask2.blocker.prodAll()Thread[pool-1-thread-4,5,main]Shutting down复制代码
从上面输出的结果可以看出,我们启动了三个 Task 任务线程,一个 Task2 线程。使用 timer 做了一个定时器,每间隔 4 毫秒就轮换启动 Task.blocker 的 notify() 和 notifyAll()方法。我们看到 Task 和 Task2 都有 Blocker 对象,他们调用 Blocker 对象的时候都会被阻塞。我们看到当调用 Task.prod() 的时候只有一个在等待锁的任务被唤醒,其余两个继续挂起。当调用 Task.prodAll() 的时候等待的三个线程都会被唤醒。当调用 Task2。prodAll() 的时候 只有 Task2 的线程任务被唤醒。其余的三个 Task 任务继续挂起。
生产者与消费者
请考虑这样一种情况,在饭店有一个厨师和一个服务员。这个服务员必须等待厨师做好膳食。当厨师准备好时会通知服务员,之后服务员上菜,然后返回继续等待。这是一个任务协作示例:厨师代表生产者,而服务员代表消费者。两个任务必须在膳食被生产和消费时进行握手,而系统必须是以有序的方式关闭。
膳食类:
public class Meal { private final int orderNum; public Meal(int orderNum) { this.orderNum = orderNum; } public String toString() { return "Meal " + orderNum; }}复制代码
服务生类:
public class WaitPerson implements Runnable { private Restaurant restaurant; public WaitPerson(Restaurant r) { restaurant = r; } public void run() { try { while(!Thread.interrupted()) { synchronized(this) { while(restaurant.meal == null) wait(); // ... for the chef to produce a meal } Print.print("Waitperson got " + restaurant.meal); synchronized(restaurant.chef) { restaurant.meal = null; restaurant.chef.notifyAll(); // Ready for another } } } catch(InterruptedException e) { Print.print("WaitPerson interrupted"); } }}复制代码
厨师类:
public class Chef implements Runnable { private Restaurant restaurant; private int count = 0; public Chef(Restaurant r) { restaurant = r; } public void run() { try { while(!Thread.interrupted()) { synchronized(this) { while(restaurant.meal != null) wait(); // ... for the meal to be taken } if(++count == 10) { Print.print("Out of food, closing"); restaurant.exec.shutdownNow(); } Print.printnb("Order up! "); synchronized(restaurant.waitPerson) { restaurant.meal = new Meal(count); restaurant.waitPerson.notifyAll(); } TimeUnit.MILLISECONDS.sleep(100); } } catch(InterruptedException e) { Print.print("Chef interrupted"); } }}复制代码
测试类:
public class Restaurant { Meal meal; ExecutorService exec = Executors.newCachedThreadPool(); WaitPerson waitPerson = new WaitPerson(this); Chef chef = new Chef(this); public Restaurant() { exec.execute(chef); exec.execute(waitPerson); } public static void main(String[] args) { new Restaurant(); }}复制代码
执行结果:
Order up! Waitperson got Meal 1Order up! Waitperson got Meal 2Order up! Waitperson got Meal 3Order up! Waitperson got Meal 4Order up! Waitperson got Meal 5Order up! Waitperson got Meal 6Order up! Waitperson got Meal 7Order up! Waitperson got Meal 8Order up! Waitperson got Meal 9Out of food, closingOrder up! WaitPerson interruptedChef interrupted复制代码
**使用显示的 Lock 和 Condition 对象
在 java SE5 的类库中还有额外的显示工具。我们来重写我们的打蜡和抛光类。使用互斥并允许任务挂起的基本类是 Condition,你可以通过在 Condition 上调用 await() 来挂起一个任务。当外部条件发生变化时,意味着某个任务应该继续执行,你可以通过调用 signal() 来通知这个任务,从而唤醒一个任务,或者调用 signalAll() 来唤醒所有在这个 Condition 上被挂起的任务。(signalAll() 比 notifAll() 是更安全的方式)
下面是重写版本:
class Car { private Lock lock = new ReentrantLock(); private Condition condition = lock.newCondition(); private boolean waxOn = false; public void waxed() { lock.lock(); try { waxOn = true; // Ready to buff condition.signalAll(); } finally { lock.unlock(); } } public void buffed() { lock.lock(); try { waxOn = false; // Ready for another coat of wax condition.signalAll(); } finally { lock.unlock(); } } public void waitForWaxing() throws InterruptedException { lock.lock(); try { while(waxOn == false) condition.await(); } finally { lock.unlock(); } } public void waitForBuffing() throws InterruptedException{ lock.lock(); try { while(waxOn == true) condition.await(); } finally { lock.unlock(); } }}class WaxOn implements Runnable { private Car car; public WaxOn(Car c) { car = c; } public void run() { try { while(!Thread.interrupted()) { printnb("Wax On! "); TimeUnit.MILLISECONDS.sleep(200); car.waxed(); car.waitForBuffing(); } } catch(InterruptedException e) { print("Exiting via interrupt"); } print("Ending Wax On task"); }}class WaxOff implements Runnable { private Car car; public WaxOff(Car c) { car = c; } public void run() { try { while(!Thread.interrupted()) { car.waitForWaxing(); printnb("Wax Off! "); TimeUnit.MILLISECONDS.sleep(200); car.buffed(); } } catch(InterruptedException e) { print("Exiting via interrupt"); } print("Ending Wax Off task"); }}public class WaxOMatic2 { public static void main(String[] args) throws Exception { Car car = new Car(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new WaxOff(car)); exec.execute(new WaxOn(car)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); }}复制代码
在 Car 的构造器中单个的 Lock 将产生一个 Condition 对象,这个对象被用来管理任务之间的通信。但是这个 Condition 不包含任何有关处理状态的信息,因此你需要额外的表示处理状态的信息,即 Boolean waxOn。
生产者消费者与队列
wait() 和 notifAll() 方法以一种非常低级的方式解决了任务的互操作的问题,即每次交互时都握手。许多时候我们可以使用同步队列来解决协作的问题,同步队列在任何时刻只允许一个任务插入或移除元素。在 Java.util.concurrent.BlockingQueue 接口中提供了这个队列,这个接口有大量的标准实现。可以使用 LinkedBlockingQueue 他是一个无界队列,还可以使用 ArrayBlockingQueue,它具有固定的尺寸,可以在它被阻塞之前向其中放置有限数量的元素。
如果消费者任务试图从队列中获取对象,而该队列为空时,那么这些队列就可以挂起这些任务,并且当有更多的元素可用时恢复这些消费任务。阻塞队列可以解决非常大的问题,而其方式与 wait() 和 notifyAll() 相比,则简单切可靠。
下面是一个简单的测试,它将多个 LiftOff 对象执行串行化。消费者 LiftOffRunner 将每个 LiftOff 对象从 BlockIngQueue 中推出并直接运行。它通过显示的调用 run() 而是用自己的线程来运行,而不是为每个任务启动一个线程。
首先把之前写过的 LiftOff 类贴出来:
public class LiftOff implements Runnable{ protected int countDown = 10; // Default private static int taskCount = 0; private final int id = taskCount++; public LiftOff() {} public LiftOff(int countDown) { this.countDown = countDown; } public String status() { return "#" + id + "(" + (countDown > 0 ? countDown : "Liftoff!") + "), "; } public void run() { while(countDown-- > 0) { System.out.print(status()); Thread.yield(); } }}复制代码
LiftOffRunner 类:
public class LiftOffRunner implements Runnable{ private BlockingQueuerockets; protected LiftOffRunner(BlockingQueue rockets) { super(); this.rockets = rockets; } public void add(LiftOff lo) { try { rockets.put(lo); } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println("添加失败"); } } @Override public void run() { // TODO Auto-generated method stub try { while (!Thread.interrupted()) { LiftOff rocket = rockets.take(); rocket.run(); } } catch (InterruptedException e) { // TODO Auto-generated catch block System.out.println("运行中断"); } System.out.println("退出运行"); }}复制代码
最后是测试类:
public class TestBlockingQueues { static void getkey(){ try { new BufferedReader(new InputStreamReader(System.in)).readLine(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } } static void getkey(String message) { Print.print(message); getkey(); } static void test(String msg,BlockingQueuequeue){ LiftOffRunner runner = new LiftOffRunner(queue); Thread thread = new Thread(runner); thread.start(); //启动了,但是内容是空的,就一直挂起,等待有新的内容进去 for (int i = 0; i < 5; i++) { runner.add(new LiftOff(5)); } getkey("Press Enter "+ msg); thread.interrupt(); } public static void main(String[] args) { test("LinkedBlockingQueue", new LinkedBlockingQueue ()); test("ArrayBlockingQueue", new ArrayBlockingQueue<>(3)); test("SynchronousQueue", new SynchronousQueue<>()); }}复制代码
吐司 BlockingQueue
下面是一个示例,每一台机器都有三个任务:一个只做吐司、一个给吐司抹黄油、另一个在涂抹黄油的吐司上抹果酱。我们来示例如果使用 BlockIngQueue 来运行这个示例:
class Toast { public enum Status { DRY, BUTTERED, JAMMED } private Status status = Status.DRY; private final int id; public Toast(int idn) { id = idn; } public void butter() { status = Status.BUTTERED; } public void jam() { status = Status.JAMMED; } public Status getStatus() { return status; } public int getId() { return id; } public String toString() { return "Toast " + id + ": " + status; }}class ToastQueue extends LinkedBlockingQueue{}class Toaster implements Runnable { private ToastQueue toastQueue; private int count = 0; private Random rand = new Random(47); public Toaster(ToastQueue tq) { toastQueue = tq; } public void run() { try { while(!Thread.interrupted()) { TimeUnit.MILLISECONDS.sleep( 100 + rand.nextInt(500)); // Make toast Toast t = new Toast(count++); print(t); // Insert into queue toastQueue.put(t); } } catch(InterruptedException e) { print("Toaster interrupted"); } print("Toaster off"); }}// Apply butter to toast:class Butterer implements Runnable { private ToastQueue dryQueue, butteredQueue; public Butterer(ToastQueue dry, ToastQueue buttered) { dryQueue = dry; butteredQueue = buttered; } public void run() { try { while(!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = dryQueue.take(); t.butter(); print(t); butteredQueue.put(t); } } catch(InterruptedException e) { print("Butterer interrupted"); } print("Butterer off"); }}// Apply jam to buttered toast:class Jammer implements Runnable { private ToastQueue butteredQueue, finishedQueue; public Jammer(ToastQueue buttered, ToastQueue finished) { butteredQueue = buttered; finishedQueue = finished; } public void run() { try { while(!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = butteredQueue.take(); t.jam(); print(t); finishedQueue.put(t); } } catch(InterruptedException e) { print("Jammer interrupted"); } print("Jammer off"); }}// Consume the toast:class Eater implements Runnable { private ToastQueue finishedQueue; private int counter = 0; public Eater(ToastQueue finished) { finishedQueue = finished; } public void run() { try { while(!Thread.interrupted()) { // Blocks until next piece of toast is available: Toast t = finishedQueue.take(); // Verify that the toast is coming in order, // and that all pieces are getting jammed: if(t.getId() != counter++ || t.getStatus() != Toast.Status.JAMMED) { print(">>>> Error: " + t); System.exit(1); } else print("Chomp! " + t); } } catch(InterruptedException e) { print("Eater interrupted"); } print("Eater off"); }}public class ToastOMatic { public static void main(String[] args) throws Exception { ToastQueue dryQueue = new ToastQueue(), butteredQueue = new ToastQueue(), finishedQueue = new ToastQueue(); ExecutorService exec = Executors.newCachedThreadPool(); exec.execute(new Toaster(dryQueue)); exec.execute(new Butterer(dryQueue, butteredQueue)); exec.execute(new Jammer(butteredQueue, finishedQueue)); exec.execute(new Eater(finishedQueue)); TimeUnit.SECONDS.sleep(5); exec.shutdownNow(); }}复制代码
这个示例中没有任何显示的同步,因为同步队列和系统的设计隐式的管理了每片 Toast 在任何时刻都只有一个任务在操作。因为队列的阻塞,使得处理过程将被自动挂起和恢复。
我的微信公号:Android开发吹牛皮
扫码关注可免费获得全套的 Java 编程思想笔记