win7自己电脑做网站,想学淘宝美工去哪里学,详情页在线设计网站推荐,做公司网站的企业一、简介
Kafka作为一个分布式的发布-订阅消息系统#xff0c;在日常项目中被频繁使用#xff0c;通常情况下无论是生产者还是消费者只要订阅Topic后#xff0c;即可进行消息的发送和接收。而kafka在0.9.0.0版本后添加了身份认证和权限控制两种安全服务#xff0c;本文主要…一、简介
Kafka作为一个分布式的发布-订阅消息系统在日常项目中被频繁使用通常情况下无论是生产者还是消费者只要订阅Topic后即可进行消息的发送和接收。而kafka在0.9.0.0版本后添加了身份认证和权限控制两种安全服务本文主要介绍在实际项目使用过程中遇到第三方kafka需身份认证时如何解决以及对可能会碰到的问题进行总结。
二、原理介绍
Kafka身份认证主要分为以下几种
1客户端与broker之间的连接认证
2broker与broker之间的连接认证
3broker与zookeeper之间的连接认证
日常项目中无论是生产者还是消费者我们都是作为客户端与kafka进行交互因此使用的最多的是客户端与broker之间的连接认证。图1是客户端与服务端broker之间的认证过程图客户端提交认证数据服务端会根据认证数据对当前客户端进行身份校验校验成功后的客户端即可成功登录kafka进行后续操作。 图1 客户端与broker之间认证过程图
目前Kafka提供了SASL、SSL、Delegation Tokem三种安全认证机制而SASL认证又分为了以下几种方式
1基于Kerberos的GSSAPI
SASL-GSSAPI提供了一种非常安全的身份验证方法但使用前提是企业中有Kerberos基础一般使用随机密码的keytab认证方式密码是加密的在0.9版本中引入目前是企业中使用最多的认证方式。
2SASL-PLAIN
SASL-PLAIN方式是一个经典的用户名/密码的认证方式其中用户名和密码是以明文形式保存在服务端的JAAS配置文件中的当客户端使用PLAIN模式进行认证时密码是明文传输的因此安全性较低但好处是足够简单方便我们对其进行二次开发在0.10版本引入。
3SASL-SCRAM
SASL-SCRAM是针对SASL-PLAIN方式的不足而提供的另一种认证方式它将用户名/密码存储在zookeeper中并且可以通过脚本动态增减用户当客户端使用SCRAM模式进行认证时密码会经过SHA-256或SHA-512哈希加密后传输到服务器因此安全性较高在0.10.2版本中引入。
对Kafka集群来说要想实现完整的安全模式首先为集群中的每台机器生成密钥和证书是第一步其次利用SASL对客户端进行身份验证是第二步最后对不同客户端进行读写操作的授权是第三步这些步骤即可以单独运作也可以同时运作从而提高kafka集群的安全性。
三、具体实现
本文主要介绍作为kafka生产者如何基于Kerberos进行身份认证给第三方kafka发送数据。
Kerberos主要由三个部分组成密钥分发中心Key Distribution Center即KDC、客户端Client、服务端Service大致关系图如下图2所示其中KDC是实现身份认证的核心组件其包含三个部分
Kerberos Database储存用户密码以及其他信息Authentication Service(AS)进行用户身份信息验证为客户端提供Ticket Granting Tickets(TGT)Ticket Granting Service(TGS)验证TGT为客户端提供Service Tickets 我们作为生产者向第三方kafka发送数据因此需要第三方提供以下安全认证文件
用户名principle标识客户端的用户身份也即用于登录的用户名指定用户名对应的秘钥文件xx.keytab存储了用户的加密密码指定安全认证的服务配置文件krb5.conf客户端根据该文件中的信息去访问KDC
获取以上安全认证文件后即可编写java代码连接第三方kafka步骤如下
1、将安全认证文件xx.keytab和krb5.conf放置于某一路径下确保后续java代码可进行读取
2、添加kafka配置文件开启安全模式认证其中kerberos.path是第一步中认证文件所在的目录 3、修改Kafka生产者配置开启安全连接 4、调用认证工具类进行登录认证 LoginUtil认证工具类的核心是根据第一步中提供的安全认证文件自动生成jaas配置文件该文件是kafka安全模式下认证的核心。代码如下
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;import java.io.File;
import java.io.FileWriter;
import java.io.IOException;/*** ProjectName: stdp-security-demo* Package: * ClassName: LoginUtil* Author: stdp* Description: ${description}*/
public class LoginUtil {public enum Module {KAFKA(KafkaClient), ZOOKEEPER(Client);private String name;Module(String name) {this.name name;}public String getName() {return name;}}private static final Logger LOGGER LoggerFactory.getLogger(LoginUtil.class);/*** line operator string*/private static final String LINE_SEPARATOR System.getProperty(line.separator);/*** jaas file postfix*/private static final String JAAS_POSTFIX .jaas.conf;private static final String JAVA_SECURITY_KRB5_CONF_KEY java.security.krb5.conf;public static final String JAVA_SECURITY_LOGIN_CONF_KEY java.security.auth.login.config;private static final String ZOOKEEPER_SERVER_PRINCIPAL_KEY zookeeper.server.principal;private static final boolean IS_IBM_JDK System.getProperty(java.vendor).contains(IBM);/*** oracle jdk login module*/private static final String SUN_LOGIN_MODULE com.sun.security.auth.module.Krb5LoginModule required;public synchronized static void login(String userPrincipal, String userKeytabPath, String krb5ConfPath)throws IOException{// 1.check input parametersif ((userPrincipal null) || (userPrincipal.length() 0)){LOGGER.error(input userPrincipal is invalid.);throw new IOException(input userPrincipal is invalid.);}if ((userKeytabPath null) || (userKeytabPath.length() 0)){LOGGER.error(input userKeytabPath is invalid.);throw new IOException(input userKeytabPath is invalid.);}if ((krb5ConfPath null) || (krb5ConfPath.length() 0)){LOGGER.error(input krb5ConfPath is invalid.);throw new IOException(input krb5ConfPath is invalid.);}// 2.check file exsitsFile userKeytabFile new File(userKeytabPath);if (!userKeytabFile.exists()){LOGGER.error(userKeytabFile( userKeytabFile.getAbsolutePath() ) does not exsit.);throw new IOException(userKeytabFile( userKeytabFile.getAbsolutePath() ) does not exsit.);}if (!userKeytabFile.isFile()){LOGGER.error(userKeytabFile( userKeytabFile.getAbsolutePath() ) is not a file.);throw new IOException(userKeytabFile( userKeytabFile.getAbsolutePath() ) is not a file.);}File krb5ConfFile new File(krb5ConfPath);if (!krb5ConfFile.exists()){LOGGER.error(krb5ConfFile( krb5ConfFile.getAbsolutePath() ) does not exsit.);throw new IOException(krb5ConfFile( krb5ConfFile.getAbsolutePath() ) does not exsit.);}if (!krb5ConfFile.isFile()){LOGGER.error(krb5ConfFile( krb5ConfFile.getAbsolutePath() ) is not a file.);throw new IOException(krb5ConfFile( krb5ConfFile.getAbsolutePath() ) is not a file.);}// 3.set and check krb5configsetKrb5Config(krb5ConfFile.getAbsolutePath());// LOGGER.info(check zookeeper server Principal );setZookeeperServerPrincipal(userPrincipal);
// LOGGER.info(check jaas.conf );setJaasFile(userPrincipal,userKeytabPath);LOGGER.info(Login success!!!!!!!!!!!!!!);}public static void setKrb5Config(String krb5ConfigFile) throws IOException {System.setProperty(JAVA_SECURITY_KRB5_CONF_KEY,krb5ConfigFile);String ret System.getProperty(JAVA_SECURITY_KRB5_CONF_KEY);if (ret null) {LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY is null.);throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY is null.);}if (!ret.equals(krb5ConfigFile)){LOGGER.error(JAVA_SECURITY_KRB5_CONF_KEY is ret is not krb5ConfigFile .);throw new IOException(JAVA_SECURITY_KRB5_CONF_KEY is ret is not krb5ConfigFile .);}}public static void setJaasFile(String userPrincipal,String userKeytabPath) throws IOException {String jaasPath new File(System.getProperty(java.io.tmpdir)) File.separator System.getProperty(user.name) JAAS_POSTFIX;LOGGER.info(jaasPath {},jaasPath);//windows路径下分隔符替换jaasPath jaasPath.replace(\\,\\\\);userKeytabPath userKeytabPath.replace(\\,\\\\);//删除jaas文件deleteJaasFile(jaasPath);writeJaasFile(jaasPath,userPrincipal,userKeytabPath);System.setProperty(JAVA_SECURITY_LOGIN_CONF_KEY,jaasPath);}private static void deleteJaasFile(String jaasPath) throws IOException {File jaasFile new File(jaasPath);if (jaasFile.exists()){if (!jaasFile.delete()){throw new IOException(failed to delete exists jaas file.);}}}private static void writeJaasFile(String jaasPath,String userPrincipal,String userKeytabPath) throws IOException {FileWriter writer new FileWriter(new File(jaasPath));try{writer.write(getJaasConfContext(userPrincipal,userKeytabPath));writer.flush();}catch (IOException e){throw new IOException(Failed to create jaas.conf File.);}finally {writer.close();}}private static String getJaasConfContext(String userPrincipal,String userKeytabPath) throws IOException{Module[] allModule Module.values();StringBuffer builder new StringBuffer();for (Module module: allModule){String serviceName null;if (Client.equals(module.getName())){serviceName zookeeper;}else if (KafkaClient.equals(module.getName())){serviceName kafka;}builder.append(getModuleContext(userPrincipal,userKeytabPath,module,serviceName));}return builder.toString();}private static String getModuleContext(String userPrincipal,String userKeytabPath,Module module,String serviceName) throws IOException {StringBuffer builder new StringBuffer();if (IS_IBM_JDK){builder.append(module.getName()).append( {).append(LINE_SEPARATOR);builder.append(credsTypeboth).append(LINE_SEPARATOR);builder.append(principal\ userPrincipal.trim() \).append(LINE_SEPARATOR);builder.append(useKeytab\ userKeytabPath \).append(LINE_SEPARATOR);builder.append(serviceName\serviceName \).append(LINE_SEPARATOR);builder.append(debugtrue;).append(LINE_SEPARATOR);builder.append(};).append(LINE_SEPARATOR);}else {builder.append(module.getName()).append( {).append(LINE_SEPARATOR);builder.append(SUN_LOGIN_MODULE).append(LINE_SEPARATOR);builder.append(useKeyTabtrue).append(LINE_SEPARATOR);builder.append(keyTab\ userKeytabPath \).append(LINE_SEPARATOR);builder.append(principal\ userPrincipal.trim() \).append(LINE_SEPARATOR);builder.append(serviceName\serviceName \).append(LINE_SEPARATOR);builder.append(useTicketCachefalse).append(LINE_SEPARATOR);builder.append(storeKeytrue).append(LINE_SEPARATOR);builder.append(debugtrue;).append(LINE_SEPARATOR);builder.append(};).append(LINE_SEPARATOR);}return builder.toString();}public static void setZookeeperServerPrincipal(String zkServerPrincipal) throws IOException {System.setProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY,zkServerPrincipal);String ret System.getProperty(ZOOKEEPER_SERVER_PRINCIPAL_KEY);if (ret null) {LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY is null.);throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY is null.);}if (!ret.equals(zkServerPrincipal)){LOGGER.error(ZOOKEEPER_SERVER_PRINCIPAL_KEY is ret is not zkServerPrincipal .);throw new IOException(ZOOKEEPER_SERVER_PRINCIPAL_KEY is ret is not zkServerPrincipal .);}}
}经过以上四步的配置启动项目后即可自动连接kafka进行身份校验若登录成功会输出如下提示信息Login success并且会将生成的jaas文件路径打印出来。 四、常见问题
1、认证文件找不到 这是因为步骤1中kerberos.path配置有问题检查path路径下是否存在认证文件keytab和krb5.conf。
2、 principal和keytab不匹配 不同的用户名对应不同的密码在身份校验时需保证用户名principle和密码keytab的一致性否则无法验证通过。而principal和keytab不匹配可能存在以下两种场景 配置文件中出现问题检查kerberos.principle和kerberos.keytab中的用户名即hkjj是否一致。 检查生成的jaas文件中用户名和配置的用户名是否相同
如果步骤1检查没用问题则可根据日志中输出的jaas文件路径查看自动生成的jaas文件中的principal和配置文件中的kerberos.principle是否一致。比如我的这个项目中就是由于现场技术配置kerberos.principle时后面多打了一个空格导致自动生成的jaas文件中的principle后多一个空格因此和keytab认证失败。 为了彻底解决这个误打空格的问题可以直接修改认证工具类LoginUtil在生成jaas文件的principle时去掉可能存在的空格。 3、用户密码keytab更新导致出现checksum failed 这是由于principal对应的密码修改了但是程序中使用的还是旧的密码就会出现这个问题。解决办法是找第三方提供principal对应的最新的密码文件keytab。
4、jaas文件找不到 该问题是由于找不到jaas.conf 这个文件导致的而基于kerberos认证时一般不会出现这是因为kerberos认证时jaas文件是由LoginUtil工具类根据安全认证文件自动生成并且存储在指定路径下的。
该问题通常出现在SASL-PLAIN方式的认证中因为该方式需要添加一个配置参数java.security.auth.login.config来标识jaas文件的路径如果文件路径出错则会报以上错误。
五、总结
在kafka身份认证的过程中需要的principalkeytabServiceName等信息均配置在jaas文件中因此保证认证的服务可以读取到正确的文件及正确的配置是kafka安全模式下认证的核心。
基于kerberos认证时可根据安全认证文件自动生成jaas配置文件从而保证了密码加密传输相比于SASL-PLAIN模式更具安全性并且认证实现过程也较为简单。