山东农业大学学风建设专题网站,如何做kindle电子书下载网站,深圳网站设计服,软文范例100例软件效果前瞻 ~ 鉴于并没有在网上找到比较好的linux平台的kafka可视化工具#xff0c;今天为大家介绍一下自己开发的在 Linux 平台上使用的可视化工具KafkaQ
虽然简陋#xff0c;主要可以实现下面的这些功能#xff1a;
1#xff09;查看当前topic的分片数量和副本数量
…软件效果前瞻 ~ 鉴于并没有在网上找到比较好的linux平台的kafka可视化工具今天为大家介绍一下自己开发的在 Linux 平台上使用的可视化工具KafkaQ
虽然简陋主要可以实现下面的这些功能
1查看当前topic的分片数量和副本数量
2查看当前topic下面每个分片的最大offset
3查看当前topic某个分片下面指定offset范围的数据
4搜索当前topic指定关键词的message KafkaQ分为普通版本和搜索版本
* 普通版本支持上述3种查询
* 搜索版本支持上述3种查询之外增加关键词搜索即在分片中搜索指定关键词的message
一、普通版 KafkaQ.sh 使用方法
Usage: KafkaQ.sh --topictopic [--partitionpartition] [--offsetoffset] [--limitlimit]--topic 话题名称
--partition 分片索引可选
--offset 从第k个offset开始检索可选
--limit 从第k个offset开始检索X条结果可选
显示的效果如下十分简洁分片数据里面左边一列是消息入库的时间右边是message内容 KafkaQ 源码如下
#!/bin/bash# 默认值
PARTITION${2:-0}
OFFSET${3:-0}
LIMIT${4:-0}# 检查参数
if [ -z $1 ]; thenecho Usage: $0 --topictopic [--partitionpartition] [--offsetoffset] [--limitlimit]exit 1
fiTOPIC$1# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh /dev/null 21; thenecho Kafka not found at /usr/local/kafka/bin/exit 1
fi# 获取Topic信息
echo -e \033[0;31m* 话题: $TOPIC\033[0m# 获取分区数和副本数
PARTITION_INFO$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic $TOPIC)
PARTITION_COUNT$(echo $PARTITION_INFO | awk /Partition:/ {print $2} | wc -l)
REPLICA_COUNT$(echo $PARTITION_INFO | grep -oP ReplicationFactor: \K\d)echo * 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT# 获取分片a和分片b的最大偏移量
MAX_OFFSET$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $TOPIC | awk -F: { printf 分片: %sMaxOffset: %s\n, $2, $3 })
echo $MAX_OFFSET# 获取分片数据
if [ $LIMIT -gt 0 ]; thenecho -e \033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0mMESSAGES$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $TOPIC --partition $PARTITION --offset $OFFSET --max-messages $LIMIT --property print.keytrue --property print.valuetrue --property print.timestamptrue --property key.deserializerorg.apache.kafka.common.serialization.StringDeserializer --property value.deserializerorg.apache.kafka.common.serialization.StringDeserializer)# 格式化输出消息echo $MESSAGES | awk -F\t BEGIN {print * 分片数据}{if ($3 ! null) {timestamp substr($1, 12) / 1000 # 从第10个字符开始提取时间戳并除以1000以转换为秒级时间戳value $3printf \033[0;33m%s\033[0m %s\n, strftime(%Y-%m-%d %H:%M:%S, timestamp), value}}
fi二、搜索版 KafkaQ-Search.sh
使用方法
Usage: KafkaQ-Search.sh --topictopic [--partitionpartition] [--offsetoffset] [--limitlimit] [--searchkeyword]--topic 话题名称
--partition 分片索引可选
--offset 从第k个offset开始检索可选
--limit 从第k个offset开始检索X条结果可选
--search 搜索字符串
示例所有参数是必选的哦
sh KafkaQ-Search.sh --topic log --partition 0 --offset 0 --limit 18480 --search 9fea9c52-c0fe-4429-81e1-d045f35f9be9
显示效果如下 KafkaQ-Search.sh 源码如下
#!/bin/bash# 默认值
PARTITION${2:-0}
OFFSET${3:-0}
LIMIT${4:-0}
SEARCH${5:-}# 检查参数
if [ -z $1 ]; thenecho Usage: $0 --topictopic [--partitionpartition] [--offsetoffset] [--limitlimit] [--searchkeyword]exit 1
fiwhile [[ $# -gt 0 ]]; docase $1 in--topic)TOPIC$2shift 2;;--partition)PARTITION$2shift 2;;--offset)OFFSET$2shift 2;;--limit)LIMIT$2shift 2;;--search)SEARCH$2shift 2;;*)echo Unknown parameter: $1exit 1;;esac
done# 检查Kafka命令是否存在
if ! command -v /usr/local/kafka/bin/kafka-topics.sh /dev/null 21; thenecho Kafka not found at /usr/local/kafka/bin/exit 1
fi# 获取Topic信息
echo -e \033[0;31m* 话题: $TOPIC\033[0m# 获取分区数和副本数
PARTITION_INFO$(/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic $TOPIC)
PARTITION_COUNT$(echo $PARTITION_INFO | awk /Partition:/ {print $2} | wc -l)
REPLICA_COUNT$(echo $PARTITION_INFO | grep -oP ReplicationFactor: \K\d)echo * 分片: $PARTITION_COUNT, 副本: $REPLICA_COUNT# 获取分片a和分片b的最大偏移量
MAX_OFFSET$(/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic $TOPIC | awk -F: { printf 分片: %sMaxOffset: %s\n, $2, $3 })
echo $MAX_OFFSET# 获取分片数据
if [ $LIMIT -gt 0 ]; thenecho -e \033[0;33mFetching messages from partition $PARTITION with offset $OFFSET and limit $LIMIT ...\033[0mMESSAGES$(/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic $TOPIC --partition $PARTITION --offset $OFFSET --max-messages $LIMIT --property print.keytrue --property print.valuetrue --property print.timestamptrue --property key.deserializerorg.apache.kafka.common.serialization.StringDeserializer --property value.deserializerorg.apache.kafka.common.serialization.StringDeserializer)# 搜索关键词并输出结果if [[ ! -z $SEARCH ]]; thenecho -e \033[0;32m* 搜索条件$SEARCH\033[0mecho 搜索结果echo $MESSAGES | grep --colornever $SEARCH | awk -F\t {timestamp substr($1, 12) / 1000 # 从第12个字符开始提取时间戳并除以1000以转换为秒级时间戳value $3printf \033[0;33m%s\033[0m %s\n, strftime(%Y-%m-%d %H:%M:%S, timestamp), value}fi
fi* 附注参考的shell如下
1、获取kafka的topic 分区数量
/usr/local/kafka/bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic topic
2、获取kafka每个分片最大的offset
/usr/local/kafka/bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic topic
3、获取kafka分片指定offset范围的具体信息
/usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic --partition partition --offset offset --max-messages max-message --property print.keytrue --property print.valuetrue --property print.timestamptrue --property key.deserializerorg.apache.kafka.common.serialization.StringDeserializer --property value.deserializerorg.apache.kafka.common.serialization.StringDeserializer