杭州网站建设费用,seo优化设计,保网微商城官网,包装设计公司 山东使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)
gRpc协议类定义
service方法定义 service MQDataService{ rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto); rpc receiveFacebookAndroidMsg(empty)returns (stream g…使用背景: 从redis队列中发送和消费消息.(使用gRpc的流式实现的消费消息)
gRpc协议类定义
service方法定义 service MQDataService{ rpc sendFacebookAndroidMsg(google.protobuf.StringValue)returns (ResultProto); rpc receiveFacebookAndroidMsg(empty)returns (stream google.protobuf.StringValue); }
服务端写法
Overridepublic void sendFacebookAndroidMsg(StringValue request, StreamObserverResultProto responseObserver) {CacheKey cacheKey AppKey.appReport;String keycacheKey.get_keyName().replace({PLATFORM}, MqTopic.FB_TOPIC).replace({APPTYPE}, 0);RedissonFactory.pushMsg(key, request.getValue(), cacheKey.get_dbIndex(),cacheKey.get_expireSecondTime());ResultProto.Builder builder ResultProto.newBuilder();builder.setCode(ResultType.SUCCESS);responseObserver.onNext(builder.build());responseObserver.onCompleted();}Overridepublic void receiveFacebookAndroidMsg(empty request, StreamObserverStringValue responseObserver) {MQListener mqListenernew MQListener(responseObserver);try {CacheKey cacheKey AppKey.appReport;String keycacheKey.get_keyName().replace({PLATFORM}, MqTopic.FB_TOPIC).replace({APPTYPE},0);RedissonFactory.getRedis().subscribe(mqListener,key);} catch (Exception e) {}finally {responseObserver.onCompleted();}}// 消息监听响应
public class MQListener extends JedisPubSub {public MQListener(StreamObserverStringValue responseObserver){_responseObserverresponseObserver;}private StreamObserverStringValue _responseObserver;// 取得订阅的消息后的处理public void onMessage(String channel, String message) {if(!StringUtil.isNullOrEmpty(message)){StringValue.Builder builder StringValue.newBuilder();builder.setValue(message);_responseObserver.onNext(builder.build());}}// 初始化订阅时候的处理public void onSubscribe(String channel, int subscribedChannels) {...}// 取消订阅时候的处理public void onUnsubscribe(String channel, int subscribedChannels) {...}// 初始化按表达式的方式订阅时候的处理public void onPSubscribe(String pattern, int subscribedChannels) {...}// 取消按表达式的方式订阅时候的处理public void onPUnsubscribe(String pattern, int subscribedChannels) {...}// 取得按表达式的方式订阅的消息后的处理public void onPMessage(String pattern, String channel, String message) {...}
}客户端写法
public static void receiveFacebookAndroidMsg() {try {log.info(facebook android msg);// 接收消息StreamObserverStringValue responseObserver new StreamObserverStringValue() {Overridepublic void onNext(StringValue msgProto) {try {log.info(facebook android msg 接收到消息: {}, msgProto.getValue());JSONObject jsonObject JSONObject.parseObject(msgProto.getValue());...} catch (Exception e) {log.error(facebook ios msg 消费失败{}, e.getMessage());// 发给mq重新消费...}}Overridepublic void onError(Throwable throwable) {System.err.println(Error occurred: throwable.getMessage());log.info(facebook android Error occurred: {}, throwable.getMessage());}Overridepublic void onCompleted() {System.out.println(Stream completed.);log.info(facebook android Stream completed.);}};log.info(接收fb android msg 开始);ClientManager.getMqDataServiceStub().receiveFacebookAndroidMsg(empty.newBuilder().build(), responseObserver);log.info(接收fb android msg 成功);} catch (Exception e) {log.info(出错了);}}源码下载