十二、主从复制

12.1、是什么?

主机数据更新后根据配置和策略, 自动同步到备机的master/slaver机制,Master以写为主,Slave以读为主

12.2、能干嘛?

  • 读写分离,性能扩展

  • 容灾快速恢复

image-20210424162613819

一主多从

12.3、搭建一主双从

1、在根目录下创建 myredis 文件夹

1
mkdir /myredis

2、进入 /myredis 文件夹,复制 redis.conf 到该文件夹中

1
cp /etc/redis.conf /myredis/redis.conf

3、创建三个文件,配置一主两从

  1. redis6379.conf
  2. redis6380.conf
  3. redis6381.conf

4、在三个配置文件中写入内容

  • 关闭 appendonly
1
appendonly no
  • 写入内容
1
2
3
4
include /myredis/redis.conf
pidfile /var/run/redis_6379.pid
port 6379
dbfilename dump6379.rdb

image-20210424171931341

以同样的方式创建 6380、6381

  • redis6380.conf

image-20210424193917941

  • redis6381.conf

image-20210424192511908

5、分别启动三台 Redis

image-20210424194756746

6、查看三台主机运行情况

分别打开三个会话,连接上三台 Redis ,然后使用以下命令查看运行情况

1
info replication
  • 6379

image-20210424200245131

  • 6380

image-20210424200321659

  • 6381

image-20210424200308562

7、配从不配主

令当前 Redis 实例成为某台 Redis 的从机

1
slaveof <ip><port>
  • 在 6380 和 6381 进行配置,使其成为 6379 的从机
1
slaveof 127.0.0.1 6379

image-20210424200530137

8、简单测试

  • 往主服务器中写入数据,写入的数据可以在从服务器中读到

往主服务器中写入数据

image-20210424201144761

在从服务器中可以读取到写入主服务器的键值对

image-20210424201322811

  • 往从服务器写入数据时会报错,这是因为从服务器一般只负责读操作

image-20210424201428595

12.4、主从服务器重启问题

1、主服务器重启

主服务器挂掉,那么直接重启就行,一切如初

2、从服务器重启

如果从服务器挂掉,那么在从服务器进行重启后,我们需要重新绑定该服务器与主服务器的重属关系,我们可以直接在配置文件中配置从属关系,一劳永逸

  • 关闭 6381

image-20210424201747115

  • 此时重新启动 6381 ,并使用 info replication 查看其情况

image-20210424201936618

可以看到此时 6381 又成为了主机,我们在配置文件中直接配置从属关系

image-20210424202127205

  • 重启 6381 ,查看信息

image-20210424202224428

可以看到直接绑定了 6379

  • 查看 6381 的键信息

image-20210424202320460

12.5、主从复制原理

image-20210424202640538

  • Slave 启动成功连接到 Master 后会发送一个同步消息(sync)
  • Master 接到命令启动后台的存盘进程,同时收集所有接收到的用于修改数据集命令, 在后台进程执行完毕之后,Master 将传送整个数据文件到 slave,以完成一次完全同步

主服务器在接收到从服务器发过来的同步消息后,会将主服务器的数据进行持久化,形成一个 .RDB 文件,然后将 .RDB 文件发送给从服务器,从服务器拿到 .RDB 文件后进行读取,形成数据复制

  • 全量复制:而 Slave 服务在接收到数据库文件数据后,将其存盘并加载到内存中。
  • 增量复制:Master 继续将新的所有收集到的修改命令依次传给 Slave,完成同步
  • 但是只要是重新连接 Master,一次完全同步(全量复制)将被自动执行

12.6、薪火相传

1、介绍

上一个 Slave 可以是下一个 Slave 的Master,Slave 同样可以接收其他 Slaves 的连接和同步请求,那么该 Slave 作为了链条中下一个的 master,可以有效减轻 master 的写压力,去中心化降低风险。

image-20210424205432726

例如在上面的示意图中:从1 和从2 是主机的仆从,而从1 和 从2 下面又各自存在三个仆从

2、注意点

  • 如果中途变更从属关系,那么会自动清除之前的数据,重新建立连接并拷贝最新的数据

使用以下命令进行从属关系的更换

1
slaveof <ip> <port>

3、风险

  • 一旦某个 Slave 宕机,那么以这个 Slave 为主机的其他 Slave 都无法进行备份

如果从 1 挂了,那么从1 下面的其他三台机器就无法进行备份了

  • 如果主机挂了,由于此时其他机器还是从机,就无法进行写数据了

12.7、反客为主

当一个master宕机后,后面的slave可以立刻升为master,其后面的slave不用做任何修改。

  • 可以用以下命令将从机变为主机
1
slaveof no one

12.8、哨兵模式(Sentinel)

1、介绍

反客为主的自动版,能够后台监控主机是否故障,如果故障了根据投票数自动将从库转换为主库.

image-20210424211222636

仍然使用一主二仆,下面实现哨兵模式

1、创建 sentinel.conf ,名字不能写错

/myredis 文件夹下新建 sentinel.conf

1
sentinel monitor mymaster 127.0.0.1 6379 1

其中 mymaster 为监控对象起的服务器名称, 1 为至少有多少个哨兵同意迁移的数量,表示只需要有一个哨兵同意,就将从机切换为主机

2、启动哨兵

使用以下命令启动哨兵

1
redis-sentinel /myredis/sentinel.conf

image-20210424212531055

3、关闭主服务器,查看效果

  • 在主机中执行指令 shutdown

  • 查看哨兵窗口

可以看到此时哨兵已经发现主机挂掉了,此时它重新推举一个从机作为主机,新的主机为 6380

image-20210424213316372

  • 查看 6380 的详细信息
1
info replication

image-20210424213740011

此时 6380 已经变成了主机

4、重启 6379 ,查看情况

  • 重启 6379 后,可以看到哨兵输出以下日志

image-20210424214106835

  • 此时使用 info replication 查看 6379 情况,发现 6379 此时已经变成了从机

image-20210424214020147

5、复制延迟

由于所有的写操作都是先在Master上操作,然后同步更新到Slave上,所以从Master同步到Slave机器有一定的延迟,当系统很繁忙的时候,延迟问题会更加严重,Slave机器数量的增加也会使这个问题更加严重。

6、故障恢复

  • 新主登基

从下线的主服务的仆从中选择一个,将其推举为主服务,筛选条件依次为:

  1. 选择优先级靠前的(优先级在 redis.conf 中默认:replice-priority 100,值越小优先级越高)
  2. 选择偏移量最大的(偏移量是指获得原主机数据最全的)
  3. 选择 runid 最小的从服务(每个 redis 实例启动后都会随机生成一个40位的 runid
  • 群仆俯首

挑选出新的主服务后, sentinel 向原主服务器的从服务器发送 slaveof 新主服务的命令,让其他从服务器成为新主机的从属

  • 旧主俯首

当原来的主服务器重新上线后,sentinel 会向其发送 slaveof 命令,使其成为新主的仆从

十三、Redis 集群

13.1、问题

容量不够,Redis 如何进行扩容?

并发写操作, Redis 如何分摊?

另外,主从模式,薪火相传模式,主机宕机,导致 ip 地址发生变化,应用程序中配置需要修改对应的主机地址、端口等信息。

之前通过代理主机来解决,但是 Redis3.0 中提供了解决方案。就是无中心化集群配置

  • 无中心化集群配置

任何一个节点都可以作为集群的入口,从 A 节点进入集群的请求,也可以访问其他节点

image-20210424221314742

13.2、什么是集群?

Redis 集群实现了对 Redis 的水平扩容,即启动N个 Redis节点,将整个数据库分布存储在这N个节点中,每个节点存储总数据的1/N。

Redis 集群通过分区(partition)来提供一定程度的可用性(availability): 即使集群中有一部分节点失效或者无法进行通讯, 集群也可以继续处理命令请求。

13.3、集群搭建

使用六个实例

1、删除目录下生成的 rdb 文件

1
rm -rf dump*

2、Redis cluster 配置修改

添加以下配置

1
2
3
cluster-enabled yes # 开启集群模式
cluster-config-file nodes-6379.conf # 设置节点配置文件名
cluster-node-timeout 15000 # 设定节点失联时间,超过该时间(毫秒),集群自动进行主从切换

3、创建六个实例

即 6379、6380、6381 、6389 、6390 和 6391六个实例,其中6379为主,6389为从,以此类推

  • 修改后的 6379 文件

image-20210424223303212

  • 拷贝 6379 ,创建五个副本

然后修改端口号等信息

vim 编辑器普通模式下输入以下命令

1
:%s/6379/63xx

4、启动 6 个 Redis 服务

此时可以看到六个 nodes 文件都生成出来了,而且进程都已正常启动

image-20210424224947357

5、将六个节点合成一个集群

组合前,先确保所有 Redis 正常启动,且 nodes-xxx.conf 生成正常

  • 切换到 /opt/redis-6.2.1/src ,即压缩包解压缩后的目录
1
cd /opt/redis-6.2.1/src
  • src 下执行以下命令
1
redis-cli --cluster create --cluster-replicas 1 192.168.20.100:6379 192.168.20.100:6380 192.168.20.100:6381 192.168.20.100:6389 192.168.20.100:6390 192.168.20.100:6391

注意,这里需要使用真实的 IP 地址,不能使用 127.0.0.1

--replicas 1 采用最简单的方式配置集群,一台主机,一台从机,正好构成三组

image-20210424230239279

在以上窗口中输入 yes

image-20210424230912382

到这里,集群就搭建完毕了

13.4、通过 cluster nodes 命令查看集群信息

这个命令需要连接客户端

1
127.0.0.1:6379> cluster nodes

查看结果

image-20210424231607247

13.5、Redis Cluster 如何分配这六个节点?

一个集群中至少有三个节点

选项 --cluster-replicas 1 表示我们希望为集群中的每个主节点创建一个从节点。

分配原则尽量保证每个主数据库运行在不同的IP地址,每个从库和主库不在一个IP地址上

1、什么是 slots

image-20210424232115883

一个 Redis 集群包含 16384 个插槽(hash slot), 数据库中的每个键都属于这 16384 个插槽的其中一个,
集群使用公式 CRC16(key) % 16384 来计算键 key 属于哪个槽, 其中 CRC16(key) 语句用于计算键 key 的 CRC16 校验和 。
集群中的每个节点负责处理一部分插槽。 举个例子, 如果一个集群可以有主节点, 其中:
节点 A 负责处理 0 号至 5460 号插槽。

节点 B 负责处理 5461 号至 10922 号插槽。

节点 C 负责处理 10923 号至 16383 号插槽。

2、在集群中录入值

redis-cli 每次录入、查询键值,redis 都会计算出该key应该送往的插槽,如果不是该客户端对应服务器的插槽,redis 会报错,并告知应前往的 redis 实例地址和端口。
redis-cli 客户端提供了 –c 参数实现自动重定向。

redis-cli -c –p 6379 登入后,再录入、查询键值对可以自动重定向。

  • 使用命令 redis-cli -c -p 6381 连接服务器
1
redis-cli -c -p 6381
  • 此时随便放入一个插槽不在 6381 的键
1
192.168.20.100:6379> set k4 v4

查看结果,此时可以看到自动帮我们重定向到了 6380

image-20210424233221229

3、根据键从集群中取出值

同理,如果要取的键值对不在此节点中,那么会重定向到该键值对存放的节点

  • 从集群中取值
1
192.168.20.100:6380> get k2

查看结果,发现自动帮我们重定向到了 6379

image-20210424233505536

4、批量存储/取出时要注意的点

必须保证批量存储/取出的键都在一个插槽中,否则无法进行

  • 批量取出两个不在同一个插槽中的键值对

image-20210424233906275

  • 如果需要添加多个值,那么可以通过 {} 来定义组的概念,从而使 key 中 {} 内相同内容的键值对放入同一个插槽中,比如定义一个 user 组,将姓名和年龄放入一个插槽中
1
192.168.20.100:6379> mset name{user} wuhu age{user} qifei 
  • 取出刚才存入的值
1
192.168.20.100:6380> mget name{user} age{user}

image-20210424234531980

5、查询键在插槽中的位置

1
cluster keyslot k1

image-20210424234638900

13.6、故障恢复

如果主节点下线,那么在15s超时后,从节点自动升级为主节点,在一段时间后,如果下线的主节点重新恢复,那么该主节点会成为新主的仆从。

  • 如果所有某一段插槽的主从节点都宕掉,redis 服务是否还能继续?
  1. 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为yes ,那么 ,整个集群都挂掉
  2. 如果某一段插槽的主从都挂掉,而cluster-require-full-coverage 为no ,那么,该插槽数据全都不能使用,也无法存储。

取决于 redis.conf 中的 cluster-require-full-coverage 参数

13.7、优势和不足

1、优势

  • 实现扩容
  • 分摊压力
  • 无中心配置相对简单

2、不足

  • 多键操作是不被支持的
  • 多键的 Redis 事务是不被支持的。lua 脚本不被支持
  • 由于集群方案出现较晚,很多公司已经采用了其他的集群方案,而代理或者客户端分片的方案想要迁移至 redis cluster ,需要整体迁移而不是逐步过渡,复杂度较大。

十四、应用问题解决

14.1、缓存穿透

1、介绍

key 对应的数据在数据源并不存在,每次针对此 key 的请求从缓存获取不到,请求都会压到数据源,从而可能压垮数据源。比如用一个不存在的用户 id 获取用户信息,不论缓存还是数据库都没有,若黑客利用此漏洞进行攻击可能压垮数据库。

​ 查询数据库和缓存中都不存在的数据。

image-20210425145655933

2、现象

  • 应用服务器压力突然变大了。
  • Redis 命中率降低
  • 一直查询数据库

3、解决方法

​ 一个一定不存在缓存及查询不到的数据,由于缓存是不命中时被动写的,并且出于容错考虑,如果从存储层查不到数据则不写入缓存,这将导致这个不存在的数据每次请求都要到存储层去查询,失去了缓存的意义。

  • 对传入参数进行检验,非法参数直接拒绝请求或者返回一个特殊值(空值)
  • 对空值缓存

如果一个查询返回的数据为空(不管是数据是否不存在),我们仍然把这个空结果(null)进行缓存,设置空结果的过期时间会很短,最长不超过五分钟

  • 设置可访问的名单(白名单)

使用bitmaps类型定义一个可以访问的名单,名单id作为bitmaps的偏移量,每次访问和bitmap里面的id进行比较,如果访问id不在bitmaps里面,进行拦截,不允许访问。

  • 采用布隆过滤器

布隆过滤器(Bloom Filter)是1970年由布隆提出的。它实际上是一个很长的二进制向量(位图)和一系列随机映射函数(哈希函数)。
布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法,缺点是有一定的误识别率和删除困难。
将所有可能存在的数据哈希到一个足够大的bitmaps中,一个一定不存在的数据会被 这个bitmaps拦截掉,从而避免了对底层存储系统的查询压力。

  • 进行实时监控

当发现 Redis 的命中率开始急速降低,需要排查访问对象和访问的数据,和运维人员配合,可以设置黑名单限制服务

14.2、缓存击穿

1、介绍

​ key对应的数据存在,但在 Redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端 DB 压垮。

image-20210425150535356

2、现象

  • 数据库访问压力瞬时增加
  • Redis 中没有出现大量 key 过期
  • Redis 正常运行

3、解决方法

​ key可能会在某些时间点被超高并发地访问,是一种非常“热点”的数据。这个时候,需要考虑一个问题:缓存被“击穿”的问题。

  • 预先设置热门数据

redis 高峰访问之前,把一些热门数据提前存入到 redis 里面,加大这些热门数据key的时长

  • 实时调整

现场监控哪些数据热门,实时调整key的过期时长

  • 设置热门数据永不过期
  • 使用锁:
    • 就是在缓存失效的时候(判断拿出来的值为空),不是立即去 load db
    • 先使用缓存工具的某些带成功操作返回值的操作(比如 Redis 的SETNX)去set一个mutex key
    • 当操作返回成功时,再进行load db 的操作,并回设缓存,最后删除mutex key;
    • 当操作返回失败,证明有线程在 load db ,当前线程睡眠一段时间再重试整个get缓存的方法。

14.3、缓存雪崩

1、介绍

​ key对应的数据存在,但在 redis 中过期,此时若有大量并发请求过来,这些请求发现缓存过期一般都会从后端DB加载数据并回设到缓存,这个时候大并发的请求可能会瞬间把后端DB压垮。

缓存雪崩与缓存击穿的区别在于这里针对很多key缓存,前者则是某一个key,雪崩问题是短时间内大量 key 集中过期

2、现象

数据库压力变大造成服务器崩溃

  • 正常情况

image-20210425152605127

  • 缓存雪崩

image-20210425152619481

3、解决方法

缓存失效时的雪崩效率对底层系统的冲击非常可怕!

  • 构建多级缓存架构

nginx 缓存 + Redis 缓存 + 其他缓存(ehcache等)

  • 使用锁或者队列

用加锁或者队列的方式保证来保证不会有大量的线程对数据库一次性进行读写,从而避免失效时大量的并发请求落到底层存储系统上。不适用高并发情况

  • 设置过期标志更新缓存

记录缓存数据是否过期(设置提前量),如果过期会触发通知另外的线程在后台去更新实际key的缓存。

  • 将缓存失效时间分散开

比如我们可以在原有的失效时间基础上增加一个随机值,比如1-5分钟随机,这样每一个缓存的过期时间的重复率就会降低,就很难引发集体失效的事件。

十五、分布式锁

15.1、概述

1、基本介绍

​ 随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。

​ 为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!

2、主流实现方案

  • 基于数据库实现分布式锁。
  • 基于缓存(Redis 等)。
  • 基于 Zookeeper.

3、实现方案的优缺点

  • 性能:Redis 最高
  • 可靠性:Zookeeper 最可靠

这里我们基于 Redis 实现分布式锁

15.2、基于 Redis 实现分布式锁

1、核心命令 SETNX

Redis SetnxSET if Not eXists)命令在指定的 key 不存在时,为 key 设置指定的值,这种情况下等同 SET 命令。当 key存在时,什么也不做。

image-20210425160301095

  • 多个客户端同时使用 SETNX 获取锁
  • 如果获取成功,那么执行业务逻辑(从 DB 中获取数据,放入缓存),执行完成后使用 del 命令释放锁。在此期间,其他客户端等待重试。

2、演示 SETNX

  • 使用 SETNX 设置一个键值对

此时可以看到添加成功

image-20210425155710137

  • 再次使用 SETNX 命令,修改上述 wuhu 键对应的值

可以看到修改失败

image-20210425155813350

  • 此时这个键已经被锁定了,删除这个键可以释放锁

删除 wuhu 键,再次进行添加操作

image-20210425155958656

  • 为键设置过期时间,让一段时间后自动释放锁,避免因业务逻辑出现异常,导致锁无法释放的情况发生。

可以看到,现在 wuhu 键已经被上了一把锁

image-20210425161142406

wuhu 键设置过期时间,使用 expire 键 过期时间(s) 设置键的过期时间,单位为秒,设置 wuhu 键过期时间为 5 s

1
127.0.0.1:6379> expire wuhu 5 

在五秒后,发现 wuhu 键的锁已被自动释放

image-20210425161716056

可以使用 ttl 键名 查看键的过期时间,-1表示永不过期

1
ttl wuhu

image-20210425161803151

wuhu 设置过期时间,继续 ttl

key 不存在时返回 -2

image-20210425161915184

3、上锁的同时设置过期时间

在我们使用 SETNX 为某个键上锁后,由于 SETNXexpire 不是原子操作,在两个命令间如服务器宕机,那么就无法设置过期时间。

为此,我们可以在上锁的同时就设置过期时间。

  • 使用命令如下
1
set key value nx ex 过期时间 
  • 设置键值对(wuhu-qifei),过期时间为12 s
1
127.0.0.1:6379> set wuhu qifei nx ex 12

使用 ttl 查看过期时间

image-20210425163449633

此时 wuhu 键已经被上锁

image-20210425163514432

过期时间一到自动释放锁

image-20210425163541222

15.3、Java 模拟分布式锁

1、使用 Spring Boot 整合 Redis 集群

  • 引入依赖
  • 编写配置文件

nodes 中填入集群信息,nodes 为一个列表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
spring:
redis:
database: 0
password:
lettuce:
pool:
max-active: 20 #最大连接数,负值表示没有限制,默认8
max-wait: -1 #最大阻塞等待时间,负值表示没限制,默认-1
max-idle: 8 #最大空闲连接,默认8
min-idle: 0 #最小空闲连接,默认0
cluster:
nodes:
- 192.168.20.100:6379
- 192.168.20.100:6380
- 192.168.20.100:6381
- 192.168.20.100:6389
- 192.168.20.100:6390
- 192.168.20.100:6391
  • 编写配置类
  • 进行测试

2、使用 Spring Boot 整合 Redis 演示分布式锁

  • RedisTestController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@GetMapping("testLock")
public void testLock() {
//1 获取锁,使用 SETNX
//redisTemplate.opsForValue()的setIfAbsent方法返回一个Boolean
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111");
//2 根据返回值进行判断
if(lock) {
//2.1 如果成功获取锁,那么执行下面的操作
Object result = redisTemplate.opsForValue().get("num");
if(StringUtils.isEmpty(result)) {
System.out.println("获取的值为空!");
return;
}
int num = Integer.parseInt(result + "");
//2.2 将redis中num的值 + 1
redisTemplate.opsForValue().set("num",++num);
//2.3 释放锁
redisTemplate.delete("lock");
} else {
//3 获取锁失败,此时休眠 0.1 秒后继续获取
try {
Thread.sleep(100);
testLock();
} catch (Exception e) {
e.printStackTrace();
}
}
}
  • 使用 ab 进行测试

在命令行中输入以下指令,表示请求1000次,每次并发数为 100

1
ab -n 1000 -c 100 http://192.168.150.1:8081/redis/testLock
  • Redis 中预先设置 num 键,初始值为0

image-20210425200447845

  • 此时再次查看 Redisnum 的值

image-20210425200715666

3、在上锁的同时为键添加过期时间

同样使用 setIfAbsent 方法,需要参数列表中填入过期时间和时间单位

1
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock","111",10, TimeUnit.SECONDS);

15.4、添加 UUID 防止误删

1、问题描述

可能会释放其他服务器的锁。

场景:如果业务逻辑执行时间为 7 s。执行流程如下

  1. index1业务逻辑没执行完,3秒后锁被自动释放。

  2. index2获取到锁,执行业务逻辑,3秒后锁被自动释放。

  3. index3获取到锁,执行业务逻辑

  4. index1业务逻辑执行完成,开始调用del释放锁,这时释放的是index3的锁,导致index3的业务只执行1s就被别人释放。

解决:SETNX 获取锁时,需要设置一个指定的唯一值(UUID),释放前获取这个值,判断是否是自己的锁

2、引入 UUID

image-20210425202300602

3、改造 testLock 方法

  • 在获取锁之前先生成一个 UUID ,将生成的 UUID 作为值存入Redis
  • 在释放锁时需要获取键值对中的值,然后与自己生成 UUID 进行比较,如果相同证明是同一把锁,此时释放锁,否则不释放
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@GetMapping("testLock")
public void testLock() {
//0 生成 UUID
String uuid = UUID.randomUUID().toString();
//1 获取锁,使用 SETNX
//redisTemplate.opsForValue()的setIfAbsent方法返回一个Boolean
Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock",uuid,10, TimeUnit.SECONDS);
//2 根据返回值进行判断
if(lock) {
//2.1 如果成功获取锁,那么执行下面的操作
Object result = redisTemplate.opsForValue().get("num");
if(StringUtils.isEmpty(result)) {
System.out.println("获取的值为空!");
return;
}
int num = Integer.parseInt(result + "");
//2.2 将redis中num的值 + 1
redisTemplate.opsForValue().set("num",++num);
//2.3 释放锁
String lockUUID = (String)redisTemplate.opsForValue().get("lock");
// 只有当redis中的uuid和该锁的uuid相同时,释放锁
if(uuid.equals(lockUUID)) {
redisTemplate.delete("lock");
}
} else {
//3 获取锁失败,此时休眠 0.1 秒后继续获取
try {
Thread.sleep(100);
testLock();
} catch (Exception e) {
e.printStackTrace();
}
}
}

15.5、LUA 保证删除原子性

1、问题描述

删除操作缺乏原子性操作

场景:

  • index 1 执行删除时,查询到的 lock 值确实与本机产生的 UUID 相等

  • index 1 在执行删除钱,lock 刚好过期时间已到,被 Redis 自动释放

此时在 Redis 中没有了 lock

  • index 2 获取到了 lock,开始执行方法

  • 此时index 1 执行 redisTemplate.delete("lock"),删除了 index 2 的锁

index1 因为已经在方法中了,所以不需要重新上锁。index1有执行的权限。index1已经比较完成了,这个时候,开始执行

2、LUA 脚本

  • 介绍

Lua 是一个小巧的脚本语言,Lua 脚本可以很容易的被 C/C++ 代码调用,也可以反过来调用 C/C++ 的函数,Lua 并没有提供强大的库,一个完整的Lua解释器不过200k,所以Lua不适合作为开发独立应用程序的语言,而是作为嵌入式脚本语言。
很多应用程序、游戏使用LUA作为自己的嵌入式脚本语言,以此来实现可配置性、可扩展性。
这其中包括魔兽争霸地图、魔兽世界、博德之门、愤怒的小鸟等众多游戏插件或外挂。

  • LUA 脚本在 Redis 中的优势
  1. 将复杂的或者多步的 Redis 操作,写为一个脚本,一次提交给 Redis 执行,减少反复连接 Redis 的次数。提升性能。
  2. LUA脚本是类似 Redis 事务,有一定的原子性,不会被其他命令插队,可以完成一些 Redis 事务性的操作
  3. 但是注意 RedisLUA 脚本功能,只有在 Redis 2.6 以上的版本才可以使用。
    利用 LUA 脚本淘汰用户,解决超卖问题。
  4. Redis 2.6 版本以后,通过 LUA 脚本解决争抢问题,实际上是 Redis 利用其单线程的特性,用任务队列的方式解决多任务并发问题。

3、使用 LUA 脚本保证删除的原子性

  • 在执行完操作后,要释放所之前,定义一段 LUA 脚本,保证删除的原子性

  • 添加 testLockLua 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@GetMapping("testLockLua")
public void testLockLua() {
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
String skuId = "25"; // 访问skuId 为25号的商品 100008348542
String locKey = "lock:" + skuId; // 锁住的是每个商品的数据

// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);

// 第一种: lock 与过期时间中间不写任何的代码。
// redisTemplate.expire("lock",10, TimeUnit.SECONDS);//设置过期时间
// 如果true
if (lock) {
// 执行的业务逻辑开始
// 获取缓存中的num 数据
Object value = redisTemplate.opsForValue().get("num");
// 如果是空直接返回
if (StringUtils.isEmpty(value)) {
return;
}
// 不是空 如果说在这出现了异常! 那么delete 就删除失败! 也就是说锁永远存在!
int num = Integer.parseInt(value + "");
// 使num 每次+1 放入缓存
redisTemplate.opsForValue().set("num", String.valueOf(++num));
/*使用lua脚本来锁*/
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
} else {
// 其他线程等待
try {
// 睡眠
Thread.sleep(1000);
// 睡醒了之后,调用方法。
testLockLua();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

4、总结

  • 加锁
1
2
3
4
5
6
7
8
//1 声明一个uuid ,将做为一个value 放入我们的key所对应的值中
String uuid = UUID.randomUUID().toString();
//2 定义一个锁:lua 脚本可以使用同一把锁,来实现删除!
String skuId = "25"; // 访问skuId 为25号的商品 100008348542
String locKey = "lock:" + skuId; // 锁住的是每个商品的数据

// 3 获取锁
Boolean lock = redisTemplate.opsForValue().setIfAbsent(locKey, uuid, 3, TimeUnit.SECONDS);
  • 使用 LUA 脚本释放锁,使用了 LUA 脚本的原子性
1
2
3
4
5
6
7
8
9
10
11
12
/*使用lua脚本来锁*/
// 定义lua 脚本
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
// 使用redis执行lua执行
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>();
redisScript.setScriptText(script);
// 设置一下返回值类型 为Long
// 因为删除判断的时候,返回的0,给其封装为数据类型。如果不封装那么默认返回String 类型,
// 那么返回字符串与0 会有发生错误。
redisScript.setResultType(Long.class);
// 第一个要是script 脚本 ,第二个需要判断的key,第三个就是key所对应的值。
redisTemplate.execute(redisScript, Arrays.asList(locKey), uuid);
  • 重试

为了确保分布式锁可用,我们至少要确保锁的实现同时满足以下四个条件

  1. 互斥锁

在任意时刻,只有一个客户端能持有锁。

  1. 不会发生死锁

即使有一个客户端在持有锁的期间崩溃而没有主动解锁,也能保证后续其他客户端能加锁。

  1. 解铃还须系铃人

加锁和解锁必须是同一个客户端,客户端自己不能把别人加的锁给解了。

  1. 加锁和解锁必须具有原子性

5、项目中正确使用

  • 定义 Key ,Key 应该是为每个 sku 定义的,也就是每个 sku 必须有一把锁

image-20210425212623498