Biblioteca Java - Rev 28

Subversion Repositories:
Rev:
package exemple.exchanger;

import java.util.*;
import java.util.concurrent.*;  

/*
 * Exchanger offers a simplified way of communicating between threads, that is, by passing
 * a specific object between two threads. That's why there's the <V> after the class
 * name. Instead of using Piped streams for stream-based, inter-thread communication
 * (where one side writes and the other reads), the code>Exchanger relies on a single
 * exchange method for the transfer of one-off data between threads. The Exchanger is
 * not a general replacement for the piped model, but their usages are similar.
 *
 * To exchange objects, you call the exchange method. The method transfers objects in both
 * directions, not just one.
 *  
 * An Exchanger is often used when you have two threads, one consuming a resource,
 * and the other producing it. When the buffer used by the producer is full, the
 * producer waits for the consumer. When the buffer used by the consumer is empty,
 * the consumer waits for the producer. After both waits happen, the two threads swap
 * buffers. This works well when you know more items will be produced. Otherwise,
 * items will sit waiting for a full buffer before swapping buffers.
 *
 * Here's an example that uses the Exchanger. The FillingLoop class is the producer
 * type. The EmptyingLoop class is the consumer. When the producer's data structure
 * is full, it tries to exchange with the consumer. When the consumer's data structure
 * is empty, it tries to exchange data structures with the producer. After both the
 * producer and consumer are waiting, the exchange happens.
 */


public class ExchangerTest {  

  private static final int FULL = 10;
  private static final int COUNT = FULL * 20;
  private static final Random random = new Random();
  private static volatile int sum = 0;
 
  private static Exchanger<List<Integer>> exchanger =
    new Exchanger<List<Integer>>();
 
  private static List<Integer> initiallyEmptyBuffer;
  private static List<Integer> initiallyFullBuffer;
  private static CountDownLatch stopLatch =
    new CountDownLatch(2);  

  private static class FillingLoop implements Runnable {
    public void run() {
      List<Integer> currentBuffer = initiallyEmptyBuffer;
      try {
        for (int i = 0; i < COUNT; i++) {
          if (currentBuffer == null)
            break; // stop on null
          Integer item = random.nextInt(100);
          System.out.println("Added: " + item);
          currentBuffer.add(item);
          if (currentBuffer.size() == FULL)
            currentBuffer =
              exchanger.exchange(currentBuffer);
        }
      } catch (InterruptedException ex) {
        System.out.println("Bad exchange on filling side");
      }
      stopLatch.countDown();
    }
  }  

  private static class EmptyingLoop implements Runnable {
    public void run() {
      List<Integer> currentBuffer = initiallyFullBuffer;
      try {
        for (int i = 0; i < COUNT; i++) {
          if (currentBuffer == null)
            break; // stop on null
          Integer item = currentBuffer.remove(0);
          System.out.println("Got: " + item);
          sum += item.intValue();
          if (currentBuffer.isEmpty()) {
            currentBuffer =
               exchanger.exchange(currentBuffer);
          }
        }
      } catch (InterruptedException ex) {
        System.out.println("Bad exchange on emptying side");
      }
      stopLatch.countDown();
    }
  }  

  public static void main(String args[]) {
         
    initiallyEmptyBuffer = new ArrayList<Integer>();
    initiallyFullBuffer = new ArrayList<Integer>(FULL);
   
    for (int i=0; i<FULL; i++) {
      initiallyFullBuffer.add(random.nextInt(100));
    }
   
    new Thread(new FillingLoop()).start();
    new Thread(new EmptyingLoop()).start();
   
    try {
      stopLatch.await();
    } catch (InterruptedException ex) {
        ex.printStackTrace();
    }
   
    System.out.println("Sum of all items is.... " + sum);
  }
}