当前位置: 首页 > news >正文

网站建设费如何账务处理购物网站 建站服务

网站建设费如何账务处理,购物网站 建站服务,哈尔滨建站的系统,wordpress 新用户邮件关于kafka 作为开发人员kafka中最常关注的几个概念#xff0c;是topic,partition和group这几个概念。topic是主题的意思#xff0c;简单的说topic是数据主题#xff0c;这样解释好像显得很苍白#xff0c;只是做了个翻译。一图胜前言#xff0c;我们还是通过图解来说明。…关于kafka 作为开发人员kafka中最常关注的几个概念是topic,partition和group这几个概念。topic是主题的意思简单的说topic是数据主题这样解释好像显得很苍白只是做了个翻译。一图胜前言我们还是通过图解来说明。 生产者负责写数据一个topic可以有多个分区。如下图所示生产者写数据的示意。从这个图中我们可以得出一个很重要的信息分区有序即消息在某个分区上是有顺序的而全局是没有顺序的。这个意味着如果我们需要保证顺序我们在写消息时需要往同一个分区中写数据。比如我们有一个场景有一个订单。首先创建支付订单发送一个kafka消息然后实际支付发送一个kafka消息最后又想退款又发起了退款又发送了一个kafka消息。如果这三个消息在不同的分区上我们就无法保证我们按照创建支付订单——支付——退款这个顺序执行。依据分区有序的特点我们可以把跟这个订单相关的所有操作的消息都写到一个分区上比如可以通过根据订单id进行hash。 group只跟消费者有关系消费者通过group进行标识一个消费者实例表示一个消费者消费者实例可以在不同的线程中开启也可以在不同的进程中开启。这可以提升消费的并发能力。那么这是否意味着可以无限开启消费者实例以提升消费者消费消息的速度呢 这就涉及到消费的逻辑我先给出结论。接下来我们还是通过示意图的方式来做补充说明。group跟partition的数量有关系当消费者数量小于等于partition数量时每个消费者都能消费到消息。 如果一个topic有4个分区分别为p0,p1,p2,p3。现在有两个消费者组分别是groupA和groupB。一个消费者就是一个消费实例比如C0就是在一个进程中启的一个消费者实例。因为这个topic有4个分区groupA有4个消费者则每个消费者被分配到一个对应的分区上消费假如这个分组又启了一个消费者consumer1即消费者数量大于分区数量则这个消费者不会读到消息。 而groupB只有两个消费者则每一个消费者分别获取两个分区上的消息如果groupB只有一个消费者那么所有分区上的消息都只有这一个消费者获取。 开源项目kafka-go的使用 kafka-go是一个用go语言开发的kafka客户端。 生产者 我们看一个基于kafka-go实现的生产者的示例 package mainimport (contextfmtlogtimegithub.com/segmentio/kafka-go )func main() {// 创建Kafka写入器配置writerConfig : kafka.WriterConfig{Brokers: []string{10.10.37.100:30001},Topic: my-topic,Balancer: kafka.Hash{}, //消息分区的策略这个策略是通过hash算法根据kafka.Message的key值来选择分区的}// 创建写入器writer : kafka.NewWriter(writerConfig)// 发送消息messages : []string{message 1, message 2, message 3, message 4, message 5, message 6, message 7, message 8, message 9, message 10, message 11, message 12, message 13, message 14, message 15, message 16}for _, msg : range messages {time.Sleep(1 * time.Second)if err : writer.WriteMessages(context.Background(), kafka.Message{Key: []byte(fmt.Sprintf(key-%d, time.Now().UnixNano())),Value: []byte(msg),}); err ! nil {fmt.Printf(Failed to write message: %v\n, err)} else {fmt.Printf(Message written: %s\n, msg)}}// 关闭写入器if err : writer.Close(); err ! nil {fmt.Printf(Failed to close writer: %v\n, err)} }WriterConfig中有一个字段我们在开发过程中可能会需要关注到Balancer这个用于把消息分发到不同的分区上。这个策略可以自定义。当然kafka-go中提供几种常用的方法我以示例中的hash这个为例做简要说明我们直接看源码。生产者发送消息的逻辑主要在这个WriteMessages方法中我们直接定位到消息分发到对应分区的逻辑。 func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {//......//忽略与分区不相关的代码我们只关注和分区相关的逻辑balancer : w.balancer()for i, msg : range msgs {topic, err : w.chooseTopic(msg)if err ! nil {return err}numPartitions, err : w.partitions(ctx, topic)if err ! nil {return err}//根据msg计算把消息分发到哪个分区partition : balancer.Balance(msg, loadCachedPartitions(numPartitions)...)key : topicPartition{topic: topic,partition: int32(partition),}assignments[key] append(assignments[key], int32(i))}batches : w.batchMessages(msgs, assignments)if w.Async {return nil}.....//忽略一些细节return werr } 如果设置了分区策略则以设置的分区策略进行分发消息。  func (w *Writer) balancer() Balancer {if w.Balancer ! nil {return w.Balancer}return w.roundRobin } 接下来我们看一下我的示例中使用的Hash的分发消息的策略。我们看到这个方法的主要逻辑就是根据msg中key进行hash然后根据topic下总分区数来计算消息分发到对应的分区号。hasher.Write(msg.Key) func (h *Hash) Balance(msg Message, partitions ...int) int {if msg.Key nil {return h.rr.Balance(msg, partitions...)}hasher : h.Hasherif hasher ! nil {h.lock.Lock()defer h.lock.Unlock()} else {hasher fnv1aPool.Get().(hash.Hash32)defer fnv1aPool.Put(hasher)}hasher.Reset()if _, err : hasher.Write(msg.Key); err ! nil {panic(err)}// uses same algorithm that Saramas hashPartitioner uses// note the type conversions here. if the uint32 hash code is not cast to// an int32, we do not get the same result as sarama.partition : int32(hasher.Sum32()) % int32(len(partitions))if partition 0 {partition -partition}return int(partition) } 消费者 消费者没有太多的注意事项只是如果有多个分区想要提升并发能力可以启多个消费者。我们直接看示例。 package mainimport (contextfmtosos/signalsyscalltimegithub.com/segmentio/kafka-go )func main() {// 创建Kafka读取器配置readerConfig : kafka.ReaderConfig{Brokers: []string{10.10.37.100:30001},Topic: my-topic,GroupID: my-group,MinBytes: 10e3, // 10KBMaxBytes: 10e6, // 10MB}// 创建读取器reader : kafka.NewReader(readerConfig)// 处理信号以便优雅地关闭sigChan : make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)// 读取消息for {select {case sig : -sigChan:fmt.Printf(Caught signal %v: terminating\n, sig)returndefault:msg, err : reader.ReadMessage(context.Background())if err ! nil {fmt.Printf(Failed to read message: %v\n, err)continue}fmt.Printf(当前时间:%v Message on topic: %s value: %s partion:%d\n, time.Now(), msg.Topic, string(msg.Value), msg.Partition)}} }创建topic 直接上示例。创建topic是个幂等操作。 package mainimport (netstrconvgithub.com/segmentio/kafka-go )func main() {// to create topics when auto.create.topics.enablefalsetopic : my-topicconn, err : kafka.Dial(tcp, 10.10.37.100:30001)if err ! nil {panic(err.Error())}defer conn.Close()controller, err : conn.Controller()if err ! nil {panic(err.Error())}var controllerConn *kafka.ConncontrollerConn, err kafka.Dial(tcp, net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err ! nil {panic(err.Error())}defer controllerConn.Close()topicConfigs : []kafka.TopicConfig{{Topic: topic,NumPartitions: 10,ReplicationFactor: 1,},}err controllerConn.CreateTopics(topicConfigs...)if err ! nil {panic(err.Error())} }
http://www.w-s-a.com/news/785047/

相关文章:

  • 衡水网站托管企业二级网站怎么做
  • 丹阳网站建设公司旅游类网站开发开题报告范文
  • 地方门户网站建设苏州网站优化建设
  • 谁用fun域名做网站了网络营销的三种方式
  • 织梦网站上传天津网站建设电话咨询
  • 论坛网站搭建深圳网
  • 天津建立网站营销设计window7用jsp做的网站要什么工具
  • 英文网站wordpress所有图片
  • 我做的网站怎么打开很慢网络营销典型企业
  • 新增备案网站python3网站开发
  • 诊断网站seo现状的方法与通信工程专业做项目的网站
  • 南京 微网站 建站alexa排名查询统计
  • 天津网站建设企业系统wordpress已发布不显示不出来
  • 大连网站前端制作公司局域网视频网站建设
  • 张家界建设局网站电话wordpress网站怎么建
  • 淄博网站建设有实力装修培训机构哪家最好
  • 彩票网站建设seo优化师是什么
  • 怎么做英文网站网站建设基本费用
  • dede网站名称不能保存wordpress运费设置
  • 出口网站制作好一点的网站建设
  • 在小说网站做编辑怎么找韶关市建设局网站
  • 网站策划怎么做内容旅游型网站建设
  • 东莞百度网站推广ppt模板免费下载的网站
  • 网站建设项目管理基本要求网站空间到期影响
  • 做奖杯的企业网站谁有推荐的网址
  • wordpress能做企业站吗wordpress收发邮件
  • 电子产品网站建设策划方案腾讯企业邮箱注册申请免费
  • 哪些网站可以免费做代码自己电脑做网站服务器广域网访问
  • 高端网站设计青海省教育厅门户网站学籍查询
  • 长春网站优化公司网站制作400哪家好