博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
调度模式·Worker-Channel-Request
阅读量:5924 次
发布时间:2019-06-19

本文共 2650 字,大约阅读时间需要 8 分钟。

以下代码在《Java多线程设计模式》(结城浩著,博硕文化译,中国铁道出版社,2005)第8章"Worler Thread --等到工作来,来了就工作"代码基础上改进而得。


改动主要为:

(1)提炼了IRequest接口

(2)添加了终结工作线程的方法。


参与者:

Client--委托方。

Channel--生产线

Request--生产任务

WorkerThread--工作线程


委托方把Request放入Channel中。Worker从Channel中取出Request,进行加工。


代码:


Channel.java


public
 
class
 Channel
{
    
private
 
static
 
final
 
int
 MAX_REQUEST 
=
 
100
;
    
private
 
final
 IRequest[] requestQueue;
    
private
 
int
 tail; 
//
 下一个putRequest的地方
    
private
 
int
 head; 
//
 下一个takeRequest的地方
    
private
 
int
 count; 
//
 Request的数量
    
    
private
 
boolean
 stoped 
=
 
true
;
    
private
 
final
 WorkerThread[] threadPool;
    
public
 Channel(
int
 threads)
    {
        
this
.requestQueue 
=
 
new
 Request[MAX_REQUEST];
        
this
.head 
=
 
0
;
        
this
.tail 
=
 
0
;
        
this
.count 
=
 
0
;
        threadPool 
=
 
new
 WorkerThread[threads];
        
for
 (
int
 i 
=
 
0
; i 
<
 threadPool.length; i
++
)
        {
            threadPool[i] 
=
 
new
 WorkerThread(
this
);
        }
    }
    
public
 
void
 startWorkers()
    {
        
this
.stoped 
=
 
false
;
        
for
 (
int
 i 
=
 
0
; i 
<
 threadPool.length; i
++
)
        {
            threadPool[i].start();
        }
    }
    
    
public
 
synchronized
 
int
 getCount()
    {
        
return
 
this
.count;
    }
    
    
public
 
synchronized
 
boolean
 isStoped()
    {
        
return
 
this
.stoped;
    }
    
    
public
 
synchronized
 
void
 stopWorkers()
    {
        
this
.stoped 
=
 
true
;
    }
    
public
 
synchronized
 
void
 putRequest(IRequest request)
    {
        
while
 (count 
>=
 requestQueue.length)
        {
            
try
            {
                wait();
            }
            
catch
 (InterruptedException e)
            {
            }
        }
        requestQueue[tail] 
=
 request;
        tail 
=
 (tail 
+
 
1
%
 requestQueue.length;
        count
++
;
        notifyAll();
    }
    
public
 
synchronized
 IRequest takeRequest()
    {
        
while
 (count 
<=
 
0
)
        {
            
try
            {
                wait();
            }
            
catch
 (InterruptedException e)
            {
            }
        }
        IRequest request 
=
 requestQueue[head];
        head 
=
 (head 
+
 
1
%
 requestQueue.length;
        count
--
;
        notifyAll();
        
return
 request;
    }
}

IRequest.java


public
 
interface
 IRequest
{
    
public
 
abstract
 
void
 execute() 
throws
 Exception;
}

WorkerThread.java


public
 
class
 WorkerThread 
extends
 Thread
{
    
private
 
final
 Channel channel;
    
public
 WorkerThread(Channel channel)
    {
        
this
.channel 
=
 channel;
    }
    
public
 
void
 run()
    {
        System.out.println(
"
[Thread]: WorkerThread 
"
 
+
 
this
.getName()
                
+
 
"
 start!
"
);
        
while
 (
true
)
        {
            
if
 (channel.getCount() 
<=
 
0
 
&&
 channel.isStoped())
            {
                System.out.println(
"
[Thread]: WorkerThread 
"
 
+
 
this
.getName()
                        
+
 
"
 stop!
"
);
                stop();
            }
            
else
            {
                IRequest request 
=
 channel.takeRequest();
                
try
                {
                    request.execute();
                }
                
catch
 (Exception e)
                {
                    System.out.println(e.getMessage());
                }
            }
        }
    }
}

Client方调用方法:


(1) 首先初始化Channel,设定工作线程数。

int workerCount = ......;

channel = new Channel(workerCount);

(2)启动生产线

channel.startWorkers();

(3)放置Request

channel.putRequest(aRequest);

(4)下班了--停止生产线

channel.stopWorkers();

workers把目前生产线上的Request处理完后,自己Stop掉自己。

本文转自xiaotie博客园博客,原文链接http://www.cnblogs.com/xiaotie/archive/2005/11/02/267044.html如需转载请自行联系原作者

xiaotie 集异璧实验室(GEBLAB)

你可能感兴趣的文章
爱乐馆-无损古典之刘汉盛榜单100牒
查看>>
Groovy入门之二:Map
查看>>
Nginx 无缝升级(Linux +Windows)
查看>>
$resource in AngularJS
查看>>
Xtreme Property Grid
查看>>
kubernetes1.5.1集群安装部署指南之基础组件安装篇
查看>>
描述论的发展
查看>>
Linux的防火墙管理命令:iptables
查看>>
初探django-使用uwsgi+supervisor+nginx来部署服务
查看>>
安装Tomcat
查看>>
Shell的字符串表达式介绍-实践及企业案例脚本剖析
查看>>
Android Fragment实践(一)
查看>>
虚拟机中克隆操作系统后网络的设置
查看>>
CentOS -简单shell
查看>>
我的友情链接
查看>>
KVM+ceph-RBD 快照创建问题
查看>>
javamail使用小记
查看>>
老徐FrankXuLei受邀为 @IGT中国研发中心 讲授《WCF分布式开发与SOA架构设计》课程...
查看>>
interceptor + spring mvc + token 防止表单重复提交
查看>>
ADB的官方概要
查看>>