`
qqdwll
  • 浏览: 131629 次
  • 性别: Icon_minigender_1
  • 来自: 上海
社区版块
存档分类
最新评论

ThreadPoolExecutor

阅读更多
转载: http://www.javamex.com/tutorials/threads/ThreadPoolExecutor.shtml

  /**
     * Creates a new <tt>ThreadPoolExecutor</tt> with the given initial
     * parameters and default rejected execution handler.
     *
     * @param corePoolSize the number of threads to keep in the
     * pool, even if they are idle.
     * @param maximumPoolSize the maximum number of threads to allow in the
     * pool.
     * @param keepAliveTime when the number of threads is greater than
     * the core, this is the maximum time that excess idle threads
     * will wait for new tasks before terminating.
     * @param unit the time unit for the keepAliveTime
     * argument.
     * @param workQueue the queue to use for holding tasks before they
     * are executed. This queue will hold only the <tt>Runnable</tt>
     * tasks submitted by the <tt>execute</tt> method.
     * @param threadFactory the factory to use when the executor
     * creates a new thread.
     * @throws IllegalArgumentException if corePoolSize or
     * keepAliveTime less than zero, or if maximumPoolSize less than or
     * equal to zero, or if corePoolSize greater than maximumPoolSize.
     * @throws NullPointerException if <tt>workQueue</tt>
     * or <tt>threadFactory</tt> are null.
     */


ThreadPoolExecutor
Introduced in Java 5, the ThreadPoolExecutor class provides a fairly flexible implementation of a Java thread pool as outlined in our introduction. In the simplest case, the class is used as follows:

we construct an instance of ThreadPoolExecutor somewhere fairly "early on" in our program's life cycle (e.g. when our server starts up), passing in parameters such as the numbe of threads;
when we need a job executing in another thread, we call the ThreadPoolExecutor's execute() method, passing in the Runnable which is the task to be executed.
For example, the pattern of a simple server that used one threaded job to handle each incoming connection would look as follows. Note that there are some details here that for simplicity we have omitted— such as how the thread pool is shut down or what to do if it is "overloaded"— but we'll come to these issues in a moment:

import java.util.concurrent.*;
...
ExecutorService exec = Executors.newFixedThreadPool(4);

private void runServer() {
  ServerSocket sock = new ServerSocket(portNo);
  while (!stopRequested) {
    Socket s = sock.accept();
    exec.execute(new ConnectionRunnable(s));
  }
}

private static class ConnectionRunnable implements Runnable {
  private final Socket s;
  ConnectionRunnable(Socket s) {
    this.s = s;
  }
  public void run() {
    // handle connection
  }
}


Constructing a ThreadPoolExecutor: the Executors helper class
In the first line, exec will in practice be a ThreadPoolExecutor or some subclass thereof. We could have called the ThreadPoolExecutor constructor directly, but it has a number of parameters which can be a bit unwieldy in the simplest case. The Executors class is a placeholder for a number of static utility methods to facilitate construction and use of thread pools (and in principle, other types of executors— see below). Utility methods such as newFixedThreadPool() in fact declare that they return an implementation of ExecutorService: an interface that ThreadPoolExecutor, and potentially other classes in the future, implements. So in practice, that is how we will refer to our thread pool. As you see, here we construct a thread pool that will always have exactly four threads, but we'll see that ThreadPoolExecutor and the Executors utility class are actually more versatile.

Our simple server then sits in a loop in some "main" thread (which calls runServer(), continually waiting for connections. Each time a connection comes in, it is passed to the thread pool to be executed in another thread when one is free to take on the job. Meanwhile, the main thread can immediately get back to accepting further incoming connections. At some point in the near future, and in one of the threads managed by the ThreadPoolExecutor, the run() method of the passed-in ConnectionRunnable will be executed.

Next: ThreadPoolExecutor options
This example shows a very simple case of using a ThreadPoolExecutor with default options. In some cases, we will need to set a few more options to make the thread pool behave in the desired way:

we may need to specify the type of job queue used by the thread pool (e.g. so that we can put a limit on its capacity), and specify a rejected execution handler which will be called when a job is rejected because the queue is full.

Controlling the queue with ThreadPoolExecutor
The previous page showed a skeleton of a simple server with ThreadPoolExecutor. We used a common paradigm, in which one thread continually sits waiting to accept connections; these connections are then each farmed off to be executed by the next available thread. Now, one problem that can occur is if we get a large volume of incoming connections so that the available threads can't proess them fast enough. In this case, the connections waiting to be processed will be queued. But we haven't put any bounds on the queue, so that in the worst case, they will just continue to "pile up". If connections aren't being processed fast enough because the server is overloaded or "has a problem", then we're not going to help matters by piling up an endless number of connections that the server doesn't have a realistic chance of processing. At some point, we need to accept that "the server is busy" and drop further connections until things have calmed down.

To achieve this goal, we need to:

use a queue with a some maximum capacity;
handle rejected execution: add a piece of code to deal with what happens when an incoming job won't fit in the queue.
Specifying a queue with maximum capacity
In our initial example, for convenience, we just used the Executors helper class to construct a thread pool with default options. However, if we constract a ThreadPoolExecutor object directly via its constructor, we can specify various additional parameters including the implementation of BlockingQueue that we wish to use as the job queue. In this case, we can use an ArrayBlockingQueue or LinkedBlockingQueue with a maximum capacity. The queue is declared to take objects of type Runnable, since this is what the thread pool deals with:

BlockingQueue q = new ArrayBlockingQueue(20);
ThreadPoolExecutor ex = new ThreadPoolExecutor(4, 10, 20, TimeUnit.SECONDS, q);


Note a side effect of specifying our own queue is that we must specify the maximum number of threads (10 in this case) and the time-to-live of idle threads (20 seconds in this case). As the number of simultaneous connections grows, the thread pool will automatically expand the number of threads up to this maximum. When the number of connections (and hence threads needed) decreases, the thread pool will "kill" each spare thread after it has been sitting idle for 20 seconds, until we're down to our "core" size of 4 threads (the first parameter).

If you specify your own job queue, be careful not to post jobs "manually" to the queue (using the regular queue methods). If you do so, the job will not be picked up by the thread pool. Always use ThreadPoolExecutor.execute() even though it's "your own queue".
Rejected execution handlers and RejectedExecutionException
With an upper bound on our queue size, the other issue we need to deal with is what happens if a job isn't executed because the queue is full. In this case, we'll be left with a "dangling" socket that we should close as soon as possible. By default, we can handle the full queue situation by catching RejectedExecutionException:

while (!shutDownRequested()) {
  Socket s = null;
  try {
    s = ss.accept();
    exec.execute(new ConnectionRunnable(s));
  } catch (RejectedExecutionException rej) {
    try { s.close(); } catch (Exception ignore) {}
  } catch (Exception e) {
    // ... log
  }
}


Another way to handle closing the socket is to pass a RejectedExecutionHandler into the constructor of our ThreadPoolExecutor. RejectedExecutionHandler is an interface that specifies one method that we must define:

public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
  ...
}


Then, instead of throwing an exception, if the connection job won't fit on the queue, the ThreadPoolExecutor will call our rejectedExecution() method instead. Whether you catch the exception or define a separate handler essentially depends on which makes your design easier— for example, you could have a single rejection handler shared by multiple executors.
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics