当前位置: 首页 > news >正文

成都手机网站手机网站 代码格式

成都手机网站,手机网站 代码格式,装修客户资源在哪里找,南昌自助建站模板目录 一. 前言 二. KafkaAdminClient API 2.1. API 总览 2.2. Topic 操作 2.2.1. 创建 Topic 2.2.2. Topic 列表 2.2.3. 删除 Topic 2.2.4. 描述 Topic 详细信息 2.3. 分区 Partition 操作 2.3.1. 增加分区 2.3.2. 分区副本重新分配 2.3.3. 查询分区副本列表 2.4.…目录 一. 前言 二. KafkaAdminClient API 2.1. API 总览 2.2. Topic 操作 2.2.1. 创建 Topic 2.2.2. Topic 列表 2.2.3. 删除 Topic 2.2.4. 描述 Topic 详细信息 2.3. 分区 Partition 操作 2.3.1. 增加分区 2.3.2. 分区副本重新分配 2.3.3. 查询分区副本列表 2.4. Config 配置信息 2.4.1. 描述配置详细信息 2.4.2. 修改配置信息 2.4.3. incrementalAlterConfigs 2.5. Scram 账户操作 2.5.1. 创建 Scram 账户 2.5.2. 删除 Scram 账户 2.5.3. 查询 Scram 账户信息 2.6. Acl 操作 2.6.1. 创建 Acl 2.6.2. 删除 Acl 一. 前言 自0.11.0.0版本起Kafka 社区推出了 AdminClient 和 KafkaAdminClient意在统一所有的集群管理 API。使用 0.11.0.0 及以后版本的用户应该始终使用这个类来管理集群。虽然和原先服务器端的 AdminClient 类同名但这个工具是属于客户端的因此只需要在管理程序项目中添加 kafka-clients 依赖即可 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion3.6.1/version /dependency Kafka 的管理 Java 客户端支持管理和检查 Topic、Broker、配置和 ACL。所需的最小 Broker版本是 0.10.0.0。有更严格要求的方法将指定所需的最小 Broker 版本。这个客户端是在 0.11.0.0中引入的API 还在不断发展。我们将尝试以兼容的方式演进 API但我们保留在必要时在次要版本中进行破坏性更改的权利。一旦 API 被认为是稳定的我们将更新 InterfaceStability 注解和本通知。  二. KafkaAdminClient API 2.1. API 总览 方法名称方法说明 createTopics 创建一批新主题。此操作不是事务性的因此它对于某些主题可能成功而对于其他主题则可能失败。此方法返回成功后可能需要几秒钟的时间所有代理才能知道主题已创建。在这段时间内listTopics和describeTopicsCollection可能不会返回有关新主题的信息。版本0.10.1.0或更高版本的代理支持此操作。版本0.10.2.0支持validateOnly选项。 deleteTopics 删除一批主题。此操作不是事务性的因此它对于某些主题可能成功而对于其他主题则可能失败。AdminClientdeleteTopics返回成功后可能需要几秒钟所有代理才能意识到主题已消失。在此期间AdminClientlistTopics和AdminClientdescribeTopics可能会继续返回有关已删除主题的信息。如果代理上的delete.topic.enable为false则deleteTopics将标记主题为删除但实际上不会删除它们。在这种情况下期货将成功返回。版本0.10.1.0或更高版本的代理支持此操作。 listTopics 查询Topic列表 describeTopics 描述Topic的详情信息 describeCluster 描述Cluster的详情信息 describeAcls 根据提供的过滤器列出访问控制列表ACL。注意createAcls或deleteAcls所做的更改可能需要一些时间才能反映到describeAcls的输出中。版本0.11.0.0或更高版本的代理支持此操作。 createAcls 创建绑定到特定资源的访问控制列表ACL。此操作不是事务性的因此对于某些ACL可能成功而对于其他ACL则可能失败。如果您尝试添加与现有ACL复制的ACL则不会引发任何错误但不会进行任何更改。版本0.11.0.0或更高版本的代理支持此操作。 deleteAcls 根据提供的过滤器删除访问控制列表ACL。此操作不是事务性的因此对于某些ACL可能成功而对于其他ACL则可能失败。版本0.11.0.0或更高版本的代理支持此操作。 describeConfigs 获取指定资源的配置。返回的配置包括默认值isDefault方法可用于将其与用户提供的值区分开。isSensitive为true的配置条目的值始终为null因此不会公开敏感信息。isReadOnly为true的配置条目无法更新。版本0.11.0.0或更高版本的代理支持此操作。 alterConfigs 使用默认选项更新指定资源的配置。更新不是事务性的因此更新可能会为某些资源成功而为其他资源失败。特定资源的配置会自动更新。版本0.11.0.0或更高版本的代理支持此操作。 incrementalAlterConfigs 新版本中是使用incrementalAlterConfigs方法来修改Topic的配置项该方法使用起来相对于alterConfigs要略微复杂一些但因此功能更多、更灵活。 alterReplicaLogDirs 更改指定副本的日志目录。当前仅当在代理上创建副本之前使用此API时此API才有用。它将支持在完全实施KIP-113之后已经创建的移动副本。此操作不是事务性的因此它可能对某些副本成功而对其他副本失败。版本1.0.0或更高版本的代理支持此操作。 describeLogDirs 查询给定代理集上所有日志目录的信息版本1.0.0或更高版本的代理支持此操作。 describeReplicaLogDirs 查询副本日志目录信息中的指定副本。版本1.0.0或更高版本的代理支持此操作。 createPartitions 根据相应的值增加作为newPartitions关键字指定的主题的分区数。如果为具有关键字的主题增加了分区则分区逻辑或消息的顺序将受到影响。 deleteRecords 删除消息 createDelegationToken 创建DelegationToken renewDelegationToken 更新DelegationToken expireDelegationToken 过期DelegationToken describeDelegationToken 描述DelegationToken的详情信息 describeConsumerGroups 描述消费组的详情信息 listConsumerGroups 查询消费组列表 listConsumerGroupOffsets 查询消费组的offset列表 deleteConsumerGroups 删除消费组只能是闲置的消费组 deleteConsumerGroupOffsets 删除消费组中某些分区的已提交偏移量。只有当组没有主动订阅相应的Topic时才会成功。 electLeaders 选举Leader alterPartitionReassignments 修改分区副本 listPartitionReassignments 查询分区副本列表 removeMembersFromConsumerGroup 从消费组中移除消费者成员 alterConsumerGroupOffsets 修改消费组的offset listOffsets 查询Topic分区的offset列表 describeClientQuotas 描述客户端配额配置详细信息 alterClientQuotas 修改客户端配额 describeUserScramCredentials 描述Scram账户详细信息 alterUserScramCredentials 修改Scram账户 describeFeatures 描述已完成的功能以及支持的功能 updateFeatures 将指定的更新应用于最终确定的功能 describeMetadataQuorum 描述元数据数量的状态 unregisterBroker 注销Broker describeProducers 描述生产者详细信息 describeTransactions 描述事务的详细信息 abortTransaction 终止事务 listTransactions 查询事务列表 fenceProducers 隔离使用任何提供的事务ID的所有活动生产者 metrics 获取adminClient保存的度量值 2.2. Topic 操作 2.2.1. 创建 Topic // bootstrapServers 如 localhost:9092 private void createTopics(String bootstrapServers) {Properties properties new Properties();properties.put(bootstrap.servers, bootstrapServers);properties.put(connections.max.idle.ms, 10000);properties.put(request.timeout.ms, 5000);try (AdminClient client AdminClient.create(properties)) {CreateTopicsResult result client.createTopics(Arrays.asList(new NewTopic(topic1, 1, (short) 1),new NewTopic(topic2, 1, (short) 1),new NewTopic(topic3, 1, (short) 1)));try {result.all().get();} catch (InterruptedException | ExecutionException e) {throw new IllegalStateException(e);}} } 2.2.2. Topic 列表 private void listTopics(String bootstrapServers) {Properties properties new Properties();properties.put(bootstrap.servers, bootstrapServers);properties.put(connections.max.idle.ms, 10000);properties.put(request.timeout.ms, 5000);try (AdminClient client AdminClient.create(properties)) {ListTopicsResult result client.listTopics();try {result.listings().get().forEach(topic - {System.out.println(topic);});} catch (InterruptedException | ExecutionException e) {throw new IllegalStateException(e);}} }// 运行结果 (nametopic1, internalfalse) (nametopic2, internalfalse) (nametopic3, internalfalse) ... 2.2.3. 删除 Topic public static void deleteTopics(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {DeleteTopicsResult ret adminClient.deleteTopics(Arrays.asList(topicName));ret.all().get(); } 2.2.4. 描述 Topic 详细信息 一个 Topic 会有自身的描述信息例如partition 的数量副本集的数量是否为 internal等等。AdminClient 中提供了 describeTopics 方法来查询这些描述信息。代码示例 public static void describeTopics(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {DescribeTopicsResult ret adminClient.describeTopics(Arrays.asList(topicName, __consumer_offsets));//等待返回结果完成MapString, TopicDescription topics ret.all().get();for (Map.EntryString, TopicDescription entry : topics.entrySet()) {System.out.println(entry.getKey() ---- entry.getValue());}}// 运行结果 testTopic----(nametestTopic, internalfalse, partitions(partition0, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)), authorizedOperationsnull) __consumer_offsets----(name__consumer_offsets, internaltrue, partitions(partition0, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition1, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition2, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition3, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition4, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition5, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition6, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition7, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition8, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition9, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition10, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition11, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition12, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition13, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition14, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition15, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition16, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition17, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition18, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition19, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition20, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition21, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition22, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition23, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition24, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition25, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition26, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition27, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition28, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition29, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition30, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition31, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition32, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition33, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition34, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition35, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition36, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition37, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition38, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition39, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition40, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition41, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition42, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition43, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition44, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition45, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition46, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition47, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition48, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)),(partition49, leaderlocalhost:9092 (id: 0 rack: null), replicaslocalhost:9092 (id: 0 rack: null), isrlocalhost:9092 (id: 0 rack: null)), authorizedOperationsnull) 2.3. 分区 Partition 操作 2.3.1. 增加分区 在创建 Topic 时我们需要设定 Partition 的数量但如果觉得初始设置的 Partition 数量太少了那么就可以使用 createPartitions 方法来调整 Topic 的 Partition 数量但是需要注意在 Kafka 中Partition 只能增加不能减少。代码示例 public static void incrPartitions(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {MapString, NewPartitions newPartitions new HashMap();//将Partition数量调整为2newPartitions.put(topicName, NewPartitions.increaseTo(2));CreatePartitionsResult ret adminClient.createPartitions(newPartitions);ret.all().get(); } 2.3.2. 分区副本重新分配 public void alterPartitionReassignments() throws ExecutionException, InterruptedException {// 构造要修改的topic和对应的partition idTopicPartition topicPartition new TopicPartition(java_kafka_tst1, 1);//构造目标副本的partition id其实就是broker idListInteger targetReplicas new ArrayList();//把partition id添加进去targetReplicas.add(1);//使用targetReplicas构造newPartitionReassignmentNewPartitionReassignment newPartitionReassignmentnew NewPartitionReassignment(targetReplicas);//构造重新分配的MapMapTopicPartition, OptionalNewPartitionReassignment reassignments new HashMap();//传入构造好的参数topicPartition和newPartitionReassignmentreassignments.put(topicPartition, Optional.of(newPartitionReassignment));//调用重新分配方法AlterPartitionReassignmentsResult result adminClient.alterPartitionReassignments(reassignments);// 执行方法result.all().get();// 等待结果if (result.all().isDone()) {System.out.println(done);} else {System.out.println(not done);} } 2.3.3. 查询分区副本列表 public static void listPartitionReassignments(AdminClient adminClient, String topicName) throws InterruptedException, ExecutionException {SetTopicPartition tpSet new HashSet();tpSet.add(new TopicPartition(t-test, 0));ListPartitionReassignmentsResult ret adminClient.listPartitionReassignments(tpSet);ret.reassignments().get(); } 2.4. Config 配置信息 2.4.1. 描述配置详细信息 public static void describeConfig(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {DescribeConfigsResult ret adminClient.describeConfigs(Collections.singleton(new ConfigResource(ConfigResource.Type.TOPIC, topicName)));MapConfigResource, Config configMap ret.all().get();for (Map.EntryConfigResource, Config entry : configMap.entrySet()) {ConfigResource key entry.getKey();Config value entry.getValue();System.out.println(String.format(Resource type: %s, resource name: %s, key.type(), key.name()));CollectionConfigEntry configEntries value.entries();for (ConfigEntry each : configEntries) {System.out.println(each.name() each.value());}} }// 运行结果 Resource type: TOPIC, resource name: testTopic compression.type producer leader.replication.throttled.replicas message.downconversion.enable true min.insync.replicas 1 segment.jitter.ms 0 cleanup.policy delete flush.ms 9223372036854775807 follower.replication.throttled.replicas segment.bytes 1073741824 retention.ms 604800000 flush.messages 9223372036854775807 message.format.version 3.0-IV1 file.delete.delay.ms 60000 max.compaction.lag.ms 9223372036854775807 max.message.bytes 1048588 min.compaction.lag.ms 0 message.timestamp.type CreateTime preallocate false min.cleanable.dirty.ratio 0.5 index.interval.bytes 4096 unclean.leader.election.enable false retention.bytes -1 delete.retention.ms 86400000 segment.ms 604800000 message.timestamp.difference.max.ms 9223372036854775807 segment.index.bytes 10485760 2.4.2. 修改配置信息 在早期版本中使用 alterConfigs 方法来修改配置项。代码示例 public static void alterConfigs(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {// 指定ConfigResource的类型及名称ConfigResource configResource new ConfigResource(ConfigResource.Type.TOPIC, topicName);// 配置项以ConfigEntry形式存在Config config new Config(Arrays.asList(new ConfigEntry(cleanup.policy, compact)));//构建MapConfigResource, Config configMap new HashMap();configMap.put(configResource, config);//执行AlterConfigsResult ret adminClient.alterConfigs(configMap);ret.all().get(); } 2.4.3. incrementalAlterConfigs 在新版本中则是使用 incrementalAlterConfigs 方法来修改配置项该方法使用起来相对于 alterConfigs 要略微复杂一些但因此功能更多、更灵活。代码示例 public static void incrementalAlterConfigs(AdminClient adminClient, String topicName) throws ExecutionException, InterruptedException {// 指定ConfigResource的类型及名称ConfigResource configResource new ConfigResource(ConfigResource.Type.TOPIC, topicName);// 配置项同样以ConfigEntry形式存在只不过增加了操作类型// 以及能够支持操作多个配置项相对来说功能更多、更灵活CollectionAlterConfigOp configs Arrays.asList(new AlterConfigOp(new ConfigEntry(preallocate, false),AlterConfigOp.OpType.SET));MapConfigResource, CollectionAlterConfigOp configMaps new HashMap();configMaps.put(configResource, configs);AlterConfigsResult result adminClient.incrementalAlterConfigs(configMaps);result.all().get(); } 2.5. Scram 账户操作 2.5.1. 创建 Scram 账户 public void createAccount(String name, String pwd, String salt) throws ExecutionException, InterruptedException {//创建User列表ListUserScramCredentialAlteration alterations new ArrayList();//构造Scram认证机制信息这里笔者选择了SCRAM_SHA_512大家也可以选择 ScramMechanism.SCRAM_SHA_256//alterations.size()此时为0或许会报错可以试下传入数字构造比如下面添加了一个认证信息那么这里传入数字1。// ScramCredentialInfo infonew ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, alterations.size()); //这里时间久远忘记当时写例子的场景了ScramCredentialInfo infonew ScramCredentialInfo(ScramMechanism.SCRAM_SHA_512, 10000);//三个UserScramCredentialAlteration构造方法三选一笔者选了一个最简单的//UserScramCredentialAlteration userScramCredentialAddnew UserScramCredentialUpsertion(name,info,pwd.getBytes());//UserScramCredentialAlteration userScramCredentialAddnew UserScramCredentialUpsertion(name,info,pwd.getBytes(),salt.getBytes());UserScramCredentialAlteration userScramCredentialAddnew UserScramCredentialUpsertion(name,info,pwd);//添加认证信息到列表alterations.add(userScramCredentialAdd);//执行方法并拿到返回结果AlterUserScramCredentialsResult result adminClient.alterUserScramCredentials(alterations);//阻塞等待结果完成result.all().get(); } 2.5.2. 删除 Scram 账户 public void deleteAccount(String name) throws ExecutionException, InterruptedException {//创建删除列表ListUserScramCredentialAlteration alterations new ArrayList();//构建删除用的UserScramCredentialAlterationUserScramCredentialAlteration userScramCredentialDelnew UserScramCredentialDeletion(name,ScramMechanism.SCRAM_SHA_512);//添加认证信息到列表alterations.add(userScramCredentialDel);//执行方法并拿到返回结果AlterUserScramCredentialsResult result adminClient.alterUserScramCredentials(alterations);//阻塞等待结果完成result.all().get(); } 2.5.3. 查询 Scram 账户信息 public void describeAccount() throws ExecutionException, InterruptedException {//***************************************查询所有用户信息*****************************************************//查询所有的账户这也是默认方法DescribeUserScramCredentialsResult result adminClient.describeUserScramCredentials();//执行方法并拿到返回结果MapString, UserScramCredentialsDescription future result.all().get();//输出future.forEach((name,info)- System.out.println([ScramUserName:name]:[ScramUserInfo:info.toString()]));//***************************************这里是分割线*****************************************************//***************************************查询指定的用户信息*****************************************************//构造指定的用户列表ListString userScramListnew ArrayList();//添加两个用户userScramList.add(user1);userScramList.add(user2);//传入特定用户列表执行方法并拿到返回结果DescribeUserScramCredentialsResult targetResult adminClient.describeUserScramCredentials(userScramList);//执行方法并拿到返回结果MapString, UserScramCredentialsDescription targetFuture targetResult.all().get();//输出targetFuture.forEach((name,info)- System.out.println([ScramUserName:name]:[ScramUserInfo:info.toString()])); } 2.6. Acl 操作 2.6.1. 创建 Acl public void createACL() {//创建ResourcePatternResourcePattern resourcePattern new ResourcePattern(ResourceType.TOPIC, topicName, PatternType.LITERAL);//创建AccessControlEntryAccessControlEntry accessControlEntrynew AccessControlEntry(User:accountName,*, AclOperation.READ, AclPermissionType.ALLOW);//绑定到AclBinding上AclBinding aclBindingnew AclBinding(resourcePattern,accessControlEntry);CollectionAclBinding aclBindingCollection new ArrayList();aclBindingCollection.add(aclBinding); //添加到集合CreateAclsResult aclResult adminClient.createAcls(aclBindingCollection);KafkaFutureVoid result aclResult.all();try {result.get(); //执行if (result.isDone()){ //验证是否成功System.out.println(result.toString());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} } 2.6.2. 删除 Acl public void deleteAcls() {ResourcePatternFilter resourcePatternFilter new ResourcePatternFilter(ResourceType.TOPIC, topicName, PatternType.LITERAL);AccessControlEntryFilter accessControlEntryFilternew AccessControlEntryFilter(User:accountName,*, AclOperation.ALL, AclPermissionType.ALLOW);AclBindingFilter aclBindingnew AclBindingFilter(resourcePatternFilter,accessControlEntryFilter);CollectionAclBindingFilter aclBindingCollection new ArrayList();aclBindingCollection.add(aclBinding);DeleteAclsResult aclResult adminClient.deleteAcls(aclBindingCollection);KafkaFutureCollectionAclBinding result aclResult.all();try {result.get();if (result.isDone()){System.out.println(result.toString());}} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} }
http://www.w-s-a.com/news/763788/

相关文章:

  • 无锡网站推广公司排名中国十大网站建设
  • 网站建设报价怎么差别那么大深圳开发公司网站建设
  • 京东商城网站建设方案书建设网站平台
  • 如何查询网站建设时间赤峰建网站的电话
  • 域名购买网站有哪些公司企业邮箱管理制度
  • 阿里云服务起做网站抖音seo推荐算法
  • 免费建站工具机械网站建设公司推荐
  • 怎么用自己主机做网站_如何做简单的网站
  • 阿里巴巴国际站跨境电商平台为什么有点网站打不开
  • 甘肃做网站哪家好网站开发 都包含什么语言
  • 合肥哪里有做网站的广告型网站怎么做的
  • 用dede做的网站国外免费空间哪个好
  • dede个人网站模板企点
  • 韩雪个人网站wordpress 怎么添加网站备案信息
  • 个人网站可以做地方技能培训班
  • 品牌营销策略研究无锡 网站 seo 优化
  • 在线推广网站的方法有哪些织梦网站首页目录在哪
  • 做爰全过程免费网站的视频做网站的几个步骤
  • cpa建站教程青海西宁制作网站企业
  • 简易的在线数据库网站模板网站多服务器建设
  • 成都seo网站建设花店网页模板html
  • 义乌市网站制作网络营销策略名词解释
  • 高端品牌网站建设图片wordpress 资源站主题
  • 上海建设工程监督总站网站电商网站wordpress
  • 网站建设 医院菏泽网站建设熊掌号
  • 成都网站建设企业预约网免费建站流程
  • 网站建设胶州中国政务网站建设绩效评估
  • 合肥知名网站推广胶东国际机场建设有限公司网站
  • asp.ney旅游信息网站下载 简洁濮阳微信网站开发
  • 建设网站专业怎么上传网站程序到空间