当前位置: 首页 > news >正文

做网站好的wordpress几万条数据

做网站好的,wordpress几万条数据,公司做网站需要准备什么材料,两学一做 网站一、了解 众所周知#xff0c;redis是我们日常开发过程中使用最多的非关系型数据库#xff0c;也是消息中间件。实际上除了常用的rabbitmq、rocketmq、kafka消息队列#xff08;大家自己下去研究吧~模式都是通用的#xff09;#xff0c;我们也能使用redis实现消息队列。…一、了解 众所周知redis是我们日常开发过程中使用最多的非关系型数据库也是消息中间件。实际上除了常用的rabbitmq、rocketmq、kafka消息队列大家自己下去研究吧~模式都是通用的我们也能使用redis实现消息队列。因为其他中间件可能更适用于大型/企业级项目在咱们项目前期不需要这么多的数据redis跟我们也是高度集成的。这里就简化了技术栈。 二、常用的几种使用redis实现的消息队列方式 1、List数据结构 Redis列表是简单的字符串列表按照插入顺序排序。你可以添加一个元素到列表的头部左边或者尾部右边一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表超过40亿个元素)。 这里的列表大家可以想想为一个横着的通道假设我现在往右边插入第一条数据这个元素就会被放在最左边接着再放入第二条数据它就会在左边第二条以此类推…插入了100条数据。 假设这个时候我要取出第一条我就从最左边取就好。 这就变相实现了有序消息队列。具体实现大家自己研究 优点操作方便可以有序的取出自己插入的数据 缺点不能进行实时消费没有消费者 2、pub/sub 订阅消费模式 这就是传统的生产者-队列-消费者的模式。生产者的消息所有订阅者都能收到。 优点实现了发布订阅模式可以实时进行消费 缺点没有消息持久化在系统崩溃、宕机的时候消息会丢失 3、sorted set有序集合Redis 有序集合和集合一样也是 string 类型元素的集合,且不允许重复的成员。不同的是每一个元素都会关联一个double分数redis就是通过分数为集合中的成员进行从大到小的排列。 有序集合的成员是唯一的但是score是可以重复的。 生成消息直接往s-set中插入数据将score设置为接收到数据的13位时间戳需要使用的时候再根据score大小有序取出来就行了。 看到这里是不是大家能想到既然每条消息都带有时间那我是不是可以顺手实现延迟队列。 这里只需要将score设置为 接受消息的时间戳延迟时间 。我在使用的时候获取当天时间戳的数据这样就实现了延迟消息队列。 优点操作方便可以实现延迟队列 缺点不能实时进行消费 4、stream流 redis5.0版本以上才有 重点讲 Redis Stream 提供了消息的持久化和主备复制功能可以让任何客户端访问任何时刻的数据并且能记住每一个客户端的访问位置还能保证消息不丢失。 Redis Stream 的结构如下所示它有一个消息链表将所有加入的消息都串起来每个消息都有一个唯一的 ID 和对应的内容 每个stream流都有自己的名称它是redis的key也可以理解为队列名称。 Consumer Group 消费组使用 XGROUP CREATE 命令创建一个消费组有多个消费者(Consumer)。 last_delivered_id 游标每个消费组会有个游标 last_delivered_id任意一个消费者读取了消息都会使游标 last_delivered_id 往前移动。 pending_ids 消费者(Consumer)的状态变量作用是维护消费者的未确认的 id。 pending_ids 记录了当前已经被客户端读取的消息但是还没有 ack (Acknowledge character确认字符。 stream常用命令 XADD 定义stream流写入消息体 XADD mystream * field1 A field2 B field3 C field4 D mystream自定义流名称 *由redis生成流的id也可以自定义但是得保证自增唯一 field1-A \field2-B\field3-C 保存的消息体key-value形式-- 举例 redis XADD mystream * name Sara surname OConnor 1601372323627-0XDEL 删除消息 XADD mystream * a 1 1538561698944-0XADD mystream * b 2 1538561700640-0XADD mystream * c 3 1538561701744-0XDEL mystream 1538561700640-0 (integer) 1 127.0.0.1:6379 XRANGE mystream - 1) 1) 1538561698944-02) 1) a2) 1 2) 1) 1538561701744-02) 1) c2) 3XRANGE 获取消息队列数据 XRANGE key start end [COUNT count]key:strem流名称 start开始值- 表示最小值 end结束值 表示最大值-- 举例 redis XRANGE mystream - 2 从mystrem全部数据中取出两条数据redis XRANGE mystream - 1 从mystream倒叙取一条数据XREVRANGE 自动过滤已删除的消息 redis XADD writers * name Virginia surname Woolf 1601372731458-0 redis XADD writers * name Jane surname Austen 1601372731459-0 redis XADD writers * name Toni surname Morrison 1601372731459-1 redis XADD writers * name Agatha surname Christie 1601372731459-2 redis XADD writers * name Ngozi surname Adichie 1601372731459-3 redis XLEN writers (integer) 5 redis XREVRANGE writers - COUNT 1 1) 1) 1601372731459-32) 1) name2) Ngozi3) surname4) Adichie redisXREAD 阻塞或者非阻塞获取消息 # 从 Stream 头部读取两条消息XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) mystream2) 1) 1) 1526984818136-02) 1) duration2) 15323) event-id4) 55) user-id6) 77828132) 1) 1526999352406-02) 1) duration2) 8123) event-id4) 95) user-id6) 388234 2) 1) writers2) 1) 1) 1526985676425-02) 1) name2) Virginia3) surname4) Woolf2) 1) 1526985685298-02) 1) name2) Jane3) surname4) Austen count 数量 milliseconds 可选阻塞毫秒数没有设置就是非阻塞模式 key 队列名 id 消息 IDXGROUP CREATE 创建消费者组 XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]key 队列名称如果不存在就创建 groupname 组名。 $ 表示从尾部开始消费只接受新消息当前 Stream 消息会全部忽略从头开始消费: XGROUP CREATE mystream consumer-group-name 0-0 从尾部开始消费: XGROUP CREATE mystream consumer-group-name $以上就是常用的steam流的命令大家下来自己测试练习。 三、springboot整合redis stream流 java中提供了连接redis的客户端jedis和lettuce、redistemplateRedisTemplate 是 Spring Data Redis 提供的一个高级抽象层封装了 Jedis 或 Lettuce 等底层客户端。 它提供了丰富的功能如序列化、事务支持、键过期等。这里主要讲主流的redistemplate整合大家以后能直接使用。 实时消费 实时消费顾名思义生产者发送消息消费者立马进行消费逻辑处理。 RedisStreamUtils工具类方便后续进行stream操作没根据自己项目需求来定义 Configuration SuppressWarnings(all) public class RedisStreamUtils {Resourceprivate RedisTemplateString, Object redisTemplate;/*** 创建消费组** param streamKey 键名称* param group 组名称* return {link String}*/public String createGroup(String streamKey, String group) {return redisTemplate.opsForStream().createGroup(streamKey, group);}/*** 获取消费者信息** param streamKey 键名称* param group 组名称* return {link StreamInfo.XInfoConsumers}*/public StreamInfo.XInfoConsumers queryConsumers(String streamKey, String group) {return redisTemplate.opsForStream().consumers(streamKey, group);}/*** 查询组信息** param streamKey 键名称* return*/public StreamInfo.XInfoGroups queryGroups(String streamKey) {return redisTemplate.opsForStream().groups(streamKey);}// 添加Map消息public String addMap(String streamKey, MapString, Object value) {return Objects.requireNonNull(redisTemplate.opsForStream().add(streamKey, value)).getValue();}// 读取消息public ListMapRecordString, Object, Object read(String streamKey) {return redisTemplate.opsForStream().read(StreamOffset.fromStart(streamKey));}// 确认消费public Long ack(String streamKey, String group, String... recordIds) {return redisTemplate.opsForStream().acknowledge(streamKey, group, recordIds);}// 删除消息。当一个节点的所有消息都被删除那么该节点会自动销毁public Long del(String key, String... recordIds) {return redisTemplate.opsForStream().delete(key, recordIds);}// 判断是否存在keypublic boolean hasKey(String key) {Boolean aBoolean redisTemplate.hasKey(key);return aBoolean ! null aBoolean;}}RedisConfig配置文件 Configuration Slf4j RequiredArgsConstructor public class RedisConfig {private final RedisStreamUtils redisStreamUtil;private final Environment environment;//消费者处理消息配置Beanpublic Subscription subscription(RedisConnectionFactory factory) {AtomicInteger index new AtomicInteger(1);//获取系统处理器数量 创建线程池开启守护线程int processors Runtime.getRuntime().availableProcessors();ThreadPoolExecutor executor new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,new LinkedBlockingDeque(), r - {Thread thread new Thread(r);thread.setName(async-stream-consumer- index.getAndIncrement());thread.setDaemon(true);return thread;});//流消息监听容器参数设置 StreamMessageListenerContainer.StreamMessageListenerContainerOptionsString, MapRecordString, String, String options StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 一次最多获取多少条消息.batchSize(5)//执行线程池.executor(executor)//阻塞消息读取延迟消息.pollTimeout(Duration.ofSeconds(1))//异常处理.errorHandler(throwable - {log.error([MQ handler exception], throwable);throwable.printStackTrace();}).build();//通过redis连接工厂创建流消息监听容器var listenerContainer StreamMessageListenerContainer.create(factory, options);//初始化流和消费者处理配置//初始化流和消费者处理配置Subscription subscription initStreamAndConsumer(listenerContainer);//开启监听容器listenerContainer.start();return subscription;}private Subscription initStreamAndConsumer(StreamMessageListenerContainerString, MapRecordString, String, String listenerContainer){//↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓//这一部分可以不用配置可以根据自己的实际情况配置//该key和group可根据需求自定义配置String streamName mystream;String groupname mygroup;initStream(streamName, groupname);// 手动ask消息//消费者处理完消息之后会进行确认这里有一个pending状态会变成已处理Subscription subscription listenerContainer.receive(Consumer.from(groupname, zhuyazhou),StreamOffset.create(streamName, ReadOffset.lastConsumed()), new RedisConsumer(redisStreamUtil));// 自动ask消息/* Subscription subscription listenerContainer.receiveAutoAck(Consumer.from(redisMqGroup.getName(), redisMqGroup.getConsumers()[0]),StreamOffset.create(streamName, ReadOffset.lastConsumed()), new ReportReadMqListener());*///这一部分可以不用配置可以根据自己的实际情况配置//↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑return subscription;}private void initStream(String key, String group) {boolean hasKey redisStreamUtil.hasKey(key);if (!hasKey) {MapString, Object map new HashMap(1);map.put(field, value);//创建主题String result redisStreamUtil.addMap(key, map);//创建消费组redisStreamUtil.createGroup(key, group);//将初始化的值删除掉redisStreamUtil.del(key, result);log.info(stream:{}-group:{} initialize success, key, group);}} }大家这里可以想一想这种写法是不是符合生产过程中的创建队列/消费者的逻辑是不是不方便。能不能在我需要的时候直接调用方法去创建假设现在我新增了一个业务需求需要用不同的业务逻辑去处理而且我希望定制不同的消费者应答模式这个时候就需要一个通用方法去实现这里我是这样做的。还是在工具类中 创建redis流消息监听容器 主要参数 定义线程池、一次最大获取消息数、超时重新获取、异常处理 Beanpublic StreamMessageListenerContainerString, MapRecordString, String, String streamMessageListenerContainer(RedisConnectionFactory factory) {log.info(redis ip:{},port:{},environment.getProperty(spring.data.redis.host),environment.getProperty(spring.data.redis.port));AtomicInteger index new AtomicInteger(1);int processors Runtime.getRuntime().availableProcessors();ThreadPoolExecutor executor new ThreadPoolExecutor(processors, processors, 0, TimeUnit.SECONDS,new LinkedBlockingDeque(), r - {Thread thread new Thread(r);thread.setName(async-stream-consumer- index.getAndIncrement());thread.setDaemon(true);return thread;});StreamMessageListenerContainer.StreamMessageListenerContainerOptionsString, MapRecordString, String, String options StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()// 一次最多获取多少条消息.batchSize(5).executor(executor).pollTimeout(Duration.ofSeconds(3)).errorHandler(throwable - {log.error([MQ handler exception], throwable);throwable.printStackTrace();}).build();return StreamMessageListenerContainer.create(factory, options);}//业务需求调用此方法即可 public void addNewStreamAndSubscribe(String streamName, String groupName, String consumerId, StreamListener listener) {initStream(streamName, groupName);subscribeToStream(streamName, groupName, Consumer.from(groupName, consumerId), listener);}public void addNewStreamAndSubscribe(String streamName, String groupName, String consumerId, RedisConsumer listener,MapString,Object recodMap) {initStream(streamName, groupName);subscribeToStream(streamName, groupName, Consumer.from(groupName, consumerId), listener);addMap(streamName, recodMap);}private void subscribeToStream(String streamName, String groupName, Consumer consumer, StreamListener listener) {StreamMessageListenerContainerString, MapRecordString, String, String container streamMessageListenerContainer(redisConnectionFactory);Subscription subscription container.receive(Consumer.from(groupName, consumer.getName()),StreamOffset.create(streamName, ReadOffset.lastConsumed()), listener);//开始消息容器监听container.start();log.info(Subscribed to stream: {} with group: {} and consumer: {}, streamName, groupName, consumer);}streamMessageListenerContainer中的 .batchSize(1) 设置需要着重说一下。意思是在消费者在监听到数据的时候一次从redis中取出的多少条数据假设我设置1就意味着我的监听器会redis中取出1条未消费的数据随后进入消费者逻辑处理完毕之后返回继续由监听器读取1条数据在进入消费者逻辑这个值设置得越小消息处理数据越快但是也会增加redis链接的资源。 较大的 batchSize 可以减少与 Redis 服务器的交互次数降低网络通信开销提高处理效率。 较小的 batchSize 适用于需要低延迟处理的场景但会增加网络通信开销和 CPU 使用率。 RedisConsumer消费者 Component(RedisConsumer) RequiredArgsConstructor Slf4j public class RedisConsumer implements StreamListenerString, MapRecordString,String,String {private final RedisStreamUtils redisStreamUtils;Overridepublic void onMessage(MapRecordString, String, String message) {try {log.info(RedisConsumer1获取到了消息{},message);String streamKey message.getStream();RecordId recordId message.getId();MapString, String value message.getValue();//获取这个流下 所有的消费者组StreamInfo.XInfoGroups xInfoGroups redisStreamUtils.queryGroups(streamKey);//处理逻辑//↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓log.info(【streamKey】 {}【recordId】 {},【msg】 {},streamKey,recordId, value);//手动确认ack消息并删除已处理的消息//我这里使用手动xInfoGroups.forEach(xInfoGroup - redisStreamUtils.ack(streamKey, xInfoGroup.groupName(), recordId.getValue()));//自动确认消息 ---------自己下来研究//↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑//根据业务场景来看是否需要删除消息 // redisStreamUtils.del(streamKey, recordId.getValue());} catch (Exception e) {throw new ServiceException(消费异常);}} }RedisConsumer2消费者 Component(RedisConsumer2) RequiredArgsConstructor Slf4j public class RedisConsumer2 implements StreamListenerString, MapRecordString,String,String {private final RedisStreamUtils redisStreamUtils;Overridepublic void onMessage(MapRecordString, String, String message) {try {log.info(RedisConsumer2获取到了消息{},message);String streamKey message.getStream();RecordId recordId message.getId();MapString, String value message.getValue();//获取这个流下 所有的消费者组StreamInfo.XInfoGroups xInfoGroups redisStreamUtils.queryGroups(streamKey);//处理逻辑//↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓log.info(【streamKey】 {}【recordId】 {},【msg】 {},streamKey,recordId, value);//↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑↑//手动确认ack消息并删除已处理的消息xInfoGroups.forEach(xInfoGroup - redisStreamUtils.ack(streamKey, xInfoGroup.groupName(), recordId.getValue())); // redisStreamUtils.del(streamKey, recordId.getValue());} catch (Exception e) {throw new ServiceException(消费异常);}} }RedisStreamcontroller模拟测试 RequestMapping(value /redisStream) RestController RequiredArgsConstructor Slf4j SuppressWarnings(all) public class RedisStreamController {private final RedisStreamUtils redisStreamUtils;private final RedisConsumer redisConsumer;private final RedisTemplate redisTemplate;private final ApplicationContext applicationContext;GetMapping(value /addNewStreamAndSubscribe)public ResultVO addNewStreamAndSubscribe(RequestParam(streamKey) String streamKey,RequestParam(groupName) String groupName,RequestParam(consumer)String consumer,RequestParam(consumerClass) String consumerClass){try {// 获取实现类的实例StreamListener consumerInstance (StreamListener) applicationContext.getBean(consumerClass);redisStreamUtils.addNewStreamAndSubscribe(streamKey, groupName, consumer,consumerInstance );} catch (Exception e) {throw new RuntimeException(e);}return ResultVO.success();}GetMapping(value /addMap)public ResultVO addMap(RequestParam(streamKey) String streamKey,RequestParam(key)String key,RequestParam(value)String value) {HashMapString, Object objectObjectHashMap new HashMap();objectObjectHashMap.put(key,value);redisStreamUtils.addMap(streamKey,objectObjectHashMap);return ResultVO.success();}GetMapping(value /getGroup)public ResultVO getGroup(RequestParam(streamKey) String streamKey,RequestParam(groupName) String groupName) {boolean b redisStreamUtils.hasKey(streamKey);if(b){StreamInfo.XInfoGroups xInfoGroups redisStreamUtils.queryGroups(streamKey);ListObject list new ArrayList();for (StreamInfo.XInfoGroup xInfoGroup : xInfoGroups) {StreamInfo.XInfoConsumers xInfoConsumers null;if(StrUtil.isNotEmpty(groupName)){xInfoConsumers redisStreamUtils.queryConsumers(streamKey, groupName);for (StreamInfo.XInfoConsumer xInfoConsumer : xInfoConsumers) {log.info(group:{},pending:{},consumerCount:{},consumerName:{},lastDeliveryId:{},xInfoGroup.groupName(),xInfoGroup.pendingCount(),xInfoGroup.consumerCount(),xInfoConsumer.consumerName(),xInfoGroup.lastDeliveredId());}}}}else{log.info(streamKey不存在{},streamKey);return ResultVO.error(streamKey不存在);}return ResultVO.success();}GetMapping(value /delStream)public ResultVO delStream(RequestParam(streamKey) String streamKey){redisTemplate.delete(streamKey);return ResultVO.success();}GetMapping(value /readMsg)public ResultVO readMsg(RequestParam(streamKey) String streamKey,RequestParam(groupName) String groupName,RequestParam(consumer) String consumer){// 读取消息每次读取最多 5 条List read redisTemplate.opsForStream().read(Consumer.from(groupName, consumer),StreamReadOptions.empty().count(10).block(Duration.ofSeconds(1)),StreamOffset.create(streamKey, ReadOffset.lastConsumed()));return ResultVO.success(JSON.toJSONString(read));}项目启动 调用/addNewStreamAndSubscribe接口 创建流、监听容器消费者绑定流消费者逻辑处理类接收生产者消息方式最新、偏移量开启消息容器监听 调用/addMap接口发送消息 如果只有一个消费者那么当消费者出现异常的时候直到服务恢复会从上一次消费的数据开始进行消费。 假设现在消费者组有两个消费者都绑定了同一个消息流这个时候发送消息就是轮询访问。 RedisConsumer1获取到了消息 RedisConsumer2获取到了消息 RedisConsumer1获取到了消息 RedisConsumer2获取到了消息 … 如果consumer1出现了异常这个时候consumer2会正常消费所有的数据。 stream本身就支持持久化数据也是dbs和aof两种。不用担心数据丢失。
http://www.w-s-a.com/news/990655/

相关文章:

  • 阿里云的网站建设方案织梦和wordpress哪个安全
  • 聊城网站建设公司电话wordpress怎么重新配置文件
  • 创业如何进行网站建设泰州公司注册
  • 免费网站建设培训学校手机百度高级搜索入口在哪里
  • 建站经验安徽六安发现一例新冠阳性检测者
  • 滨州内做网站系统的公司汕头网络营销公司
  • 苏州制作网站的公司哪家好wordpress google搜索
  • c语言做项目网站wordpress博客被书为什么还
  • 企业建站用什么系统网站建设补充协议模板
  • 常州网站关键字优化淘客网站怎么做排名
  • 全flash网站制作教程网站做进一步优化
  • 建设网站步骤是如何做自媒体和网站签约赚点击
  • 网站建设的闪光点网站 备案 拍照
  • 那些企业需要做网站九洲建设集团网站
  • 中山企业做网站昆明做网站价格
  • wordpress 新网站 代码网站可以做系统还原吗
  • 百度给做网站公司餐饮设计装饰公司
  • 专门卖医疗器械的网站网站建设方案一份
  • 吉林省建设安全监督站网站wordpress 4.7.5下载
  • 网页制作视频的网站建设营销策划公司
  • 玉雕网站建设八点品牌设计公司招聘
  • 服务器可以自己的网站吗flash 网站 源码
  • 湖南做网站 搜搜磐石网络网站注册收入
  • 北京软件网站开发装修设计培训机构
  • 哪个网站能帮助做路书网站建设的技巧
  • 上海网站备案在哪里在国外怎么做网站
  • 做网站得花多钱乡村振兴网站建设
  • 站设计培训课程wordpress自动回复
  • 上海闵行区 网站建设永久免费crm软件下载
  • 天津营销网站建设公司排名台州网站排名公司