网站建设费如何账务处理,购物网站 建站服务,哈尔滨建站的系统,wordpress 新用户邮件关于kafka
作为开发人员kafka中最常关注的几个概念#xff0c;是topic,partition和group这几个概念。topic是主题的意思#xff0c;简单的说topic是数据主题#xff0c;这样解释好像显得很苍白#xff0c;只是做了个翻译。一图胜前言#xff0c;我们还是通过图解来说明。…关于kafka
作为开发人员kafka中最常关注的几个概念是topic,partition和group这几个概念。topic是主题的意思简单的说topic是数据主题这样解释好像显得很苍白只是做了个翻译。一图胜前言我们还是通过图解来说明。
生产者负责写数据一个topic可以有多个分区。如下图所示生产者写数据的示意。从这个图中我们可以得出一个很重要的信息分区有序即消息在某个分区上是有顺序的而全局是没有顺序的。这个意味着如果我们需要保证顺序我们在写消息时需要往同一个分区中写数据。比如我们有一个场景有一个订单。首先创建支付订单发送一个kafka消息然后实际支付发送一个kafka消息最后又想退款又发起了退款又发送了一个kafka消息。如果这三个消息在不同的分区上我们就无法保证我们按照创建支付订单——支付——退款这个顺序执行。依据分区有序的特点我们可以把跟这个订单相关的所有操作的消息都写到一个分区上比如可以通过根据订单id进行hash。 group只跟消费者有关系消费者通过group进行标识一个消费者实例表示一个消费者消费者实例可以在不同的线程中开启也可以在不同的进程中开启。这可以提升消费的并发能力。那么这是否意味着可以无限开启消费者实例以提升消费者消费消息的速度呢
这就涉及到消费的逻辑我先给出结论。接下来我们还是通过示意图的方式来做补充说明。group跟partition的数量有关系当消费者数量小于等于partition数量时每个消费者都能消费到消息。
如果一个topic有4个分区分别为p0,p1,p2,p3。现在有两个消费者组分别是groupA和groupB。一个消费者就是一个消费实例比如C0就是在一个进程中启的一个消费者实例。因为这个topic有4个分区groupA有4个消费者则每个消费者被分配到一个对应的分区上消费假如这个分组又启了一个消费者consumer1即消费者数量大于分区数量则这个消费者不会读到消息。
而groupB只有两个消费者则每一个消费者分别获取两个分区上的消息如果groupB只有一个消费者那么所有分区上的消息都只有这一个消费者获取。 开源项目kafka-go的使用
kafka-go是一个用go语言开发的kafka客户端。
生产者
我们看一个基于kafka-go实现的生产者的示例
package mainimport (contextfmtlogtimegithub.com/segmentio/kafka-go
)func main() {// 创建Kafka写入器配置writerConfig : kafka.WriterConfig{Brokers: []string{10.10.37.100:30001},Topic: my-topic,Balancer: kafka.Hash{}, //消息分区的策略这个策略是通过hash算法根据kafka.Message的key值来选择分区的}// 创建写入器writer : kafka.NewWriter(writerConfig)// 发送消息messages : []string{message 1, message 2, message 3, message 4, message 5, message 6, message 7, message 8, message 9, message 10, message 11, message 12, message 13, message 14, message 15, message 16}for _, msg : range messages {time.Sleep(1 * time.Second)if err : writer.WriteMessages(context.Background(), kafka.Message{Key: []byte(fmt.Sprintf(key-%d, time.Now().UnixNano())),Value: []byte(msg),}); err ! nil {fmt.Printf(Failed to write message: %v\n, err)} else {fmt.Printf(Message written: %s\n, msg)}}// 关闭写入器if err : writer.Close(); err ! nil {fmt.Printf(Failed to close writer: %v\n, err)}
}WriterConfig中有一个字段我们在开发过程中可能会需要关注到Balancer这个用于把消息分发到不同的分区上。这个策略可以自定义。当然kafka-go中提供几种常用的方法我以示例中的hash这个为例做简要说明我们直接看源码。生产者发送消息的逻辑主要在这个WriteMessages方法中我们直接定位到消息分发到对应分区的逻辑。
func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {//......//忽略与分区不相关的代码我们只关注和分区相关的逻辑balancer : w.balancer()for i, msg : range msgs {topic, err : w.chooseTopic(msg)if err ! nil {return err}numPartitions, err : w.partitions(ctx, topic)if err ! nil {return err}//根据msg计算把消息分发到哪个分区partition : balancer.Balance(msg, loadCachedPartitions(numPartitions)...)key : topicPartition{topic: topic,partition: int32(partition),}assignments[key] append(assignments[key], int32(i))}batches : w.batchMessages(msgs, assignments)if w.Async {return nil}.....//忽略一些细节return werr
}
如果设置了分区策略则以设置的分区策略进行分发消息。
func (w *Writer) balancer() Balancer {if w.Balancer ! nil {return w.Balancer}return w.roundRobin
}
接下来我们看一下我的示例中使用的Hash的分发消息的策略。我们看到这个方法的主要逻辑就是根据msg中key进行hash然后根据topic下总分区数来计算消息分发到对应的分区号。hasher.Write(msg.Key)
func (h *Hash) Balance(msg Message, partitions ...int) int {if msg.Key nil {return h.rr.Balance(msg, partitions...)}hasher : h.Hasherif hasher ! nil {h.lock.Lock()defer h.lock.Unlock()} else {hasher fnv1aPool.Get().(hash.Hash32)defer fnv1aPool.Put(hasher)}hasher.Reset()if _, err : hasher.Write(msg.Key); err ! nil {panic(err)}// uses same algorithm that Saramas hashPartitioner uses// note the type conversions here. if the uint32 hash code is not cast to// an int32, we do not get the same result as sarama.partition : int32(hasher.Sum32()) % int32(len(partitions))if partition 0 {partition -partition}return int(partition)
}
消费者
消费者没有太多的注意事项只是如果有多个分区想要提升并发能力可以启多个消费者。我们直接看示例。
package mainimport (contextfmtosos/signalsyscalltimegithub.com/segmentio/kafka-go
)func main() {// 创建Kafka读取器配置readerConfig : kafka.ReaderConfig{Brokers: []string{10.10.37.100:30001},Topic: my-topic,GroupID: my-group,MinBytes: 10e3, // 10KBMaxBytes: 10e6, // 10MB}// 创建读取器reader : kafka.NewReader(readerConfig)// 处理信号以便优雅地关闭sigChan : make(chan os.Signal, 1)signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)// 读取消息for {select {case sig : -sigChan:fmt.Printf(Caught signal %v: terminating\n, sig)returndefault:msg, err : reader.ReadMessage(context.Background())if err ! nil {fmt.Printf(Failed to read message: %v\n, err)continue}fmt.Printf(当前时间:%v Message on topic: %s value: %s partion:%d\n, time.Now(), msg.Topic, string(msg.Value), msg.Partition)}}
}创建topic
直接上示例。创建topic是个幂等操作。
package mainimport (netstrconvgithub.com/segmentio/kafka-go
)func main() {// to create topics when auto.create.topics.enablefalsetopic : my-topicconn, err : kafka.Dial(tcp, 10.10.37.100:30001)if err ! nil {panic(err.Error())}defer conn.Close()controller, err : conn.Controller()if err ! nil {panic(err.Error())}var controllerConn *kafka.ConncontrollerConn, err kafka.Dial(tcp, net.JoinHostPort(controller.Host, strconv.Itoa(controller.Port)))if err ! nil {panic(err.Error())}defer controllerConn.Close()topicConfigs : []kafka.TopicConfig{{Topic: topic,NumPartitions: 10,ReplicationFactor: 1,},}err controllerConn.CreateTopics(topicConfigs...)if err ! nil {panic(err.Error())}
}