Redis

1、NoSQL

1、概述

NoSQL(not only SQL ) 不仅仅是SQL,泛指非关系型数据库,NoSQL不依赖业务逻辑方式存储,而是以简单的key-Value 模式 存储,因此的 大大的增加了数据库的拓展能力

  • 不遵守SQL标准
  • 不支持ACID(原子性,一致性,隔离性,持久性)
  • 远超于SQL的性能

2、NoSQL 常用场景

  • 对数据高并发的读写
  • 海量数据的读写
  • 对数据高可延展性

3、NoSQL不支持的场景

  • 需要事务支持
  • 基于SQL的结构化查询存储,处理复杂的关系,需要即系查询
  • 用不着SQL的和用了SQL也不行的情况就用NoSQL

4、安装Redis

首先要在Linux环境中安装,并且要求安装gcc,当我们电脑中安装有gcc时,我们需要对版本进行升级

1
2
3
4
yum -y install centos-release-scl
yum -y install devtoolset-9-gcc devtoolset-9-gcc-c++ devtoolset-9-binutils
scl enable devtoolset-9 bash
echosource /opt/rh/devtoolset-9/enable” >>/etc/profile 

之后安装redis, 将文件夹解压在我们创建的目录中

进入包下 输入 make 指令,而后输入指令 make install

安装完成后 打开redis.config文件 将其中的

1
2
3
4
5
6
7
8
bind 127.0.0.1 #绑定ip:如果需要远程访问,可将此行注释,或绑定一个真实ip
port 6379 #端口号
protected-mode yes #是否开启保护模式 改为no
daemonize no #是否设为后台运行 改为 yes
#requirepass foobared #密码设置 123456
pidfile /var/run/redis_6379.pid #进程文件保存位置,redis运行后会在此位置自动生成
logfile “” #日志文件保存位置
dir ./ #redis位置

配置防火墙

1
2
3
4
firewall-cmd --zone=public --add-port=6379/tcp --permanent ----添加6379端口
firewall-cmd --reload ----重启防火墙
firewall-cmd --list-port -----查看所有开放端口号
firewall-cmd --query-port=6379/tcp -----查看指定端口是否开放

关闭redis:

1
redis-cli shutdown #没有设置密码,运行此行代码

redis启动服务

1
./redis-server   配置文件的绝对路径     (在/usr/local/bin目录下) 

redis 开启连接

1
redis-cli

2、Redis的数据类型

redis有五大数据类型:String(字符串)、List(列表)、Set(集合)、Hash(哈希),Zset(有序集合)

Redis对键的操作

  • keys * 查看当前库的所有key
  • exists key 判断某个key是否存在
  • type key 查看你的key是什么类型
  • del key 查处指定的key数据
  • unlink key 根据value选择非阻塞删除(如果数据是个BigKey数据处理起来会非常慢,那么由于redis是单线程的,就会导致出现缓存队列,甚至会出现队列溢出)
  • expire key 10 给key设置10s的过期时间
  • ttl key 查看还有多少秒过期,-1为永不过期,-2表示已经过期
  • select dbindex 切换到数据库 0-15 默认为0
  • move key dbindex 将数据移动到该索引的数据库内
  • Redis一共有16个库,select 命令为切换数据库
  • dbsize查看当前数据库的key的数量
  • flushdb 清空当前库
  • flushall 通杀所有库

Redis中的命令不区分大小写,但key与value区分大小写

image-20230511124318066

Redis字符串(String)

概述

  • String是Redis最基础的类型,一个key对应一个value

  • String类型是二进制安全的,意味着Redis的String可以包含任何数据,例如jpg图片或者序列化的对象

  • String类型是Redis最基本的数据类型,Redis中字符串value最多可以使512M

常用命令

set < key> <value> 添加键值对,倘若要设置两个相同的key那么其后者的将会覆盖前者

可选项:

  • EX – 设置指定的过期时间(以秒为单位),给key设置过期时间若超过已过期,则该key以及对应的value会被直接删除。

  • PX 毫秒 – 设置指定的过期时间(以毫秒为单位)。

  • EXAT timestamp-seconds – 设置key过期的指定 Unix 时间(以秒为单位)。

  • PXAT timestamp-毫秒 – 设置key过期的指定 Unix 时间(以毫秒为单位)。

  • NX– 仅当key尚不存在时才设置key,若设置成功返回ok,若存在key则返回nil

  • XX– 仅当key已存在时才设置密钥,若设置成功返回ok,若不存在key则返回nil

  • KEEPTTL– 保留与key关联的生存时间,当我们对key设置了过期时间t1,当key被修改时,ttl就变成了当次set所制定的过期时间

    而并非最开始设置的过期时间t1,但我们在某些业务场景下,需要保持这种key的ttl而不被set所修改,所以就用到了keepttl

  • GET– 返回存储在键的旧字符串,如果键不存在,则返回 nil。如果存储在键的值不是字符串,则会返回并中止错误。

示例:

image-20230511130815473image-20230511130940612image-20230511131348655

image-20230511131504714image-20230511131651165image-20230511132852631

get <key> 通过key获得value

append <key> <value> 将给定的 <value>追加到原值的末尾

strlen <key>获得值的长度

setnx <key> <value> 就是set < key> <value> if not exist <key>

incr <key> 将key中存储的数字值增1,只能对数字值的操作,如果为空,新增值为1

decr <key> 将key中存储的数字值减1

mset < key1> <value1> < key2> <value2> < key3> <value3> …… 同时设置一个或多个key-value对

mget< key1> < key2> < key3> …… 同时获取获取一个或多个value

msetnx < key1> <value1> < key2> <value2> < key3> <value3> …… 同时设置一个或多个key-value对,当且仅当所有给定的key都不存在时

原子性有一个失败则都失败

incr <key>执行一次value +1,且返回+1后的值

incrby <key> 步长 执行一次value+步长,且返回执行后的值

image-20230511134626243

decr <key> 执行一次 value -1 ,并返回执行后的值

decrby <key> 步长 执行一次value-1步长,且返回执行后的值

image-20230511134927565

getrange <key> <起始位置> <结束位置> 获得值的范围,类似于java中的subString ,如果是 getrange k1 0 -1那么就是将value全部取出

image-20230511133955005

setrange <key> <起始位置> <value> 用 <value> 覆盖<key> 所储存的字符串值,从起始位置开始

setex <key><过期时间><vlalue> 设置键值的同时,设置过期时间,单位为秒

getset <key><value> 以旧换新 设置了新值同时获得旧值

String类型的应用场景

作品点赞,浏览量增加

Redis列表(List)

概述

单键多值

Redis列表是最简单的字符串列表,按照插入顺序排序,你可以添加一个元素到列表头部(左边)或者尾部(右边)

他的底层实际上是一个双向链表,容量是2的32次方减1个元素,大概40多亿,主要功能有push/pop等,一般用在栈、队列、消息队列等场景。对两端操作性能很高,通过索引下标的操作中间的节点性能较差

image-20230511140132281

常见命令

lpush /rpush key value1 vlaue2 value3 …… 从左边或右边插入一个或多个值

image-20230511140445259

image-20230511140531466

lpop/rpop key 从左边或右边吐出一个值。 (值在键在,值光键亡)

image-20230511140630589image-20230511140653944

rpoplpush <key1> <key2> 从<key1>列表右边吐出一个值插到<key2>列表的左边

Irange <key><start><stop> 按照索引下标获得元素(从左到右)

lindex <key> <index> 按照索引下表获得元素(从左到右)

llen <key>获得列表的长度

linsert <key> before/after <value> <newvalue> 在所指定的value前边/后边插入newvalue

lrem <key> n <value> 从左边删除n个value(从左到右)

ltrim <key> <start> <end> 截取 索引 start 到end之间的 元素 并返回给 key

lset <key> <index> <value> 将列表key下表为index的值替换为value

rpoplpush <key1> <key2> 从 key1的最右侧队列弹出一个元素到key2队列的最左侧

应用场景

​ 例如,倘若我在订阅的公众号发布两个文章,文章id分别是 9,10,我的喜欢文章 的key value为 likeArtcle:zf 9,10,……

列表的数据结构

List的数据结构为快速链表quickList,首先列表元素较少的情况下会使用一块连续的内存存储,这个结构是ziplist即压缩列表

他将所有的元素紧挨着一起存储,分配的是一块连续的内存,当数据量较多的时候才会改成quicklist

因为普通链表需要的附加指针空间太大,会比较浪费空间,比如这个列表存的只是int类型的数据,结构还需要额外增加两个指针prev和newx

image-20221226002540698

Redis将链表和ziplist结合起来组成quicklist,也就是将多个自拍list使用双向指针串联起来使用,这样既满足了快速的插入删除性能,又不会出现太大的空间冗余

Redis集合Set

Set中的元素自动排重,不可重复

常用指令

sadd <key> <value1><value2>…… 将一个或多个member元素加入到集合key中,已经存在的member将被忽略

smembers <key> 取出该集合的所有值

sismember <key> <value> 判断集合<key>是否含有<value>值,有1,没有0

scard <key> 返回该集合元素的个数

srem <key> <value1> <value2> …… 删除集合中的某个元素

spop <pop> 随即从该集合中吐出一个值,集合中会去除这个值

srandmember <key> <n> 随机丛该集合中取出n个值,不会从集合中删除

smove <source> <desination> value 把一个集合中的一个值 移动到另一个集合中

sinter <key1> <key2> 返回两个集合的交集元素

sunion <key1> <key2> 返回两个集合并集的元素

sdiff <key1> <key2> 返回两个集合的差集元素 (key1中,但不在 key2中的 即为 k1 -k2)

set常用于社交软件中,例如我们的微信朋友圈点赞,假如我们发布了微信朋友圈,这时正好有两位朋友点赞,

我们就可以写成

1
sadd pub:msgId 点赞用户1id 点赞用户2id 

​ 有一人取消点赞

1
srem pub:masgId 点赞用户id

展示所有点赞的用户

1
smembers pub:msgId 

点赞用户数量统计

1
scard pub:msgId 

再比如我们的交友软件

推荐你可能认识的人, 张三与李四是好友 李四与王五是好友

这时候我们是张三,系统推荐我们可能认识的人是王五那么可以用set来实现

1
sdiff firend:diff 张三好友set集合 李四好友set集合

例如抖音中的共同好友数量

1
sdiff firend:inter 张三好友set 李四好友set

Set的数据结构

Set数据结构是dict字典,字典是用哈希表实现的

Java中的HashSet的内部是用的是HashMap,只不过所有的value都指向了同一个对象,Redis中的Set结构也是一样,他的内部也使用了hash结构,所有的value都只想同一个内部值

Redis 哈希(Hash)

概述

Redis hash是一个键值对集合

Redis hash 是一个String类型的field和value 映射表,hash特别适合存储对象,类似于java中的Map<key,Map<String,Object>>

常用的命令

hset <key> <field> <value> 给 <key> 集合中的 <field>键赋值<value>

hget <key1> <field>从<key1>集合 <field>取出value

image-20230511180722640

hash数据结构的用法非常广泛, 上述的例子就是一个典型的将对象存入redis中,利用的就是我们Hash数据结构,将对象001的信息存入其中

hmset<key1><field><value1> <field2><value2> …… 批量设置hash的值

hmget<key1> <field> <field2> …… 批量获取hash的值

image-20230511181641468

hgetall <key> 获取key中所有field以及对应的value值

image-20230511181550242

hexists <key1> <field>查看哈希表key中,给定域field 是否存在

hkeys <key> 列出该hash集合的所有field

hvals <key> 列出该hash 集合所有的value

hincrby <key><field><increment> 为哈希表 key 中的域 field的值加上 整数increment

hincrbyfloat <key><field><increment> 为哈希表 key 中的域 field的值加上 小数increment

hsetnx <key> <field> <value>将哈希表key 中的域field 的值设置为value,当且仅当field不存在

Hash的数据结构

Hash类型对应的数据结构是两种,ziplist(压缩列表),hashtable(哈希表),当field-value长度较短且个数较少时,使用ziplist否则使用hashtable

Redis有序集合Zset(sorted set)

Redis有序集合zset与普通集合set非常类似,是一个没有重复元素的字符串集合

不同之处是有序集合的每个成员都关联了一个评分(score) ,这个评分被用来按照最低分到最高分的方式排序集合中的成员,集合成员是唯一的,但是评分可以是重复的

因为元素是有序的,所以你可可以很快的根据评分或者次序来获取一个范围的元素,访问有序集合的中间元素也是非常快的因此你能够使用有序集合作为一个没有重复成员的智能列表

常用的命令

zadd <key> <score1><value> <score2><value2>……

将一个或多个member元素及其score值加入到有序集key当中

zrange <key> <start> <stop> [WITHSCORES] 返回有序集key中,下标在< start> <stop>之间的元素 带WITHSCORES,可以让分数一起和值返回到结果集

zrangebyscore key min max [withscores] [limit offset count] 返回有序集(升序)key中,所有score值介于 min和max 之间(包括等于min或max)的成员

zrevrangebyscore key max min 返回有序集key中,所有score值介于min和max之间,降序

zincrby <key> <increment><value> 为元素的score加上增加 increment

zrem <key> <value> 删除该集合下,指定值的元素

zcount <key> <min> <max> 统计该集合,分数之间的元素个数

zpopMax <key> conunt 弹出key集合中最大值的score的value

zpopMin <key> conunt 弹出key集合中最小值的score的value

zrank <key> <value> 返回该值在集合中的排名,从0开始

应用非常滴广泛,例如我们要对商城中的商品进行销量统计

其中 12件2002 13 件2003 ……

1
zadd good:sellsort 12 2002 13 2003 18 2004 24 2005 241 2006 1 2007 0 2008

又有两名用户下单了商品2004 且有20名用户下单2008

1
2
ZINCRBY good:sellsort 2 2004
ZINCRBY good:sellsort 20 2008

求前三名的销量

1
ZrevRANGE good:sellsort 0 2

Zset数据结构

ScoreSet(Zset)是Redis提供的一个非常特别的数据结构,一方面它等价于Java的数据结构Map<String,Double> 可以给每个元素value赋予一个权重score,另一方面他又类似于TreeSet,内部元素会按照权重score进行排序,可以得到每个元素的名词,还可以通过score的范围来获取元素的列表

Zset底层使用了两个数据结构:

  1. hash,hash的作用就是关联元素value和权重score,保证元素value的唯一性,可以通过元素value找到相应的score值

  2. 跳跃表,表余额表的目的在于给元素value进行排序,根据score的范围获取元素列表

    跳跃表

    有序集合在生活中比较常见,例如根据成绩对学生排名,根据得分对玩家排名等,对于有序集合的底层实现就可以用数组、平衡树、链表等,数组不便元素的插入删除;平衡树或红黑树虽然效率高但是结构复杂,连表查询需要遍历所有效率低,Redis使用的是跳跃表,跳跃表效率堪比红黑树,实现远比红黑树简单 , 其他了解请见博客https://blog.csdn.net/weixin_45480785/article/details/116293416

3、Redis中的发布与订阅

简介

Redis发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息,Redis客户端可以订阅任意数量的频道

发布订阅命令行实现

1、打开一个客户端订阅channnel1

SUBSCRIBE channel1

2、打开另一个客户端发送消息

publish channel1 hello

3、回到订阅消息的客户端

image-20221227001734316

4、Redis7新数据类型

redis 位域(bitfield)

通过bitfield命令可以一次性操作多个比特位域(指的是连续的多个比特位),它会执行一系列操作并返回一个响应数组,这个数组中的元素对应参数列表中的相应操作的执行结果。
说白了就是通过bitfield命令我们可以一次性对多个比特位域进行操作。

redis流(stream)

Redis Stream主要用于消息队列(MQ,Message Queue), Redis 本身是有一个Redis 发布订阅(pub/sub)来实现消息队列的功能,但它有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。

简单来说发布订阅(pub/sub)可以分发消息,但无法记录历史消息。而Redis Stream 提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失

stream流

实现消息队列,它支持消息的持久化、支持自动生成全局唯一ID、支持ack确认消息的模式、支持消费组模式等,让消息队列更加的稳定和可靠

  • 添加队列消息到队列末尾
1
2
xadd key ID field1 value1 field2 value2
# * 表示为消息id,使用* 后表示又redis替我们生成,可以自定义,但是要保证递增性,*表示让 Redis 自动生成唯一的消息 ID

image-20230515161314141

  • 用于获取消息列表,忽略已经删除的消息
1
2
3
xrange key  start  end   count 升序
xrerange key end start count 降序
#其中 start 表示开始值 + 表示最大值,end表示结束值 +表示为最大值 count表示最多获取多少个值
  • 用于消息删除
1
xdel key  id   

image-20230515161943733

  • 返回消息队列的长度
1
xlen  key	
  • 用于对stream的长度进行截取,如超长会进行截取
1
2
3
4
xtrim  key  maxlen  n
#n 为最大长度
xtrim key minid id
#去除队列中最小的id,倘若比该id还要小也一起被删除
  • 读取队列中的消息
    • 非阻塞
1
2
3
4
xread count n streams key  id
#n为读取的最大数
# $代表特殊ID,表示以当前Stream已经存储的最大的ID作为最后一个ID,当前Stream中不存在大于当前最大ID的消息,因此此时返回nil
# 0-O代表从最小的ID开始获取Stream中的消息,当不指定count,将会返回Stream中的所有消息,注意也可以使用O(O0/000也都是可以的
  • 阻塞
1
2
3
xread count n   block  timeout   stream key id
#读取消息时,只需要增加 BLOCK 参数即可支持「阻塞式」拉取消息。
# BLOCK timeout 表示阻塞等待,timeout表示超时时间, 0为 不设置超时时间

利用redis stream的阻塞队列实现 生产者 消费者

我们开启两个redis 会话 session1 session2

此时 session1 (消费者)作为我们的消费者开始实时监听 消息队列mystream中的新消息

1
2
xread count 1 block 0  streams  mystream  $
#我们设置的获取消息的数量为1 超时时间为 0 ,$为比当前队列中的最大id更大的id 监听队列中的新消息

此时我们的session2(生产者) 作为我们的生产者开始生产一条消息

1
xadd mystream * kf vf

此时session1 获取到了消息

image-20230515170447244

消费者组

1
2
3
4
5
6
7
#创建消费组
xgroup create key groupname id
#对于id $ 表示从队尾开始消费
0 表示从队首开始消费 队尾新来
#指定消费组的消费者消费消息
XREADGROUP group groupname consumername count n streams key >
# >表示游标读取 n为读取消息数量

stream中的消息—旦被消费组里的一个消费者读取了,就不能再被该消费组内的具他消费者读取了,即同—个消费组里的消费者不能消费同—条消息XREADGROUP命令再执行—次,此时读到的就是空值

消费组的目的:

让组内的多个消费者共同分担读取消息,所以,我们通常会让每个消费者读取部分消息,从而实现消息读取负载在多个消费者间是均衡分布的

如何知道读取的消息是否ack了呢

1
2
XPENDING key groupname start end  n  consumername
#n为显示的数量

image-20230515215649319

ACK 答复

1
XACK key groupName id 	

bitmaps(位图)

Bitmaps 称为位图,它不是一种数据类型。网上很多视频教程把Bitmaps称为数据类型,应该是不正确的。Bitmaps 是Redis提供给使用者用于操作位的“数据类型”。它主要有如下的基本特性:

  • Bitmaps 不是数据类型,底层就是字符串(key-value),byte数组。我们可以使用普通的get/set直接获取和设值位图的内容,也可以通过Redis提供的位图操作getbit/setbit等将byte数组看成“位数组”来处理
  • Bitmaps 的“位数组”每个单元格只能存储0和1,数组的下标在Bitmaps中称为偏移量
  • Bitmaps设置时key不存在会自动生成一个新的字符串,如果设置的偏移量超出了现有内容的范围,就会自动将位数组进行零扩充

image-20221227201847546

常用命令

setbit key offset value 对key存储的字符串,设置或者清除指定偏移量上的位(bit),位的设置或者清除取决于value参数,0/1;当key不存在时,自动生成一个新的字符串。字符串会进行伸展确保value保存在指定的偏移量上。字符串进行伸展时,空白位置以0填充

例如使用Bitmaps来存储用户是否打卡,打卡记做1,未打卡为0,用户的id作为偏移量
假设存在10个用户,此时用户1、3、5、9、10打了卡,其他人未打卡,Bitmaps的初始化结果如下所示:

image-20221227202452540

GETBIT key offset

获取指定偏移量上的位(bit),当offset比字符串长度大,或者key不存在,返回0;

image-20221227202717804

strlen key

获取bitmap的字节数

不是字符串长度而是占据几个字节,超过8位后自己按照8位一组一byte再扩容

BITCOUNT key [start] [end]

计算给定字符串中,被设置为1的bit位的数量。start和end参数可以指定查询的范围,可以使用负数值。-1代表最后一个字节,-2代表倒是第二个字节。
注意:start和end是字节索引,因此每增加1 代表的是增加一个字符,也就是8位,所以位的查询范围必须是8的倍数。

当我插入 1 3 5 8 10 占位为1 时

当我们使用命令BITCOUNT clock:20221227 0 1 时, 我们会输出5(01010100 10100000 00000000)

image-20221227203541187

当我们输入命令 bitcount clock:20221227 0 0 时 我们会输出 3 (01010100)

image-20221227203850444

当我们输入命令 bitcount clock:20221227 1 1 时我们会输出 2(10100000)

image-20221227203932393

当我们输入命令 bitcount clock:20221227 2 2 时我们会输出 0 (00000000)

image-20221227204150513

BITOP operation destkey key [key …]

Redis的Bitmaps提供BITOP指令来对一个或多个(除了NOT操作)二进制位的字符串key进行位元操作,操作的结果保存到destkey上,operation是操作类型,有四种分别是:AND、OR、NOT、XOR

  • BITOP AND destkey key [key …] ,对一个或多个 key 求逻辑并,并将结果保存到 destkey

  • BITOP OR destkey key [key …] ,对一个或多个 key 求逻辑或,并将结果保存到 destkey

  • BITOP XOR destkey key [key …] ,对一个或多个 key 求逻辑异或,并将结果保存到 destkey

  • BITOP NOT destkey key ,对给定 key 求逻辑非,并将结果保存到 destkey

HyperLogLog

Redis HyperLogLog 是用来做基数统计的算法,HyperLogLog 的优点是,在输入元素的数量或者体积非常非常大时,计算基数所需的空间总是固定 的、并且是很小的。

在 Redis 里面,每个 HyperLogLog 键只需要花费 12 KB 内存,就可以计算接近 2^64 个不同元素的基 数。这和计算基数时,元素越多耗费内存就越多的集合形成鲜明对比。

但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。

pfadd key value1 value2 value3 …… 存储多个基数,自动去重

pfcount key 计算基数的数量

pfmerge list key1 key2 将key1 key2 中的基数的并集储存在list中

Geospatial

可储存地理位置的经纬度,如果出现中文乱码则要在进入客户端之前添加属性 –raw

geoadd key 经度 维度 城市 经度2 维度2 城市2 …… 储存各个城市的经纬度

geopos key 城市1 城市2 …… 输出城市的经纬度

geohash key 城市1 城市 2 返回城市坐标使用geohash 表示

geodist key 城市1 城市2 单位(m,km,mi,ft) 输出城市之间的距离

georadius key 经度 维度 半径 单位(m,km,mi,ft) 以该经纬度圆心做半径的圆包括哪些城市

5、jedis

导入依赖坐标

1
2
3
4
5
6
7
8
9
<dependency>

    <groupId>redis.clients</groupId>

    <artifactId>jedis</artifactId>

    <version>3.6.0</version>

</dependency>

使用java操作redis数据库

创建链接

1
2
3
Jedis jedis=new Jedis("192.168.26.131",6379,0); 远程链接的IP地址,端口号
jedis.auth("123456"); 密码

其中有许多API操作

list 集合操作

1
2
3
4
5
6
jedis.set("key","value")增添键值
jedis.del("key") 删除键key
jedis.get("key") 获取键key的value值
jedis.append("key","value2")追加增添键key的值value2
jedis.lpush("key","1","2","3","4");向key为键的list增加值 1 2 3 4
jedis.lrange("key",0,-1) 将key键的list 从0索引到-1索引全部到一个java类型的list中

set集合操作

1
2
jedis.sadd("names","zf","zhm","666666"); 以names为键,后续的几个字符串作为值
jedis.smembers("names"); 将value作为Java类型的Set进行封装

hash集合操作

1
2
jedis.hset("users","name","zf");  创建以 users 为键 ,name 为 field  zf 为 值的一个hash 数据结构
jedis.hget("users","name") 获取 users 为键,name为field的hash 的value

Zset集合操作

1
2
3
4
5
6
7
jedis.zadd("programs",100,"python"); 
jedis.zadd("programs",200,"c++");
jedis.zadd("programs",300,"javaScript");
jedis.zadd("programs",400,"java");
创建key为programs score 为 100 200 300 400 值分别为 python c++ javaScript java 的Zset对象
Set<String> programs = jedis.zrange("programs", 0, -1);
将key为programs的Zset 从索引0到-1(输出全部)的value值,按照其分别的score进行排序并且输出

SpringDataRedis

SpringData是Spring中数据操作的模块,包含对各种数据库的集成 ,其中对Redis的集成模块就叫做SpringDataRedis

  • 提供了对不同Redis客户端的整合(Lettuce和Jedis)
  • 提供了RedisTemplate统一API来操作Redis
  • 支持Redis的发布订阅模型
  • 支持Redis烧饼和Redis集群
  • 支持基于lettuce的响应式编程
  • 支持基于JDK、JSON、字符串、Spring对象的数据序列化以及反序列化
  • 支持基于Redis的JDKCollection实现

SpringDataRedis快速入门

SpringDataRedis提供了RedisTemplate工具类,其中就封装了各种对Redis的操作,并将不同数据类型的操作API封装到不同的类型中

API:redisTemplate.opsForValue() 返回值类型: ValueOperations 说明:操作String类型数据

API:redisTemplate.opsForHash 返回值类型: HashOperations 说明:操作Hash类型数据

API:redisTemplate.opsForList() 返回值类型: ListOperations 说明:操作List类型数据

API:redisTemplate.opsForSet() 返回值类型: SetOperations 说明:操作Set类型数据

API:redisTemplate.opsForZSet() 返回值类型: ZSetOperations 说明:操作ZSet类型数据

使用SpringDataRedis

首先要导入依赖

1
2
3
4
5
6
7
8
9
10
<!--Redis依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--连接池依赖-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
redis:
#redis服务器地址
host: 192.168.26.132
#redis服务器端口
port: 6379
#redis数据库索引
database: 0
lettuce:
pool:
#连接池最大连接
max-active: 20
#连接池最大连接数
max-idle: 5
#连接池中的最小空闲连接
min-idle: 0
#最大连接等待时长
max-wait: 100
password: 123456

编码测试

1
2
3
4
5
6
7
8
9
10
11
12
13
public class RedisTestController {
@Autowired
private RedisTemplate redisTemplate;

@GetMapping("/")
public String testRedis(){
//设置值到redis
redisTemplate.opsForValue().set("name","lucy");
//从redis获取值
String name =redisTemplate.opsForValue().get("name");
return name;
}
}

但是这样操作会带来一些列问题,当我们在redis去get key时我们并不能get到 我们在RedisTemplate所set的值,当我们查看却发现

image-20230114203640729

这是为什么呢?

当RedisTemplate可以接收人意Object作为值写入Redis,只不过写入前会把Object序列化为字节形式,默认是采用JDK序列化,得到的结果就是这样,这样写不仅可读性差,而且内存占用大

如何处理?

在RedisTemplate源码中,有RedisSerializer与Jackson2JsonRedisSerializer会实现key与value 的序列化,我们使用RedisSerializer作为key 的序列化方式,将Jackson2JsonRedisSerializer作为Value的Object类型的转换,我们可以将配置写在RedisConfig中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.example.redisboot.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
//创建RedisTemplate对象
RedisTemplate<String, Object> template = new RedisTemplate<>();
//设置连接工厂
template.setConnectionFactory(redisConnectionFactory);
//创建JSON序列化连接工具
GenericJackson2JsonRedisSerializer genericJackson2JsonRedisSerializer = new GenericJackson2JsonRedisSerializer();
//设置key的序列化
template.setKeySerializer(StringRedisSerializer.UTF_8);
template.setHashKeySerializer(StringRedisSerializer.UTF_8);
//设置value的序列化
template.setValueSerializer(genericJackson2JsonRedisSerializer);
template.setHashValueSerializer(genericJackson2JsonRedisSerializer);
return template;
}
}

当我们存入对象时

image-20230114213024518

我们会存入对象类的包名,还有其中的属性及其值

6、Springboot整合redis

首先要在SpringBoot项目中导入maven依赖

1
2
3
4
5
6
7
8
9
10
11
<!--redis-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!--spring集成redis所需commons-pool2-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>

在配置文件中写入相关配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
spring:
redis:
#redis服务器地址
host: hasdsd.cn
#redis服务器端口
port: 6379
#redis数据库索引
database: 0
lettuce:
pool:
shutdown-timeout: 100
max-active: 20
#最大阻塞等待时间(负数没有限制)
max-wait: -1
#连接池中的最大空闲连接
max-idle: 5
#连接池中的最小空闲连接
min-idle: 0
timeout: 5000

添加redis配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package com.example.redisboot.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.*;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
* redis配置类
*/
@Configuration
@EnableCaching
public class RedisConfig extends CachingConfigurerSupport {

/**
* retemplate相关配置
* @param factory
* @return
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

RedisTemplate<String, Object> template = new RedisTemplate<>();
// 配置连接工厂
template.setConnectionFactory(factory);

//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
Jackson2JsonRedisSerializer jacksonSeial = new Jackson2JsonRedisSerializer(Object.class);

ObjectMapper om = new ObjectMapper();
// 指定要序列化的域,field,get和set,以及修饰符范围,ANY是都有包括private和public
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// 指定序列化输入的类型,类必须是非final修饰的,final修饰的类,比如String,Integer等会跑出异常
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jacksonSeial.setObjectMapper(om);

// 值采用json序列化
template.setValueSerializer(jacksonSeial);
//使用StringRedisSerializer来序列化和反序列化redis的key值
template.setKeySerializer(new StringRedisSerializer());

// 设置hash key 和value序列化模式
template.setHashKeySerializer(new StringRedisSerializer());
template.setHashValueSerializer(jacksonSeial);
template.afterPropertiesSet();

return template;
}

/**
* 对hash类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public HashOperations<String, String, Object> hashOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForHash();
}

/**
* 对redis字符串类型数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ValueOperations<String, Object> valueOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForValue();
}

/**
* 对链表类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ListOperations<String, Object> listOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForList();
}

/**
* 对无序集合类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public SetOperations<String, Object> setOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForSet();
}

/**
* 对有序集合类型的数据操作
*
* @param redisTemplate
* @return
*/
@Bean
public ZSetOperations<String, Object> zSetOperations(RedisTemplate<String, Object> redisTemplate) {
return redisTemplate.opsForZSet();
}
}

Redis工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
package com.xxxx.utils;

import java.util.*;
import java.util.concurrent.TimeUnit;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;

/**
* Redis工具类
*
* @author zlpt
* @date 2020年11月17日
*/
@Component
@Slf4j
public final class RedisUtils {
// 直接用RedisTemplate操作Redis,需要很多行代码,因此直接封装好一个RedisUtil,这样写代码更方便点。这个RedisUtil交给Spring容器实例化,使用时直接注解注入。
@Autowired
private RedisTemplate<String, Object> redisTemplate;
// =============================common============================

/**
* 指定缓存失效时间
*
* @param key 键
* @param time 时间(秒)
* @return
*/
public boolean expire(String key, long time) {
try {
if (time > 0) {
redisTemplate.expire(key, time, TimeUnit.SECONDS);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 根据key 获取过期时间
*
* @param key 键 不能为null
* @return 时间(秒) 返回0代表为永久有效
*/
public long getExpire(String key) {
return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}

/**
* 判断key是否存在
*
* @param key 键
* @return true 存在 false不存在
*/
public boolean hasKey(String key) {
try {
return redisTemplate.hasKey(key);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 删除缓存
*
* @param key ...表示可以传一个值 或多个
*/
@SuppressWarnings("unchecked")
public void del(String... key) {
if (key != null && key.length > 0) {
if (key.length == 1) {
redisTemplate.delete(key[0]);
} else {
redisTemplate.delete((Collection<String>) CollectionUtils.arrayToList(key));
}
}
}
// ============================String=============================

/**
* 普通缓存获取
*
* @param key 键
* @return
*/
public Object get(String key) {
return key == null ? null : redisTemplate.opsForValue().get(key);
}

/**
* 普通缓存放入
*
* @param key 键
* @param value 值
* @return true成功 false失败
*/
public boolean set(String key, Object value) {
try {
redisTemplate.opsForValue().set(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 普通缓存放入并设置时间
*
* @param key 键
* @param value 值
* @param time 时间(秒) time要大于0 如果time小于等于0 将设置无限期
* @return true成功 false 失败
*/
public boolean set(String key, Object value, long time) {
try {
if (time > 0) {
redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
} else {
set(key, value);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 递增
*
* @param key 键
* @param delta 要增加几(大于0)
* @return
*/
public long incr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递增因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, delta);
}

/**
* 递减
*
* @param key 键
* @param delta 要减少几(小于0)
* @return
*/
public long decr(String key, long delta) {
if (delta < 0) {
throw new RuntimeException("递减因子必须大于0");
}
return redisTemplate.opsForValue().increment(key, -delta);
}

// ================================Map=================================

/**
* HashGet
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return
*/
public Object hget(String key, String item) {
return redisTemplate.opsForHash().get(key, item);
}

/**
* 获取所有给定字段的值
* author: xu
* @param key
* @return
*/
public Map<Object, Object> hgetall(String key) {
return redisTemplate.opsForHash().entries(key);
}

/**
* 获取hashKey对应的所有键值
*
* @param key 键
* @return 对应的多个键值
*/
public Map<Object, Object> hmget(String key) {
return redisTemplate.opsForHash().entries(key);
}

/**
* HashSet
*
* @param key 键
* @param map 对应多个键值
* @return true 成功 false 失败
*/
public boolean hmset(String key, Map<String, Object> map) {
try {
redisTemplate.opsForHash().putAll(key, map);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* HashSet 并设置时间
*
* @param key 键
* @param map 对应多个键值
* @param time 时间(秒)
* @return true成功 false失败
*/
public boolean hmset(String key, Map<String, Object> map, long time) {
try {
redisTemplate.opsForHash().putAll(key, map);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value) {
try {
redisTemplate.opsForHash().put(key, item, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 向一张hash表中放入数据,如果不存在将创建
*
* @param key 键
* @param item 项
* @param value 值
* @param time 时间(秒) 注意:如果已存在的hash表有时间,这里将会替换原有的时间
* @return true 成功 false失败
*/
public boolean hset(String key, String item, Object value, long time) {
try {
redisTemplate.opsForHash().put(key, item, value);
if (time > 0) {
expire(key, time);
}
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 删除hash表中的值
*
* @param key 键 不能为null
* @param item 项 可以使多个 不能为null
*/

public void hdel(String key, Object... item) {
redisTemplate.opsForHash().delete(key, item);
}

/**
* 判断hash表中是否有该项的值
*
* @param key 键 不能为null
* @param item 项 不能为null
* @return true 存在 false不存在
*/
public boolean hHasKey(String key, String item) {
return redisTemplate.opsForHash().hasKey(key, item);
}

/**
* hash递增 如果不存在,就会创建一个 并把新增后的值返回
*
* @param key 键
* @param item 项
* @param by 要增加几(大于0)
* @return
*/
public double hincr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, by);
}

/**
* hash递减
*
* @param key 键
* @param item 项
* @param by 要减少记(小于0)
* @return
*/
public double hdecr(String key, String item, double by) {
return redisTemplate.opsForHash().increment(key, item, -by);
}
// ============================set=============================

/**
* 根据key获取Set中的所有值
*
* @param key 键
* @return
*/
public Set<Object> sGet(String key) {
try {
return redisTemplate.opsForSet().members(key);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

/**
* 根据value从一个set中查询,是否存在
*
* @param key 键
* @param value 值
* @return true 存在 false不存在
*/
public boolean sHasKey(String key, Object value) {
try {
return redisTemplate.opsForSet().isMember(key, value);
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 将数据放入set缓存
*
* @param key 键
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSet(String key, Object... values) {
try {
return redisTemplate.opsForSet().add(key, values);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}

/**
* 将set数据放入缓存
*
* @param key 键
* @param time 时间(秒)
* @param values 值 可以是多个
* @return 成功个数
*/
public long sSetAndTime(String key, long time, Object... values) {
try {
Long count = redisTemplate.opsForSet().add(key, values);
if (time > 0)
expire(key, time);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}

/**
* 获取set缓存的长度
*
* @param key 键
* @return
*/
public long sGetSetSize(String key) {
try {
return redisTemplate.opsForSet().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}

/**
* 移除值为value的
*
* @param key 键
* @param values 值 可以是多个
* @return 移除的个数
*/
public long setRemove(String key, Object... values) {
try {
Long count = redisTemplate.opsForSet().remove(key, values);
return count;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
// ===============================list=================================

/**
* 获取list缓存的内容
*
* @param key 键
* @param start 开始
* @param end 结束 0 到 -1代表所有值
* @return
*/
public List<Object> lGet(String key, long start, long end) {
try {
return redisTemplate.opsForList().range(key, start, end);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

/**
* 获取list缓存的长度
*
* @param key 键
* @return
*/
public long lGetListSize(String key) {
try {
return redisTemplate.opsForList().size(key);
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}

/**
* 通过索引 获取list中的值
*
* @param key 键
* @param index 索引 index>=0时, 0 表头,1 第二个元素,依次类推;index<0时,-1,表尾,-2倒数第二个元素,依次类推
* @return
*/
public Object lGetIndex(String key, long index) {
try {
return redisTemplate.opsForList().index(key, index);
} catch (Exception e) {
e.printStackTrace();
return null;
}
}

/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, Object value) {
try {
redisTemplate.opsForList().rightPush(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, Object value, long time) {
try {
redisTemplate.opsForList().rightPush(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @return
*/
public boolean lSet(String key, List<Object> value) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 将list放入缓存
*
* @param key 键
* @param value 值
* @param time 时间(秒)
* @return
*/
public boolean lSet(String key, List<Object> value, long time) {
try {
redisTemplate.opsForList().rightPushAll(key, value);
if (time > 0)
expire(key, time);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 根据索引修改list中的某条数据
*
* @param key 键
* @param index 索引
* @param value 值
* @return
*/
public boolean lUpdateIndex(String key, long index, Object value) {
try {

redisTemplate.opsForList().set(key, index, value);
return true;
} catch (Exception e) {
e.printStackTrace();
return false;
}
}

/**
* 移除N个值为value
*
* @param key 键
* @param count 移除多少个
* @param value 值
* @return 移除的个数
*/
public long lRemove(String key, long count, Object value) {
try {
Long remove = redisTemplate.opsForList().remove(key, count, value);
return remove;
} catch (Exception e) {
e.printStackTrace();
return 0;
}
}
}

7、Redis事务锁机制

定义

Redis事务是一个单独的操作:事务中的所有命令都会被序列化、按顺序地执行,事务在执行的过程中,不会其他客户端发送来的请求命令打断

Redis事务的主要作用就是串联多个命令防止命令插队

Multi、Exec、duscard

从输入Multi命令开始,输入的命令会被依次进入命令队列中,但不会被执行,直到输入Exec后,Redis会将之前的命令队列中的命令依次执行

组队的过程中可以通过discard来放弃组队

image-20230108010357345

在组队过程中如果有语句发生错误,那么整个队伍的语句都不可以执行,当执行的过程中有语句发生错误,那么只会有发生错误的语句不执行

事务冲突

脏读就是最典型的事务冲突,例如:A事务访问并修改了数据,但未提交;B事务读取了A事务修改后的数据,发生了脏读。若A回滚,则B所做的一切操作都是错的。如何规避事务冲突,那么就可以用到乐观锁和悲观锁

乐观锁

image-20230108155430540

乐观锁(Optimistic Lock) ,顾名思义,就很乐观,每次去拿数据的时候都会认为别人不会修改,所以不会上锁,但是在下一次更新的时候会判断一下在此期间别人有没有去更新这个数据,可以使用版本号等机制,乐观锁适用于多读写的应用类型,这样可以提高吞吐量,Redis就是利用这种check-and-set机制实现事务

悲观锁

image-20230108154731365

悲观锁(pessimistic Lock) ,顾名思义,就是很悲观,每次去拿数据的时候都认为别人会修改,所以每次拿数据的时候都会先上锁,这样别人想拿这个数据就会被阻塞(block),直到他拿到锁,传统的关系型数据库就是用道理喝多这种锁的机制,比如行锁,表锁等,读锁,写锁等都是在这个到做之前先上锁

乐观锁在Redis中的实现

watch key1 [key2]

在执行multi之前,限制性watch key1 [key2],可以监视一个或多个key,如果在事务执行之前这个(这些)key被其他命令改动那么事务将会被打断

image-20230108161954038

左边先执行改动了 balance的值,而右边也进行了改动,但发现balance在 watch之后被改动过,所以不执行该事务

Redis事务的三大特性

  • 单独的隔离操作
    • 事务中的所有命令都会被序列化、按顺序地执行,食物在执行的过程中不会被其他客户端发来的命令请求所打断
  • 没有隔离级别的概念
    • 队列中的命令没有提交之前都不会实际被执行,因为事务提交前任何指令都不会被世纪执行
  • 不保证原子性
    • 十五中如果有一条命令执行失败,其后的命令仍然会被执行,没有回滚

8、redis持久化操作

Redis提供了两种持久化操作方式:

  • RDB(Redis database)
  • AOF (Append of File)

RDB

什么是RDB?

在指定的时间间隔内将内存中的数据集快照写到磁盘中,也就是行话所讲的SnapShot快照,它恢复时是将快照文件直接读到内存里

RDB备份是怎么执行的

Redis会单独创建(fork)一个子进程来进行持久化,会先将数据写到一个临时文件中,待持久化过程都结束了。再将这个临时文件替换上持久化好的文件。整个过程中,主进程时不进行任何IO操作的,这就确保了极高的性能,如果需要进行大规模的数据恢复,且对于数据恢复的完整性不是特别敏感,那么RDB方式是要比AOF方式更加高效,RDB的缺点是最后一次持久化的数据可能丢失

Fork

  • Fork的作用是复制一个与当前进程一样的进程,新进程的所有数据(变量、环境变量、程序计数器等)数值和原进程一致,但是是一个全新的进程,并作为原进程的子进程
  • 在Linux系统中,fork()会产生一个和父进程完全相同的子进程,但子进程再次后多会exec系统调用,处于效率的考虑,Linux中引用了写时复制技术
  • 一般情况父进程和子进程会共用一段物理内存,只有进程空间的隔断的内容要发生变化时,才会将父进程的内容复制一份给子进程

利用配置文件管理save

  • save 900 1 表示在 900 秒内,至少更新了 1 条数据,Redis 自动触发 BGSAVE 命令,将数据保存到硬盘。

  • save 300 10 表示在 300 秒内,至少更新了 10 条数据,Redis 自动触 BGSAVE 命令,将数据保存到硬盘

  • save 60 10000 表示 60 秒内,至少更新了 10000 条数据,Redis 自动触发 BGSAVE 命令,将数据保存到硬盘。

  • image-20230111152835505

我们知道,在 RDB 持久化的过程中,子进程会把 Redis 的所有数据都保存到新建的 dump.rdb 文件中,这是一个既消耗资源又浪费时间的操作。因此 Redis 服务器不能过于频繁地创建 rdb 文件,否则会严重影响服务器的性能。

RDB 持久化的最大不足之处在于,最后一次持久化的数据可能会出现丢失的情况。我们可以这样理解,在 持久化进行过程中,服务器突然宕机了,这时存储的数据可能并不完整,比如子进程已经生成了 rdb 文件,但是主进程还没来得及用它覆盖掉原来的旧 rdb 文件,这样就把最后一次持久化的数据丢失了。

RDB 数据持久化适合于大规模的数据恢复,并且还原速度快,如果对数据的完整性不是特别敏感(可能存在最后一次丢失的情况),那么 RDB 持久化方式非常合适。

自动触发RDB持久化操作

当我们对redis的操作达到了配置文件中的条件,那么就会生成dump文件在我们所指定的目录中(配置文件中可以指定dump文件的位置和名称),当我们的redis 突然宕机,那么当我们重启redis服务时,redis会帮我们将原来的数据恢复

手动触发RDB持久化操作

savebgsave(默认)

save:生产上,是绝对不可以用save的,因为save为一个阻塞命令,当执行时redis会放下手头的工作全力以赴的去生成dump文件,在生产中,每秒产生数以万计的缓存,如果发生阻塞,那么缓存中的信息会直接消失

bgsave:Redis会在后台异步进行快照操作,不阻塞快照同时还可以响应客户端请求,该触发方式会fork一个子进程由子进程复制持久化过程

lastsave: 上一次执行快照的时间

原理如下

RDB快照模式原理

RDB 即快照模式,它是 Redis 默认的数据持久化方式,它会将数据库的快照保存在 dump.rdb 这个二进制文件中。

提示:所谓“快照”就是将内存数据以二进制文件的形式保存起来。

我们知道 Redis 是单线程的,也就说一个线程要同时负责多个客户端套接字的并发读写,以及内存数据结构的逻辑读写。

Redis 服务器不仅需要服务线上请求,同时还要备份内存快照。在备份过程中 Redis 必须进行文件 IO 读写,而 IO 操作会严重服务器的性能。那么如何实现既不影响客户端的请求,又实现快照备份操作呢,这时就用到了多进程。

Redis 使用操作系统的多进程 COW(Copy On Write) 机制来实现快照持久化操作。

RDB 实际上是 Redis 内部的一个定时器事件,它每隔一段固定时间就去检查当前数据发生改变的次数和改变的时间频率,看它们是否满足配置文件中规定的持久化触发条件。当满足条件时,Redis 就会通过操作系统调用 fork() 来创建一个子进程,该子进程与父进程享有相同的地址空间。

Redis 通过子进程遍历整个内存空间来获取存储的数据,从而完成数据持久化操作。注意,此时的主进程则仍然可以对外提供服务,父子进程之间通过操作系统的 COW 机制实现了数据段分离,从而保证了父子进程之间互不影响。

RDB文件修复

当出现rdb文件中出现缺失的信息该怎么办

例如:我们的rdb文件中写着1k条数据,当我下次正在写入rdb文件时 突然redis宕机了,rdb文件中传入了残次的数据,导致rdb文件无法运行数据无法恢复这时该怎么办

在我们的 redis-server 与 redis- cli的同级文件下有这样的一个方法 redis-check-rdb 他可以检查指定rdb文件的完整性与正确性

触发RDB快照模式的条件

  1. 配置文件中默认的快照配置
  2. 手动save/bgsave命令
  3. 执行flushall/flushdb命令也会产生dump.rdb文件
  4. 执行shutdown且没有设置开启AOF持久化
  5. 主从复制时,主节点的自动触发

redis快照模式调优参数

stop-srites-on-bgsave-error: 是否在bgsave操作出错时停止写入操作

默认yes
如果配置成no,表示你不在乎数据不一致或者有其他的手段发现和控制这种不一致,那么在快照写入失败时也能确保redis继续接受新的写请求

rdbcomporession:对于存储到磁盘中的快照,可以设置是否进行压缩存储,默认为yes,redis会采用LZF算法进行压缩,如果不想消耗CPU来进行压缩的话,可以设置关闭此功能

rdbchecksum

在存储快照后,还可以让redis使用CRC64算法来进行数据校验,但是这样做会增加大约10%的性能消耗,如果希望得到性能提升可以关闭此功能

rdb-del-sync-files :在没有持久性的情况下删除复制中使用的RDB文件启用。默认情况下no,此选禁用的

AOF

什么是AOF?

以日志的形式来记录每个写操作(增量保存),将Redis执行过程的所有写指令记录下来(读操作不记录),只追加文件但不可以修改写文件,Redis启动之初会读取该文件重新构建数据,换而言之,redis重启的话就根据日志文件的内容将写指令从前到后执行一次以完成数据的回复操作

AOF的持久化流程

  1. 客户端的请求写命令会被append追加到 AOF缓冲区内
  2. AOF缓冲区根据AOF持久化策略(always,everysec,no)将操作sync同步到磁盘的AOF文件中
  3. AOF文件大小超过重写策略或手动重写时,会对AOF文件rewrite’重写,压缩AOF文件容量
  4. Redis服务重启时,会重新load加载AOF文件中的写操作达到数据恢复的目的

image-20230517170420924

异常恢复

如果AOF文件遇到损坏,通过 /usr/local/bin/redis-check-aof –fix appendonly.aof的相对路径 进行修复,修复后重启redis

AOF同步频率配置

appendfsync always 始终同步,每次Redis的写入都会立即写入日志;性能较差但数据完整性比较好

appendfsync everysec 每秒同步,每秒计入日志一次,如果宕机,本秒的数据可能会丢失

appendfsync no redis不主动进行同步,把同步时机交给操作系统

AOF的文件存储规则与命名规则

reids7中 AOF目录默认存储来dump文件下,其中含有AOF文件,同时AOF引入了MP AOF原先单个的AOF文件变成了多个

顾名思义,Multi-Part-AOF就是将原来的单个AOF文件拆分成多个AOF文件。在MP-AOF中,我们将AOF分为三种类型,

分别为:

  • BASE:基础的AOF,一般由子进程重写完成,改文件最多有有一个
  • INCR:表示增量AOF,一般在AOFRW时被构建一般有多个
  • HISTORY:表示历史AOF,它由BASE与INCR AOF变化而来,每次AOFRW成功完成时,本次AOFRW之前对应的BASE与INCR AOF将会变为HISTORY,HISTORY类型的AOF也会被redis自动删除
  • 为了管理这些AOF文件,我们因付了一个manifest(清单)文件来追踪,管理这些AOF。同时,为了便于AOF备份和靠背,我们将所有的AOF文件和mainfest文件放在一个独立的文件夹目录中,目录由appenddirname配置决定

Rewrite压缩

什么是rewrite

AOF采用文件追加的方式,文件会越来越大,为了避免出现此种情况,新增了重写机制,当AOF文件的大小超过所设定的阈值时,Redis就会启动AOF文件的内容压缩,只保留可以恢复数据的最小指令集,可以使用命令bgrewriteaof

重写原理,如何实现重写

AOF文件持续增长而过大时,会fork出一条新进程来将文件重写(也是先写临时文件最后再rename),redis4.0后的重写,是指就是吧RDB的快照以2进制的形式附在新的aof头部,作为已有历史数据替换掉原来的流水账操作,我们可以设置一个基准,当文件大小大于这个基准的一倍时则会实现重写操作

优势
  • 备份机制更加稳健,丢失数据概率更低
  • 可读的日志文本,通过操作AOF文件,可以处理误操作
劣势
  • 比起RDB占用更多的磁盘空间
  • 恢复备份速度更慢
  • 每次读写都同步的话对性能有一定压力
  • 存在个别BUG造成回复不能

何时用RDB何时用AOF

  • 数据不敏感可单独选用RDB
  • 不建议单独用AOF,会出现BUG
  • 当Redis只是做缓存时可以都不用

9、主从复制

什么是主从复制

主机数据更新后根据配置和策略,自动同步到备机的master/slaver机制,Master以写为主,slave以读为主

能干嘛

  • 读写分离,性能扩展
  • 容灾快速恢复
1
info repliction  #查看主从复制的相关信息

10、缓存

1、什么是缓存

缓存就是数据交换的缓冲区(cache),就是存储数据的临时地方,一般读写性能比较高

2、缓存的作用与成本

  • 作用:降低后端负载,提高读写效率降低响应时间
  • 成本:数据一致性成本,代码维护成本,运维成本

image-20230131152202214

  • 缓存的使用:主动更新下的修改数据库信息后再删除缓存内容

3、缓存穿透

缓存穿透是指客户端请求的数据在缓存中和数据库中都不存在,这样缓存永远不会生效,这些请求都会达到数据库

常见的解决方案:

  • 缓存空对象

    • 优点:实现简单,维护方便
    • 缺点
      • 额外的内存消耗
      • 可能造成短期的不一致image-20230131162849236
  • 布隆过滤

    • 优点:内存占用较少,没有多余的key
    • 缺点:
      • 实现复杂
      • 存在误判的可能

image-20230131163449717

4、缓存雪崩

缓存雪崩是指同一时段大量的缓存key同时失效或redis服务宕机,导致大量请求到达数据库,带来巨大压力

解决方案:

  • 给不同的key的TTL添加随机值
  • 利用Redis集群提高服务的可用性
  • 给缓存业务添加降级限流策略
  • 给业务添加多级缓存

5、缓存击穿

​ 缓存击穿问题也叫做热点key问题,就是一个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击

解决方案:

  • 互斥锁

  • 逻辑过期

image-20230131174445701