网站建设设计价格,深圳今天最新通知,网站开发招聘要求,地铁网站建设特点要将 Kafka 的消息生产和消费转换为 API 接口#xff0c;我们可以使用 Python 的 Web 框架。其中 Flask 是一个轻量级且易于使用的选择。下面是一个简单的例子#xff0c;使用 Flask 创建 API 来生成和消费 Kafka 消息。
1. 安装所需的库#xff1a;
pip install kafka-py…要将 Kafka 的消息生产和消费转换为 API 接口我们可以使用 Python 的 Web 框架。其中 Flask 是一个轻量级且易于使用的选择。下面是一个简单的例子使用 Flask 创建 API 来生成和消费 Kafka 消息。
1. 安装所需的库
pip install kafka-python flask2. 创建 Flask API
from flask import Flask, request, jsonify
from kafka import KafkaProducer, KafkaConsumerapp Flask(__name__)# 配置 Kafka
KAFKA_BROKER_URL localhost:9092
TOPIC test_topic
producer KafkaProducer(bootstrap_serversKAFKA_BROKER_URL)app.route(/send, methods[POST])
def send_message():message request.json.get(message)if message:producer.send(TOPIC, valuemessage.encode(utf-8))return jsonify({status: success, message: Message sent!}), 200else:return jsonify({status: error, message: Message cannot be empty!}), 400app.route(/receive, methods[GET])
def receive_message():consumer KafkaConsumer(TOPIC, bootstrap_serversKAFKA_BROKER_URL, auto_offset_resetearliest)messages []for message in consumer:messages.append(message.value.decode(utf-8))if len(messages) 5: # 只收集最近的5条消息可以根据需要调整breakreturn jsonify(messages)if __name__ __main__:app.run(debugTrue, port5000)这个 Flask 应用程序定义了两个端点
/send: 它接受 POST 请求并发送消息到 Kafka。/receive: 它返回 Kafka 主题中的最近消息。
3. 使用 API
发送消息
curl -X POST http://localhost:5000/send -H Content-Type: application/json -d {message: Hello, Kafka!}接收消息
curl http://localhost:5000/receive这只是一个简单的示例您可能需要添加错误处理、日志记录、认证、消息序列化和反序列化等功能以满足更复杂的需求。