博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Parallel Programming-多消费者,多生产者同时运行并行
阅读量:5243 次
发布时间:2019-06-14

本文共 3576 字,大约阅读时间需要 11 分钟。

在文章演示了并行的流水线操作(生产者和消费者并行同时执行),C#是通过BlockingCollection这个线程安全的对象作为Buffer,并且结合Task来实现的。但是上一篇文章有个缺陷,在整个流水线上,生产者和消费者是唯一的。本文将演示多个消费者多个生产者同时并行执行。

一、多消费者、多生产者示意图

 与前一篇文章演示的流水线思想类似,不同之处就是本文的topic:消费者和生产者有多个,以buffer1为例,起生产者有两个,消费者有两个,现在有三个纬度的并行:

  1. Action1和Action2并行(消费者和生产者并行)
  2. 消费者并行(Action2.1和Action2.2并行)
  3. 生产者并行(Action1.1和Action1.2并行)

二、实现

2.1 代码

class PiplelineDemo    {        private int seed;        public PiplelineDemo()        {            seed = 10;        }        public void Action11(BlockingCollection
output) { for (var i = 0; i < seed; i++) { output.Add(i.ToString());//initialize data to buffer1 } } public void Action12(BlockingCollection
output) { for (var i = 0; i < seed; i++) { output.Add(i.ToString());//initialize data to buffer1 } } public void Action21(BlockingCollection
input, BlockingCollection
output) { foreach (var item in input.GetConsumingEnumerable()) { var itemToInt = int.Parse(item); output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2 } } public void Action22(BlockingCollection
input, BlockingCollection
output) { foreach (var item in input.GetConsumingEnumerable()) { var itemToInt = int.Parse(item); output.Add((itemToInt * itemToInt).ToString());// add new data to buffer2 } } public void Action31(BlockingCollection
input, BlockingCollection
output) { foreach (var item in input.GetConsumingEnumerable()) { output.Add((item));// add new data to buffer3 } } public void Action32(BlockingCollection
input, BlockingCollection
output) { foreach (var item in input.GetConsumingEnumerable()) { output.Add((item));// add new data to buffer3 } } public void Pipeline() { var buffer1 = new BlockingCollection
(seed * 2); var buffer2 = new BlockingCollection
(seed * 2); var buffer3 = new BlockingCollection
(seed * 2); var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None); var stage11 = taskFactory.StartNew(() => Action11(buffer1)); var stage12 = taskFactory.StartNew(() => Action12(buffer1)); Task.Factory.ContinueWhenAll(new Task[] { stage11, stage12 }, (tasks) => { buffer1.CompleteAdding(); }); var stage21 = taskFactory.StartNew(() => Action21(buffer1, buffer2)); var stage22 = taskFactory.StartNew(() => Action22(buffer1, buffer2)); Task.Factory.ContinueWhenAll(new Task[] { stage21, stage22 }, (tasks) => { buffer2.CompleteAdding(); }); var stage31 = taskFactory.StartNew(() => Action31(buffer2, buffer3)); var stage32 = taskFactory.StartNew(() => Action32(buffer2, buffer3)); Task.Factory.ContinueWhenAll(new Task[] { stage31, stage32 }, (tasks) => { buffer3.CompleteAdding(); }); Task.WaitAll(stage11, stage12, stage21, stage22, stage31, stage32); foreach (var item in buffer3.GetConsumingEnumerable())//print data in buffer3 { Console.WriteLine(item); } } }

2.2 运行结果

2.3 代码解释

  1. Action11和Action12相对比较好理解。初始化数据到buffer1。
  2. Action2.1和Action2.2相对比较费解,他们同时接受buffer1作为输入,为什么最终的结果Buffer2没有产生重复? 最后由Action21,action22同时产生的buffer3为什么也没有重复?这就是GetConsumingEnumerable这个方法的功劳。这个方法会将buffer的数据分成多份给多个消费者,如果一个value已经被一个消费者获取,那么其他消费者将不会再拿到这个值。这就回答了为什么没有重复这个问题。
  3. 上面方法同时使用了对buffer的调用CompleteAdding方法:该方法非常重要,如果没有调用这个方法,程序会进入死锁,因为消费者(consumer)会处于一直的等待状态。

 

转载于:https://www.cnblogs.com/Brake/p/Parallel_Programming_Multiple_Producer_And_Consumer_In_Parallel.html

你可能感兴趣的文章
历时八年,HTML5 标准终于完工了
查看>>
17.树的子结构
查看>>
D - Mike and strings
查看>>
C++:多维数组的动态分配(new)和释放(delete)
查看>>
c#基础学习(0806)之抽象类实现多态
查看>>
S5PV210根文件系统的制作(一)
查看>>
51NOD 1244 莫比乌斯函数之和
查看>>
[bzoj1923]外星千足虫[高斯消元]
查看>>
centos下同时启动多个tomcat
查看>>
slab分配器
查看>>
分析 PHP大马-php_mof SHELL
查看>>
TCP/IP
查看>>
[推荐] 协同滤波 —— Collaborative Filtering (CF)
查看>>
python中使用中文
查看>>
习题4.14
查看>>
linux 增加用户 useradd 用法小结及配置文件说明
查看>>
Java将Excel中科学计数法解析成数字
查看>>
使用YUM安装MySQL 5.5(适用于CentOS6.2/5.8及Fedora 17/16平台)
查看>>
用jsp写的网页 怎么在传递参数时包含中文?
查看>>
记WinCE下调试SIM900 GSM module
查看>>