当前位置:首页 > 开发 > 编程语言 > Java > 正文

Java Concurrency: Latches & Barriers

发表于: 2014-09-03   作者:DavyJones2010   来源:转载   浏览次数:
摘要: Latches:     A latch is a synchronizer that can delay the process of threads until it reaches its terminal state.     A latch acts as a gate: until the latch reaches the terminal

Latches:

    A latch is a synchronizer that can delay the process of threads until it reaches its terminal state.

    A latch acts as a gate: until the latch reaches the terminal state the gate is closed and no thread can pass, and in the terminal state the gate opens, allowing all threads to pass.

    Once the latch reaches the terminal state, it cannot change state again, so it remains open forever.

    Latch can be used to ensure that certain activities do not proceed until other on-time activities complete, such as:

    1> Ensuring that a computation does not proceed until resources it needs have been initialized. A simple binary(two-state) latch could be used to indicate "Resource R has been initialized", and any activity that requires R would wait first on this latch.

public class CountDownLatchTest {

    @Test
    public void countDownTest1() throws InterruptedException {
	CountDownLatch resourcePrepareLatch = new CountDownLatch(1);

	ResourcePrepareThread resourcePrepareThread = new ResourcePrepareThread(
		resourcePrepareLatch);
	ResourceBlockingThread resourceBlockingThread = new ResourceBlockingThread(
		resourcePrepareLatch);
	ResourceBlockingThread resourceBlockingThread2 = new ResourceBlockingThread(
		resourcePrepareLatch);
	ResourceBlockingThread resourceBlockingThread3 = new ResourceBlockingThread(
		resourcePrepareLatch);

	ExecutorService executorService = Executors.newFixedThreadPool(4);
	executorService.submit(resourcePrepareThread);
	executorService.submit(resourceBlockingThread);
	executorService.submit(resourceBlockingThread2);
	executorService.submit(resourceBlockingThread3);

	executorService.shutdown();
	executorService.awaitTermination(10, TimeUnit.SECONDS);
    }

    private static class ResourcePrepareThread implements Runnable {
	CountDownLatch resourcePrepareLatch;

	public ResourcePrepareThread(CountDownLatch resourcePrepareLatch) {
	    super();
	    this.resourcePrepareLatch = resourcePrepareLatch;
	}

	@Override
	public void run() {
	    try {
		System.out.println(String.format(
			"[%s] start to prepare resource",
			Thread.currentThread()));

		Thread.sleep((long) (3000 * Math.random()));

		System.out.println(String.format(
			"[%s] finished prepare resource",
			Thread.currentThread()));

		resourcePrepareLatch.countDown();
	    } catch (InterruptedException e) {
		e.printStackTrace();
	    }
	}
    }

    private static class ResourceBlockingThread implements Runnable {
	CountDownLatch resourcePrepareLatch;

	public ResourceBlockingThread(CountDownLatch resourcePrepareLatch) {
	    super();
	    this.resourcePrepareLatch = resourcePrepareLatch;
	}

	@Override
	public void run() {
	    try {
		resourcePrepareLatch.await();
		System.out.println(String.format("[%s] start to use resource",
			Thread.currentThread()));
	    } catch (InterruptedException e) {
		e.printStackTrace();
	    }
	}
    }
}

    2> Ensuring that a service does not start until other services on which it depends have started. Each service would have an associated binary latch; starting service S would involve first waiting on the latches for other services on which S depends, and then releasing the S latch after startup completes so any services that depend on S can then proceed.

    3> Waiting until all the parties involved in an activity, for instance the players in a multi-player game, are ready to proceed. In this case, the latch reaches the terminal state after all the players are ready.

public class BasketTest {

    public static void main(String[] args) throws InterruptedException {
	CountDownLatch countDownLatch = new CountDownLatch(10);
	BasketBallPlayer player1 = new BasketBallPlayer(countDownLatch, 1);
	BasketBallPlayer player2 = new BasketBallPlayer(countDownLatch, 2);
	BasketBallPlayer player3 = new BasketBallPlayer(countDownLatch, 3);
	BasketBallPlayer player4 = new BasketBallPlayer(countDownLatch, 4);
	BasketBallPlayer player5 = new BasketBallPlayer(countDownLatch, 5);
	BasketBallPlayer player6 = new BasketBallPlayer(countDownLatch, 6);
	BasketBallPlayer player7 = new BasketBallPlayer(countDownLatch, 7);
	BasketBallPlayer player8 = new BasketBallPlayer(countDownLatch, 8);
	BasketBallPlayer player9 = new BasketBallPlayer(countDownLatch, 9);
	BasketBallPlayer player10 = new BasketBallPlayer(countDownLatch, 10);

	ExecutorService executorService = Executors.newFixedThreadPool(10);
	executorService.submit(player1);
	executorService.submit(player2);
	executorService.submit(player3);
	executorService.submit(player4);
	executorService.submit(player5);
	executorService.submit(player6);
	executorService.submit(player7);
	executorService.submit(player8);
	executorService.submit(player9);
	executorService.submit(player10);

	countDownLatch.await();

	System.out.println("All player is ready. Game is starting...");
	System.exit(0);
    }

    private static class BasketBallPlayer implements Runnable {
	CountDownLatch countDownLatch;
	int playerNo;

	public BasketBallPlayer(CountDownLatch countDownLatch, int playerNo) {
	    super();
	    this.countDownLatch = countDownLatch;
	    this.playerNo = playerNo;
	}

	@Override
	public void run() {
	    try {
		System.out.println(String.format(
			"Player: [%d] start to prepare for game", playerNo));
		Thread.sleep((long) (1000 * Math.random()));
		System.out.println(String.format(
			"Player: [%d] finished prepare for game", playerNo));
		countDownLatch.countDown();
	    } catch (InterruptedException e) {
		e.printStackTrace();
	    }
	}
    }
}

 

 

Barriers:

    We have seen how latches can facilitate starting up a group of related activities or waiting for a group of related activities to complete. Latches are single use objects, once a latches enters the terminal state, it cannot be reset.

    Barriers are similars to latches in that they block a group of threads until some event has occurred. The key difference is that with a barrier, all the threads must come together at a barrier point at the same time in order to proceed. Latches are for waiting for events, barriers are for waiting for other threads.

    CyclicBarrier allows a fixed number of parties to rendezvous repeatedly at a barrier point and is useful in parallel interactive algorithms that break down a problem into a fixed number of independent subproblems. Thread call await when they reach the barrier point, and await blocks until all the threads have reached the barrier point. If all threads meet at the barrier point, the barrier has been successfully passed, in which case all threads are released and the barrier is reset so it can be used again. CyclicBarrier also lets you pass a barrier action to the constructor, this is a Runnable that is executed(in one of the subtask threads) when the barrier is successfully passed but before the blocked threads are released.

public class BarrierTest {

    public static void main(String[] args) {
	CyclicBarrier cyclicBarrier = new CyclicBarrier(3, new Runnable() {
	    @Override
	    public void run() {
		System.out
			.println("All attendees are ready. Meeting is starting...");
	    }
	});
	Runnable attendee1 = new Attendee(cyclicBarrier);
	Runnable attendee2 = new Attendee(cyclicBarrier);
	Runnable attendee3 = new Attendee(cyclicBarrier);

	Thread t1 = new Thread(attendee1);
	Thread t2 = new Thread(attendee2);
	Thread t3 = new Thread(attendee3);

	t1.start();
	t2.start();
	t3.start();
    }

    private static class Attendee implements Runnable {
	private final CyclicBarrier cyclicBarrier;

	public Attendee(CyclicBarrier cyclicBarrier) {
	    super();
	    this.cyclicBarrier = cyclicBarrier;
	}

	@Override
	public void run() {
	    try {
		while (true) {
		    System.out.println(String.format(
			    "[%s] start to prepare for metting",
			    Thread.currentThread()));
		    Thread.sleep((long) (1000 * Math.random()));
		    System.out.println(String.format(
			    "[%s] finished prepare for metting",
			    Thread.currentThread()));
		    cyclicBarrier.await();
		}
	    } catch (InterruptedException e) {
		e.printStackTrace();
	    } catch (BrokenBarrierException e) {
		e.printStackTrace();
	    }
	}
    }
}

 

 

 

Reference Links:

1> "Java Concurrency in Practice"

Java Concurrency: Latches & Barriers

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
1. 简介 1.1 多线程的好处 提高性能,提高吞吐量,开发多核CPU的性能; 使UI应答更顺畅 1.2 多线程的
【转】http://www.blogjava.net/vincent/archive/2009/07/16/287055.html 关于 Java Concurrency 自
对数据同步访问封装的策略 我们经常操作一些本身不是线程安全的对象时,在多线程的环境下就会考虑到
引言 以前有几次碰到过一个有意思的多线程问题,当时的场景看起来比较简单。有两个线程,他们都需要
1. Risks of Threads: 1> Safety Hazards We can add annotation: @NotThreadSafe, @ThreadSafe,
现在在CPU上,摩尔定律已经失效,大家都不再追求高频率,而是越来越追求多核,阿姆达尔定律似乎更重
粗略看完《Java Concurrency in Practice》这部书,确实是多线程/并发编程的一本好书。里面对各种并
Chapter 2. Thread Safety Whether an object needs to be thread-safe depends on whether it will
前面的章节主要谈谈原子操作,至于与原子操作一些相关的问题或者说陷阱就放到最后的总结篇来整体说
原文地址:http://coderbee.net/index.php/concurrent/20131211/624 翻译自:Martin Thompson – Me
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号