伪静态网站搬迁,成都建设局网站首页,门源县电子商务网站建设公司,建设网站多少费用1#xff0c;创建证书和密钥
需要openssl环境#xff0c;如果是Window下#xff0c;下载openssl Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions 还需要keytool环境#xff0c;此环境是在jdk环境下
本案例所使用的账号密码均为#xff1a; ka…1创建证书和密钥
需要openssl环境如果是Window下下载openssl Win32/Win64 OpenSSL Installer for Windows - Shining Light Productions 还需要keytool环境此环境是在jdk环境下
本案例所使用的账号密码均为 kafka: kafka2024 2 生成 CA 证书
# 创建CA
openssl req -new -x509 -keyout ca-key.pem -out ca-cert.pem -days 3650 -nodes -passin pass:kafka2024 -passout pass:kafka2024 -subj /CNkafka CA# 创建Kafka服务器证书请求
keytool -keystore kafka.keystore.jks -validity 3650 -genkey -keyalg RSA -storepass kafka2024 -keypass kafka2024 -dname CNUnknown, OUUnknown, OUnknown, LUnknown, STUnknown, CUnknown# 将CA证书导入Kafka服务器的密钥库
keytool -keystore kafka.keystore.jks -alias CARoot -import -file ca-cert.pem -storepass kafka2024 -noprompt# 创建证书签名请求CSR
keytool -keystore kafka.keystore.jks -certreq -file cert.csr -storepass kafka2024 -keypass kafka2024# 使用CA签署证书
openssl x509 -req -CA ca-cert.pem -CAkey ca-key.pem -in cert.csr -out cert.pem -days 3650 -CAcreateserial -passin pass:kafka2024# 导入签署后的证书到密钥库
keytool -keystore kafka.keystore.jks -import -file cert.pem -storepass kafka2024 -keypass kafka2024 -noprompt# 创建信任库并导入CA证书
keytool -keystore kafka.truststore.jks -alias CARoot -import -file cert.pem -storepass kafka2024 -noprompt
3得到kafka.keystore.jks、kafka.truststore.jks 4准备好镜像包
bitnami/kafka:3.9.0 镜像资源包 5配置文件docker-compose.yml
注意将 IP地址改成主机IP 1.14.165.18
version: 3.8services:kafka:image: bitnami/kafka:3.9.0container_name: kafkaports:- 9092:9092environment:# KRaft- KAFKA_CFG_NODE_ID0- KAFKA_CFG_PROCESS_ROLEScontroller,broker- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS0kafka:9093# Listeners- KAFKA_CFG_LISTENERSSASL_SSL://:9092,CONTROLLER://:9093- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAPCONTROLLER:SASL_PLAINTEXT,SASL_SSL:SASL_SSL- KAFKA_CFG_ADVERTISED_LISTENERSSASL_SSL://1.14.165.18:9092- KAFKA_CFG_CONTROLLER_LISTENER_NAMESCONTROLLER- KAFKA_CFG_INTER_BROKER_LISTENER_NAMESASL_SSL- KAFKA_CLIENT_LISTENER_NAMESASL_SSL# SASL- KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOLPLAIN- KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOLPLAIN- KAFKA_CONTROLLER_USERkafka- KAFKA_CONTROLLER_PASSWORDkafka2024- KAFKA_INTER_BROKER_USERkafka- KAFKA_INTER_BROKER_PASSWORDkafka2024- KAFKA_CLIENT_USERSkafka- KAFKA_CLIENT_PASSWORDSkafka2024# SSL- KAFKA_TLS_TYPEJKS- KAFKA_CERTIFICATE_PASSWORDkafka2024volumes:- ./kafka.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro- ./kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
6在当前目录下准备好如下三个文件 7运行以下命令启动Kafka服务
sudo docker-compose up -d 8测试验证
测试发送消息
sudo docker exec -it kafka kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties 发现有报错
[2024-11-14 08:41:04,117] ERROR [Producer clientIdconsole-producer] Connection to node -1 (localhost/127.0.0.1:9092) failed authentication due to: SSL handshake failed (org.apache.kafka.clients.NetworkClient)
[2024-11-14 08:41:04,118] WARN [Producer clientIdconsole-producer] Bootstrap broker 127.0.0.1:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
[2024-11-14 08:41:04,120] ERROR Error when sending message to topic test with key: null, value: 0 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake failed
Caused by: javax.net.ssl.SSLHandshakeException: No subject alternative names presentat java.base/sun.security.ssl.Alert.createSSLException(Alert.java:131)at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:383)at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:326)at java.base/sun.security.ssl.TransportContext.fatal(TransportContext.java:321)at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1351)at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.onConsumeCertificate(CertificateMessage.java:1226)at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.consume(CertificateMessage.java:1169)at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:396)at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:480)at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1277)at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask$DelegatedAction.run(SSLEngineImpl.java:1264)at java.base/java.security.AccessController.doPrivileged(AccessController.java:712)at java.base/sun.security.ssl.SSLEngineImpl$DelegatedTask.run(SSLEngineImpl.java:1209)at org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:444)at org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:533)at org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:382)at org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:302)at org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:178)at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:563)at org.apache.kafka.common.network.Selector.poll(Selector.java:501)at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:596)at org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:74)at org.apache.kafka.clients.producer.internals.Sender.awaitNodeReady(Sender.java:569)at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:490)at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:337)at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:251)at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.security.cert.CertificateException: No subject alternative names presentat java.base/sun.security.util.HostnameChecker.matchIP(HostnameChecker.java:142)at java.base/sun.security.util.HostnameChecker.match(HostnameChecker.java:101)at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:458)at java.base/sun.security.ssl.X509TrustManagerImpl.checkIdentity(X509TrustManagerImpl.java:432)at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:292)at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:144)at java.base/sun.security.ssl.CertificateMessage$T13CertificateConsumer.checkServerCerts(CertificateMessage.java:1329)解决办法
在容器内配置文件两个文件加上参数algorithm /opt/bitnami/kafka/config/producer.properties /opt/bitnami/kafka/config/consumer.properties ssl.endpoint.identification.algorithm producer.ssl.endpoint.identification.algorithm consumer.ssl.endpoint.identification.algorithm 如果容器内没办法编辑可以先把文件拷贝出来修改然后再拷贝覆盖
sudo docker cp kafka:/opt/bitnami/kafka/config/consumer.properties consumer.properties
sudo docker cp kafka:/opt/bitnami/kafka/config/producer.properties producer.propertiessudo vi consumer.properties
sudo vi producer.propertiessudo docker cp consumer.properties kafka:/opt/bitnami/kafka/config/consumer.properties
sudo docker cp producer.properties kafka:/opt/bitnami/kafka/config/producer.properties
再次执行发送消息
sudo docker exec -it kafka kafka-console-producer.sh --bootstrap-server 127.0.0.1:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties另外开一个窗口测试接受消息 sudo docker exec -it kafka kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --consumer.config /opt/bitnami/kafka/config/consumer.properties 10使用Spring Boot 集成Kafka
在Spring Boot应用中配置Kafka客户端以使用SASL_SSL认证。 在pom.xml中添加Kafka客户端依赖。 dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency 配置application.yml修改对应IP地址
spring:application:name: ncckafka:bootstrap-servers: 1.14.165.11:9092properties:security.protocol: SASL_SSLsasl.mechanism: SCRAM-SHA-512sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required usernamekafka passwordkafka2024;ssl.truststore.location: kafka.truststore.jksssl.truststore.password: kafka2024ssl.keystore.location: kafka.keystore.jksssl.keystore.password: kafka2024ssl.key.password: kafka2024ssl.endpoint.identification.algorithm:producer.ssl.endpoint.identification.algorithm:consumer.ssl.endpoint.identification.algorithm:
并将kafka.keystore.jks 和 kafka.truststore.jks 文件放到当前项目 11创建KafkaTest测试类
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;SpringBootTest(classes NccApplication.class)
public class KafkaTest {Autowiredprivate KafkaTemplateString, String kafkaTemplate;Testvoid send() {kafkaTemplate.send(test,hhh);}}
测试通过