Kafka学习记录

Apache Kafka 是一个分布式消息发布和订阅系统,具有以下特点:

  1. 被设计为生产者(发布)和消费者(订阅)模式,解耦组件之间的依赖,异步处理消息,增强业务扩展能力。
  2. 作为分布式系统,具备高伸缩能力,为发布和订阅提供高吞吐量。
  3. 持久化消息到磁盘,直到消息过期,兼备批处理和实时处理消息的能力。
  4. 冗余消息数据,具备高可用能力。
  5. 本质上是队列,为消息订阅提供顺序保证。

基本概念

kafka-001

  • Broker(中介)

Kafka集群中的每一个server都是一个broker,具备唯一的id,消息的中转和暂存媒介,为消息提供持久化。消息被发布到这里时,首先滞留在内存中,等待异步线程刷入磁盘;订阅消息时首先会从内存取,如果没有则通过映射的index找到消息位置,发生磁盘IO。

  • Topic(话题)

每条发布到Kafka集群的消息都有一个类别,这个类别被称为topic。通常要预先创建topic然后使用,物理上不同topic的消息分开存储。topic在创建时可以声明副本(replication-factor)数量和分区(partitions)数量,如分区数量3、副本数量2,则产生3个Leader分区和3个与之对应的Follower分区,分布在不同的Broker上。被设置为该topic的消息将被冗余为相应数量的副本,并按照分区策略落入对应的分区。

  • Partition(分区)

partition是包含在topic中的概念,每个topic包含一个或多个partition,创建topic时可指定partition数量。每个partition对应一个文件夹,该文件夹下存储该partition的数据和索引文件。partition命名规则为topic名称+有序序号。分区越多并行消费能力越好,但同时也带来消息同步延迟越大的损失。

  • Offset(偏移量)

任何发布到某个partition的消息都会被直接追加到数据文件的尾部,构成一个有序的、不可变的队列。每条消息在文件中的位置称为offset。offset是一个连续的long型数字,用于partition唯一标识一条消息。消费者可以将“指针”指向任意offset来读取消息,通常由分区起始位置或消费者记录的上次读到到的位置开始线性逐条读取。

  • Segment(片段)

partition中的数据文件和索引文件其实是由一组连续的segments组成,每个逻辑上的segment包括data文件(*.log)和index文件(*.index)。segment文件的大小和生命周期由服务端配置参数决定。

  • Message(消息)

发布和订阅的数据主体,组成segment数据文件的基本单元。

  • Producer(生产者)

Producer是发布消息到Kafka集群(Brokers)的组件。可以制定策略来决定将消息发布到topic的哪个分区上。

  • Consumer(消费者)

Consumer是从Kafka集群订阅并处理消息的组件。每个Consumer可以被指定属于一个特定的Consumer Group,每个group中可以有一个或多个consumer。发送到Topic的消息只会被订阅此Topic的每个group中的一个consumer消费。所谓发布订阅模式是指当某个topic被多个组订阅时,同一条消息可以被不同组分别消费;所谓队列模式是指当某个topic仅被一个组订阅时,一条消息仅被消费一次。一个有5个Partition的Topic被一个有6个Consumer的Group消费时,只会有5个Consumer工作,即一个分区同时只能被组内的一个消费者使用,如果其中一个工作的消费者坏掉了,则另一个没事做的顶替。

集群环境搭建

三台主机:
172.16.0.3 node1
172.16.0.4 node2
172.16.0.5 node3
操作系统:CentOS7
已搭建好zookeeper集群环境

  • 三台主机上分别下载解压kafka
cd /opt
wget http://www-us.apache.org/dist/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz
tar zxvf kafka_2.11-0.9.0.1.tgz
  • Broker配置config/server.properties,参数broker.id、host.name彼此不同。更多参考官方文档
############################# Service #############################

# 用整数作为Broker实例的唯一标识.
broker.id=1
#broker.id=2
#broker.id=3
# Socket监听端口
port=9092
# Broker实例绑定的主机名或网络接口地址
host.name=172.16.0.3
#host.name=172.16.0.4
#host.name=172.16.0.5
# 处理网络请求的线程数
num.network.threads=3
# 处理磁盘IO的线程数
num.io.threads=8
# Socket使用的发送消息缓冲区大小(SO_SNDBUF)
socket.send.buffer.bytes=102400
# Socket使用的接收消息缓冲区大小(SO_RCVBUF)
socket.receive.buffer.bytes=102400
# Socket接收的最大消息大小(防止OOM)
socket.request.max.bytes=104857600

############################# Log(Message) #############################

# 消息日志存储目录
log.dirs=/data/kafka
# 话题缺省的分区数量
num.partitions=1
# 每个数据目录用于日志恢复的线程数
num.recovery.threads.per.data.dir=1
# 消息数量达到此值将触发flush数据到磁盘
log.flush.interval.messages=10000
# 消息缓冲的时间达到此值将触发flush数据到磁盘
log.flush.interval.ms=1000
# 消息日志保留的最大时间(时间策略)
log.retention.hours=168
# 消息日志保留的最大尺寸(尺寸策略)
#log.retention.bytes=1073741824
# 片段的最大尺寸,超过将建立新的片段
log.segment.bytes=1073741824
# 检查片段是否应该删除的时间间隔
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper连接串,可追加路径(自行创建)
zookeeper.connect=node1:2181,node2:2181,node3:2181
  • 启动Kafka集群的三个Broker,同样的Zookeeper配置决定了他们属于同一个集群。
/opt/kafka/bin/kafka-server-start.sh ../config/server.properties -daemon

基本操作

# 启动Broker
/opt/kafka/bin/kafka-server-start.sh -daemon ../config/server.properties
# 停止Broker
/opt/kafka/bin/kafka-server-stop.sh
# 创建分区数量5,副本数量2的话题my-replicated-topic5
/opt/kafka/bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181/kafka --replication-factor 2 --partitions 5 --topic my-replicated-topic5
# 查看话题my-replicated-topic5
/opt/kafka/bin/kafka-topics.sh --describe --zookeeper node1:2181,node2:2181/kafka --topic my-replicated-topic5
# 删除话题test2
/opt/kafka/bin/kafka-topics.sh --delete --zookeeper node1:2181,node2:2181/kafka --topic test2

# 控制台生产者消费者例子。以下完成后在node1输入可以在node2、node3看到输出
# 在node1启动生产者示例
/opt/kafka/bin/kafka-console-producer.sh --broker-list node1:9092,node2:9092 --topic my-replicated-topic5
# 在node2启动消费者示例
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper node1:2181,node2:2181/kafka --from-beginning --topic my-replicated-topic5 
# 在node3启动消费者示例,并输出到/tmp/tmpout-kafka
/opt/kafka/bin/kafka-console-consumer.sh --zookeeper node1:2181,node2:2181/kafka --from-beginning --topic my-replicated-topic5 > /tmp/tmpout-kafka &

应用解析

kafka-002

  • 消息系统(MQ)。用于异步传递消息。
  • 服务状态实时监控。
  • 用户行为追踪。
  • 流式计算。
  • 数据仓库。用于离线批量计算等。

参考

http://kafka.apache.org/documentation.html
http://tech.meituan.com/kafka-fs-design-theory.html
http://www.jasongj.com/2015/01/02/Kafka%E6%B7%B1%E5%BA%A6%E8%A7%A3%E6%9E%90/

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但请保留文章署名wanghengbin(包含链接:https://wanghengbin.com),不得用于商业目的,基于本文修改后的作品请以相同的许可发布。

评论(1) “Kafka学习记录

发表评论