当前位置: 首页 > news >正文

彩票网站平台专业的购物网站建设

彩票网站平台,专业的购物网站建设,Pelican wordpress,事件营销怎么做今日已办 Watermill Handler 将 4 个阶段的逻辑处理定义为 Handler 测试发现#xff0c;添加的 handler 会被覆盖掉#xff0c;故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。 参考https://watermill.io/docs/messages-router/实现不同topic添加的 handler 会被覆盖掉故考虑添加为 middleware 且 4 个阶段的处理逻辑针对不同 topic 是相同的。 参考https://watermill.io/docs/messages-router/实现不同topic不同事件走不同的Handler处理逻辑相同处理逻辑则可以使用MiddleWare(https://watermill.io/docs/middlewares/) Middleware ProfileCtx实现 context.Context 接口 // Package consumer // Author xzx 2023/8/11 18:53:00 package consumerimport (contextgithub.com/Shopify/saramagithub.com/ThreeDotsLabs/watermillgithub.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafkagithub.com/ThreeDotsLabs/watermill/messagegithub.com/ThreeDotsLabs/watermill/message/router/middlewaregithub.com/ThreeDotsLabs/watermill/message/router/plugingo.uber.org/zapprofile/internal/configprofile/internal/logprofile/internal/schematime )// ProfileContext // Description: // Author xzx 2023-08-11 22:21:41 type ProfileContext struct {// Properties that can be called by inherited subclassesStatus intCtx context.ContextRouter *message.RouterEvent schema.EventAppID string // API 上报FetchScenario string // API 上报 }// NewProfileContext // Description // Author xzx 2023-08-11 22:49:00 // Return *ProfileContext func NewProfileContext() *ProfileContext {profileCtx : ProfileContext{Ctx: context.Background(),}profileCtx.init()return profileCtx }// init // Description 初始化 // Author xzx 2023-08-11 22:22:01 func (profileCtx *ProfileContext) init() {logger : watermill.NewStdLogger(false, false)saramaSubscriberConfig : kafka.DefaultSaramaSubscriberConfig()saramaSubscriberConfig.Consumer.Offsets.Initial sarama.OffsetOldestsubscriber, err : kafka.NewSubscriber(kafka.SubscriberConfig{Brokers: []string{config.Profile.GetString(kafka.bootstrap)},Unmarshaler: kafka.DefaultMarshaler{},OverwriteSaramaConfig: saramaSubscriberConfig,ConsumerGroup: config.Profile.GetString(kafka.group),},logger,)if err ! nil {log.Logger.Error(creates a new Kafka Subscriber error, zap.Error(err))}router, err : message.NewRouter(message.RouterConfig{}, logger)if err ! nil {log.Logger.Error(creates a new Router with given configuration error, zap.Error(err))}router.AddPlugin(plugin.SignalsHandler)router.AddMiddleware(middleware.CorrelationID,middleware.Retry{MaxRetries: 3,InitialInterval: time.Millisecond * 100,Logger: logger,}.Middleware,middleware.Recoverer,)topic : to_analyzer__0.PERF_CRASHrouter.AddNoPublisherHandler(WriteKafka, topic, subscriber, profileCtx.WriteKafka).AddMiddleware(profileCtx.UnpackKafkaMessage,profileCtx.InitPerformanceEvent,profileCtx.AnalyzeEvent,)profileCtx.Router router }// Run // Description // Author xzx 2023-08-12 13:52:53 func (profileCtx *ProfileContext) Run() {// router.Run contains defer cancel()if err : profileCtx.Router.Run(profileCtx.Ctx); err ! nil {log.Logger.Error(runs all plugins and handlers and starts subscribing to provided topics error, zap.Error(err))} }func (profileCtx *ProfileContext) Done() -chan struct{} {return profileCtx.Ctx.Done() }func (profileCtx *ProfileContext) Err() error {return profileCtx.Ctx.Err() }func (profileCtx *ProfileContext) Deadline() (deadline time.Time, ok bool) {return profileCtx.Ctx.Deadline() }func (profileCtx *ProfileContext) Value(key any) any {return profileCtx.Ctx.Value(key) }【测试】前 3 个阶段处理为 Middleware最后一个阶段为 Handler // Package consumer // Author xzx 2023/8/12 10:01:00 package consumerimport (encoding/jsongithub.com/ThreeDotsLabs/watermill/messagegithub.com/segmentio/kafka-gogo.uber.org/zapprofile/internal/connectorprofile/internal/logprofile/internal/schema/performanceprofile/internal/state )// UnpackKafkaMessage // Description // Author xzx 2023-08-12 12:27:30 // Param h // Return message.HandlerFunc func (profileCtx *ProfileContext) UnpackKafkaMessage(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {// 反序列化存入通用结构体if contextErr : json.Unmarshal(message.Payload, profileCtx.Event); contextErr ! nil {profileCtx.Status state.StatusUnmarshalErrorreturn h(message)}log.Logger.Info([UnpackKafkaItem] unpack kafka item success, zap.Any(event, profileCtx.Event))message.SetContext(profileCtx)return h(message)} }// InitPerformanceEvent // Description // Author xzx 2023-08-12 12:27:35 // Param h // Return message.HandlerFunc func (profileCtx *ProfileContext) InitPerformanceEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx message.Context().(*ProfileContext)event, contextErr : performance.EventFactory(profileCtx.Event.Category, profileCtx.Event.Dimensions, profileCtx.Event.Values)if contextErr ! nil {profileCtx.Status state.StatusEventFactoryErrorreturn h(message)}log.Logger.Info([InitPerformanceEvent] init performance event success, zap.Any(event, profileCtx.Event))profileCtx.Event.ProfileData eventmessage.SetContext(profileCtx)return h(message)} }// AnalyzeEvent // Description // Author xzx 2023-08-12 12:27:38 // Param h // Return message.HandlerFunc func (profileCtx *ProfileContext) AnalyzeEvent(h message.HandlerFunc) message.HandlerFunc {return func(message *message.Message) ([]*message.Message, error) {profileCtx message.Context().(*ProfileContext)contextErr : profileCtx.Event.ProfileData.Analyze()if contextErr ! nil {profileCtx.Status state.StatusAnalyzeErrorreturn h(message)}log.Logger.Info([AnalyzeEvent] analyze event success, zap.Any(event, profileCtx.Event))// clear dimensions and valuesprofileCtx.Event.Dimensions nilprofileCtx.Event.Values nilmessage.SetContext(profileCtx)return h(message)} }// WriteKafka // Description // Author xzx 2023-08-11 22:30:47 // Param msg // Return contextErr func (profileCtx *ProfileContext) WriteKafka(msg *message.Message) (contextErr error) {profileCtx msg.Context().(*ProfileContext)toWriteBytes, contextErr : json.Marshal(profileCtx.Event)if contextErr ! nil {profileCtx.Status state.StatusUnmarshalErrorreturn}topic : connector.GetTopic(profileCtx.Event.Category)contextErr connector.GetProducer().WriteMessages(profileCtx.Ctx, kafka.Message{Topic: topic,Key: []byte(profileCtx.Event.ID),Value: toWriteBytes,})if contextErr ! nil {profileCtx.Status state.StatusWriteKafkaErrorreturn}log.Logger.Info([WriteKafka] write kafka success, zap.String(topic, topic), zap.String(id, profileCtx.Event.ID), zap.String(msg, string(toWriteBytes)))return }可以实现正常的效果 Router 目前的 topic 是固定写死的要考虑正则表达式将不同 topic 的共有逻辑抽出为 Middleware特定逻辑抽出为 Handler消息处理似乎不是并发的 pub/sub kafka-go custom pub/sub Kafka Pub/Sub for the Watermill project, based on Shopify’s Sarama qiulin/watermill-kafkago: Kafka Pub/Sub for the Watermill project, based on segmentio/kafka-go (github.com) 明日待办 组内开会继续开发需求
http://www.w-s-a.com/news/265745/

相关文章:

  • 凡科网免费建站步骤及视频网页设计基础教程第二版课后答案
  • 建设一个旅游网站毕业设计企业网站要更新文章吗
  • 做网站需要简介中山网站设计公司
  • 网站怎么做导航栏微信公众号官网登录
  • 1_ 掌握网站开发的基本流程 要求:熟悉网站开发与设计的基本流程.电子商城网站开发
  • 百度网站怎么建设河北省工程造价信息网官网
  • 阿里云网站模板网页设计的合适尺寸是多少
  • 做小程序和做网站哪个好让别人做网站推广需要多少钱
  • 做外贸的几个网站查询网域名解析
  • 酒泉如何做百度的网站seo研究中心好客站
  • 网站设计建设平台户县做网站
  • 一元云购网站开发wordpress博客空间
  • 深圳高端网站建设公司排名如何搭建局域网服务器
  • 照片管理网站模板高端网站开发哪家好
  • 黄冈网站制作wordpress为什么不能显示域名
  • 做网站设计怎么进企业电子商务网站建设与管理教材
  • 设计广告公司网站建设网站开发技术选择
  • 个人网站教程个人网站有必要备案吗
  • 网站建设推广好做吗黄浦企业网站制作
  • 怎样做28网站代理中山网站建设方案外包
  • vs2010做网站前台搭建小网站
  • 做视频必须知道的一些网站wordpress 标签鼠标滑过_弹出的title 代码美化
  • 怎么做室内设计公司网站电商运营培训视频课程
  • 昆明网站策划天津市建筑信息平台
  • 三亚放心游app官方网站wordpress 个人主题
  • 做简单的网站备案平台新增网站
  • 中国建设网站银行网络营销推广方案整合
  • 网站域名列表dede网站白屏
  • 站长工具一区品牌建设卓有成效
  • 电子商务网站建设案例wordpress批量编辑