大佬教程收集整理的这篇文章主要介绍了Kafka私房菜————|消息队列|Kafka特性|Kafka系统架构|Zookeeper+Kafka集群部署|超详细图解|,大佬教程大佬觉得挺不错的,现在分享给大家,也给大家做个参考。
消息(message)是指在应用之间传送的数据c;消息可以非常简单c;比如只包含文本字符串c;也可以更复杂c;可能包含嵌入对象。 消息队列(message Queue)是一种应用间的通信方式c;消息发送后可以立即返回c;有消息系统来确保信息的可靠专递c;消息发布者只管把消息发布到MQ中而不管谁来取c;消息使用者只管从MQ中取消息而不管谁发布的c;这样发布者和使用者都不用知道对方的存在。
主要原因是由于在高并发环境下c;同步请求来不及处理c;请求往往会发生阻塞。比如大量的请求并发访问数据库c;导致行锁表锁c;最后请求线程会堆积过多c;从而触发 too many connection 错误c;引发雪崩效应。 我们使用消息队列c;通过异步处理请求c;从而缓解系统的压力。消息队列常应用于异步处理c;流量削峰c;应用解耦c;消息通讯等场景。
当前比较常见的 MQ 中间件 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。
允许你独立的扩展或修改两边的处理过程c;只要确保它们遵守同样的接口约束。
系统的一部分组件失效时c;不会影响到整个系统。消息队列降低了进程间的耦合度c;所以即使一个处理消息的进程挂掉c;加入队列中的消息仍然可以在系统恢复后被处理。
有助于控制和优化数据流经过系统的速度c;解决生产消息和消费消息的处理速度不一致的情况。
在访问量剧增的情况下c;应用仍然需要继续发挥作用c;但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力c;而不会因为突发的超负荷的请求而完全崩溃。
很多时候c;用户不想也不需要立即处理消息。消息队列提供了异步处理机制c;允许用户把一个消息放入队列c;但并不立即处理它。想向队列中放入多少消息就放多少c;然后在需要的时候再去处理它们。
(1)点对点模式(一对一c;消费者主动拉取数据c;消息收到后消息清除) 消息生产者生产消息发送到消息队列中c;然后消息消费者从消息队列中取出并且消费消息。消息被消费以后c;消息队列中不再有存储c;所以消息消费者不可能消费到已经被消费的消息。消息队列支持存在多个消费者c;但是对一个消息而言c;只会有一个消费者可以消费。
(2)发布/订阅模式(一对多c;又叫观察者模式c;消费者消费数据之后不会清除消息) 消息生产者(发布)将消息发布到 topic 中c;同时有多个消息消费者(订阅)消费该消息。和点对点方式不同c;发布到 topic 的消息会被所有订阅者消费。 发布/订阅模式是定义对象间一种一对多的依赖关系c;使得每当一个对象(目标对象)的状态发生改变c;则所有依赖于它的对象(观察者对象)都会得到通知并自动更新。
Kafka 是一个分布式的基于发布/订阅模式的消息队列(MQc;message Queue)c;主要应用于大数据实时处理领域。
Kafka 是最初由 Linkedin 公司开发c;是一个分布式、支持分区的(partition)、多副本的(replica)c;基于 Zookeeper 协调的分布式消息中间件系统c;它的最大的特性就是可以实时的处理大量数据以满足各种需求场景c;比如基于 hadoop 的批处理系统、低延迟的实时系统、Spark/Flink 流式处理引擎c;nginx 访问日志c;消息服务等等c;用 scala 语言编写c; Linkedin 于 2010 年贡献给了 Apache 基金会并成为顶级开源项目。
Kafka 每秒可以处理几十万条消息c;它的延迟最低只有几毫秒。每个 topic 可以分多个 Partitionc;Consumer Group 对 Partition 进行消费操作c;提高负载均衡能力和消费能力。
kafka 集群支持热扩展
消息被持久化到本地磁盘c;并且支持数据备份防止数据丢失
允许集群中节点失败(多副本情况下c;若副本数量为 nc;则允许 n-1 个节点失败)
支持数千个客户端同时读写
一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker 可以容纳多个 topic。
为了实现扩展性c;一个非常大的 topic 可以分布到多个 broker(即服务器)上c;一个 topic 可以分割为一个或多个 partitionc;每个 partition 是一个有序的队列。Kafka 只保证 partition 内的记录是有序的c;而不保证 topic 中不同 partition 的顺序。
每个 topic 至少有一个 partitionc;当生产者产生数据的时候c;会根据分配策略选择分区c;然后将消息追加到指定的分区的队列末尾。
每条消息都会有一个自增的编号c;用于标识消息的偏移量c;标识顺序从 0 开始。 每个 partition 中的数据使用多个 segment 文件存储。 如果 topic 有多个 partitionc;消费数据时就不能保证数据的顺序。严格保证消息的消费顺序的场景下(例如商品秒杀、 抢红包)c;需要将 partition 数目设为 1。
每个 partition 有多个副本c;其中有且仅有一个作为 Leaderc;Leader 是当前负责数据的读写的 partition。
Follower 跟随 Leaderc;所有写请求都通过 Leader 路由c;数据变更会广播给所有 Followerc;Follower 与 Leader 保持数据同步。Follower 只负责备份c;不负责数据的读写。 如果 Leader 故障c;则从 Follower 中选举出一个新的 Leader。 当 Follower 挂掉、卡住或者同步太慢c;Leader 会把这个 Follower 从 ISR(Leader 维护一个和 Leader 保持同步的 Follower 集合) 列表中删除c;重新创建一个 Follower。
副本c;为保证集群中的某个节点发生故障时c;该节点上的 partition 数据不丢失c;且 kafka 仍然能够继续工作c;kafka 提供了副本机制c;一个 topic 的每个分区都有若干个副本c;一个 leader 和若干个 follower。
生产者即数据的发布者c;该角色将消息发布到 Kafka 的 topic 中。 broker 接收到生产者发送的消息后c;broker 将该消息追加到当前用于追加数据的 segment 文件中。 生产者发送的消息c;存储到一个 partition 中c;生产者也可以指定数据存储的 partition。
消费者可以从 broker 中读取数据。消费者可以消费多个 topic 中的数据。
消费者组c;由多个 consumer 组成。 所有的消费者都属于某个消费者组c;即消费者组是逻辑上的一个订阅者。可为每个消费者指定组名c;若不指定组名则属于默认的组。 将多个消费者集中到一起去处理某一个 Topic 的数据c;可以更快的提高数据的消费能力。 消费者组内每个消费者负责消费不同分区的数据c;一个分区只能由一个组内消费者消费c;防止数据被重复读取。 消费者组之间互不影响。
可以唯一的标识一条消息。 偏移量决定读取数据的位置c;不会有线程安全的问题c;消费者通过偏移量来决定下次读取的消息(即消费位置)。 消息被消费之后c;并不被马上删除c;这样多个业务就可以重复使用 Kafka 的消息。 某一个业务也可以通过修改偏移量达到重新读取消息的目的c;偏移量由用户控制。 消息最终还是会被删除的c;默认生命周期为 1 周(7*24小时)。
Kafka 通过 Zookeeper 来存储集群的 meta 信息。
由于 consumer 在消费过程中可能会出现断电宕机等故障c;consumer 恢复后c;需要从故障前的位置的继续消费c;所以 consumer 需要实时记录自己消费到了哪个 offsetc;以便故障恢复后继续消费。 Kafka 0.9 版本之前c;consumer 默认将 offset 保存在 Zookeeper 中;从 0.9 版本开始c;consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中c;该 topic 为 __consumer_offsets。
准备3台服务器做zookeeper集群
192.168.148.136 192.168.148.132 192.168.148.133
//关闭防火墙
systemctl stop firewalld
systemctl disable firewalld
setenforce o
yum install -y java-1.8.0-openjdk java-1.8.0-openjdk-devel
java -version
cd /opt
tar -zxvf apache-zookeeper-3.5.7-bin.tar.gz
mv apache-zookeeper-3.5.7-bin /usr/local/zookeeper-3.5.7
cd /usr/local/zookeeper-3.5.7/conf/
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
tickTime=2000 #通信心跳时间c;Zookeeper服务器与客户端心跳时间c;单位毫秒
initLimit=10 #Leader和Follower初始连接时能容忍的最多心跳数(tickTime的数量)c;这里表示为10*2s
syncLimit=5 #Leader和Follower之间同步通信的超时时间c;这里表示如果超过5*2sc;Leader认为Follwer死掉c;并从服务器列表中删除Follwer
dataDir=/usr/local/zookeeper-3.5.7/data #修改c;指定保存Zookeeper中的数据的目录c;目录需要单独创建
dataLogDir=/usr/local/zookeeper-3.5.7/logs #添加c;指定存放日志的目录c;目录需要单独创建
clientPort=2181 #客户端连接端口
#添加集群信息
server.1=192.168.148.136:3188:3288
server.2=192.168.148.132:3188:3288
server.3=192.168.148.133:3188:3288
server.A=B:C:D A是一个数字c;表示这个是第几号服务器。集群模式下需要在zoo.cfg中dataDir指定的目录下创建一个文件myidc;这个文件里面有一个数据就是a的值c;Zookeeper启动时读取此文件c;拿到里面的数据与zoo.cfg里面的配置信息比较从而判断到底是哪个server。 B是这个服务器的地址。 C是这个服务器Follower与集群中的Leader服务器交换信息的端口。 D是万一集群中的Leader服务器挂了c;需要一个端口来重新进行选举c;选出一个新的l.eaderc;而这个端口就是用来执行选举时服务器相互通信的端口。
scp /usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.148.132:/usr/local/zookeeper-3.5.7/conf/
scp /usr/local/zookeeper-3.5.7/conf/zoo.cfg 192.168.148.133:/usr/local/zookeeper-3.5.7/conf/
@H_997_374@mkdir /usr/local/zookeeper-3.5.7/data mkdir /usr/local/zookeeper-3.5.7/logs
echo 1 > /usr/local/zookeeper-3.5.7/data/@H_319_376@myid
echo 2 > /usr/local/zookeeper-3.5.7/data/@H_319_376@myid
echo 3 > /usr/local/zookeeper-3.5.7/data/@H_319_376@myid
vim /etc/init.d/zookeeper
# !/bin/bash
#chkconfig:2345 20 90
#description:Zookeeper @R_673_9260@ce Control Script
ZK_HOME=@H_155_874@'/usr/local/zookeeper-3.5.7'
case $1 in
start)
echo @H_155_874@"---------- zookeeper启动------------"
$ZK_HOME/bin/zkServer.sh start
; ;
stop)
echo @H_155_874@"---------- zookeeper 停止-----------"
$ZK_HOME/bin/zkServer.sh stop
;;
restart)
echo @H_155_874@"---------- zookeeper 重启-----------"
$ZK_HOME/bin/zkServer.sh restart
;;
status)
echo @H_155_874@"---------- zookeeper状态------------"
$ZK_HOME/bin/zkServer.sh status
;;
*)
echo @H_155_874@"usage: $0 {start| stop|restart|status }"
esac
设置开机自启
chmod +x /etc/init.d/zookeeper
chkconfig --add zookeeper
分别启动Zookeeper
scp /etc/init.d/zookeeper 192.168.148.132:/etc/init.d/
scp /etc/init.d/zookeeper 192.168.148.133:/etc/init.d/
@R_673_9260@ce zookeeper start
查看当前状态
@R_673_9260@ce zookeeper status
@H_548_1058@
cd /opt/
tar zxvf kafka_2.13-2.7.1.tgz
mv kafka_2.13-2.7.1 /usr/local/kafka
cd /usr/local/kafka/config/
cp server.properties{,.bak}
vim server.properties
broker.id=0 #21行c;broker的全局唯一编号c;每个broker不能重复c;因此要在其他机器上配置 broker.id=1、broker.id=2
listeners=PLAIntexT://192.168.148.136:9092 #31行c;指定监听的IP和端口c;如果修改每个broker的IP需区分开来c;也可保持默认配置不用修改
num.network.threads=3 #42行c;broker 处理网络请求的线程数量c;一般情况下不需要去修改
num.io.threads=8 #45行c;用来处理磁盘IO的线程数量c;数值应该大于硬盘数
socket.send.buffer.bytes=102400 #48行c;发送套接字的缓冲区大小
socket.receive.buffer.bytes=102400 #51行c;接收套接字的缓冲区大小
socket.request.@H_319_376@max.bytes=104857600 #54行c;请求套接字的缓冲区大小
log.dirs=/usr/local/kafka/logs #60行c;kafka运行日志存放的路径c;也是数据存放的路径
num.partitions=1 #65行c;topic在当前broker上的默认分区个数c;会被topic创建时的指定参数覆盖
num.recovery.threads.per.data.dir=1 #69行c;用来恢复和清理data下数据的线程数量
log.retention.hours=168 #103行c;segment文件(数据文件)保留的最长时间c;单位为小时c;默认为7天c;超时将被删除
log.segment.bytes=1073741824 #110行c;一个segment文件最大的大小c;默认为 1Gc;超出将新建一个新的segment文件
zookeeper.connect=192.168.148.136:2181,192.168.148.132:2181,192.168.148.133:2181 #123行c;配置连接Zookeeper集群地址
vim /etc/profile
export KAFKA_HOME=/usr/local/kafka
export PATH=$PATH:$KAFKA_HOME/bin
source /etc/profile
vim /etc/init.d/kafka
#!/bin/bash
#chkconfig:2345 22 88
#description:Kafka @R_673_9260@ce Control Script
KAFKA_HOME=@H_155_874@'/usr/local/kafka'
case $1 in
start)
echo @H_155_874@"---------- Kafka 启动 ------------"
${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
;;
stop)
echo @H_155_874@"---------- Kafka 停止 ------------"
${KAFKA_HOME}/bin/kafka-server-stop.sh
;;
restart)
$0 stop
$0 start
;;
status)
echo @H_155_874@"---------- Kafka 状态 ------------"
count=$(ps -ef | grep kafka | egrep -cv @H_155_874@"grep|$$")
if [ @H_155_874@"$count" -eq 0 ];then
echo @H_155_874@"kafka is not running"
else
echo @H_155_874@"kafka is running"
fi
;;
*)
echo @H_155_874@"usage: $0 {start|stop|restart|status}"
esac
chmod +x /etc/init.d/kafka
chkconfig --add kafka
#分别启动 Kafka
@R_673_9260@ce kafka start
#创建topic
kafka-topics.sh --create --zookeeper 192.168.148.136:2181,192.168.148.132:2181,192.168.148.133:2181 --Replication-factor 2 --partitions 3 --topic test
–zookeeper:定义 zookeeper 集群服务器地址c;如果有多个 IP 地址使用逗号分割c;一般使用一个 IP 即可 –Replication-factor:定义分区副本数c;1 代表单副本c;建议为 2 –partitions:定义分区数 –topic:定义 topic 名称
kafka-topics.sh --list --zookeeper 192.168.148.136:2181,192.168.148.132:2181,192.168.148.133:2181
kafka-topics.sh --describe --zookeeper 192.168.148.136:2181,192.168.148.132:2181,192.168.148.133:2181
kafka-console-producer.sh --broker-list 192.168.148.136:9092,192.168.148.134:9092,192.168.148.132:9092 --topic test
@H_607_1590@
kafka-console-consumer.sh --bootstrap-server 192.168.148.136:9092,192.168.148.134:9092,192.168.148.132:9092 --topic test --from-beginning
--from-beginning:会把主题中以往所有的数据都读取出来
kafka-topics.sh --zookeeper 192.168.148.136:2181,192.168.148.132:2181,192.168.148.133:2181 --alter --topic test --partitions 6
kafka-topics.sh --delete --zookeeper 192.168.148.136:2181,192.168.148.132:2181,192.168.148.133:2181--topic test
以上是大佬教程为你收集整理的Kafka私房菜————|消息队列|Kafka特性|Kafka系统架构|Zookeeper+Kafka集群部署|超详细图解|全部内容,希望文章能够帮你解决Kafka私房菜————|消息队列|Kafka特性|Kafka系统架构|Zookeeper+Kafka集群部署|超详细图解|所遇到的程序开发问题。
如果觉得大佬教程网站内容还不错,欢迎将大佬教程推荐给程序员好友。
本图文内容来源于网友网络收集整理提供,作为学习参考使用,版权属于原作者。
如您有任何意见或建议可联系处理。小编QQ:384754419,请注明来意。