大佬教程收集整理的这篇文章主要介绍了关于配置,你必须要知道这一点....,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
@H_616_7@ kafka管控平台推荐使用 滴滴开源 的 Kafka运维管控平台(戳我呀) 更符合国人的操作习惯 、更强大的管控能力 、更高效的问题定位能力 、更便捷的集群运维能力 、更专业的资源治理 、更友好的运维生态 、
BliBli视频: 石臻臻的杂货铺
kafka的动态配置
今天这篇文章,给大家分享一下最近看kafka中的动态配置,不需要重启Broker,即时生效的配置 欢迎留言一起探讨!
kafka中的配置
- Broker静态配置
.properties
文件- ZK中的动态配置
全局 default
配置- ZK中动态配置 指定配置
优先级从底到高
KafkaServer.startup
1. 动态配置初始化
config.dynamicConfig.initialize(zkClient)
currentConfig
c; 然后从zk中获取节点 /config/brokers/<default>
信息,然后更新配置updateDefaultConfig
; (动态默认配置覆盖静态配置)/config/brokers/{当前BrokerID}
获取配置, 如果配置中有ConfigType=passworD
的配置(例如ssl.keystore.password
)存在,接着判断 是否存在password.encoder.old.secret
配置,(这个配置是用来加解密ConfigType=passworD
的旧的秘钥),尝试用旧秘钥解密秘钥; 然后将这些配置重新加密回写入/config/brokers/{当前BrokerID}
; 然后返回配置 (这里主要是动态配置里面有密码类型配置的时候需要做一次解密加密处理)2. 注册可变更配置监听器
config.dynamicConfig.addReconfigurables(this)
DynamicBrokerConfig.addReconfigurables
// .........
def addReconfigurables(kafkaServer: KafkaServer): Unit = {
kafkaServer.authorizer @H_755_209@match {
case Some(authz: Reconfigurable) => addReconfigurable(authz)
case _ =>
}
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer))
addReconfigurable(new DynamicClientQuotaCallBACk(kafkaConfig.brokerId, kafkaServer))
addBrokerReconfigurable(new DynamicThreadPool(kafkaServer))
if (kafkaServer.logManager.cleaner != null)
addBrokerReconfigurable(kafkaServer.logManager.cleaner)
addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(kafkaServer.socketServer)
}
3. 动态配置启动监听
// Create the config manager. start listening to notifications
dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
dynamicConfigManager.startup()
change-notification-/config/changes
= stateChangeHandler/config/changes
= zNodeChildChangeHandler/config/changes
所有子节点看看有哪些变更lastExecutedChange(启动的时候是-1)
lastExecutedChange
15 * 60 * 1000(15分钟)
就是删除/config/changes /
下面的过期节点TopicConfigHandler.processConfigChanges
data
数据, 如果获取到了则执行通知流程notificationHandler.processnotification(d)
,处理器是ConfigChangedNotificationHandler
; 它先解析节点的json数据,根据版本信息不同调用不同的处理方法; 下面是version=2的处理方式;entityType
和entityName
; 那么久可以去对应的zk数据里面getData获取数据; 并且将获取到的数据Decode成Properties
对象entityConfig
;TopicConfigHandler.processConfigChanges
来进行处理,方法里面再看看流程 ->
entityConfig
里面获取message.format.version
配置消息格式版本号; 如果当前Broker的版本inter.broker.protocol.version
小于message.format.version
配置; 则将message.format.version
配置 排除掉TopicConfigHandler.updateLogConfig
来更新指定Topic的所有TopicPartition的配置,其实是将TP正在加载或初始化的状态标记为没有完成初始化,这将会在后续过程中促成TP重新加载并初始化initializeLeaderEpochCache
; (需要注意的是:这里的动态配置不是支持所有的配置参数,请看【kafka运维】Kafka全网最全最详细运维命令合集(精品强烈建议收藏!!!)的附件部分)leader.Replication.throttled.replicas
,follower.Replication.throttled.replicas
这两个限流相关;解析配置之后,然后通过quotaManager.markThrottled/quotaManager.removeThrottle
更新/移除对应的限流分区集合unclean.leader.election.enable=true(允许非同步副本选主 )
;那么就会执行TopicUncleanLeaderElectionEnable
方法来让它改变选举策略(前提是当前Broker是Controller角色)BrokerConfigHandler.processConfigChanges
假设我们配置了默认配置; zk里面的节点是<default>
sh bin/kafka-configs.sh --bootstrap-server xxxxx:9090 --alter --entity-type brokers
--entity-default
--add-config log.segment.bytes=88888888
/config/changes
里面获取变更节点的json数据.然后去对应的 /config/{entityTypE}/{entituNamE}获取对应的数据<default>
节点,说明有配置动态默认配置; 则按照 静态配置<动态默认配置<动态指定配置 的顺序重新加载覆盖一下; 如果 新旧配置有变更(有可能执行了一次命令但是参数并没有变化的情况,修改了个寂寞)的情况下 才会做更新的; 并且 通知到所有的 BrokerReconfigurable
; 这个就是上面启动时候 1.1 启动加载动态配置总流程的第2步骤 (注册可变更配置监听器) 注册的;leader.Replication.throttled.rate
、follower.Replication.throttled.rate
、replica.alter.log.dirs.io.max.bytes.per.second
都会被更新一下quotaManagers.leader/leader/alterLogDirs.updateQuota
;如果这些配置没有配置的话,则用 Long.MaxValue(相当于是不限流)
来更新--describe
@H_696_46@
简单检验
根据类型查询entities
; type是topics
就获取所有topic; type是broker|broker-loggers
则查询所有Broker节点
遍历entities
获取配置 ;做些简单校验;然后想Broker发起describeConfigs
请求; 节点策略是leastLoadedNodeProvider
节点调用方法 KafkaApis.handleDescribeConfigsrequest
如果有broker|broker-loggers
节点, 则在 获取到数据之后 然后指定nodEID节点发起 describeBrokerConfigs
请求
brokers
broker-loggers
1. 发起请求
--describe
流程是一样的delete-config
配置, 需要校验一下当前配置有没有;如果没有抛出异常;incrementalAlterConfigs
;如果请求类型是 brokers/broker-loggers
则发起请求的接收方是 指定的Broker 节点; 否则就是leastLoadedNodeProvider
(当前负载最少的节点)2. incrementalAlterConfigs 增量修改配置
KafkaApis.handleIncrementalAlterConfigsrequest
configs
;alter.config.policy.class.name=
配置(默认null)的话,则会实例化指定的类(需要继承 AlterConfigPolicy
类);并调用他的 validate
方法来校验;/config/topics/{topicNamE}
中;/config/changes/config_change_序列号
中; 这个节点主要是让Broker们来监听这个节点的来了解到哪个配置有变更的;省略
在 1. Broker启动加载动态配置 中我们了解到有对节点/config/change
注册一个子节点变更的监听处理器
@H_834_696@
那么对动态配置做出修改之后, 这个节点就会新增一条数据,那么所有的Broker都会收到这个通知;
所以我们就要来看一看收到通知之后又做了哪些事情
这个流程是又回到了上面的 1. 2 加载Topics/Brokers动态配置 的流程中了;
原理部分讲解比较详细的可以看 : Kafka动态配置实现原理解析 - 李志涛 - 博客园
/config/change
节点的变化不可以,因为Broker是监听
/config/changes/
里面的Broker节点c;来实时得知有数据变更;
/config/
下面的配置?@H_696_46@
没有必要,这样监听的数据数据太多了,而且 你不知道具体是改了哪个配置,所以每次都要全部更新一遍,无缘无故的加重负担了, 用
/config/change
节点来得知哪个类型的数据变更, 只变更这个相关数据就可以了
以上是大佬教程为你收集整理的关于配置,你必须要知道这一点....全部内容,希望文章能够帮你解决关于配置,你必须要知道这一点....所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。