彩票网站里的统计怎么做,网站建设在实际工作中的意义,网站呼叫中心 建设工期,wordpress二次开发手册chm本文介绍Redis pipeline相关的知识点及代码示例#xff0c;包括Redis客户端-服务端的一次完整的网络请求、pipeline与client执行多命令的区别、pipeline与Redis事务、pipeline的使用代码示例#xff1b;
pipeline与client执行多命令的区别
Redis是一种基于客户…本文介绍Redis pipeline相关的知识点及代码示例包括Redis客户端-服务端的一次完整的网络请求、pipeline与client执行多命令的区别、pipeline与Redis事务、pipeline的使用代码示例
pipeline与client执行多命令的区别
Redis是一种基于客户端-服务端模型以及请求/响应的TCP服务Redis客户端-服务端的一次完整的网络请求来回如下图 简化一下一次Redis请求和响应会经历如下的步骤
客户端发起一个查询/插入请求并监听socket返回通常情况都是阻塞模式等待Redis服务器的响应服务端处理命令并且返回处理结果给客户端客户端接收到服务的返回结果程序从阻塞代码处返回
Redis客户端和服务端之间通过网络连接进行数据传输这个连接可以很快loopback接口或很慢建立了一个多次跳转的网络连接但无论网络延如何延时数据包总是能从客户端到达服务器并从服务器返回数据回复客户端这个时间被称之为RTT(Round Trip Time - 往返时间)
我们可以很容易就意识到Redis在连续请求服务端时即使Redis每秒能处理100k请求但也会因为网络传输花费大量时间导致整体性能的下降
因此如果遇到大量的批处理我们可以考虑使用Redis的pipeline管道
对于pipeline技术而言对于N个命令就相当于将N个上图中的步骤合并成1个其他多余的时间开销仅作用于命令的执行这样服务请求响应的总体时间将会大大的减少
关于pipeline与client单命令的压测结果可参考Redis精通系列——Pipeline管道
值得注意的是管道技术并不是Redis特有的技术管道技术往往需要客户端-服务器的共同配合大部分工作任务其实是在客户端完成Redis在较早的版本就已经支持管道技术
如下图多个连续的incr指令使用pipeline管道后多个连续的incr指令只会花费一次网络来回开销这个开销会随着N数值的增大大幅减少网络IO开销从而提升整体服务的性能 Redis pipeline的使用注意事项
1. pipeline一次执行的命令不宜过多
结合上面redis命令完整执行流程图有个值得注意的点——可能出现我们经常说到的IO阻塞
当write操作发生并且发送缓冲区send buffer满时就会导致write操作阻塞当read操作发生并且接收缓冲区recv buffer满时就会导致read操作阻塞
上述的这两个阻塞如果出现将会导致整个请求时间变长
因此我们操作大批量指令的时候比如10k个指令我们可以合理的对指令分多次批量发送这样可以减少出现阻塞的情况也可以避免服务器响应一个过大的response包导致客户端内存负载过重
即使不发生IO阻塞pipeline每批打包的命令不能过多还有一个原因因为 pipeline 方式打包命令再发送那么 redis server 必须在处理完所有命令前先缓存起所有命令的处理结果这样就有一个缓存结果的内存的消耗
2. pipeline不保证命令执行的原子性
官方文档的一句话—— Redis::PIPELINE block is simply transmitted faster to the server, but without any guarantee of atomicity. 其实Redis的高性能设计本就不支持包含多命令的严格事务哪怕是multi/exec操作还是Lua脚本Redis只是提供了一些命令来一定程度实现事务
multi/exec操作针对命令语法错误和执行时参数错误处理是不一样的详情见我之前的文章《Redis——“事务“/Lua脚本》
Lua脚本也只能一定程度保证逻辑处理和Redis命令打包的原子性例如库存扣减 pipeline的使用代码示例
代码示例的背景是根据appId批量查询本地缓存的App信息未命中本地缓存的需要从redis中拿这多个key的value然后刷入本地缓存在从redis中拿这多个key的value在key数量较多时做个优化使用Redis的pipeline private ListAppSimpleInfoDTO querySimpleAppWithCache(ListString payAppIds) {if (CollectionUtils.isEmpty(payAppIds)) {return Lists.newArrayList();}ListAppSimpleInfoDTO appList Lists.newArrayList();try {// 从本地缓存读取APPfinal ListString tobeQry Lists.newArrayList();for (String payAppId : payAppIds) {AppSimpleInfoDTO appSimpleInfoDTO null;appSimpleInfoDTO appSimpleInfoCache.getIfPresent(payAppId);if (appSimpleInfoDTO ! null) {log.info(get_appSimpleInfoDTO_fr_appSimpleInfoCache_suc. [appSimpleInfoDTO{}], JSON.toJSONString(appSimpleInfoDTO));appList.add(appSimpleInfoDTO);} else {tobeQry.add(payAppId);}}// 未命中本地缓存则去redis查询if (CollectionUtils.isNotEmpty(tobeQry)) {final ListAppSimpleInfoDTO cacheAppSimpleInfoDTOs getAndCacheAppSimpleInfoDTOs(tobeQry);if (CollectionUtils.isNotEmpty(cacheAppSimpleInfoDTOs)) {appList.addAll(cacheAppSimpleInfoDTOs);}}return appList;} catch (Exception e) {log.error(querySimpleAppWithCache error., e);// 异常时刷全量缓存return getAndCacheAppSimpleInfoDTOs(payAppIds);}}
使用pipeline做多个key的get命令 private ListAppSimpleInfoDTO getAndCacheAppSimpleInfoDTOs(ListString payAppIds) {final SetString payAppIds2Qry new HashSet(payAppIds);final ListAppSimpleInfoDTO result Lists.newArrayList();// 先尝试从redis获取 pipeline模式JedisClusterPipeLine pipeline null;try {pipeline jedisCluster.pipelined();for (String payAppId : payAppIds) {final String appSimpleInfoKey CacheKeyUtils.getAppSimpleInfoKey(payAppId);// pipeline添加get命令pipeline.get(appSimpleInfoKey);}// pipeline执行并获取结果final ListObject allVal pipeline.syncAndReturnAll();if (CollectionUtils.isNotEmpty(allVal)) {allVal.forEach(val - {if (val ! null) {final String jsonStr String.valueOf(val);if (StringUtils.isNotBlank(jsonStr)) {Optional.ofNullable(JSON.parseObject(jsonStr, AppSimpleInfoDTO.class)).ifPresent(appSimpleInfoDTO - {result.add(appSimpleInfoDTO);appSimpleInfoCache.put(appSimpleInfoDTO.getPayAppId(), appSimpleInfoDTO);log.info(localCache_AppSimpleInfoDTO_fr_redis_suc. [simpleApp{}], JSON.toJSONString(appSimpleInfoDTO));payAppIds2Qry.remove(appSimpleInfoDTO.getPayAppId());});}}});}} catch (Exception e) {log.error(localCache_AppSimpleInfoDTOs_fr_redis_error. [payAppIds{}], JSON.toJSONString(payAppIds), e);} finally {if (pipeline ! null) {pipeline.close();}}// redis未查到的数据走RPC查询 并异步加载到redis和localCacheif (CollectionUtils.isNotEmpty(payAppIds2Qry)) {final ListString payAppIds2QryList new ArrayList(payAppIds2Qry);ListApp apps Lists.newArrayList();int index 0;while (index payAppIds2QryList.size()) {int toIndex Math.min(index max_batch, payAppIds2QryList.size());ListString payAppIds2QryTemp payAppIds2QryList.subList(index, toIndex);// 实时查APP信息接口ListApp appsTemp queryAppByPayAppIds(new ArrayList(payAppIds2QryTemp));apps.addAll(appsTemp);index max_batch;}// 异步加载到redis和localCacheif (CollectionUtils.isNotEmpty(apps)) {for (App app : apps) {AppSimpleInfoDTO simpleApp new AppSimpleInfoDTO(app.getName(), app.getPackname(), app.getCode(), app.getBigType());CompletableFuture.runAsync(() - {// redis缓存1小时jedisClusterTemplate.setex(CacheKeyUtils.getAppSimpleInfoKey(simpleApp.getPayAppId()), VivoConfigManager.getInteger(JointOperateConfigConstants.APP_SIMPLEINFO_REDIS_CACHE_TTL, JointOperateConfigConstants.APP_SIMPLEINFO_REDIS_CACHE_TTL_DEFAULT), JSON.toJSONString(simpleApp));log.info(redisCache_AppSimpleInfoDTO_suc. [simpleApp{}], JSON.toJSONString(simpleApp));// local cache 缓存5分钟appSimpleInfoCache.put(simpleApp.getPayAppId(), simpleApp);log.info(localCache_AppSimpleInfoDTO_suc. [simpleApp{}], JSON.toJSONString(simpleApp));});result.add(simpleApp);}}}// 重新排序final MapString, AppSimpleInfoDTO payAppIdMap result.stream().collect(Collectors.toMap(AppSimpleInfoDTO::getPayAppId, Function.identity(), (old, newly) - newly));result.clear();payAppIds.forEach(payAppId - Optional.ofNullable(payAppIdMap.get(payAppId)).ifPresent(result::add));return result;} 本文参考
Redis精通系列——Pipeline管道