网站结构布局,昆明seo关键词排名,大连云购物app下载安装到手机,无锡企业网站制作目录 前言思路实现功能代码实现 测试先引测试版包测试代码结果与分析思考 尾语 前言 使用通信来共享内存#xff0c;而不是通过共享内存来通信 上面这句话#xff0c;是每个go开发者在 处理多线程通信时 的座右铭#xff0c;go甚至把实现这个理念的channel直接焊在编译器里而不是通过共享内存来通信 上面这句话是每个go开发者在 处理多线程通信时 的座右铭go甚至把实现这个理念的channel直接焊在编译器里几乎所有的go程序里都有channel的身影。 rust的异步和go的goroutine有异曲同工之妙甚至可以把 tokio::spawn 理解为go关键字。但在rust中好像并没有异步channel的实现。本着求人不如求己的原则决定diy一个类似go的channel。
思路
先看一下发送流程 #mermaid-svg-y1pHP32l1IIfUmLh {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-y1pHP32l1IIfUmLh .error-icon{fill:#552222;}#mermaid-svg-y1pHP32l1IIfUmLh .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-y1pHP32l1IIfUmLh .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-y1pHP32l1IIfUmLh .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-y1pHP32l1IIfUmLh .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-y1pHP32l1IIfUmLh .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-y1pHP32l1IIfUmLh .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-y1pHP32l1IIfUmLh .marker{fill:#333333;stroke:#333333;}#mermaid-svg-y1pHP32l1IIfUmLh .marker.cross{stroke:#333333;}#mermaid-svg-y1pHP32l1IIfUmLh svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-y1pHP32l1IIfUmLh .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-y1pHP32l1IIfUmLh .cluster-label text{fill:#333;}#mermaid-svg-y1pHP32l1IIfUmLh .cluster-label span{color:#333;}#mermaid-svg-y1pHP32l1IIfUmLh .label text,#mermaid-svg-y1pHP32l1IIfUmLh span{fill:#333;color:#333;}#mermaid-svg-y1pHP32l1IIfUmLh .node rect,#mermaid-svg-y1pHP32l1IIfUmLh .node circle,#mermaid-svg-y1pHP32l1IIfUmLh .node ellipse,#mermaid-svg-y1pHP32l1IIfUmLh .node polygon,#mermaid-svg-y1pHP32l1IIfUmLh .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-y1pHP32l1IIfUmLh .node .label{text-align:center;}#mermaid-svg-y1pHP32l1IIfUmLh .node.clickable{cursor:pointer;}#mermaid-svg-y1pHP32l1IIfUmLh .arrowheadPath{fill:#333333;}#mermaid-svg-y1pHP32l1IIfUmLh .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-y1pHP32l1IIfUmLh .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-y1pHP32l1IIfUmLh .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-y1pHP32l1IIfUmLh .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-y1pHP32l1IIfUmLh .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-y1pHP32l1IIfUmLh .cluster text{fill:#333;}#mermaid-svg-y1pHP32l1IIfUmLh .cluster span{color:#333;}#mermaid-svg-y1pHP32l1IIfUmLh div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-y1pHP32l1IIfUmLh :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 唤醒发送任务 缓存已满,添加任务到 插入成功 重新发送 发送消息 尝试添加到缓存中 缓存空出 发送队列 唤起接收任务 End 再看一下接收流程 #mermaid-svg-RU2jq0YvlLlzRJrh {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-RU2jq0YvlLlzRJrh .error-icon{fill:#552222;}#mermaid-svg-RU2jq0YvlLlzRJrh .error-text{fill:#552222;stroke:#552222;}#mermaid-svg-RU2jq0YvlLlzRJrh .edge-thickness-normal{stroke-width:2px;}#mermaid-svg-RU2jq0YvlLlzRJrh .edge-thickness-thick{stroke-width:3.5px;}#mermaid-svg-RU2jq0YvlLlzRJrh .edge-pattern-solid{stroke-dasharray:0;}#mermaid-svg-RU2jq0YvlLlzRJrh .edge-pattern-dashed{stroke-dasharray:3;}#mermaid-svg-RU2jq0YvlLlzRJrh .edge-pattern-dotted{stroke-dasharray:2;}#mermaid-svg-RU2jq0YvlLlzRJrh .marker{fill:#333333;stroke:#333333;}#mermaid-svg-RU2jq0YvlLlzRJrh .marker.cross{stroke:#333333;}#mermaid-svg-RU2jq0YvlLlzRJrh svg{font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;}#mermaid-svg-RU2jq0YvlLlzRJrh .label{font-family:"trebuchet ms",verdana,arial,sans-serif;color:#333;}#mermaid-svg-RU2jq0YvlLlzRJrh .cluster-label text{fill:#333;}#mermaid-svg-RU2jq0YvlLlzRJrh .cluster-label span{color:#333;}#mermaid-svg-RU2jq0YvlLlzRJrh .label text,#mermaid-svg-RU2jq0YvlLlzRJrh span{fill:#333;color:#333;}#mermaid-svg-RU2jq0YvlLlzRJrh .node rect,#mermaid-svg-RU2jq0YvlLlzRJrh .node circle,#mermaid-svg-RU2jq0YvlLlzRJrh .node ellipse,#mermaid-svg-RU2jq0YvlLlzRJrh .node polygon,#mermaid-svg-RU2jq0YvlLlzRJrh .node path{fill:#ECECFF;stroke:#9370DB;stroke-width:1px;}#mermaid-svg-RU2jq0YvlLlzRJrh .node .label{text-align:center;}#mermaid-svg-RU2jq0YvlLlzRJrh .node.clickable{cursor:pointer;}#mermaid-svg-RU2jq0YvlLlzRJrh .arrowheadPath{fill:#333333;}#mermaid-svg-RU2jq0YvlLlzRJrh .edgePath .path{stroke:#333333;stroke-width:2.0px;}#mermaid-svg-RU2jq0YvlLlzRJrh .flowchart-link{stroke:#333333;fill:none;}#mermaid-svg-RU2jq0YvlLlzRJrh .edgeLabel{background-color:#e8e8e8;text-align:center;}#mermaid-svg-RU2jq0YvlLlzRJrh .edgeLabel rect{opacity:0.5;background-color:#e8e8e8;fill:#e8e8e8;}#mermaid-svg-RU2jq0YvlLlzRJrh .cluster rect{fill:#ffffde;stroke:#aaaa33;stroke-width:1px;}#mermaid-svg-RU2jq0YvlLlzRJrh .cluster text{fill:#333;}#mermaid-svg-RU2jq0YvlLlzRJrh .cluster span{color:#333;}#mermaid-svg-RU2jq0YvlLlzRJrh div.mermaidTooltip{position:absolute;text-align:center;max-width:200px;padding:2px;font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:12px;background:hsl(80, 100%, 96.2745098039%);border:1px solid #aaaa33;border-radius:2px;pointer-events:none;z-index:100;}#mermaid-svg-RU2jq0YvlLlzRJrh :root{--mermaid-font-family:"trebuchet ms",verdana,arial,sans-serif;} 唤醒接受任务 缓存为空,添加任务至 取值成功 重新取值 接收消息 尝试从缓存中取消息 缓存入值 接收队列 唤起发送任务 End 总体来说流程清晰易懂不管接收还是发送都是先尝试从缓存队列中操作值不成功则加入到对应队列等待再次执行。反之则唤起相关任务结束操作。
实现功能
首先需要实现一个存放值的环形缓冲区并且每个单元应该是单独加锁的从而避免全局锁。需要两个任务队列用来存放在饥饿模式(从缓存操作失败)下的 发送任务和接受任务。按照rust习惯将发送者和接受者拆开并各自实现future因为唤醒不是同步的需要通过一个唤醒器来唤醒沉默的任务。使用原子操作替代锁
代码实现
具体的就不写了放在github上了 github地址https://github.com/woshihaoren4/wd_tools/tree/main/src/channel
测试
这里主要和async-channel测试一下
async-channel 是最常见的异步channel在crateio上有两千万的下载。
先引测试版包
cargo.toml
[dependencies]
tokio {version 1.22.0,features[full]}
wd_tools {version 0.8.3,features [sync,chan]}
async-channel 1.8.0wd_tools 是我们的channel这里引用的sync chan两个feature前者用于测试后者是chan实现。
测试代码
测试场景设置缓存长度为10发100万数据接100万数据。在1发送者1接受者1发送者10接受者10发送者1接受者10发送者10接受者四种情况下的收发性能。
use std::fmt::Debug;
use wd_tools::channel as wd;
use async_channel as ac;#[tokio::main]
async fn main(){let ts TestService::new(10);println!(test start ------------ wd_tools);ts.send_to_recv(1-v-1,true,100_0000,1,100_0000,1,|x|x).await;ts.send_to_recv(1-v-10,true,100_0000,1,10_0000,10,|x|x).await;ts.send_to_recv(10-v-1,true,10_0000,10,100_0000,1,|x|x).await;ts.send_to_recv(10-v-10,true,10_0000,10,10_0000,10,|x|x).await;println!(wd_tools ------------- test over);println!(test start ------------ async-channel);ts.send_to_recv(1-v-1,false,100_0000,1,100_0000,1,|x|x).await;ts.send_to_recv(1-v-10,false,100_0000,1,10_0000,10,|x|x).await;ts.send_to_recv(10-v-1,false,10_0000,10,100_0000,1,|x|x).await;ts.send_to_recv(10-v-10,false,10_0000,10,10_0000,10,|x|x).await;println!(async-channel ------------ test over);
}struct TestServiceT{wd_sender : wd::SenderT,wd_receiver : wd::ReceiverT,ac_sender : ac::SenderT,ac_receiver : ac::ReceiverT
}implT:UnpinSendSyncDebugstatic TestServiceT{pub fn new(cap:usize)-TestServiceT{let (wd_sender,wd_receiver) wd::Channel::new(cap);let (ac_sender,ac_receiver) ac::bounded(cap);TestService{wd_sender,wd_receiver,ac_sender,ac_receiver}}pub fn sendG:Fn(usize)-TSendSyncstatic(self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize,generater:G){let wd_sender self.wd_sender.clone();let ac_sender self.ac_sender.clone();wg.defer_args1(|is_wd|async move{for i in 0..max {let t generater(i);if is_wd {wd_sender.send(t).await.expect( 发送失败);}else{ac_sender.send(t).await.expect( 发送失败);}}},is_wd);}pub fn recv(self,wg:wd_tools::sync::WaitGroup,is_wd:bool,max:usize){let wd_receiver self.wd_receiver.clone();let ac_receiver self.ac_receiver.clone();wg.defer_args1(|is_wd|async move{for _i in 0..max {if is_wd {wd_receiver.recv().await.expect( 接收失败);}else{ac_receiver.recv().await.expect( 接收失败);}}},is_wd);}pub async fn send_to_recvG:Fn(usize)-TSendSyncClonestatic(self,info:static str, is_wd:bool, sbase:usize, sgroup:usize, rbase:usize, rgroup:usize, generater:G){let now std::time::Instant::now();let wg wd_tools::sync::WaitGroup::default();let wg_send wd_tools::sync::WaitGroup::default();let wg_recv wd_tools::sync::WaitGroup::default();for _ in 0..sgroup{self.send(wg_send.clone(),is_wd,sbase,generater.clone());}for _ in 0..rgroup{self.recv(wg_recv.clone(),is_wd,rbase);}wg.defer(move ||async move{let now std::time::Instant::now();wg_send.wait().await;println!(test[{}] --- send use time:{}ms,info,now.elapsed().as_millis());});wg.defer(move ||async move{let now std::time::Instant::now();wg_recv.wait().await;println!(test[{}] --- recv use time:{}ms,info,now.elapsed().as_millis());});wg.wait().await;println!(test[{}] --- all use time:{}ms,info,now.elapsed().as_millis());}
}结果与分析
测试10次取平均值做表如下 如上图得结论
在1发收者和10发收者的情况下两种channel效率相差不多。在发送者和接受者数量不等时wd_tools::channel的性能明显优于async-channel
思考
分析结论之前先看一下async-channel的实现。虽然async-channel也是异步但它并不依赖某个异步运行时来进行任务的上线文切换而是使用concurrent-queue和event-listener进行消息调度底层依赖于std::thread::park_timeout。
相比event-listener的调度方式直接管理tokio的Context则更适用于异步环境。尤其是存在大量等待的场景。如上面测试接受者和发送者数量不等需要长时间等待的情况。实际开发中接受者或者发送者可能长时间处于饥饿的情况下wd_tools::channel不会产生多余的资源开销毕竟上下文被挂起了也就不会被cpu执行。
当然实际是复杂的因情而异使用的CPU数量(线程数)缓存长度异步任务数同样会影响消息队列的性能尤其是不需要等待的场景下async-channel性能更优。
而wd_tools::channel则更适合tokio异步环境。并且不会引起线程park而产生其他影响。
尾语
wd_tools::channel 目前只是一个初级版本还有很多地方待优化比如过多的状态判断对缓存区直接轮训加锁而没有采用优化算法 唤醒器完全可以通过一定优化策略替换带。 但这个思路是没错的欢迎有想法的同志加入进来。