以下代码在《Java多线程设计模式》(结城浩著,博硕文化译,中国铁道出版社,2005)第8章"Worler Thread --等到工作来,来了就工作"代码基础上改进而得。 改动主要为: (1)提炼了IRequest接口 (2)添加了终结工作线程的方法。 参与者: Client--委托方。 Channel--生产线 Request--生产任务 WorkerThread--工作线程 委托方把Request放入Channel中。Worker从Channel中取出Request,进行加工。 代码: Channel.java public class Channel { private static final int MAX_REQUEST = 100 ; private final IRequest[] requestQueue; private int tail; // 下一个putRequest的地方 private int head; // 下一个takeRequest的地方 private int count; // Request的数量 private boolean stoped = true ; private final WorkerThread[] threadPool; public Channel( int threads) { this .requestQueue = new Request[MAX_REQUEST]; this .head = 0 ; this .tail = 0 ; this .count = 0 ; threadPool = new WorkerThread[threads]; for ( int i = 0 ; i < threadPool.length; i ++ ) { threadPool[i] = new WorkerThread( this ); } } public void startWorkers() { this .stoped = false ; for ( int i = 0 ; i < threadPool.length; i ++ ) { threadPool[i].start(); } } public synchronized int getCount() { return this .count; } public synchronized boolean isStoped() { return this .stoped; } public synchronized void stopWorkers() { this .stoped = true ; } public synchronized void putRequest(IRequest request) { while (count >= requestQueue.length) { try { wait(); } catch (InterruptedException e) { } } requestQueue[tail] = request; tail = (tail + 1 ) % requestQueue.length; count ++ ; notifyAll(); } public synchronized IRequest takeRequest() { while (count <= 0 ) { try { wait(); } catch (InterruptedException e) { } } IRequest request = requestQueue[head]; head = (head + 1 ) % requestQueue.length; count -- ; notifyAll(); return request; } } IRequest.java public interface IRequest { public abstract void execute() throws Exception; } WorkerThread.java public class WorkerThread extends Thread { private final Channel channel; public WorkerThread(Channel channel) { this .channel = channel; } public void run() { System.out.println( " [Thread]: WorkerThread " + this .getName() + " start! " ); while ( true ) { if (channel.getCount() <= 0 && channel.isStoped()) { System.out.println( " [Thread]: WorkerThread " + this .getName() + " stop! " ); stop(); } else { IRequest request = channel.takeRequest(); try { request.execute(); } catch (Exception e) { System.out.println(e.getMessage()); } } } } } Client方调用方法: (1) 首先初始化Channel,设定工作线程数。 int workerCount = ......; channel = new Channel(workerCount); (2)启动生产线 channel.startWorkers(); (3)放置Request channel.putRequest(aRequest); (4)下班了--停止生产线 channel.stopWorkers(); workers把目前生产线上的Request处理完后,自己Stop掉自己。
本文转自xiaotie博客园博客,原文链接http://www.cnblogs.com/xiaotie/archive/2005/11/02/267044.html如需转载请自行联系原作者
xiaotie 集异璧实验室(GEBLAB)