java实现生产者和消费者问题的几种方式
发布时间:2025-05-22 09:13:24 发布人:远客网络
一、java实现生产者和消费者问题的几种方式
1、生产者消费者问题是多线程的一个经典问题,它描述是有一块缓冲区作为仓库,生产者可以将产品放入仓库,消费者则可以从仓库中取走产品。
2、解决生产者/消费者问题的方法可分为两类:
3、采用某种机制保护生产者和消费者之间的同步;
4、在生产者和消费者之间建立一个管道。
5、第一种方式有较高的效率,并且易于实现,代码的可控制性较好,属于常用的模式。第二种管道缓冲区不易控制,被传输数据对象不易于封装等,实用性不强。
6、在Java中有四种方法支持同步,其中前三个是同步方法,一个是管道方法。
7、PipedInputStream/ PipedOutputStream
8、通过 wait()/ notify()方法实现:
9、wait()/ nofity()方法是基类Object的两个方法:
10、wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,放弃锁,使自己处于等等状态,让其他线程执行。
11、notify()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
12、通过await()/ signal()方法实现:
13、await()和signal()的功能基本上和wait()/ nofity()相同,完全可以取代它们,但是它们和新引入的锁定机制Lock直接挂钩,具有更大的灵活性。通过在Lock对象上调用newCondition()方法,将条件变量和一个锁对象进行绑定,进而控制并发程序访问竞争资源的安全。
14、它是一个已经在内部实现了同步的队列,实现方式采用的是我们第2种await()/ signal()方法。它可以在生成对象时指定容量大小。它用于阻塞操作的是put()和take()方法:
15、put()方法:类似于我们上面的生产者线程,容量达到最大时,自动阻塞。
16、take()方法:类似于我们上面的消费者线程,容量为0时,自动阻塞。
二、由生产者/消费者问题看JAVA多线程
1、生产者消费者问题是研究多线程程序时绕不开的问题它的描述是有一块生产者和消费者共享的有界缓冲区生产者往缓冲区放入产品消费者从缓冲区取走产品这个过程可以无休止的执行不能因缓冲区满生产者放不进产品而终止也不能因缓冲区空消费者无产品可取而终止
2、解决生产者消费者问题的方法有两种一种是采用某种机制保持生产者和消费者之间的同步一种是在生产者和消费者之间建立一个管道前一种有较高的效率并且可控制性较好比较常用后一种由于管道缓冲区不易控制及被传输数据对象不易封装等原因比较少用
3、同步问题的核心在于 CPU是按时间片轮询的方式执行程序我们无法知道某一个线程是否被执行是否被抢占是否结束等因此生产者完全可能当缓冲区已满的时候还在放入产品消费者也完全可能当缓冲区为空时还在取出产品
4、现在同步问题的解决方法一般是采用信号或者加锁机制即生产者线程当缓冲区已满时放弃自己的执行权进入等待状态并通知消费者线程执行消费者线程当缓冲区已空时放弃自己的执行权进入等待状态并通知生产者线程执行这样一来就保持了线程的同步并避免了线程间互相等待而进入死锁状态
5、 JAVA语言提供了独立于平台的线程机制保持了 write once run anywhere的特色同时也提供了对同步机制的良好支持
6、在JAVA中一共有四种方法支持同步其中三个是同步方法一个是管道方法
7、管道方法PipedInputStream/PipedOutputStream
8、 wait()和notify()是根类Object的两个方法也就意味着所有的JAVA类都会具有这个两个方法为什么会被这样设计呢?我们可以认为所有的对象默认都具有一个锁虽然我们看不到也没有办法直接操作但它是存在的
9、 wait()方法表示当缓冲区已满或空时生产者或消费者线程停止自己的执行放弃锁使自己处于等待状态让另一个线程开始执行
10、 notify()方法表示当生产者或消费者对缓冲区放入或取出一个产品时向另一个线程发出可执行通知同时放弃锁使自己处于等待状态
11、 private LinkedList<Object> myList=new LinkedList<Object>();
12、 public static void main(String[] args) throws Exception{
13、 class Producer extends Thread{
14、 System out println( warning: it s full!);
15、 System out println( Producer:+ o);
16、}catch(InterruptedException ie){
17、 System out println( producer is interrupted!);
18、 class Consumer extends Thread{
19、 System out println( warning: it s empty!);
20、 Object o= myList removeLast();
21、 System out println( Consumer:+ o);
22、}catch(InterruptedException ie){
23、 System out println( consumer is interrupted!);
24、在JDK以后 JAVA提供了新的更加健壮的线程处理机制包括了同步锁定线程池等等它们可以实现更小粒度上的控制 await()和signal()就是其中用来做同步的两种方法它们的功能基本上和wait()/notify()相同完全可以取代它们但是它们和新引入的锁定机制Lock直接挂钩具有更大的灵活性
25、 private LinkedList<Object> myList= new LinkedList<Object>();
26、 private final Lock lock= new ReentrantLock();
27、 private final Condition full= lock newCondition();
28、 private final Condition empty= lock newCondition();
29、 public static void main(String[] args) throws Exception{
30、 class Producer extends Thread{
31、 System out println( warning: it s full!);
32、 System out println( Producer:+ o);
33、}catch(InterruptedException ie){
34、 System out println( producer is interrupted!);
35、 class Consumer extends Thread{
36、 System out println( warning: it s empty!);
37、 Object o= myList removeLast();
38、 System out println( Consumer:+ o);
39、}catch(InterruptedException ie){
40、 System out println( consumer is interrupted!);
41、 BlockingQueue也是JDK的一部分它是一个已经在内部实现了同步的队列实现方式采用的是我们的第种await()/signal()方法它可以在生成对象时指定容量大小
42、它用于阻塞操作的是put()和take()方法
43、 put()方法类似于我们上面的生产者线程容量最大时自动阻塞
44、 take()方法类似于我们上面的消费者线程容量为时自动阻塞
45、 private LinkedBlockingQueue<Object> queue= new LinkedBlockingQueue<Object>();
46、 public static void main(String[] args) throws Exception{
47、 class Producer extends Thread{
48、 System out println( warning: it s full!);
49、 System out println( Producer:+ o);
50、}catch(InterruptedException e){
51、 System out println( producer is interrupted!);
52、 class Consumer extends Thread{
53、 System out println( warning: it s empty!);
54、 System out println( Consumer:+ o);
55、}catch(InterruptedException e){
56、 System out println( producer is interrupted!);
57、如果没有我建议你运行一下这段代码仔细观察它的输出是不是有下面这个样子的?为什么会这样呢?
58、 Producer: java lang object@ e a
59、你可能会说这是因为put()和System out println()之间没有同步造成的我也这样认为我也这样认为但是你把run()中的synchronized前面的注释去掉重新编译运行有改观吗?没有为什么?
60、这是因为当缓冲区已满生产者在put()操作时 put()内部调用了await()方法放弃了线程的执行然后消费者线程执行调用take()方法 take()内部调用了signal()方法通知生产者线程可以执行致使在消费者的println()还没运行的情况下生产者的println()先被执行所以有了上面的输出 run()中的synchronized其实并没有起什么作用
61、对于BlockingQueue大家可以放心使用这可不是它的问题只是在它和别的对象之间的同步有问题
62、对于这种多重嵌套同步的问题以后再谈吧欢迎大家讨论啊!
63、管道方法PipedInputStream/PipedOutputStream
64、这个类位于java io包中是解决同步问题的最简单的办法一个线程将数据写入管道另一个线程从管道读取数据这样便构成了一种生产者/消费者的缓冲区编程模式
65、下面是一个例子代码在这个代码我没有使用Object对象而是简单的读写字节值这是因为PipedInputStream/PipedOutputStream不允许传输对象这是JAVA本身的一个bug具体的大家可以看sun的解释 _bug do?bug_id=
66、 private PipedOutputStream pos;
67、 private PipedInputStream pis;
68、//private ObjectOutputStream oos;
69、//private ObjectInputStream ois;
70、 pos= new PipedOutputStream();
71、 pis= new PipedInputStream(pos);
72、//oos= new ObjectOutputStream(pos);
73、//ois= new ObjectInputStream(pis);
74、 public static void main(String[] args) throws Exception{
75、 class Producer extends Thread{
76、 System out println( Producer: a byte the value is+ b);
77、//System out println( Producer:+ o);
78、 class Consumer extends Thread{
79、 System out println( Consumer: a byte the value is+ String valueOf(b));
80、//System out println( Consumer:+ o);
81、//class MyObject implements Serializable{
三、生产者消费者问题--进程
1、int head,tail=0;//Buffer数组下标
2、int count;//被使用的缓冲区数量
3、HANDLE hNotFullEvent, hNotEmptyEvent;//用来同步生产者和消费者线程
4、 cout<<"缓冲区存储情况为:"<<endl;
5、 cout<<"\t|----"<<a<<"----|"<<endl;
6、char p1[]={'a','A','b','B','c','C','D','d','E','e'};
7、WaitForSingleObject(hMutex,INFINITE);
8、if(count==BufferSize){//缓冲区满
9、WaitForSingleObject(hNotFullEvent,INFINITE);
10、//得到互斥锁且缓冲区非满,跳出while循环
11、 cout<<"缓冲区已满,不能再存入数据!"<<endl;
12、 ReleaseMutex(hMutex);//结束临界区
13、 PulseEvent(hNotEmptyEvent);//唤醒消费者线程
14、//得到互斥锁且缓冲区非满,开始产生新数据
15、cout<<"Producer p1:\t"<<p1<<endl;
16、//tail=(tail+1)%BufferSize;///存放于缓冲区的位置
17、cout<<"按ENTER继续...."<<endl;
18、ReleaseMutex(hMutex);//结束临界区
19、PulseEvent(hNotEmptyEvent);//唤醒消费者线程
20、//////////////////////////////////////////////////////////////////
21、char p2[]={'0','1','2','3','4','5','6','7','8','9'};
22、WaitForSingleObject(hMutex,INFINITE);
23、if(count==BufferSize){//缓冲区满
24、WaitForSingleObject(hNotFullEvent,INFINITE);
25、//得到互斥锁且缓冲区非满,跳出while循环
26、 cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl;
27、 ReleaseMutex(hMutex);//结束临界区
28、 PulseEvent(hNotEmptyEvent);//唤醒消费者线程
29、//得到互斥锁且缓冲区非满,开始产生新数据
30、cout<<"Producer p2:\t"<<p2<<endl;
31、cout<<"按ENTER继续...."<<endl;
32、ReleaseMutex(hMutex);//结束临界区
33、PulseEvent(hNotEmptyEvent);//唤醒消费者线程
34、//////////////////////////////////////////////////////////////////
35、char p3[]={'!','#','$','%','&','*','+','-','.','/'};
36、WaitForSingleObject(hMutex,INFINITE);
37、if(count==BufferSize){//缓冲区满
38、WaitForSingleObject(hNotFullEvent,INFINITE);
39、//得到互斥锁且缓冲区非满,跳出while循环
40、 cout<<"缓冲区已满,不能再存入数据!程序结束!"<<endl;
41、 ReleaseMutex(hMutex);//结束临界区
42、 PulseEvent(hNotEmptyEvent);//唤醒消费者线程
43、//得到互斥锁且缓冲区非满,开始产生新数据
44、cout<<"Producer p3:\t"<<p3<<endl;
45、cout<<"按ENTER继续...."<<endl;
46、ReleaseMutex(hMutex);//结束临界区
47、PulseEvent(hNotEmptyEvent);//唤醒消费者线程
48、//////////////////////////////////////////////////////////////////
49、WaitForSingleObject(hMutex,INFINITE);
50、if(count==0){//没有可以处理的数据
51、ReleaseMutex(hMutex);//释放互斥锁且等待
52、WaitForSingleObject(hNotEmptyEvent,INFINITE);
53、cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;
54、ReleaseMutex(hMutex);//结束临界区
55、else{//获得互斥锁且缓冲区有数据,开始处理
56、if(result>64&&result<70){
57、 cout<<"Consumer c1:(大写->小写)\t"<<result<<endl;
58、 Buffer[head]='^';//'^'表示数据已被消费
59、 cout<<"'^'表示数据已被消费"<<endl;
60、if(result>96&&result<102){
61、 cout<<"Consumer c1:(小写->大写)\t"<<result<<endl;
62、 cout<<"'^'表示数据已被消费"<<endl;
63、if(result>47&&result<58){
64、 cout<<"Consumer c1:(显示字符)\t"<<result<<endl;
65、 cout<<"'^'表示数据已被消费"<<endl;
66、if(result>32&&result<48){
67、 cout<<"Consumer c1:(用符号打印出菱形)"<<endl;
68、 for(j=1;j<=40-(9+1)/2+i;j++)
69、 cout<<"'^'表示数据已被消费"<<endl;
70、cout<<"按ENTER继续...."<<endl;
71、ReleaseMutex(hMutex);//结束临界区
72、PulseEvent(hNotFullEvent);//唤醒生产者线程
73、//////////////////////////////////////////////////////////////////
74、WaitForSingleObject(hMutex,INFINITE);
75、if(count==0){//没有可以处理的数据
76、ReleaseMutex(hMutex);//释放互斥锁且等待
77、WaitForSingleObject(hNotEmptyEvent,INFINITE);
78、cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;
79、ReleaseMutex(hMutex);//结束临界区
80、else{//获得互斥锁且缓冲区有数据,开始处理
81、if(result>64&&result<90){
82、 cout<<"Consumer c2:(大写->小写)\t"<<result<<endl;
83、 cout<<"'^'表示数据已被消费"<<endl;
84、if(result>96&&result<102){
85、 cout<<"Consumer c2:(小写->大写)\t"<<result<<endl;
86、 cout<<"'^'表示数据已被消费"<<endl;
87、if(result>47&&result<58){
88、 cout<<"Consumed c2:(显示字符)\t"<<result<<endl;
89、 cout<<"'^'表示数据已被消费"<<endl;
90、if(result>32&&result<48){
91、 cout<<"Consumer c2:(用符号打印出菱形)"<<endl;
92、 for(j=1;j<=40-(9+1)/2+i;j++)
93、 cout<<"'^'表示数据已被消费"<<endl;
94、cout<<"按ENTER继续...."<<endl;
95、ReleaseMutex(hMutex);//结束临界区
96、PulseEvent(hNotFullEvent);//唤醒生产者线程
97、//////////////////////////////////////////////////////////////////
98、WaitForSingleObject(hMutex,INFINITE);
99、if(count==0){//没有可以处理的数据
100、ReleaseMutex(hMutex);//释放互斥锁且等待
101、WaitForSingleObject(hNotEmptyEvent,INFINITE);
102、cout<<"Consumer 0:缓冲区的数据已全消费过一次,消费完毕!"<<endl;
103、ReleaseMutex(hMutex);//结束临界区
104、else{//获得互斥锁且缓冲区有数据,开始处理
105、if(result>64&&result<70){
106、 cout<<"Consumer c3:(大写->小写)\t"<<result<<endl;
107、 cout<<"'^'表示数据已被消费"<<endl;
108、if(result>96&&result<102){
109、 cout<<"Consumer c3:(小写->大写)\t"<<result<<endl;
110、 cout<<"'^'表示数据已被消费"<<endl;
111、if(result>47&&result<58){
112、 cout<<"Consumer c1:(显示字符)\t"<<result<<endl;
113、 cout<<"'^'表示数据已被消费"<<endl;
114、if(result>32&&result<48){
115、 cout<<"Consumer c3:(用符号打印出菱形)"<<endl;
116、 for(j=1;j<=40-(7+1)/2+i;j++)
117、 cout<<"'^'表示数据已被消费"<<endl;
118、cout<<"按ENTER继续...."<<endl;
119、ReleaseMutex(hMutex);//结束临界区
120、PulseEvent(hNotFullEvent);//唤醒生产者线程
121、//////////////////////////////////////////////////////////////////
122、hMutex=CreateMutex(NULL,FALSE,NULL);
123、hNotFullEvent=CreateEvent(NULL,TRUE,FALSE,NULL);
124、hNotEmptyEvent=CreateEvent(NULL,TRUE,FALSE,NULL);
125、hThreadVector[0]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p1_Producer,NULL, 0,(LPDWORD)&ThreadID);
126、hThreadVector[1]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c1_Consumer,NULL, 0,(LPDWORD)&ThreadID);
127、hThreadVector[3]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p2_Producer,NULL, 0,(LPDWORD)&ThreadID);
128、hThreadVector[4]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c2_Consumer,NULL, 0,(LPDWORD)&ThreadID);
129、hThreadVector[5]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) p3_Producer,NULL, 0,(LPDWORD)&ThreadID);
130、hThreadVector[5]=CreateThread(NULL, 0,(LPTHREAD_START_ROUTINE) c3_Consumer,NULL, 0,(LPDWORD)&ThreadID);
131、WaitForMultipleObjects(2,hThreadVector,TRUE,INFINITE);
132、//cout<<"**********************Finish*************************"<<endl;
133、我最近也在学操作系统,PV好麻烦的