一、消息队列

1.1、什么是 MQ ?

MQ 即 Message Queue ,即消息队列中间件,很多人说 MQ 通过将消息的发送和接收分离来实现应用程序的异步和解耦,这给人的直觉是 – MQ 是异步的,用来解耦的,但这只是 MQ 的效果而不是目的。

MQ 真正的目的是为了通讯,它屏蔽了底层复杂的通讯系统,定义了一套应用层的、更加简单的通讯协议。一个分布式系统中两个模块之间的通信要么是 HTTP ,要么是自己开发的 TCP (RPC),但是这两种协议其实都是原始的协议。

  • HTTP 协议很难实现两端通信

A 可以调 B ,B 也可以主动调用 A ,但 HTTP 协议不支持长连接

  • TCP 更加原始

MQ 做的就是在这些协议上构建一个更加简单的协议 – 生产者 / 消费者模型,它定义了两个对象 – 发送消息的称为发送者,接收数据的称为接收者,它同时提供了一个 SDK 让我们可以自定义生产者与消费者实现消息通讯且无视底层协议。

1.2、有 Broker 的 MQ

这种 MQ 通常有一台服务器作为 Broker ,所有的消息都经由这个 Broker 进行中转。生产者在把消息发送给它后就结束自己的任务了,而消费者可以通过主动拉取或者 Broker 推送的方式获取消息。

1、重 Topic

Kafka 就属于这个流派,生产者会发送 Key 和数据到 Broker ,由 Broker 比较 Key 后再决定给哪个消费者。这种模式是最常见的模式,该模式下, Topic 往往是一个比较大的概念,甚至一个系统中只有一个 Topic , Topic 某种意义上就是 Queue。

image-20211009180234957

如上图所示,Broker 定义了三个队列,key1,key2,key3,⽣产者发送数据的时候会发送 key1 和 data,Broker 在推送数据的时候则推送 data(也可能把 key 带上)。

2、轻 Topic

这种的代表是 RabbitMQ(或者说是 AMQP)。⽣产者发送 key 和数据,消费者定义订阅的队列,Broker 收到数据之后会通过⼀定的逻辑计算出 key 对应的队列,然后把数据交给队列。

这种模式下解耦了 key和 queue,在这种架构中 queue是⾮常轻量级的(在 RabbitMQ中它的上限取决于你的内存),消费者关⼼的只是⾃⼰的 queue;⽣产者不必关⼼数据最终给谁只要指定 key 就⾏了,中间的那层映射在 AMQP中叫 exchange(交换机)。

  • 可以这样理解,在这些 MQ 中, Topic 只是其中一种类型
  • AMQP 有四种 exchange
  1. Direct exchange:key就等于 queue
  2. Fanout exchange:⽆视 key,给所有的 queue都来⼀份
  3. Topic exchange:key可以⽤ “宽字符” 模糊匹配 queue
  4. Headers exchange:⽆视 key,通过查看消息的头部元数据来决定发给那个 queue (AMQP头部元数据⾮常丰富⽽且可以⾃定义)

这种结构的架构给通讯带来了很⼤的灵活性,我们能想到的通讯⽅式都可以⽤这四种 exchange 表达出来。如果你需要⼀个企业数据总线(在乎灵活性)那么 RabbitMQ 绝对的值得⼀⽤。

1.3、无 Broker 的 MQ

⽆ Broker的 MQ的代表是 ZeroMQ。该作者⾮常睿智,他⾮常敏锐的意识到——MQ是更⾼级的 Socket,它是解决通讯问题的。

所以 ZeroMQ 被设计成了⼀个“库”⽽不是⼀个中间件,这种实现也可以达到——没有 Broker 的⽬的节点之间通讯的消息都是发送到彼此的队列中,每个节点都既是⽣产者⼜是消费者ZeroMQ 做的事情就是封装出⼀套类似于 Socket 的 API 可以完成发送数据,读取数据

  • ZeroMQ 其实就是⼀个跨语⾔的、重量级的 Actor 模型邮箱库。
  • ZeroMQ可以实现同⼀台机器的 RPC通讯也可以实现不同机器的 TCP、UDP通讯,如果你需要⼀个强⼤的、灵活、野蛮的通讯能⼒,别犹豫 ZeroMQ

二、Kafka 介绍

Apache Kafka 是⼀个分布式⽀持分区的(partition)、多副本的(replica),基于 zookeeper 协调分布式消息系统

它的最⼤的特性就是可以实时的处理⼤量数据以满⾜各种需求场景:⽐如基于 hadoop 的批处理系统、低延迟的实时系统、Storm / Spark流式处理引擎,web / nginx⽇志、访问⽇志,消息服务等等,⽤ scala 语⾔编写。

2.1、Kafka 的使用场景

1、日志收集

⼀个公司可以⽤ Kafka 收集各种服务的 log ,通过 Kafka 以统⼀接⼝服务的⽅式开放给各种 consumer,例如 hadoopHbaseSolr 等。

2、消息系统

解耦和⽣产者和消费者、缓存消息等。

3、用户活动跟踪

Kafka 经常被⽤来记录 web ⽤户或者 app ⽤户的各种活动,如浏览⽹⻚、搜索、点击等活动,这些活动信息被各个服务器发布到 kafkatopic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 hadoop、数据仓库中做离线分析和挖掘。

4、运营指标

Kafka 也经常⽤来记录运营监控数据。包括收集各种分布式应⽤的数据,⽣产
各种操作的集中反馈,⽐如报警和报告。

2.2、Kafka 的基本概念

Kafka 是⼀个分布式的,分区的消息(官⽅称之为 commit log )服务。它提供⼀个消息系统应该具备的功能,但是确有着独特的设计。

可以这样来说,Kafka 借鉴了 JMS 规范的思想,但是确并没有完全遵循 JMS 规范。

名称描述
Broker消息中间件处理节点,⼀个 Kafka 节点就是⼀个broker,⼀个或者多个Broker可以组成⼀个 Kafka 集群
TopicKafka 根据 topic 对消息进行归类,发布到 Kafka 集群中的每条消息都需要指定一个 Topic
Producer消息⽣产者,向Broker发送消息的客户端
Consumer消息消费者,从Broker读取消息的客户端
Consumer Group每个 Consumer 属于⼀个特定的 Consumer Group ,⼀条消息可以被多个不同的 Consumer Group 消费,但是⼀个 Consumer Group 中只能有⼀个Consumer 能够消费该消息
Partition物理上的概念,⼀个topic可以分为多个partition,每个partition内部消息是有序的

2.3、Kafka 的基本知识

1、Kafka 的安装

  • Kafka 中下载安装包,解压即可
  • 进入解压后的目录,使用 ls 命令查看情况

image-20211009183603443

  • 进入 config 目录,查看情况

image-20211009183716805

  • server.properties 配置文件是主要配置文件,我们需要修改以下内容
1
2
3
4
5
6
7
8
# broker.id 属性在 kafka 集群中必须要是唯⼀
broker.id=0
# kafka 部署的机器 ip 和提供服务的端⼝号
isteners=PLAINTEXT://IP:9092
# kafka 的消息存储⽂件
log.dir=/usr/local/kafka-2.5.1/data/kafka-logs
# kafka 连接 zookeeper 的地址
zookeeper.connect=IP:2181

2、启动

  • 进入 bin 目录下,使用命令启动
1
./kafka-server-start.sh -daemon ../config/server.properties
  • 验证是否启动成功

进入 Zookeeper 中查看 id 为 0 的 broker 是否存在

3、server.properties 核心配置详解

属性默认值描述
broker.id0每个broker都可以⽤⼀个唯⼀的⾮负整数id进⾏标识;这个id可以作为broker的“名字”,你可以选择任意你喜欢的数字作为id,只要id是唯⼀的即可。
log.dirs/tmp/kafka-logskafka存放数据的路径。这个路径并不是唯⼀的,可以是多个,路径之间只需要使⽤逗号分隔即可;每当创建新partition 时,都会选择在包含最少 partitions 的路径下进⾏。
listenersPLAINTEXT://:9092Server 接收客户端连接的端口,IP 配置 Kafka 本机 IP 即可
zookeeper.connectlocalhost:2181Zookeeper连接字符串的格式为:hostname:port,此处hostname和port分别是 Zookeeper 中某个节点的 host 和 port ;如果是集群,那么连接方式为 hostname1:port1,host2:port2…
log.retention.hours168每个日志文件删除之前保存的时间,默认数据保存时间对 所有 topic 都一样
num.partitions1默认 topic 的默认分区数
default.replication.factor1自动创建 topic 的默认副本数量,建议设置为大于等于 2
min.insync.replicas1当 producer 设置 ack 为 -1 时,min.insync.replicas 指定 replicas 的最小数目(必须确认每一个 replicas 的写数据都是成功的),如果这个数目没有达到,那么 producer 发送消息会产生异常。
delete.topic.enablefalse是否允许删除主题

4、创建主题 topic

什么是主题 Topic ?Topic 可以实现消息的分类,不同的消费者订阅不同的 topic ,就可以接收到不同的消息

image-20211009201038161

  • 执行以下命令创建一个名为 test 的 topic ,这个 topic 只有一个分区 partitions ,而且备份 replication 也只有一份
1
./kafka-topics.sh --create --zookeeper IP:2181 --replication-factor 1 --partitions 1 --topic test

image-20211009202715116

  • 查看当前 kafka 中有哪些 topic
1
./kafka-topics.sh --list --zookeeper IP:2181

image-20211009203110064

5、发送消息

kafka ⾃带了⼀个 producer 命令客户端,可以从本地⽂件中读取内容,或者我们也可以以命令⾏中直接输⼊内容,并将这些内容以消息的形式发送到kafka集群中

在默认情况下,每⼀个⾏会被当做成⼀个独⽴的消息。使⽤ kafka 的发送消息的客户端,指定发送到的kafka服务器地址和topic

  • 发送消息
1
./kafka-console-producer.sh --broker-list IP:9092 --topic test

image-20211009204054807

我们可以在命令客户端中输入消息,一行就是一条消息,比如我们可以发送两条消息,分别是 hello world

image-20211009204140934

6、接收消息

对于 consumer,kafka 同样也携带了⼀个命令⾏客户端,会将获取到内容在命令中进⾏输出,默认是消费最新的消息。

使⽤ kafka 的消费者消息的客户端,从指定 kafka 服务器的指定 topic 中消费消息

  • 方式一:从最后一条消息的偏移量 + 1 开始消费
1
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

可以看到,由于是从最后一条消息的偏移量开始消费,所以这里消费者会阻塞住

image-20211009204627667

我们在生产者处发送消息后,可以看到消费者接收到了新消息

  1. 生产者

image-20211009204735550

  1. 消费者

image-20211009204751974

  • 方式二:从头开始消费
1
2
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning
--topic test

演示效果,连接上后,可以看到读取到了所有生产者发送的消息

image-20211009205326807

7、几个特点

  • Kafka 中的消息会被存储

  • Kafka 中的消息是顺序存储的

  • Kafka 中的消息是有偏移量的

消费者消费消息时也是通过偏移量来描述当前要消费的消息的位置

  • Kafka 中的消息在消费时可以指明偏移量进行消费

image-20211009210633183

  • 生产者发送消息给 broker 后,broker 会将消息保存在本地的日志文件中,日志文件的位置在 server.properties 文件中可以配置

2.4、Kafka 中的关键细节

1、单播消息的实现

⼀个消费组⾥只会有⼀个消费者能消费到某⼀个 topic 中的消息。于是可以创建多
个消费者,这些消费者在同⼀个消费组中。

如果要实现单播消息,那么只需要让多个消费者同一个消费组中即可。

1
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup --topic test

2、多播消息的实现

在⼀些业务场景中需要让⼀条消息被多个消费者消费,那么就可以使⽤多播模式。

Kafka 实现多播,只需要让不同的消费者处于不同的消费组即可。

1
2
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup1 --topic test
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --consumer-property group.id=testGroup2 --topic test

3、消息的顺序存储

消息的发送⽅会把消息发送到 broker 中,broker 会存储消息,消息是按照发送的顺序进⾏存储

因此消费者在消费消息时可以指明主题中消息的偏移量。默认情况下,是从最后⼀个消息的下⼀个偏移量开始消费

4、查看消费组及消息

  • 查看当前主题下有哪些消费组
1
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
  • 查看消费组中的具体信息

比如当前偏移量,最后一条消息的偏移量,堆积的消息数量等等

1
2
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --
describe --group testGroup

三、Kafka 主题和分区的概念

3.1、主题 Topic

主题 Topic 在 Kafka 中是一个逻辑的概念,Kafka 通过 Topic 将消息进行分类,不同 Topic 的信息会被订阅不同 Topic 的消费者所消费。

可以这样理解,Kafka 使用 Topic 对消息进行分类维护。

3.2、分区 Partition

1、概念

  • 当一个 Topic 中的消息非常非常多时,多到需要用几 T 的文件来存储消息(Kafka 将消息持久化到文件中)时,为了解决这个文件过大的问题, Kafka 提出了分区(Partition)的概念
  • 每个分区都是一个单独的 log 文件,每条记录都以追加的形式写入
  • 分区中的每条记录都会被分配一个唯一的序号,成为偏移量(Offset)
  • Offset 是一个递增的、不可变的数字,由 Kafka 自动维护。
  • 当一条记录写入 Partition 的时候,它就被追加到 log 文件的末尾,并被分配一个序号,作为 Offset。

image-20211010145934615

如上图所示,一个文件中的消息量非常大,这个时候就可以通过分区来分布式地存储这些消息,比如说一个 Topic 创建了三个分区,那么 Topic 中地消息就会被分别存放在这三个分区中

2、优点

  • 可以对消息进行分布式存储

可以解决单个文件过大的问题,同时可以将 IO 压力分散到多个文件上,提高性能

  • 可以提高吞吐量

生产者可以向多个分区并行写入数据,消费者也可以同时消费多个分区中的数据,让读和写同时在多个分区中进行。

3、消息的有序性

在向 Topic 发送消息时,实际上是被写入某一个分区,并赋予偏移量(Offset)

  • 一个 Topic 如果有多个分区,那么从 Topic 的层面上看,消息是无序的
  • 如果单独只看一个分区,那么一个分区内的消息是有序的。

一个分区内部消息有序,一个 Topic 跨分区的消息无序,如果强制要求 Topic 整体有序,那么只能让 Topic 拥有一个分区。

4、分区操作

  • 为一个主题创建多个分区

下面的命令为 test1 主题创建了两个分区

1
./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test1
  • 查看 topic 的分区信息
1
./kafka-topics.sh --describe --zookeeper localhost:2181 --topic test1

image-20211010153438377

5、生产者分区写入策略

生产者将消息写入到 Topic 时, Kafka 将依据不同的策略将数据分配到不同的分区中,常用的有四种分区写入策略

  • 轮询分区策略

默认的策略,也是使用最多的策略,可以最大限度保证所有消息平均分配到一个分区。如果在生产消息时,key 为 null,则使用轮询算法均衡地分配分区

image-20211012162422678

  • 随机分区策略(不用)

随机策略,每次都随机地将消息分配到每个分区。在较早的版本,默认的分区策略就是随机策略,也是为了将消息均衡地写入到每个分区。但后续轮询策略表现更佳,所以基本上很少会使用随机策略。

image-20211012162631447

  • Key 分区分配策略

Key 分配策略,有可能会出现「数据倾斜」,例如:某个 Key 包含了大量的数据,因为 Key 值一样,所有所有的数据将都分配到一个分区中,造成该分区的消息数量远大于其他的分区。

image-20211012162747213

  • 自定义分区策略

通过自定义分区组件实现自定义分区策略,下面给出实现步骤

image-20211012162934109

  1. 创建一个自定义分区类 MyPartitioner,这个类需要实现 Partitioner 接口,重写接口方法
  2. 在 Kafka 生产者配置类中,配置上一步编写的自定义分区类的类名即可。
1
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class.getName());

6、分区策略导致的消息乱序问题

  • 分区写入策略中的轮询策略随机策略都会导致一个问题,即生产者发送到 Broker 中的消息是乱序的

  • 分区写入策略中的Key 分区策略可以在一定程度上实现消息的有序存储(同一个 Key 的数据被保存到同一个分区中,而一个分区中的消息是有序的),但又可能导致一个新的问题 – 数据倾斜(一定时间内很多数据堆积在一个分区中)

所以实际开发中需要考虑实际情况来做取舍。

3.3、副本 Replication

1、概念说明

副本是对分区的备份,在集群中,不同的副本会被部署在不同的 broker 上

  • 在一个集群中创建 1 个主题,2 个分区、3 个副本。
1
./kafka-topics.sh --create --zookeeper localhost:2181 --replication- factor 3 --partitions 2 --topic my-replicated-topic

image-20211010155300383

可以看到,在一个 Broker 中存在着两个分区,而每个 Broker 中存在着一个副本,每个 Broker 上的数据都是完备的,同时有另外两个 Broker 作为备份存在。

image-20211010155357613

  • 副本中 Leader 的含义

在上面的输出中,我们可以得出两个信息:

  1. 在分区 0 的三个副本中,副本的 Leader 是编号 0 的副本,即第一个副本。
  2. 在分区 1 的三个副本中,副本的 Leader 是编号 1 的副本,即第二个副本。

副本的 Leader 负责消息的读取和写入,同时 Leader 需要向其他的两个 Follower 副本进行数据同步

当 Leader 副本挂掉时,那么剩下两个副本就可以起到数据备份的作用。

2、主题信息关键数据

image-20211010155300383

从上图中,我们可以得到该主题的一些关键信息

  • Leader

副本中的概念,在 Kafka 集群中,一个分区可能有多个副本,这些副本分散在不同的 Broker 中,一个分区的多个副本中需要有一个 Broker 上的副本作为 Leader

Leader 专门用来接收消息,在接收到消息后,其他 Follower 通过 poll 的方式来同步数据

  • Follower

Leader 处理所有针对这个分区的读写请求,⽽follower被动复制leader,不提供读写(主要是为了保证多副本数据与消费的⼀致性),如果 Leader 所在的 Broker 挂掉,那么剩下的 Follower 中就会进行新 Leader 的选举。

  • Isr
  1. 可以同步的 broker 节点(和 Leader 连接畅通)和已同步(数据是最新的)的 broker 节点,存放在isr集合中。
  2. 在 Leader 挂掉后,Kafka 会从给 Isr 集合中选举新的节点(要求被选举节点中的数据是最新的,且是正常的)
  3. 如果处于 Isr 列表中的节点性能较差,那么它会被提出 Isr 集合中。

3.4、概念总结

  • Kafka 集群由多个 Broker 组成,一台 Kafka Server 可以看为一个 Broker
  • 一个 Broker 可以有多个主题,一个 Broker 中存放了一个 Topic 的所有分区,当前集群中的其他 Broker 中存放的该主题的数据作为备份存在。
  • 对于每一个分区的所有副本中,会有一个副本作为 Leader 存在,剩下的副本作为这个 Leader 的 Follower

3.5、Kafka 集群消息

1、集群消息的发送

1
./kafka-console-producer.sh --broker-list IP1:Port1,IP2:Port2,IP3:Port3 --topic my-replicated-topic

这样就可以打开消息发送控制台,进行消息发送

2、集群消息的消费

1
./kafka-console-consumer.sh --bootstrap-server IP1:Port1,IP2:Port2,IP3:Port3 --from- beginning --topic my-replicated-topic

3、关于分区消费组消费者的细节

image-20211010181133577

  • 一个分区中的消息最多被同一个消费组里的某一个消费者消费(单播消息)

假设上面 broker-0 中的第一个分区的消息 Message1 被 Comsumer1 消费后, Comsumer2 就无法消费 Message1 了。

  • 一个消费者可以消费多个分区中的消息

  • 消费组中消费者的数量不能比一个 Topic 中的分区的数量多,否则多出来的消费者消费不到消息。

  • 为什么一个分区中的消息不能被多个消费者消费?

为了保证单个分区中消息消费的有序性

  • 如果消费者挂了,那么会触发 rebalance 机制,让其他消费者来消费该分区