Java应用学习(十)-Kafka 学习笔记一
一、消息队列
1.1、什么是 MQ ?
MQ 即 Message Queue ,即消息队列中间件,很多人说 MQ 通过将消息的发送和接收分离来实现应用程序的异步和解耦,这给人的直觉是 – MQ 是异步的,用来解耦的,但这只是 MQ 的效果而不是目的。
MQ 真正的目的是为了通讯,它屏蔽了底层复杂的通讯系统,定义了一套应用层的、更加简单的通讯协议。一个分布式系统中两个模块之间的通信要么是 HTTP ,要么是自己开发的 TCP (RPC),但是这两种协议其实都是原始的协议。
- HTTP 协议很难实现两端通信
A 可以调 B ,B 也可以主动调用 A ,但 HTTP 协议不支持长连接
- TCP 更加原始
MQ 做的就是在这些协议上构建一个更加简单的协议 – 生产者 / 消费者模型,它定义了两个对象 – 发送消息的称为发送者,接收数据的称为接收者,它同时提供了一个 SDK 让我们可以自定义生产者与消费者实现消息通讯且无视底层协议。
1.2、有 Broker 的 MQ
这种 MQ 通常有一台服务器作为 Broker ,所有的消息都经由这个 Broker 进行中转。生产者在把消息发送给它后就结束自己的任务了,而消费者可以通过主动拉取或者 Broker 推送的方式获取消息。
1、重 Topic
Kafka 就属于这个流派,生产者会发送 Key 和数据到 Broker ,由 Broker 比较 Key 后再决定给哪个消费者。这种模式是最常见的模式,该模式下, Topic 往往是一个比较大的概念,甚至一个系统中只有一个 Topic , Topic 某种意义上就是 Queue。
如上图所示,Broker 定义了三个队列,key1,key2,key3,⽣产者发送数据的时候会发送 key1 和 data,Broker 在推送数据的时候则推送 data(也可能把 key 带上)。
2、轻 Topic
这种的代表是 RabbitMQ(或者说是 AMQP)。⽣产者发送 key 和数据,消费者定义订阅的队列,Broker 收到数据之后会通过⼀定的逻辑计算出 key 对应的队列,然后把数据交给队列。
这种模式下解耦了 key和 queue,在这种架构中 queue是⾮常轻量级的(在 RabbitMQ中它的上限取决于你的内存),消费者关⼼的只是⾃⼰的 queue;⽣产者不必关⼼数据最终给谁只要指定 key 就⾏了,中间的那层映射在 AMQP中叫 exchange(交换机)。
- 可以这样理解,在这些 MQ 中, Topic 只是其中一种类型
- AMQP 有四种 exchange
- Direct exchange:key就等于 queue
- Fanout exchange:⽆视 key,给所有的 queue都来⼀份
- Topic exchange:key可以⽤ “宽字符” 模糊匹配 queue
- Headers exchange:⽆视 key,通过查看消息的头部元数据来决定发给那个 queue (AMQP头部元数据⾮常丰富⽽且可以⾃定义)
这种结构的架构给通讯带来了很⼤的灵活性,我们能想到的通讯⽅式都可以⽤这四种 exchange 表达出来。如果你需要⼀个企业数据总线(在乎灵活性)那么 RabbitMQ 绝对的值得⼀⽤。
1.3、无 Broker 的 MQ
⽆ Broker的 MQ的代表是 ZeroMQ。该作者⾮常睿智,他⾮常敏锐的意识到——MQ是更⾼级的 Socket,它是解决通讯问题的。
所以 ZeroMQ 被设计成了⼀个“库”⽽不是⼀个中间件,这种实现也可以达到——没有 Broker 的⽬的节点之间通讯的消息都是发送到彼此的队列中,每个节点都既是⽣产者⼜是消费者。ZeroMQ 做的事情就是封装出⼀套类似于 Socket 的 API 可以完成发送数据,读取数据
- ZeroMQ 其实就是⼀个跨语⾔的、重量级的 Actor 模型邮箱库。
- ZeroMQ可以实现同⼀台机器的 RPC通讯也可以实现不同机器的 TCP、UDP通讯,如果你需要⼀个强⼤的、灵活、野蛮的通讯能⼒,别犹豫 ZeroMQ
二、Kafka 介绍
Apache Kafka 是⼀个分布式、⽀持分区的(partition)、多副本的(replica),基于 zookeeper 协调的分布式消息系统。
它的最⼤的特性就是可以实时的处理⼤量数据以满⾜各种需求场景:⽐如基于 hadoop 的批处理系统、低延迟的实时系统、Storm / Spark流式处理引擎,web / nginx⽇志、访问⽇志,消息服务等等,⽤ scala 语⾔编写。
2.1、Kafka 的使用场景
1、日志收集
⼀个公司可以⽤
Kafka
收集各种服务的log
,通过Kafka
以统⼀接⼝服务的⽅式开放给各种consumer
,例如hadoop
、Hbase
、Solr
等。
2、消息系统
解耦和⽣产者和消费者、缓存消息等。
3、用户活动跟踪
Kafka
经常被⽤来记录web
⽤户或者app
⽤户的各种活动,如浏览⽹⻚、搜索、点击等活动,这些活动信息被各个服务器发布到kafka
的topic
中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到hadoop
、数据仓库中做离线分析和挖掘。
4、运营指标
Kafka 也经常⽤来记录运营监控数据。包括收集各种分布式应⽤的数据,⽣产
各种操作的集中反馈,⽐如报警和报告。
2.2、Kafka 的基本概念
Kafka
是⼀个分布式的,分区的消息(官⽅称之为commit log
)服务。它提供⼀个消息系统应该具备的功能,但是确有着独特的设计。可以这样来说,Kafka 借鉴了 JMS 规范的思想,但是确并没有完全遵循 JMS 规范。
名称 | 描述 |
---|---|
Broker | 消息中间件处理节点,⼀个 Kafka 节点就是⼀个broker,⼀个或者多个Broker可以组成⼀个 Kafka 集群 |
Topic | Kafka 根据 topic 对消息进行归类,发布到 Kafka 集群中的每条消息都需要指定一个 Topic |
Producer | 消息⽣产者,向Broker发送消息的客户端 |
Consumer | 消息消费者,从Broker读取消息的客户端 |
Consumer Group | 每个 Consumer 属于⼀个特定的 Consumer Group ,⼀条消息可以被多个不同的 Consumer Group 消费,但是⼀个 Consumer Group 中只能有⼀个Consumer 能够消费该消息 |
Partition | 物理上的概念,⼀个topic可以分为多个partition,每个partition内部消息是有序的 |
2.3、Kafka 的基本知识
1、Kafka 的安装
- 到 Kafka 中下载安装包,解压即可
- 进入解压后的目录,使用
ls
命令查看情况
- 进入 config 目录,查看情况
- server.properties 配置文件是主要配置文件,我们需要修改以下内容
1 | # broker.id 属性在 kafka 集群中必须要是唯⼀ |
2、启动
- 进入 bin 目录下,使用命令启动
1 | ./kafka-server-start.sh -daemon ../config/server.properties |
- 验证是否启动成功
进入 Zookeeper 中查看 id 为 0 的 broker 是否存在
3、server.properties 核心配置详解
属性 | 默认值 | 描述 |
---|---|---|
broker.id | 0 | 每个broker都可以⽤⼀个唯⼀的⾮负整数id进⾏标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯⼀的即可。 |
log.dirs | /tmp/kafka-logs | kafka存放数据的路径。这个路径并不是唯⼀的,可以是多个,路径之间只需要使⽤逗号分隔即可;每当创建新partition 时,都会选择在包含最少 partitions 的路径下进⾏。 |
listeners | PLAINTEXT://:9092 | Server 接收客户端连接的端口,IP 配置 Kafka 本机 IP 即可 |
zookeeper.connect | localhost:2181 | Zookeeper连接字符串的格式为:hostname:port,此处hostname和port分别是 Zookeeper 中某个节点的 host 和 port ;如果是集群,那么连接方式为 hostname1:port1,host2:port2… |
log.retention.hours | 168 | 每个日志文件删除之前保存的时间,默认数据保存时间对 所有 topic 都一样 |
num.partitions | 1 | 默认 topic 的默认分区数 |
default.replication.factor | 1 | 自动创建 topic 的默认副本数量,建议设置为大于等于 2 |
min.insync.replicas | 1 | 当 producer 设置 ack 为 -1 时,min.insync.replicas 指定 replicas 的最小数目(必须确认每一个 replicas 的写数据都是成功的),如果这个数目没有达到,那么 producer 发送消息会产生异常。 |
delete.topic.enable | false | 是否允许删除主题 |
4、创建主题 topic
什么是主题 Topic ?Topic 可以实现消息的分类,不同的消费者订阅不同的 topic ,就可以接收到不同的消息
- 执行以下命令创建一个名为
test
的 topic ,这个 topic 只有一个分区 partitions ,而且备份 replication 也只有一份
1 | ./kafka-topics.sh --create --zookeeper IP:2181 --replication-factor 1 --partitions 1 --topic test |
- 查看当前 kafka 中有哪些 topic
1 | ./kafka-topics.sh --list --zookeeper IP:2181 |
5、发送消息
kafka ⾃带了⼀个 producer 命令客户端,可以从本地⽂件中读取内容,或者我们也可以以命令⾏中直接输⼊内容,并将这些内容以消息的形式发送到kafka集群中。
在默认情况下,每⼀个⾏会被当做成⼀个独⽴的消息。使⽤ kafka 的发送消息的客户端,指定发送到的kafka服务器地址和topic
- 发送消息
1 | ./kafka-console-producer.sh --broker-list IP:9092 --topic test |
我们可以在命令客户端中输入消息,一行就是一条消息,比如我们可以发送两条消息,分别是 hello world
6、接收消息
对于 consumer,kafka 同样也携带了⼀个命令⾏客户端,会将获取到内容在命令中进⾏输出,默认是消费最新的消息。
使⽤ kafka 的消费者消息的客户端,从指定 kafka 服务器的指定 topic 中消费消息
- 方式一:从最后一条消息的偏移量 + 1 开始消费
1 | ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test |
可以看到,由于是从最后一条消息的偏移量开始消费,所以这里消费者会阻塞住
我们在生产者处发送消息后,可以看到消费者接收到了新消息
- 生产者
- 消费者
- 方式二:从头开始消费
1 | ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning |
演示效果,连接上后,可以看到读取到了所有生产者发送的消息
7、几个特点
Kafka 中的消息会被存储
Kafka 中的消息是顺序存储的
Kafka 中的消息是有偏移量的
消费者消费消息时也是通过偏移量来描述当前要消费的消息的位置
- Kafka 中的消息在消费时可以指明偏移量进行消费
- 生产者发送消息给 broker 后,broker 会将消息保存在本地的日志文件中,日志文件的位置在 server.properties 文件中可以配置
2.4、Kafka 中的关键细节
1、单播消息的实现
⼀个消费组⾥只会有⼀个消费者能消费到某⼀个 topic 中的消息。于是可以创建多
个消费者,这些消费者在同⼀个消费组中。如果要实现单播消息,那么只需要让多个消费者同一个消费组中即可。
1 | ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test |
2、多播消息的实现
在⼀些业务场景中需要让⼀条消息被多个消费者消费,那么就可以使⽤多播模式。
Kafka 实现多播,只需要让不同的消费者处于不同的消费组即可。
1 | ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test |
3、消息的顺序存储
消息的发送⽅会把消息发送到 broker 中,broker 会存储消息,消息是按照发送的顺序进⾏存储。
因此消费者在消费消息时可以指明主题中消息的偏移量。默认情况下,是从最后⼀个消息的下⼀个偏移量开始消费。
4、查看消费组及消息
- 查看当前主题下有哪些消费组
1 | ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list |
- 查看消费组中的具体信息
比如当前偏移量,最后一条消息的偏移量,堆积的消息数量等等
1 | ./kafka-consumer-groups.sh --bootstrap-server localhost:9092 -- |
三、Kafka 主题和分区的概念
3.1、主题 Topic
主题 Topic 在 Kafka 中是一个逻辑的概念,Kafka 通过 Topic 将消息进行分类,不同 Topic 的信息会被订阅不同 Topic 的消费者所消费。
可以这样理解,Kafka 使用 Topic 对消息进行分类维护。
3.2、分区 Partition
1、概念
- 当一个 Topic 中的消息非常非常多时,多到需要用几 T 的文件来存储消息(
Kafka
将消息持久化到文件中)时,为了解决这个文件过大的问题, Kafka 提出了分区(Partition)的概念 - 每个分区都是一个单独的 log 文件,每条记录都以追加的形式写入。
- 分区中的每条记录都会被分配一个唯一的序号,成为偏移量(Offset)
- Offset 是一个递增的、不可变的数字,由 Kafka 自动维护。
- 当一条记录写入 Partition 的时候,它就被追加到 log 文件的末尾,并被分配一个序号,作为 Offset。
如上图所示,一个文件中的消息量非常大,这个时候就可以通过分区来分布式地存储这些消息,比如说一个 Topic 创建了三个分区,那么 Topic 中地消息就会被分别存放在这三个分区中。
2、优点
- 可以对消息进行分布式存储
可以解决单个文件过大的问题,同时可以将 IO 压力分散到多个文件上,提高性能
- 可以提高吞吐量
生产者可以向多个分区并行写入数据,消费者也可以同时消费多个分区中的数据,让读和写同时在多个分区中进行。
3、消息的有序性
在向 Topic 发送消息时,实际上是被写入某一个分区,并赋予偏移量(Offset)
- 一个 Topic 如果有多个分区,那么从 Topic 的层面上看,消息是无序的
- 如果单独只看一个分区,那么一个分区内的消息是有序的。
一个分区内部消息有序,一个 Topic 跨分区的消息无序,如果强制要求 Topic 整体有序,那么只能让 Topic 拥有一个分区。
4、分区操作
- 为一个主题创建多个分区
下面的命令为 test1 主题创建了两个分区
1 | ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test1 |
- 查看 topic 的分区信息
1 | ./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1 |
5、生产者分区写入策略
生产者将消息写入到 Topic 时, Kafka 将依据不同的策略将数据分配到不同的分区中,常用的有四种分区写入策略
- 轮询分区策略
默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区。如果在生产消息时,key 为 null,则使用轮询算法均衡地分配分区
- 随机分区策略(不用)
随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以基本上很少会使用随机策略。
- 按
Key
分区分配策略
按
Key
分配策略,有可能会出现「数据倾斜」,例如:某个Key
包含了大量的数据,因为Key
值一样,所有所有的数据将都分配到一个分区中,造成该分区的消息数量远大于其他的分区。
- 自定义分区策略
通过自定义分区组件实现自定义分区策略,下面给出实现步骤
- 创建一个自定义分区类
MyPartitioner
,这个类需要实现Partitioner
接口,重写接口方法 - 在 Kafka 生产者配置类中,配置上一步编写的自定义分区类的类名即可。
1 | props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName()); |
6、分区策略导致的消息乱序问题
分区写入策略中的轮询策略、随机策略都会导致一个问题,即生产者发送到 Broker 中的消息是乱序的
分区写入策略中的按
Key
分区策略可以在一定程度上实现消息的有序存储(同一个 Key 的数据被保存到同一个分区中,而一个分区中的消息是有序的),但又可能导致一个新的问题 – 数据倾斜(一定时间内很多数据堆积在一个分区中)
所以实际开发中需要考虑实际情况来做取舍。
3.3、副本 Replication
1、概念说明
副本是对分区的备份,在集群中,不同的副本会被部署在不同的 broker 上。
- 在一个集群中创建 1 个主题,2 个分区、3 个副本。
1 | ./kafka-topics.sh --create --zookeeper localhost:2181 --replication- factor 3 --partitions 2 --topic my-replicated-topic |
可以看到,在一个 Broker 中存在着两个分区,而每个 Broker 中存在着一个副本,每个 Broker 上的数据都是完备的,同时有另外两个 Broker 作为备份存在。
- 副本中 Leader 的含义
在上面的输出中,我们可以得出两个信息:
- 在分区 0 的三个副本中,副本的 Leader 是编号 0 的副本,即第一个副本。
- 在分区 1 的三个副本中,副本的 Leader 是编号 1 的副本,即第二个副本。
副本的 Leader 负责消息的读取和写入,同时 Leader 需要向其他的两个 Follower 副本进行数据同步。
当 Leader 副本挂掉时,那么剩下两个副本就可以起到数据备份的作用。
2、主题信息关键数据
从上图中,我们可以得到该主题的一些关键信息
- Leader
副本中的概念,在 Kafka 集群中,一个分区可能有多个副本,这些副本分散在不同的 Broker 中,一个分区的多个副本中需要有一个 Broker 上的副本作为 Leader 。
Leader 专门用来接收消息,在接收到消息后,其他 Follower 通过 poll 的方式来同步数据
- Follower
Leader 处理所有针对这个分区的读写请求,⽽follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的⼀致性),如果 Leader 所在的 Broker 挂掉,那么剩下的 Follower 中就会进行新 Leader 的选举。
- Isr
- 可以同步的 broker 节点(和 Leader 连接畅通)和已同步(数据是最新的)的 broker 节点,存放在isr集合中。
- 在 Leader 挂掉后,Kafka 会从给 Isr 集合中选举新的节点(要求被选举节点中的数据是最新的,且是正常的)
- 如果处于 Isr 列表中的节点性能较差,那么它会被提出 Isr 集合中。
3.4、概念总结
- Kafka 集群由多个 Broker 组成,一台 Kafka Server 可以看为一个 Broker
- 一个 Broker 可以有多个主题,一个 Broker 中存放了一个 Topic 的所有分区,当前集群中的其他 Broker 中存放的该主题的数据作为备份存在。
- 对于每一个分区的所有副本中,会有一个副本作为 Leader 存在,剩下的副本作为这个 Leader 的 Follower
3.5、Kafka 集群消息
1、集群消息的发送
1 | ./kafka-console-producer.sh --broker-list IP1:Port1,IP2:Port2,IP3:Port3 --topic my-replicated-topic |
这样就可以打开消息发送控制台,进行消息发送
2、集群消息的消费
1 | ./kafka-console-consumer.sh --bootstrap-server IP1:Port1,IP2:Port2,IP3:Port3 --from- beginning --topic my-replicated-topic |
3、关于分区消费组消费者的细节
- 一个分区中的消息最多被同一个消费组里的某一个消费者消费(单播消息)
假设上面 broker-0 中的第一个分区的消息 Message1 被 Comsumer1 消费后, Comsumer2 就无法消费 Message1 了。
一个消费者可以消费多个分区中的消息
消费组中消费者的数量不能比一个 Topic 中的分区的数量多,否则多出来的消费者消费不到消息。
为什么一个分区中的消息不能被多个消费者消费?
为了保证单个分区中消息消费的有序性
- 如果消费者挂了,那么会触发 rebalance 机制,让其他消费者来消费该分区