Skip to content

ayaka-icu/MyRedis

Repository files navigation

Redis 相关命令

关于安装

Redis安装

Redis是基于C语言编写的,因此首先需要安装Redis所需要的gcc依赖:

yum install -y gcc tcl

解压缩:

压缩在 /usr/local/src

tar -xzf redis-6.2.6.tar.gz

解压后:

进入redis目录:

cd redis-6.2.6

运行编译命令:

make && make install

如果没有出错,应该就安装成功了。

默认的安装路径是在 /usr/local/bin目录下:

该目录以及默认配置到环境变量,因此可以在任意目录下运行这些命令。其中:

  • redis-cli:是redis提供的命令行客户端
  • redis-server:是redis的服务端启动脚本
  • redis-sentinel:是redis的哨兵启动脚本

如果没有任意目录下启动

修改profile文件:

vi /etc/profile 

在最后行添加:

export PATH=$PATH:/usr/local/bin

注意:/usr/local/bin表示的是redis-server 命令存在的目录路径

重新加载/etc/profile

source /etc/profile  

在任意目录执行命令

redis-server 

Redis配置文件

我们先将这个配置文件备份一份:

cp redis.conf redis.conf.bck

然后修改redis.conf文件中的一些配置:

# 允许访问的地址,默认是127.0.0.1,会导致只能在本地访问。修改为0.0.0.0则可以在任意IP访问,生产环境不要设置为0.0.0.0
bind 0.0.0.0
# 守护进程,修改为yes后即可后台运行
daemonize yes 
# 密码,设置后访问Redis必须输入密码
requirepass 123321

Redis的其它常见配置:

# 监听的端口
port 6379
# 工作目录,默认是当前目录,也就是运行redis-server时的命令,日志、持久化等文件会保存在这个目录
dir .
# 数据库数量,设置为1,代表只使用1个库,默认有16个库,编号0~15
databases 1
# 设置redis能够使用的最大内存
maxmemory 512mb
# 日志文件,默认为空,不记录日志,可以指定日志文件名
logfile "redis.log"

启动Redis:

# 进入redis安装目录 
cd /usr/local/src/redis-6.2.6
# 启动
redis-server redis.conf

停止服务:

# 利用redis-cli来执行 shutdown 命令,即可停止 Redis 服务,
# 因为之前配置了密码,因此需要通过 -u 来指定密码
redis-cli -u 123321 shutdown

设置自启动

我们也可以通过配置来实现开机自启。

首先,新建一个系统服务文件:

vi /etc/systemd/system/redis.service

内容如下:

[Unit]
Description=redis-server
After=network.target

[Service]
Type=forking
ExecStart=/usr/local/bin/redis-server /usr/local/src/redis-6.2.6/redis.conf
PrivateTmp=true

[Install]
WantedBy=multi-user.target

然后重载系统服务:

systemctl daemon-reload

现在,我们可以用下面这组命令来操作redis了:

# 启动
systemctl start redis
# 停止
systemctl stop redis
# 重启
systemctl restart redis
# 查看状态
systemctl status redis

执行下面的命令,可以让redis开机自启:

systemctl enable redis

进入客户端

Redis安装完成后就自带了命令行客户端:redis-cli,使用方式如下:

redis-cli [options] [commonds]

其中常见的options有:

  • -h 127.0.0.1:指定要连接的redis节点的IP地址,默认是127.0.0.1
  • -p 6379:指定要连接的redis节点的端口,默认是6379
  • -a 123321:指定redis的访问密码

其中的commonds就是Redis的操作命令,例如:

  • ping:与redis服务端做心跳测试,服务端正常会返回pong

不指定commond时,会进入redis-cli的交互控制台:

还可以这样:

[root@gangajiang ~]# redis-cli -h 127.0.0.1 -p 6379
127.0.0.1:6379> auth 密码
OK
127.0.0.1:6379> ping
PONG

可视化工具

resp for windows

Windows版本


通用命令

Redis通用命令

help + 关键字 :查看该关键字用法

通用指令是部分数据类型的,都可以使用的指令,常见的有:

  • KEYS:查看符合模板的所有key

    • *全部 / 占多个字符
    • ?占一个字符
  • DEL:删除一个指定的key

  • EXISTS:判断key是否存在

  • EXPIRE:给一个key设置有效期,有效期到期时该key会被自动删除

  • TTL:查看一个KEY的剩余有效期

  • RENAME: 对一个key重命名


String类型的常见命令

String类型,也就是字符串类型,是Redis中最简单的存储类型。 其value是字符串,不过根据字符串的格式不同,又可以分为3类:

  • string:普通字符串
  • int:整数类型,可以做自增、自减操作
  • float:浮点类型,可以做自增、自减操作

String的常见命令有:

  • SET:添加或者修改已经存在的一个String类型的键值对
  • GET:根据key获取String类型的value
  • MSET:批量添加多个String类型的键值对
  • MGET:根据多个key获取多个String类型的value
  • INCR:让一个整型的key自增1
  • INCRBY: 让一个整型的key自增并指定步长,例如:incrby num 2 让num值自增2
  • INCRBYFLOAT:让一个浮点类型的数字自增并指定步长
  • SETNX:添加一个String类型的键值对,前提是这个key不存在,否则不执行
  • SETEX:添加一个String类型的键值对,并且指定有效期

key的结构

Redis的key允许有多个单词形成层级结构,多个单词之间用':'隔开,格式如下:

项目名:业务名:类型:id

这个格式并非固定,也可以根据自己的需求来删除或添加词条。

例如我们的项目名称叫 heima,有user和product两种不同类型的数据,我们可以这样定义key:

◆ user相关的key:heima:user:1

◆ product相关的key:heima:product:1

如果Value是一个Java对象,例如一个User对象,则可以将对象序列化为JSON字符串后存储:

KEY VALUE
ganga:user:1 {"id":1, "name": "Jack", "age": 21}
ganga:product:1 {"id":1, "name": "小米11", "price": 4999}
set ganga:user:1 '{"id":1, "name": "Jack", "age": 21}'
set ganga:product:1 '{"id":1, "name": "小米11", "price": 4999}'

KEY VALUE
ganga:user:2 {"id":2, "name":"Rose", "age": 18}
ganga:product:2 {"id":2, "name":"荣耀6", "price": 2999}
set ganga:user:2 '{"id":2, "name":"Rose", "age": 18}'
set ganga:product:2 '{"id":2, "name":"荣耀6", "price": 2999}'

Hash类型

Hash类型,也叫散列,其value是一个无序字典,类似于Java中的HashMap结构。

String结构是将对象序列化为JSON字符串后存储,当需要修改对象某个字段时很不方便:

Hash结构可以将对象中的每个字段独立存储,可以针对单个字段做CRUD:

Hash的常见命令有:

  • HSET key field value:添加或者修改hash类型key的field的值
  • HGET key field:获取一个hash类型key的field的值
  • HMSET:批量添加多个hash类型key的field的值
  • HMGET:批量获取多个hash类型key的field的值
  • HGETALL:获取一个hash类型的key中的所有的field和value
  • HKEYS:获取一个hash类型的key中的所有的field
  • HVALS:获取一个hash类型的key中的所有的value
  • HINCRBY:让一个hash类型key的字段值自增并指定步长
  • HSETNX:添加一个hash类型的key的field值,前提是这个field不存在,否则不执行

List类型

Redis中的List类型与Java中的LinkedList类似,可以看做是一个双向链表结构。

既可以支持正向检索和也可以支持反 向检索。

特征也与LinkedList类似:

  • 有序

  • 元素可以重复

  • 插入和删除快

  • 查询速度一般

常用来存储一个有序数据,例如:朋友圈点赞列表,评论列表等。

List类型的常见命令

List的常见命令有:

  • LPUSH key element ... :向列表左侧插入一个或多个元素
  • LPOP key:移除并返回列表左侧的第一个元素,没有则返回nil // 第一个元素是一
  • RPUSH key element ... :向列表右侧插入一个或多个元素
  • RPOP key:移除并返回列表右侧的第一个元素
  • LRANGE key star end:返回一段角标范围内的所有元素 l // 第一个范围是零
  • BLPOP和BRPOP:与LPOP和RPOP类似,只不过在没有元素时等待指定时间,而不是直接返回nil

Set类型

Redis的Set结构与Java中的HashSet类似,可以看做是一个value为null的HashMap。

因为也是一个hash表,因此具 备与HashSet类似的特征:

  • 无序
  • 元素不可重复
  • 查找快
  • 支持交集、并集、差集等功能

Set类型的常见命令

String的常见命令有:

  • SADD key member ... :向set中添加一个或多个元素
  • SREM key member ... : 移除set中的指定元素
  • SCARD key: 返回set中元素的个数
  • SISMEMBER key member:判断一个元素是否存在于set中
  • SMEMBERS:获取set中的所有元素
  • SINTER key1 key2 ... :求key1与key2的交集
  • SDIFF key1 key2 ... :求key1与key2的差集
  • SUNION key1 key2 ..:求key1和key2的并集


SortedSet类型

Redis的SortedSet是一个可排序的set集合,与Java中的TreeSet有些类似,但底层数据结构却差别很大。SortedSet 中的每一个元素都带有一个score属性,可以基于score属性对元素排序,底层的实现是一个跳表(SkipList)加 hash表

  • SortedSet具备下列特性:*
  • 可排序
  • 元素不重复
  • 查询速度快

因为SortedSet的可排序特性,经常被用来实现排行榜这样的功能。


SortedSet类型的常见命令

SortedSet的常见命令有:

  • ZADD key score member:添加一个或多个元素到sorted set ,如果已经存在则更新其score值
  • ZREM key member:删除sorted set中的一个指定元素
  • ZSCORE key member : 获取sorted set中的指定元素的score值
  • ZRANK key member:获取sorted set 中的指定元素的排名
  • ZCARD key:获取sorted set中的元素个数
  • ZCOUNT key min max:统计score值在给定范围内的所有元素的个数
  • ZINCRBY key increment member:让sorted set中的指定元素自增,步长为指定的increment值
  • ZRANGE key min max:按照score排序后,获取指定排名范围内的元素
  • ZRANGEBYSCORE key min max:按照score排序后,获取指定score范围内的元素
  • ZDIFF、ZINTER、ZUNION:求差集、交集、并集 注意:所有的排名默认都是升序,如果要降序则在命令的Z后面添加REV即可

Redis 实现短信登陆功能

pom.xml 依赖文件

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-pool2</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <scope>runtime</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <optional>true</optional>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>com.baomidou</groupId>
        <artifactId>mybatis-plus-boot-starter</artifactId>
        <version>3.4.3</version>
    </dependency>

    <!--hutool-->
    <dependency>
        <groupId>cn.hutool</groupId>
        <artifactId>hutool-all</artifactId>
        <version>5.8.5</version>
    </dependency>
</dependencies>

图解

image-20221008110140875

image-20221008110331140

image-20221008110903511

UserService 实现

package com.hmdp.service.impl;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.RandomUtil;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.dto.LoginFormDTO;
import com.hmdp.dto.Result;
import com.hmdp.dto.UserDTO;
import com.hmdp.entity.User;
import com.hmdp.mapper.UserMapper;
import com.hmdp.service.IUserService;
import com.hmdp.utils.RedisConstants;
import com.hmdp.utils.RegexUtils;
import com.hmdp.utils.SystemConstants;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import javax.servlet.http.HttpSession;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;


@Service
public class UserServiceImpl extends ServiceImpl<UserMapper, User> implements IUserService {


    /**
     * 注入RedisTemplate
     */
    @Resource  //Resource 注入时 对象名不能瞎起
    //@Autowired
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 发送手机验证码 redis
     */
    @Override
    public Result sendCode(String phone, HttpSession session) {
        //验证手机号格式是否正确
        boolean phoneInvalid = RegexUtils.isPhoneInvalid(phone);
        if (RegexUtils.isPhoneInvalid(phone)) {
            return Result.fail("手机号格式错误!");
        }

        //生成验证码
        String code = RandomUtil.randomNumbers(6);

        //将验证码发送在Redis当中
        stringRedisTemplate.opsForValue().set(
                RedisConstants.LOGIN_CODE_KEY + phone,
                code,
                RedisConstants.LOGIN_CODE_TTL, // 5分钟
                TimeUnit.MINUTES);

        //发送验证码  模拟
        log.debug("[模拟]发送验证码:" + code);
        //返回结果 ok
        return Result.ok();
    }

    /**
     * 登录功能 redis
     */
    @Override
    public Result login(LoginFormDTO loginForm, HttpSession session) {
        //验证手机号是否正确
        String phone = loginForm.getPhone();
        if (RegexUtils.isPhoneInvalid(phone)) {
            return Result.fail("手机号格式错误!");
        }
        //用户输入的验证码
        String code = loginForm.getCode();

        //取出redis中的验证码
        String codeRedis = stringRedisTemplate.opsForValue().get(RedisConstants.LOGIN_CODE_KEY + phone);

        //与redis存储的验证码进行比对
        if (!Objects.equals(codeRedis, code)) {
            return Result.fail("验证码错误!");
        }
        //当前手机号是否存在用户
        User user = query().eq("phone", phone).one();
        //不存在:通过手机号创建新用户  登录用户 保持到session当中
        if (ObjectUtils.isEmpty(user)) {
            user = createUserWithPhone(phone);
        }
        //存在:  登录用户 保持到redis当中
        //不管用户是否存在都要 登录用户 保持到reids当中

        //转成 UserDTO 对象 【增加安全】
        UserDTO userDTO = BeanUtil.copyProperties(user, UserDTO.class);

        //将 UserDTO 对象 保存在redis 使用hash结构
        // 键:token
        String token = RedisConstants.LOGIN_USER_KEY + UUID.randomUUID().toString(true);
        // 值:用户信息
        Map<String, String> userMap = new HashMap<>();
        userMap.put(UserDTO.USER_DTO_ID, userDTO.getId().toString());
        userMap.put(UserDTO.USER_DTO_NICKNAME, userDTO.getNickName());
        userMap.put(UserDTO.USER_DTO_ICON, userDTO.getIcon());
        //存储
        stringRedisTemplate.opsForHash().putAll(token, userMap);
        //设置有效时间
        stringRedisTemplate.expire(token, RedisConstants.LOGIN_USER_TTL, TimeUnit.MINUTES);

        //返回给前端用户数据 token
        return Result.ok(token);
    }

    /**
     * 通过手机号创建用户
     */
    public User createUserWithPhone(String phone) {
        //生成一个随机用户名
        String userName = SystemConstants.USER_NICK_NAME_PREFIX + RandomUtil.randomString(8);
        User user = new User();
        user.setPhone(phone);
        user.setNickName(userName);
        save(user);
        return user;
    }
}

拦截器的实现

package com.hmdp.config;

import com.hmdp.interceptor.LoginInterceptor;
import com.hmdp.interceptor.RefreshTokenInterceptor;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

import javax.annotation.Resource;

@Configuration
public class MvcConfig implements WebMvcConfigurer {

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 关于拦截器的顺序问题:
     *      当添加多个拦截器的时候,第一个添加的默认的拦截器顺序是0,后面添加的依次递增
     *          顺序的两种解决方案:
     *              1. 先过滤的过滤器 先添加 即:按过滤的顺序依次 registry.addInterceptor()
     *              2. 使用 registry.addInterceptor().order() order方法手动设置过滤顺序: 0 > 1 > 2 > ...
     * @param registry
     */
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(new LoginInterceptor())
                .excludePathPatterns(
                        "/shop/**",
                        "/voucher/**",
                        "/shop-type/**",
                        "/upload/**",
                        "/blog/hot",
                        "/user/code",
                        "/user/login"
                ).order(1);
        registry.addInterceptor(new RefreshTokenInterceptor(stringRedisTemplate)).order(0);
    }


}


Redis 缓存相关

什么是缓存

image-20221008111515914

缓存作用和成本

image-20221008111603068

缓存更新策略

image-20221008112021404

主动更新策略

image-20221008112131756

Cache Adide Pattern

image-20221008112148352

image-20221008112331473

缓存更新策略最佳方案

image-20221008112431914

缓存穿透

什么是缓存穿透

image-20221008112509532

缓存空对象解决方案

image-20221008112632643

缓存穿透产生的原因是什么?

  • 用户请求的数据在缓存中和数据库中都不存在,不断发起这样的请 求,给数据库带来巨大压力 缓存穿透的解决方案有哪些?

  • 被动防御

    • 缓存null值
    • 布隆过滤
  • 主动防御

    • 增强id的复杂度,避免被猜测id规律

    • 做好数据的基础格式校验

    • 加强用户权限校验

    • 做好热点参数的限流


缓存雪崩

image-20221008112934964

缓存击穿

什么是缓存击穿

image-20221008113030208

缓存击穿解决方案

image-20221008113325878

两种方案的优缺点

image-20221008113352925

**一致还是性能,这是个值得思考的问题~ **

案例解决方案

image-20221008113428273

image-20221008113440575

缓存案例

@Override
public Result queryById(Long id) {

    //使用缓存
    //Result result = queryByIdCache(id);

    //使用缓存解决 [缓存穿透] 问题
    //Result result = queryByIdCachePenetration(id);

    //使用缓存解决 [缓存击穿] 问题 方案一: [互斥锁方案]
    //Result result = queryByIdCacheMutex(id);

    //使用缓存解决 [缓存击穿] 问题 方案一: [逻辑过期方案]
    //Result result = queryByIdCacheLogicalExpire(id);

    //不存在,返回404
    return result;
}

缓存穿透 ---缓存空对象

/**
 * 解决缓存穿透问题
 * 使用 缓存空对象 解决方案
 *
 * @param id
 * @return
 */
public Result queryByIdCachePenetration(Long id) {

    //1. 获取商品id的key
    String key = RedisConstants.CACHE_SHOP_KEY + id;
    //2. 通过key向redis缓存中查询数据
    String shopStr = stringRedisTemplate.opsForValue().get(key);
    //3. 如果命中,返回查询结果
    if (ObjectUtil.isNotEmpty(shopStr)) {
        Shop shop = JSONUtil.toBean(shopStr, Shop.class); //转成对象
        return Result.ok(shop);
    }

    //3.1 判断是否命中的是空值
    if (shopStr != null) {
        return null;
    }

    //4. 如果未命中,查询数据库
    //5. 通过id查询数据库
    Shop shopById = this.getById(id);
    //6. 存在,写入redis缓存,并返回查询结果
    if (ObjectUtil.isNotEmpty(shopById)) {
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shopById));
        return Result.ok(shopById);
    }

    //7. 不存在,向redis写入空对象
    stringRedisTemplate.opsForValue().set(
        RedisConstants.CACHE_SHOP_KEY + id, "", RedisConstants.CACHE_SHOP_NOLL_TTL, TimeUnit.SECONDS);

    //8. 向前端返回信息
    return Result.fail("查询的商品不存在");
}

缓存击穿 ---互斥锁方案

/**
 * 解决缓存击穿问题
 * 使用 互斥锁解决方案
 *
 * @param id
 * @return
 */
public Result queryByIdCacheMutex(Long id) {

    //1. 获取商品id的key
    String key = RedisConstants.CACHE_SHOP_KEY + id;
    //2. 通过key向redis缓存中查询数据
    String shopStr = stringRedisTemplate.opsForValue().get(key);
    //3. 如果命中,返回查询结果
    if (ObjectUtil.isNotEmpty(shopStr)) {
        Shop shop = JSONUtil.toBean(shopStr, Shop.class); //转成对象
        return Result.ok(shop);
    }

    //3.1 判断是否命中的是空值
    if (shopStr != null) {
        return null;
    }

    //4. 如果未命中,进行缓存重建
    try {

        //4.1 获取互斥锁
        boolean lock = setShopMutex(id);

        //4.2 判断互斥锁是否获取成功 不成功,等待 + 递归
        if (!lock) {
            Thread.sleep(500); //等到500毫秒
            queryByIdCacheMutex(id); //递归
        }

        //4.3 获取互斥锁成功 通过id查询数据库
        Shop shopById = this.getById(id);

        //4.4 存在,写入redis缓存,并返回查询结果
        if (ObjectUtil.isNotEmpty(shopById)) {
            stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shopById));
            return Result.ok(shopById);
        }

        //4.5 不存在,向redis写入空对象
        stringRedisTemplate.opsForValue()
            .set
            (RedisConstants.CACHE_SHOP_KEY + id, "", RedisConstants.CACHE_SHOP_NOLL_TTL, TimeUnit.SECONDS);

    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    } finally {

        //4.6 释放互斥锁
        delShopMutex(id);
    }

    //5. 向前端返回信息
    return Result.fail("查询的商品不存在");
}

互斥锁的简单构建

//获取互斥锁
public boolean setShopMutex(Long id) {
    //使用 .setIfAbsent()方法
    Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent(RedisConstants.LOCK_SHOP_KEY + id, "lock", RedisConstants.LOCK_SHOP_TTL, TimeUnit.SECONDS);
    return BooleanUtil.isTrue(lock);
}

//删除互斥锁
public void delShopMutex(Long id) {
    stringRedisTemplate.delete(RedisConstants.LOCK_SHOP_KEY + id);
}

缓存击穿 ---逻辑过期方案

数据的变动

import lombok.Data;
import java.time.LocalDateTime;

/**
 * 存储 数据 + 逻辑时间
 */
@Data
public class RedisData {
    private LocalDateTime expireTime;
    private Object data;
}

缓存重建方法

//缓存重建
public void saveShopRedis(Long id, Long expireSeconds) {

    //模拟重建时间
    System.out.println("休眠中");
    try {
        Thread.sleep(200);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    System.out.println("休眠结束");

    //查询数据库数据
    Shop shop = this.getById(id);
    //封装 数据+逻辑时间
    RedisData redisData = new RedisData();
    redisData.setData(shop); //数据
    redisData.setExpireTime(LocalDateTime.now().plusSeconds(expireSeconds)); //时间
    //序列化
    String json = JSONUtil.toJsonStr(redisData);
    //写入redis缓存当中
    stringRedisTemplate.opsForValue().set(RedisConstants.CACHE_SHOP_KEY + id, json);
}

业务逻辑

/**
 * 解决缓存击穿问题
 * 使用 逻辑过期解决方案
 *
 * @param id
 * @return
 */
public Result queryByIdCacheLogicalExpire(Long id) {

    //1. 查询缓存是否命中
    String jsonData = stringRedisTemplate.opsForValue().get(RedisConstants.CACHE_SHOP_KEY + id);
    //2. 未命中直接返回前端信息
    if (ObjectUtil.isEmpty(jsonData)) {
        return Result.fail("查询的商品不存在!");
    }
    //3. 命中
    //4. 反序列化数据
    RedisData redisData = JSONUtil.toBean(jsonData, RedisData.class);
    JSONObject data = (JSONObject) redisData.getData(); //转成 JSONObject对象
    Shop shop = JSONUtil.toBean(data, Shop.class); //转成 Shop 对象
    LocalDateTime expireTime = redisData.getExpireTime(); //获取逻辑时间
    //5. 判断逻辑时间是否过去
    if (expireTime.isAfter(LocalDateTime.now())) { //expireTime.isAfter(LocalDateTime.now()) --> true:未过期  false:过期
        //6. 逻辑时间未过期 直接向前端返回数据
        return Result.ok(shop);
    }
    //7. 逻辑时间过期 尝试获取互斥锁
    boolean lock = setShopMutex(id);
    //8. 获取互斥成功
    if (lock) {
        //9. 开启新的线程
        CACHE_REBUILD_EXECUTOR.submit(() -> {
            try {
                //10. 重建缓存
                saveShopRedis(id, 10L);
            } finally {
                //11. 释放锁
                delShopMutex(id);
            }
        });
    }

    //12. 返回旧数据
    return Result.ok(shop);
}

缓存工具封装

**U基于StringRedisTemplate封装一个缓存工具类,满足下列需求: **

✓ 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间

✓ 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存 击穿问题

✓ 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题

✓ 方法4:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题

package com.hmdp.utils;


import cn.hutool.core.util.BooleanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;

/**
 * 封装缓存工具类
 */
@Slf4j
@Component
public class CacheClient {

    //注入RedisTemplate
    @Resource
    private StringRedisTemplate stringRedisTemplate;


    //自定义线程池
    private static final ExecutorService CACHE_REBUILD_EXECUTOR = Executors.newFixedThreadPool(10);



    //✓ 方法1:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置TTL过期时间
    public <ID> void setCache(String prefix, ID id, Object value, Long time, TimeUnit timeUnit){
        //序列化 并写入redis缓存
        String key = prefix + id;
        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(value),time,timeUnit);
    }


    //✓ 方法2:将任意Java对象序列化为json并存储在string类型的key中,并且可以设置逻辑过期时间,用于处理缓存 击穿问题
    public <ID> void setWithLogicalExpire(String prefix, ID id, Object value, Long timeSeconds){
        String key = prefix + id;
        //封装成 RedisData类型
        RedisData redisData = new RedisData();
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeSeconds));
        redisData.setData(value);
        //序列化
        String json = JSONUtil.toJsonStr(redisData);
        //写入redis缓存
        stringRedisTemplate.opsForValue().set(key, json);
    }
    // ✓ 方法2的另一种写法,+缓存重建
    public <R,ID> R newCache(String key, ID id, Long time, TimeUnit timeUnit, Function<ID, R> sqlFunction){
        //获取数据库数据
        R rs = sqlFunction.apply(id);
        //是否存在数据 不存在
        if (ObjectUtil.isEmpty(rs)){
            return null;
        }
        //序列化 并 封装
        RedisData redisData = new RedisData();
        redisData.setExpireTime(LocalDateTime.now().plusSeconds(timeUnit.toSeconds(time)));
        redisData.setData(rs);
        String value = JSONUtil.toJsonStr(redisData);
        //写入缓存
        stringRedisTemplate.opsForValue().set(key,value);

        //将数据库中的数据返回
        return rs;
    }


    //======================================================================================


    // 缓存击穿
    //✓ 方法3:根据指定的key查询缓存,并反序列化为指定类型,利用缓存空值的方式解决缓存穿透问题
    public <R,ID> R queryWithPassThrough(
            String keyPrefix,
            ID id, Class<R> type,
            Function<ID,R> sqlFunction,
            Long time, TimeUnit timeUnit,
            Long timeNull, TimeUnit timeUnitNull){

        String key = keyPrefix + id;
        //从数据库中查询 key 数据
        String jsonStr = stringRedisTemplate.opsForValue().get(key);
        //判断是否命中 命中
        if (ObjectUtil.isNotEmpty(jsonStr)){
            //反序列化对象 并返回调用者
            return JSONUtil.toBean(jsonStr, type);
        }

        //未命中 先判断是否为空值
        if (jsonStr != null){
            //返回错误信息
            return null;
        }

        //数据库调用 日志
        log.info("数据库被调用...");

        //未命中 并且 没对象对象 重建缓存
        R sqlData = sqlFunction.apply(id);
        //判断是否存在
        if (ObjectUtil.isNotEmpty(sqlData)){
            //存在 序列化 并且写入缓存
            this.setCache(keyPrefix,id,sqlData,time,timeUnit);
            //返回调用者数据
            return sqlData;
        }
        //不存在 缓存空对象
        stringRedisTemplate.opsForValue().set(keyPrefix+id,"",timeNull,timeUnitNull);

        return null;
    }


    //缓存击穿

    //互斥锁方案
    //✓ 方法4.1:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
    public <R,ID> R queryWithMutex(
            String keyPrefix, String lockPrefix,
            Long keyTime, TimeUnit keyTimeUnit,
            Long nullTime, TimeUnit nullTimeUnit,
            Long lockTime, TimeUnit lockTimeUnit,
            ID id,
            Class<R> type, //返回值类型
            //函数式 sql具体实现
            Function<ID,R> sqlFunction){

        String key = keyPrefix + id;
        //通过缓存进行查询
        String jsonStr = stringRedisTemplate.opsForValue().get(key);
        //命中 反序列化并返回调用者
        if(ObjectUtil.isNotEmpty(jsonStr)){
            return JSONUtil.toBean(jsonStr,type);
        }
        //未命中 判断是否为空值
        if (jsonStr != null){
            //命中空对象 返回null
            return null;
        }

        //未命中 且 不为空对象 --> 缓存中无key-value
        //重建缓存
        //获取互斥锁
        try {
            boolean isLock = getCacheLock(lockPrefix, id, lockTime, lockTimeUnit);
            if (!isLock){

                //失败 线程等待
                Thread.sleep(600);
                //回调 递归
                this.queryWithMutex(
                        keyPrefix,lockPrefix,
                        keyTime,keyTimeUnit,
                        nullTime,nullTimeUnit,
                        lockTime,lockTimeUnit,
                        id,type,sqlFunction);
            }


            String str = stringRedisTemplate.opsForValue().get(key);
            if (ObjectUtil.isNotEmpty(str)){
                return JSONUtil.toBean(str,type);
            }

            //获取互斥锁成功 查询数据库数据
            R rd = sqlFunction.apply(id);
            if (ObjectUtil.isNotEmpty(rd)){
                //重建缓存
                //存在: 调用 this.setCache
                this.setCache(keyPrefix,id,rd,keyTime,keyTimeUnit);
                return rd;
            }

            //不存在: 写入缓存空对象
            stringRedisTemplate.opsForValue().set(key,"",nullTime,nullTimeUnit);

        } catch (Exception e) {
            throw new RuntimeException(e);
        } finally {
            //释放锁
            this.delCacheLock(lockPrefix,id);
        }

        return null;
    }


    //✓ 方法4.2:根据指定的key查询缓存,并反序列化为指定类型,需要利用逻辑过期解决缓存击穿问题
    public <R,ID> R queryWithLogicalExpire(
            String keyPrefix, String lockPrefix,
            Long time, TimeUnit timeUnit,
            Long lockTime, TimeUnit lockTimeUnit,
            ID id, R r, Class<R> type,
            Function<ID,R> sqlFunction){


        String key = keyPrefix + id;
        //通过缓存查询
        String jsonStr = stringRedisTemplate.opsForValue().get(key);
        //判断是否命中 未命中
        if (ObjectUtil.isEmpty(jsonStr)){
            //直接向前端返回null
        }
        //命中
        //反序列化 并 拆分 RedisData
        RedisData redisData = JSONUtil.toBean(jsonStr, RedisData.class);
        LocalDateTime expireTime = redisData.getExpireTime();
        JSONObject jsonObject = (JSONObject) redisData.getData();
        R rd = JSONUtil.toBean(jsonObject, type);

        //判断逻辑时间是否过期
        if (expireTime.isAfter(LocalDateTime.now())){
            //未过期 直接返回数据
            return rd;
        }
        //逻辑时间过期 获取互斥锁
        boolean cacheLock = this.getCacheLock(lockPrefix, id, lockTime, lockTimeUnit);
        //获取互斥锁成功
        if (cacheLock){
            //获取新线程
            CACHE_REBUILD_EXECUTOR.submit(()->{
                try {
                    //重建缓存
                    this.newCache(key,id,time,timeUnit,sqlFunction);
                } finally {
                    //释放锁
                    this.delCacheLock(lockPrefix,id);
                }
            });
        }

        //先将旧数据返回给调用者  失去的一致性 但提高了性能!
        return rd;
    }


    //另外的 : 互斥锁 Redis简单的实现

    //获取互斥锁
    public <ID> boolean getCacheLock(String prefix, ID id, Long time, TimeUnit timeUnit){
        String key = prefix + id;
        //创建Mutex
        Boolean ifs = stringRedisTemplate.opsForValue().setIfAbsent(key, RedisConstants.CACHE_LOCK_VALUE, time, timeUnit );
        //log.info("互斥锁已 [创建] ...");
        return BooleanUtil.isTrue(ifs); //为了防止自动拆箱 使用了工具类
    }

    //释放锁
    public <ID> void delCacheLock(String prefix, ID id){
        String key = prefix + id;
        // 删除Mutex
        stringRedisTemplate.delete(key);
        //log.info("互斥锁已 [删除] ...");
    }



}

Redis生成全局ID方案

解决方案:

package com.hmdp.utils;

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;

@Component
public class RedisIdWorker {

    /**
     * 开始时间戳
     */
    public static final long BEGIN_TIMESTAMP = 1640995200L; //2022,1,1,0,0

    /**
     * 时间戳移动的位数
     */
    public static final int COUNT_BITS = 32;

    /**
     * StringRedisTemplate对象
     */
    public StringRedisTemplate stringRedisTemplate;
    public RedisIdWorker(StringRedisTemplate stringRedisTemplate) {
        this.stringRedisTemplate = stringRedisTemplate;
    }

    /**
     * 生成全局id
     * @param keyPrefix
     * @return
     */
    public long nextId(String keyPrefix) {

        //当前时间戳
        LocalDateTime now = LocalDateTime.now();
        long nowTimestamp = now.toEpochSecond(ZoneOffset.UTC);
        long timestamp = nowTimestamp - BEGIN_TIMESTAMP;

        //获取自增
        //1.获取当前日期
        String format = now.format(DateTimeFormatter.ofPattern("yyyy:MM:dd"));
        //2.获取自增
        long count = stringRedisTemplate.opsForValue().increment("icr:" + keyPrefix + format);

        //计算并返回
        return timestamp << COUNT_BITS | count;
    }


/*
public static void main(String[] args) {
        LocalDateTime time = LocalDateTime.of(2022, 1, 1, 0, 0);
        long times = time.toEpochSecond(ZoneOffset.UTC);
        System.out.println(times);
}
*/

}

测试代码

package com.hmdp;

import com.hmdp.utils.RedisIdWorker;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;

import javax.annotation.Resource;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

@SpringBootTest
@ContextConfiguration(classes = RedisIdWorkerApp.class)
class RedisIdWorkerTests {

    @Resource
    private RedisIdWorker redisIdWorker;

    public ExecutorService es= Executors.newFixedThreadPool(500);

    @Test
    void gitIdTest() throws InterruptedException {

        CountDownLatch countDownLatch = new CountDownLatch(300);

        //开始时间戳
        long startTimestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);

        Runnable task = ()->{
            //一个任务生成100个id
            for (int i = 0; i < 100; i++) {
                long id = redisIdWorker.nextId("order");
                System.out.println(id);
            }
            countDownLatch.countDown();
        };

        //300个任务
        for (int i = 0; i < 300; i++) {
            es.submit(task);
        }

        countDownLatch.await();
        long stopTimestamp = LocalDateTime.now().toEpochSecond(ZoneOffset.UTC);
        System.out.println(stopTimestamp - startTimestamp);
    }


}

Redis解决秒杀卷问题

秒杀卷库表设计

image-20221028150115925

tb_voucher 优惠卷的基本信息

image-20221028150847270

tb_seckill_voucher 特价/秒杀 优惠卷的拓展信息

image-20221028152907291

SQL

# tb_voucher
CREATE TABLE `tb_voucher` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `shop_id` bigint(20) unsigned DEFAULT NULL COMMENT '商铺id',
  `title` varchar(255) NOT NULL COMMENT '代金券标题',
  `sub_title` varchar(255) DEFAULT NULL COMMENT '副标题',
  `rules` varchar(1024) DEFAULT NULL COMMENT '使用规则',
  `pay_value` bigint(10) unsigned NOT NULL COMMENT '支付金额,单位是分。例如200代表2元',
  `actual_value` bigint(10) NOT NULL COMMENT '抵扣金额,单位是分。例如200代表2元',
  `type` tinyint(1) unsigned NOT NULL DEFAULT '0' COMMENT '0,普通券;1,秒杀券',
  `status` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '1,上架; 2,下架; 3,过期',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;

# tb_seckill_voucher
CREATE TABLE `tb_seckill_voucher` (
  `voucher_id` bigint(20) unsigned NOT NULL COMMENT '关联的优惠券的id',
  `stock` int(8) NOT NULL COMMENT '库存',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `begin_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '生效时间',
  `end_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '失效时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`voucher_id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT COMMENT='秒杀优惠券表,与优惠券是一对一关系';

添加秒杀卷

VoucherController

/**
 * 新增秒杀券
 * @param voucher 优惠券信息,包含秒杀信息
 * @return 优惠券id
 */
@PostMapping("seckill")
public Result addSeckillVoucher(@RequestBody Voucher voucher) {
    voucherService.addSeckillVoucher(voucher);
    return Result.ok(voucher.getId());
}

VoucherServiceImpl

//也就是说 秒杀卷 也包含的普通卷的创建 
//这里调用的seckillVoucherService业务
@Override
@Transactional
public void addSeckillVoucher(Voucher voucher) {
    // 保存优惠券
    save(voucher);
    // 保存秒杀信息
    SeckillVoucher seckillVoucher = new SeckillVoucher();
    seckillVoucher.setVoucherId(voucher.getId());
    seckillVoucher.setStock(voucher.getStock());
    seckillVoucher.setBeginTime(voucher.getBeginTime());
    seckillVoucher.setEndTime(voucher.getEndTime());
    seckillVoucherService.save(seckillVoucher);
}

也就是说 秒杀卷 也包含的普通卷的创建

POST	http://localhost:8081/voucher/seckill

{
    "shopId":1,
    "title":"100元代金券",
    "subTitle":"周一至周五均可使用",
    "rules":"全场通用\\n无需预约\n可无限叠加\\不兑现、不找零\\n仅限堂食",
    "payValue":8000,
    "actualValue":10000,
    "type":1,
    "stock":100,
    "beginTime":"2022-10-26T10:01:00",
    "endTime":"2022-10-31T23:01:00"
}

秒杀卷下单

库表设计:tb_voucher_order

image-20221028151740027

# tb_vouche_order
CREATE TABLE `tb_voucher_order` (
  `id` bigint(20) NOT NULL COMMENT '主键',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '下单的用户id',
  `voucher_id` bigint(20) unsigned NOT NULL COMMENT '购买的代金券id',
  `pay_type` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '支付方式 1:余额支付;2:支付宝;3:微信',
  `status` tinyint(1) unsigned NOT NULL DEFAULT '1' COMMENT '订单状态,1:未支付;2:已支付;3:已核销;4:已取消;5:退款中;6:已退款',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '下单时间',
  `pay_time` timestamp NULL DEFAULT NULL COMMENT '支付时间',
  `use_time` timestamp NULL DEFAULT NULL COMMENT '核销时间',
  `refund_time` timestamp NULL DEFAULT NULL COMMENT '退款时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;

实体类:Voucher

@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_voucher")
public class Voucher implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 主键
     */
    @TableId(value = "id", type = IdType.AUTO)
    private Long id;

    /**
     * 商铺id
     */
    private Long shopId;

    /**
     * 代金券标题
     */
    private String title;

    /**
     * 副标题
     */
    private String subTitle;

    /**
     * 使用规则
     */
    private String rules;

    /**
     * 支付金额
     */
    private Long payValue;

    /**
     * 抵扣金额
     */
    private Long actualValue;

    /**
     * 优惠券类型
     */
    private Integer type;

    /**
     * 优惠券类型
     */
    private Integer status;
    /**
     * 库存
     */
    @TableField(exist = false)
    private Integer stock;

    /**
     * 生效时间
     */
    @TableField(exist = false)
    private LocalDateTime beginTime;

    /**
     * 失效时间
     */
    @TableField(exist = false)
    private LocalDateTime endTime;

    /**
     * 创建时间
     */
    private LocalDateTime createTime;


    /**
     * 更新时间
     */
    private LocalDateTime updateTime;


}

下单流程:

image-20221029165002258

业务代码:

package com.hmdp.service.impl;

import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.Voucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;


/**
 * <p>
 *  服务实现类
 * </p>
 *
 */
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService iSeckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    /**
     * 实现订单秒杀业务
     * @param voucherId
     * @return
     */
    @Override
    @Transactional //添加上事务
    public Result seckillVoucher(Long voucherId) {
        //1.查询优惠卷信息
        SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
        Integer stock = voucher.getStock();
        System.out.println(voucher);
        //2.判断优惠卷是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
            //尚未开始,返回异常给前端
            return Result.fail("秒杀尚未开始!");
        }
        //3.判断优惠卷是否过期
        if (voucher.getEndTime().isBefore(LocalDateTime.now())){
            //秒杀已结束,返回前端异常信息
            return Result.fail("秒杀已结束!");
        }
        //4.判断优惠卷库存是否充足
        if (stock < 1){
            //秒杀卷库存不足,返回给前端异常信息
            return Result.fail("库存不足!");
        }
        //5.扣减库存
        boolean isOK = iSeckillVoucherService
                .update()
                .setSql("stock =stock - 1")
                .eq("voucher_id", voucherId)
                .update();
        if (!isOK){
            //秒杀失败,返回给前端异常信息
            return Result.fail("库存不足!");
        }
        //6.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //6.1.生成订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //6.2.设置用户id
        voucherOrder.setUserId(UserHolder.getUser().getId());
        //6.3.设置代金卷id
        voucherOrder.setVoucherId(voucherId);
        //6.4.当生产的订单id写入数据库
        this.save(voucherOrder);

        //7.返回订单ID
        return Result.ok(orderId);
    }

}

存在问题:

  1. 【超卖问题】 并发情况下,会出现线程安全问题 【超卖问题】
  2. 【一人一单】 一人可以下多单,应该是一人只能抢一个秒杀卷 【一人一单】

问题--超卖问题

image-20221029170512498

解决方案 : 加锁

锁的选择:

image-20221029170556067

乐观锁:

版本号法

image-20221029170746672

CAS法

image-20221029170826616

超卖这样的线程安全问题,解决方案有哪些?

  1. 悲观锁:添加同步锁,让线程串行执行

    • 优点:简单粗暴

    • 缺点:性能一般

  2. 乐观锁:不加锁,在更新时判断是否有其它线程在修改

    • 优点:性能好

    • 缺点:存在成功率低的问题

解决问题:

image-20221029173628445

package com.hmdp.service.impl;

import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.Voucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IVoucherService;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;


/**
 * <p>
 *  服务实现类
 * </p>
 *
 */
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService iSeckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    /**
     * 实现订单秒杀业务
     * @param voucherId
     * @return
     */
    @Override
    @Transactional //添加上事务
    public Result seckillVoucher(Long voucherId) {
        //1.查询优惠卷信息
        SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
        Integer stock = voucher.getStock();
        System.out.println(voucher);
        //2.判断优惠卷是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())){
            //尚未开始,返回异常给前端
            return Result.fail("秒杀尚未开始!");
        }
        //3.判断优惠卷是否过期
        if (voucher.getEndTime().isBefore(LocalDateTime.now())){
            //秒杀已结束,返回前端异常信息
            return Result.fail("秒杀已结束!");
        }
        //4.判断优惠卷库存是否充足
        if (stock < 1){
            //秒杀卷库存不足,返回给前端异常信息
            return Result.fail("库存不足!");
        }
        //5.扣减库存
        boolean isOK = iSeckillVoucherService
                .update()
                .setSql("stock =stock - 1")
                .eq("voucher_id", voucherId)
                //.eq("stock",stock) // CAS乐观锁
                .gt("stock",0) 
            	// CAS乐观锁改进  stock > 0 就可以执行下单业务
                .update();
        if (!isOK){
            //秒杀失败,返回给前端异常信息
            return Result.fail("库存不足!");
        }
        //6.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //6.1.生成订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //6.2.设置用户id
        voucherOrder.setUserId(UserHolder.getUser().getId());
        //6.3.设置代金卷id
        voucherOrder.setVoucherId(voucherId);
        //6.4.当生产的订单id写入数据库
        this.save(voucherOrder);

        //7.返回订单ID
        return Result.ok(orderId);
    }

}

问题--一人一单

image-20221029233136089

重点!!!!!!!

/**
几个重点:
1. 一人一单问题使用 悲观锁 还是 乐观锁?
     使用悲观锁
2. 使用悲观锁 synchronized 加载 方法上 还是 内部?
     内部,如果加载方法上,那整个订单业务都是串行,那刚刚解决的 超卖问题[乐观锁]也没意义了
3. synchronized 的锁对象是什么
     userId.toString().intern()
         而不是 userId.toString()
         Long的 toString() 底层是:
         return new String(buf, UTF16);
         .intern()方法是:返回字符串对象的规范表示。
4. 对于事务的添加 是 锁释放完了再提交 还是 提交完了再释放锁
     提交完了再释放锁
     具体操作:
         1.在方法上加上@Transactional
         2.在调用者 调用语句外加上 synchronized
            synchronized (userId.toString().intern()) {
                return queryOrderVoucherSave(voucherId);
            }
5. 事务失效问题
     spring的事务是 AOP动态代理的
         this.queryOrderVoucherSave(voucherId) //并非是代理对象
     解决方法
         这里使用 获取动态代理的方式 :
             //获取动态代理对象
             IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
             //通过代理对象调用
             proxy.queryOrderVoucherSave(voucherId);
         注意: 使用AopContext.currentProxy()
                 导入aspectjweaver依赖
                 开启 @EnableAspectJAutoProxy(exposeProxy = true) 暴露代理对象
                 
*/

业务的实现

@Override
public Result seckillVoucher(Long voucherId) {
    //1.查询优惠卷信息
    SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
    Integer stock = voucher.getStock();
    System.out.println(voucher);
    //2.判断优惠卷是否开始
    if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
        //尚未开始,返回异常给前端
        return Result.fail("秒杀尚未开始!");
    }
    //3.判断优惠卷是否过期
    if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
        //秒杀已结束,返回前端异常信息
        return Result.fail("秒杀已结束!");
    }
    //4.判断优惠卷库存是否充足
    if (stock < 1) {
        //秒杀卷库存不足,返回给前端异常信息
        return Result.fail("库存不足!");
    }


    //一人一单问题
    //获取用户id
    Long userId = UserHolder.getUser().getId();

    //缩小悲观锁范围
    synchronized (userId.toString().intern()) {
        //获取当前代理对象
        IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
        //通过代理对象调用 保证事务的正常
        return proxy.queryOrderVoucherSave(voucherId);
    }

}


@Transactional
public Result queryOrderVoucherSave(Long voucherId) {

    //获取用户id
    Long userId = UserHolder.getUser().getId();

    //判断订单表中是否已经存在
    int count = query()
            .eq("user_id", userId)
            .eq("voucher_id", voucherId)
            .count();
    if (count > 0) {
        //存在,返回前端信息
        return Result.fail("你已领取,每人只能领取一份!");
    }

    //以上都满足 施行扣减下单业务
    //5.扣减库存
    boolean isOK = iSeckillVoucherService
            .update()
            .setSql("stock =stock - 1")
            .eq("voucher_id", voucherId)
            //.eq("stock",stock) // CAS乐观锁
            // CAS乐观锁改进  stock > 0 就可以执行下单业务
            .gt("stock", 0)
            .update();
    if (!isOK) {
        //秒杀失败,返回给前端异常信息
        return Result.fail("库存不足!");
    }
    //6.创建订单
    VoucherOrder voucherOrder = new VoucherOrder();
    //6.1.生成订单id
    long orderId = redisIdWorker.nextId("order");
    voucherOrder.setId(orderId);
    //6.2.设置用户id
    voucherOrder.setUserId(userId);
    //6.3.设置代金卷id
    voucherOrder.setVoucherId(voucherId);
    //6.4.当生产的订单id写入数据库
    this.save(voucherOrder);

    //7.返回订单ID
    return Result.ok(orderId);


}

暴露代理对象

<dependency>
    <groupId>org.aspectj</groupId>
    <artifactId>aspectjweaver</artifactId>
    <version>1.9.9.1</version>
</dependency>
//暴露动态代理
@EnableAspectJAutoProxy(exposeProxy = true)
@MapperScan("com.hmdp.mapper")
@SpringBootApplication
public class SeckillVouchersApp {
    public static void main(String[] args) {
        SpringApplication.run(SeckillVouchersApp.class, args);
    }
}

以上依然存在问题!

image-20221030200606796

在集群 或 分布式系统下 , 每个JVM的锁监视器是独立的,就会出现并发安全问题

解决方案:使用 分布式锁

下面👇


分布式锁

分布式锁

什么是分布式锁

image-20221030200931213

分布式锁的实现

image-20221030201024610

基于Redis的分布式锁

image-20221030201149540

一个简单的实现:

/**
 * 基于Redis的分布式锁
 */
public interface ILock {

    /**
     * 尝试获取锁
     * @param timeoutSec 兜底过期时间
     * @return 获取是否成功 true成功
     */
    boolean tryLock(long timeoutSec);

    /**
     * 释放锁
     */
    void unLock();

}

impl

package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

public class LockImpl implements ILock{

    /**
     * redis
     */
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 锁名称
     */
    private String name;
    public LockImpl(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.name = name;
    }

    /**
     * 锁前缀
     */
    private static final String KEY_PREFIX = "lock:";

    /**
     * 锁的唯一标识
     */
    private String ID_PREFIX = UUID.randomUUID().toString(true);

    /**
     * 尝试获取锁
     * @param timeoutSec 兜底过期时间
     * @return 获取是否成功 true成功
     */
    @Override
    public boolean tryLock(long timeoutSec) {
        // 锁的唯一标识:这里用 UUID + 线程id
        String value = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁的 key
        String key = KEY_PREFIX + name;

        //尝试获取锁
        Boolean isLock = stringRedisTemplate
                .opsForValue()
                .setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);

        //返回结果
        //return Boolean.TRUE.equals(isLock); //或者 👇
        return BooleanUtil.isTrue(isLock);
    }


    /**
     * 释放锁
     */
    @Override
    public void unLock() {
        //释放锁
        Boolean delete = stringRedisTemplate.delete(KEY_PREFIX + name);
    }

}

业务:

package com.hmdp.service.impl;

import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.LockImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.springframework.aop.framework.AopContext;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;


/**
 * <p>
 * 服务实现类
 * </p>
 */
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService iSeckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 实现订单秒杀业务
     *
     * @param voucherId
     * @return
     */
    @Override
    public Result seckillVoucher(Long voucherId) {
        //1.查询优惠卷信息
        SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
        Integer stock = voucher.getStock();
        System.out.println(voucher);
        //2.判断优惠卷是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            //尚未开始,返回异常给前端
            return Result.fail("秒杀尚未开始!");
        }
        //3.判断优惠卷是否过期
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            //秒杀已结束,返回前端异常信息
            return Result.fail("秒杀已结束!");
        }
        //4.判断优惠卷库存是否充足
        if (stock < 1) {
            //秒杀卷库存不足,返回给前端异常信息
            return Result.fail("库存不足!");
        }


        //一人一单问题
        //获取用户id
        Long userId = UserHolder.getUser().getId();

        //缩小悲观锁范围
        /*synchronized (userId.toString().intern()) {
            //获取当前代理对象
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            //通过代理对象调用 保证事务的正常
            return proxy.queryOrderVoucherSave(voucherId);
        }*/

        //获取分布式锁对象
        LockImpl lock = new LockImpl(stringRedisTemplate,"order:" + userId);
        //尝试获取锁
        boolean isLock = lock.tryLock(1200);
        //判断是否获取成功
        if (!isLock){
            //获取锁失败
            return Result.fail("不能重复下单!");
        }
        //成功 执行业务
        try{
            //获取当前代理对象
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            //通过代理对象调用 保证事务的正常
            return proxy.queryOrderVoucherSave(voucherId);
        }finally{
            //确保锁的释放
            lock.unLock();
        }


    }


    /**
     *<br> 几个重点:
     *<br> 1. 一人一单问题使用 悲观锁 还是 乐观锁?
     *<br>      使用悲观锁
     *<br> 2. 使用悲观锁 synchronized 加载 方法上 还是 内部?
     *<br>      内部,如果加载方法上,那整个订单业务都是串行,那刚刚解决的 超卖问题[乐观锁]也没意义了
     *<br> 3. synchronized 的锁对象是什么
     *<br>      userId.toString().intern()
     *<br>          而不是 userId.toString()
     *<br>          Long的 toString() 底层是:
     *<br>          return new String(buf, UTF16);
     *<br>          .intern()方法是:返回字符串对象的规范表示。
     *<br> 4. 对于事务的添加 是 锁释放完了再提交 还是 提交完了再释放锁
     *<br>      提交完了再释放锁
     *<br>      具体操作:
     *<br>          1.在方法上加上@Transactional
     *<br>          2.在调用者 调用语句外加上 synchronized
     *<br>             synchronized (userId.toString().intern()) {
     *<br>                 return queryOrderVoucherSave(voucherId);
     *<br>             }
     *<br> 5. 事务失效问题
     *<br>      spring的事务是 AOP动态代理的
     *<br>          this.queryOrderVoucherSave(voucherId) //并非是代理对象
     *<br>      解决方法
     *<br>          这里使用 获取动态代理的方式 :
     *<br>              //获取动态代理对象
     *<br>              IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
     *<br>              //通过代理对象调用
     *<br>              proxy.queryOrderVoucherSave(voucherId);
     *<br>          注意: 使用AopContext.currentProxy()
     *<br>                  导入aspectjweaver依赖
     *<br>                  开启 @EnableAspectJAutoProxy(exposeProxy = true) 暴露代理对象
     *<br>
     *<br> @param voucherId
     *<br> @return
     */
    @Transactional
    public Result queryOrderVoucherSave(Long voucherId) {

        //获取用户id
        Long userId = UserHolder.getUser().getId();

        //判断订单表中是否已经存在
        int count = query()
                .eq("user_id", userId)
                .eq("voucher_id", voucherId)
                .count();
        if (count > 0) {
            //存在,返回前端信息
            return Result.fail("你已领取,每人只能领取一份!");
        }

        //以上都满足 施行扣减下单业务
        //5.扣减库存
        boolean isOK = iSeckillVoucherService
                .update()
                .setSql("stock =stock - 1")
                .eq("voucher_id", voucherId)
                //.eq("stock",stock) // CAS乐观锁
                // CAS乐观锁改进  stock > 0 就可以执行下单业务
                .gt("stock", 0)
                .update();
        if (!isOK) {
            //秒杀失败,返回给前端异常信息
            return Result.fail("库存不足!");
        }
        //6.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //6.1.生成订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //6.2.设置用户id
        voucherOrder.setUserId(userId);
        //6.3.设置代金卷id
        voucherOrder.setVoucherId(voucherId);
        //6.4.当生产的订单id写入数据库
        this.save(voucherOrder);

        //7.返回订单ID
        return Result.ok(orderId);

    }


}

存在的问题:误删问题

上面的简单实现

在正常情况下:

image-20221030213559181

极端情况下:

image-20221030214121368

解决方案:

image-20221030214149274

对 上面代码优化:

impl

package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.data.redis.core.StringRedisTemplate;

import java.util.concurrent.TimeUnit;

public class LockImpl implements ILock{

    /**
     * redis
     */
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 锁名称
     */
    private String name;
    public LockImpl(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.name = name;
    }

    /**
     * 锁前缀
     */
    private static final String KEY_PREFIX = "lock:";

    /**
     * 锁的唯一标识
     */
    private String ID_PREFIX = UUID.randomUUID().toString(true);

    /**
     * 尝试获取锁
     * @param timeoutSec 兜底过期时间
     * @return 获取是否成功 true成功
     */
    @Override
    public boolean tryLock(long timeoutSec) {
        // 锁的唯一标识:这里用 UUID + 线程id
        String value = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁的 key
        String key = KEY_PREFIX + name;

        //尝试获取锁
        Boolean isLock = stringRedisTemplate
                .opsForValue()
                .setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);

        //返回结果
        //return Boolean.TRUE.equals(isLock); //或者 👇
        return BooleanUtil.isTrue(isLock);
    }


    /**
     * 释放锁
     */
    @Override
    public void unLock() {

        //判断将要释放的锁 的 线程表示是否一致 解决分布式锁误删问题

        //锁的唯一标识:这里用 UUID + 线程id
        String value = ID_PREFIX + Thread.currentThread().getId();
        //获取锁的 key
        String key = KEY_PREFIX + name;
        //获取锁的标识
        String value2 = stringRedisTemplate.opsForValue().get(key);

        //判断将要释放的锁 的 线程表示是否一致 解决分布式锁误删问题
        if (value.equals(value2)){
            //释放锁
            stringRedisTemplate.delete(key);
        }
        //否则 不释放锁
    }

}

依然存在问题:原子性

删除锁时 判断锁的标识 和 释放锁 并发问题

极端情况下:

判断锁的标识 后 发生阻塞,超时释放了锁,此时其它线程获取锁,那么这个线程释放的锁 就是 其他线程的锁了

image-20221030221249309

改进方案:

  1. Redis的事务功能:麻烦不用
  2. Redis的Lua脚本

Redis的Lua脚本

Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令执行时的原子性。Lua是一种 编程语言,它的基本语法大家可以参考网站:https://www.runoob.com/lua/lua-tutorial.html 这里重点介绍Redis提供的调用函数,语法如下:

# 执行redis命令
redis.call('命令名称', 'key', '其它参数', ...)

例如,我们要执行set name jack,则脚本是这样:

# 执行 set name jack
redis.call('set', 'name', 'jack')

例如,我们要先执行set name Rose,再执行get name,则脚本如下:

 先执行 set name jack
redis.call('set', 'name', 'jack')
# 再执行 get name
local name = redis.call('get', 'name')
# 返回
return name

Redis的Lua脚本的执行

image-20221030222016825

释放锁的业务流程是这样的:

  1. 获取锁中的线程标示

  2. 判断是否与指定的标示(当前线程标示)一致

  3. 如果一致则释放锁(删除)

  4. 如果不一致则什么都不做 如果用Lua脚本来表示则是这样的:

    -- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
    -- 获取锁中的标示,判断是否与当前线程标示一致
    if (redis.call('GET', KEYS[1]) == ARGV[1]) then
    -- 一致,则删除锁
    return redis.call('DEL', KEYS[1])
    end
    -- 不一致,则直接返回
    return 0

对之前的impl进行优化:

resources/unlock.lua

-- 这里的 KEYS[1] 就是锁的key,这里的ARGV[1] 就是当前线程标示
-- 获取锁中的标示,判断是否与当前线程标示一致

if (redis.call('GET', KEYS[1]) == ARGV[1]) then
    -- 一致,则删除锁
    return redis.call('DEL', KEYS[1])
end

-- 不一致,则直接返回
return 0

impl

package com.hmdp.utils;

import cn.hutool.core.lang.UUID;
import cn.hutool.core.util.BooleanUtil;
import org.springframework.core.io.ClassPathResource;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;

import java.util.Collections;
import java.util.concurrent.TimeUnit;

public class LockImpl implements ILock{

    /**
     * redis
     */
    private StringRedisTemplate stringRedisTemplate;
    /**
     * 锁名称
     */
    private String name;
    public LockImpl(StringRedisTemplate stringRedisTemplate, String name) {
        this.stringRedisTemplate = stringRedisTemplate;
        this.name = name;
    }

    /**
     * 锁前缀
     */
    private static final String KEY_PREFIX = "lock:";

    /**
     * 锁的唯一标识
     */
    private String ID_PREFIX = UUID.randomUUID().toString(true);

    /**
     * 初始化Lua脚本对象   RedisScript的实现类
     */
    private static final  DefaultRedisScript<Long> UNLOCK_SCRIPT;
    static {
        //创建 RedisScript的实现类 DefaultRedisScript
        UNLOCK_SCRIPT = new DefaultRedisScript<Long>();
        //设置Lua脚本位置
        UNLOCK_SCRIPT.setLocation(new ClassPathResource("unlock.lua"));
        //设置脚本执行后的返回值类型
        UNLOCK_SCRIPT.setResultType(Long.class);
    }


    /**
     * 尝试获取锁
     * @param timeoutSec 兜底过期时间
     * @return 获取是否成功 true成功
     */
    @Override
    public boolean tryLock(long timeoutSec) {
        // 锁的唯一标识:这里用 UUID + 线程id
        String value = ID_PREFIX + Thread.currentThread().getId();
        // 获取锁的 key
        String key = KEY_PREFIX + name;

        //尝试获取锁
        Boolean isLock = stringRedisTemplate
                .opsForValue()
                .setIfAbsent(key, value, timeoutSec, TimeUnit.SECONDS);

        //返回结果
        //return Boolean.TRUE.equals(isLock); //或者 👇
        return BooleanUtil.isTrue(isLock);
    }


    /**
     * 释放锁
     */
    @Override
    public void unLock() {

        //判断将要释放的锁 的 线程表示是否一致 解决分布式锁误删问题

        //锁的唯一标识:这里用 UUID + 线程id
        String value = ID_PREFIX + Thread.currentThread().getId();
        //获取锁的 key
        String key = KEY_PREFIX + name;

        //判断将要释放的锁 的 线程表示是否一致 解决分布式锁误删问题
        //使用Lua脚本 确保 [判断标识] 和 [释放锁] 的 原子性
        stringRedisTemplate
                .execute(UNLOCK_SCRIPT, //Lua脚本对象
                         Collections.singletonList(key), //KEYS[1] list
                         value); //ARGV[1] object

        //否则 不释放锁
    }


}

到此 实现了一个较为完善的 基于Redis的分布式锁

但是.......在某些场景下 依然需要优化.......


基于Redis的分布式锁优化


还有些问题可以进一步优化:

image-20221030231315439

这些实现起来比较繁琐

可以使用开源框架去解决:

**使用 Redisson **👇


Redisson解决Redis分布式锁

Redisson介绍


Redisson Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。

它不仅提供了一系列的分布 式的Java常用对象,还提供了许多分布式服务,其中就包含了各种分布式锁的实现。

image-20221031105827347

官网地址: https://redisson.org

GitHub地址: https://github.com/redisson/redisson

简单的使用

package com.hmdp.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
 * Redisson 初始化配置
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient1() {

        Config config = new Config();
        //链接Redis
        config.useSingleServer()
                .setAddress("redis://ayaka520:6379")
                .setPassword("gangajiang521");
	    //解耦合 可以使用yaml的方式 解耦合
        //通过Redisson.create(config) 指定配置文件 创建RedissonClient
        return Redisson.create(config);
    }

    //@Bean
    public RedissonClient redissonClient2() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://ayaka521:6380");
        return Redisson.create(config);
    }

    //@Bean
    public RedissonClient redissonClient3() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://ayaka521:6381");
        return Redisson.create(config);
    }

}

业务改造

package com.hmdp.service.impl;

import com.hmdp.dto.Result;
import com.hmdp.entity.SeckillVoucher;
import com.hmdp.entity.VoucherOrder;
import com.hmdp.mapper.VoucherOrderMapper;
import com.hmdp.service.ISeckillVoucherService;
import com.hmdp.service.IVoucherOrderService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.utils.RedisIdWorker;
import com.hmdp.utils.UserHolder;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.aop.framework.AopContext;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.time.LocalDateTime;


/**
 * <p>
 * 服务实现类
 * </p>
 */
@Service
public class VoucherOrderServiceImpl extends ServiceImpl<VoucherOrderMapper, VoucherOrder> implements IVoucherOrderService {

    @Resource
    private ISeckillVoucherService iSeckillVoucherService;

    @Resource
    private RedisIdWorker redisIdWorker;

    //@Resource
    //private StringRedisTemplate stringRedisTemplate;

    /**
     * Redisson
     */
    @Resource
    private RedissonClient redissonClient;

    /**
     * 实现订单秒杀业务
     *
     * @param voucherId
     * @return
     */
    @Override
    public Result seckillVoucher(Long voucherId) {
        //1.查询优惠卷信息
        SeckillVoucher voucher = iSeckillVoucherService.getById(voucherId);
        Integer stock = voucher.getStock();
        //2.判断优惠卷是否开始
        if (voucher.getBeginTime().isAfter(LocalDateTime.now())) {
            //尚未开始,返回异常给前端
            return Result.fail("秒杀尚未开始!");
        }
        //3.判断优惠卷是否过期
        if (voucher.getEndTime().isBefore(LocalDateTime.now())) {
            //秒杀已结束,返回前端异常信息
            return Result.fail("秒杀已结束!");
        }
        //4.判断优惠卷库存是否充足
        if (stock < 1) {
            //秒杀卷库存不足,返回给前端异常信息
            return Result.fail("库存不足!");
        }


        //一人一单问题
        //获取用户id
        Long userId = UserHolder.getUser().getId();

        //缩小悲观锁范围
        /*synchronized (userId.toString().intern()) {
            //获取当前代理对象
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            //通过代理对象调用 保证事务的正常
            return proxy.queryOrderVoucherSave(voucherId);
        }*/

        //获取分布式锁对象
        //LockImpl lock = new LockImpl(stringRedisTemplate,"order:" + userId);

        //从Redisson中获取锁
        RLock lock = redissonClient.getLock("lock:order:" + userId);

        //尝试获取锁
        boolean isLock = lock.tryLock();

        //判断是否获取成功
        if (!isLock){
            //获取锁失败
            return Result.fail("不能重复下单!");
        }
        //成功 执行业务
        try{
            //获取当前代理对象
            IVoucherOrderService proxy = (IVoucherOrderService) AopContext.currentProxy();
            //通过代理对象调用 保证事务的正常
            return proxy.queryOrderVoucherSave(voucherId);
        }finally{
            //确保锁的释放
            lock.unlock();
        }
    }

    @Transactional
    public Result queryOrderVoucherSave(Long voucherId) {

        //获取用户id
        Long userId = UserHolder.getUser().getId();

        //判断订单表中是否已经存在
        int count = query()
                .eq("user_id", userId)
                .eq("voucher_id", voucherId)
                .count();
        if (count > 0) {
            //存在,返回前端信息
            return Result.fail("你已领取,每人只能领取一份!");
        }

        //以上都满足 施行扣减下单业务
        //5.扣减库存
        boolean isOK = iSeckillVoucherService
                .update()
                .setSql("stock =stock - 1")
                .eq("voucher_id", voucherId)
                //.eq("stock",stock) // CAS乐观锁
                // CAS乐观锁改进  stock > 0 就可以执行下单业务
                .gt("stock", 0)
                .update();
        if (!isOK) {
            //秒杀失败,返回给前端异常信息
            return Result.fail("库存不足!");
        }
        //6.创建订单
        VoucherOrder voucherOrder = new VoucherOrder();
        //6.1.生成订单id
        long orderId = redisIdWorker.nextId("order");
        voucherOrder.setId(orderId);
        //6.2.设置用户id
        voucherOrder.setUserId(userId);
        //6.3.设置代金卷id
        voucherOrder.setVoucherId(voucherId);
        //6.4.当生产的订单id写入数据库
        this.save(voucherOrder);

        //7.返回订单ID
        return Result.ok(orderId);

    }


}

image-20221030231315439

Redisson可重入锁问题

原理:

image-20221031105235063


使用Lua脚本实现 --- 获取锁

image-20221031110822288

源码:

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
            "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                    "end; " +
                    "return redis.call('pttl', KEYS[1]);",
            Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}

使用Lua脚本实现 --- 释放锁

image-20221031111022292

源码:

protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                    "end; " +
                    "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                    "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                    "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; " +
                    "end; " +
                    "return nil;",
            Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

可重试--WatchDog机制


源码jiji看 jiji分析

超时释放--发布订阅/信号量


源码jiji看 jiji分析

Redisson分布式锁的原理

image-20221102084703840

Redisson分布式锁原理:

  • 可重入:利用hash结构记录线程id和重入次数
  • 可重试:利用信号量和PubSub功能实现等待、唤醒,获取 锁失败的重试机制
  • 超时续约:利用watchDog,每隔一段时间(releaseTime / 3),重置超时时间


Redisson主从一致性问题


image-20221102090850998

image-20221102091109610

使用:

RedissonConfig

package com.hmdp.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;

/**
 * Redisson 初始化配置
 */
@Configuration
public class RedissonConfig {

    @Bean
    public RedissonClient redissonClient1() throws IOException {

        Config config = new Config();
        //链接Redis
        config.useSingleServer()
                .setAddress("redis://ayaka520:6379")
                .setPassword("gangajiang521");

        //通过Redisson.create(config) 指定配置文件 创建RedissonClient
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient2() throws IOException {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://ayaka521:6380");
        return Redisson.create(config);
    }

    @Bean
    public RedissonClient redissonClient3() throws IOException {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://ayaka521:6381");
        return Redisson.create(config);
    }


}

RedissonTests

package com.hmdp;

import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.concurrent.TimeUnit;

@Slf4j
@SpringBootTest
class RedissonTest {

    @Resource
    private RedissonClient redissonClient1;

    @Resource
    private RedissonClient redissonClient2;

    @Resource
    private RedissonClient redissonClient3;

    
    private RLock lock;
    //创建连锁
    @BeforeEach
    void setUp() {
        //获取 RLock对象
        RLock lock1 = redissonClient1.getLock("lock:test");
        RLock lock2 = redissonClient2.getLock("lock:test");
        RLock lock3 = redissonClient3.getLock("lock:test");
        //创建连锁
        lock = redissonClient1.getMultiLock(lock1,lock2,lock3);
    }


    @Test
    void method1() throws InterruptedException {
        // 尝试获取锁
        boolean isLock = lock.tryLock(1L, TimeUnit.SECONDS);
        if (!isLock) {
            log.error("获取锁失败 .... 1");
            return;
        }
        try {
            log.info("获取锁成功 .... 1");
            method2();
            log.info("开始执行业务 ... 1");
        } finally {
            log.warn("准备释放锁 .... 1");
            lock.unlock();
        }
    }
    void method2() {
        // 尝试获取锁
        boolean isLock = lock.tryLock();
        if (!isLock) {
            log.error("获取锁失败 .... 2");
            return;
        }
        try {
            log.info("获取锁成功 .... 2");
            log.info("开始执行业务 ... 2");
        } finally {
            log.warn("准备释放锁 .... 2");
            lock.unlock();
        }
    }
}

总结


总结

1)不可重入Redis分布式锁:

  • 原理:利用setnx的互斥性;利用ex避免死锁;释放锁时判 断线程标示

  • 缺陷:不可重入、无法重试、锁超时失效

2)可重入的Redis分布式锁:

  • 原理:利用hash结构,记录线程标示和重入次数;利用 watchDog延续锁时间;利用>信号量控制锁重试等待

  • 缺陷:redis宕机引起锁失效问题

image-20221102084703840

3)Redisson的multiLock:

  • 原理:多个独立的Redis节点,必须在所有节点都获取重入锁,才算获取锁成功
  • 缺陷:运维成本高、实现复杂

image-20221102091109610

这样就就觉了分布式锁的问题

但是,还可以继续优化:


基于Redis的消息队列


秒杀业务的优化

image-20221104151321789

改进方案:

image-20221104151803752

image-20221104151933960

image-20221104152114257

Redis 解决消息队列效果并不好 这里就不实现了 还dai是MQ



Redis解决 [博客] [点赞] [排行榜]

分析 与 问题

Redis类型的选择:

  • 一个用户不能重复点赞 -- 集合元素不可重复
  • 点赞榜 Top5 需要排序 -- 集合需要排序 最终选出前5个数据 || Top5(最先点赞的前5人)

image-20221123193728144

所以 选择了 SortedSet 数据类型

  • key : blog:liked: + 博客id
  • member:用户id
  • score:时间戳

SorteddSet 要用的的命令

添加成员 --- 新增点赞

image-20221123194309452

  • reids命令:zadd blog:liked:博客id 用户id 时间戳
  • java命令: stringRedisTemplate.opsForZSet().add(key,userId,System.currentTimeMillis());

判断是否为该成员 --- 判断该用户是否为该博客点赞过

image-20221123194947893

  • reids命令:zscore blog:liked:博客id 时间戳 返回部位 nil 为该成员
  • java命令: Double isMemberScore = stringRedisTemplate.opsForZSet().score(key, userId); 不为 null 为该成员

按分数顺序查询成员 --- 实现点赞 Top5 功能

image-20221123195204150

  • reids命令:zscore blog:liked:博客id 时间戳 返回部位 nil 为该成员
  • java命令: Set<String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);

几个坑

数据库问题:

/**
根据博客id 获取点赞排行榜前 5 名 <br>
     */
@Override
    public Result likesBlogTop(Long id) {
        String key = BLOG_LIKED_KEY + id.toString();
        //根据博客id查询点赞前 5 名
        Set<String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);// top5
        if (range == null){
            return Result.ok();
        }
        List<Long> ids = range.stream().map(Long::valueOf).collect(Collectors.toList());
        if (ObjectUtil.isEmpty(ids)){
            return Result.ok();
        }
        String idStr = StrUtil.join("," , ids);
        System.out.println(idStr);
        //根据这些用户 id 获取用户信息
        List<UserDTO> userDTOS = userService
                //.listByIds(ids)
                .query()
                .in("id",ids)
                .last("order by field(id,"+ idStr + ")")
                .list()
                .stream()//封装成 UserDTO 防止敏感信息泄露
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());

        return Result.ok(userDTOS);
    }

根据博客id 获取点赞排行榜前 5 名

有一个坑:

select * from tb_user where id in(3,2,1); 的查询结果顺序 是 1 2 3

select * from tb_user where id in(3,2,1) order by field(id,3,2,1)

这样才能保证按给定的顺序查询

这里依然有个坑!!

当未有用户点赞时 :SQL语句:SELECT * FROM tb_user WHERE (id IN ()) order by field(id,)语法错误

解决方法 :提前判断

if (ObjectUtil.isEmpty(ids)){
	return Result.ok();
}

空指针异常

//判断当前用户是否为当前博客点赞
private void isLiked(Blog blog){
    UserDTO userDTO = UserHolder.getUser();
    if (userDTO == null){
        //用户未登录,不需要查询是否点赞过
        return;
    }
    String userId = userDTO.getId().toString();
    String key = BLOG_LIKED_KEY + blog.getId().toString();
    //Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
    Double isMember = stringRedisTemplate.opsForZSet().score(key, userId.toString());
    //封装blog
    blog.setIsLike(isMember != null);
}

用户未登录 会报空指针异常 提前判断一下

同一用户多地登录 并发安全问题 TODO: T.T

更多异常 TODO: T.T

库表设置

image-20221123112636816

CREATE TABLE `tb_blog` (
  `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT '主键',
  `shop_id` bigint(20) NOT NULL COMMENT '商户id',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '用户id',
  `title` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '标题',
  `images` varchar(2048) NOT NULL COMMENT '探店的照片,最多9张,多张以","隔开',
  `content` varchar(2048) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT '探店的文字描述',
  `liked` int(8) unsigned DEFAULT '0' COMMENT '点赞数量',
  `comments` int(8) unsigned DEFAULT NULL COMMENT '评论数量',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=26 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;

实体类

package com.hmdp.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.io.Serializable;
import java.time.LocalDateTime;


@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_blog")
public class Blog implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 主键
     */
    @TableId(value = "id", type = IdType.AUTO)
    private Long id;
    /**
     * 商户id
     */
    private Long shopId;
    /**
     * 用户id
     */
    private Long userId;
    /**
     * 用户图标
     */
    @TableField(exist = false)
    private String icon;
    /**
     * 用户姓名
     */
    @TableField(exist = false)
    private String name;
    /**
     * 是否点赞过了
     */
    @TableField(exist = false)
    private Boolean isLike;

    /**
     * 标题
     */
    private String title;

    /**
     * 探店的照片,最多9张,多张以","隔开
     */
    private String images;

    /**
     * 探店的文字描述
     */
    private String content;

    /**
     * 点赞数量
     */
    private Integer liked;

    /**
     * 评论数量
     */
    private Integer comments;

    /**
     * 创建时间
     */
    private LocalDateTime createTime;

    /**
     * 更新时间
     */
    private LocalDateTime updateTime;


}

BlogController

package com.hmdp.controller;


import com.hmdp.dto.Result;
import com.hmdp.dto.UserDTO;
import com.hmdp.entity.Blog;
import com.hmdp.service.IBlogService;
import com.hmdp.utils.UserHolder;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

/**
 * <p>
 * 前端控制器
 * </p>
 */
@RestController
@RequestMapping("/blog")
public class BlogController {

    @Resource
    private IBlogService blogService;

    @PostMapping
    public Result saveBlog(@RequestBody Blog blog) {

        // 获取登录用户
        UserDTO user = UserHolder.getUser();
        blog.setUserId(user.getId());
        // 保存探店博文
        blogService.save(blog);
        // 返回 id
        return Result.ok(blog.getId());
    }


    @PutMapping("/like/{id}")
    public Result likeBlog(@PathVariable("id") Long id) {
        // 修改点赞数量
        return blogService.likeBlog(id);
    }

    @GetMapping("/likes/{id}")
    public Result likesBlogTop(@PathVariable("id") Long id){
        // 点赞榜单 前 5 位
        return blogService.likesBlogTop(id);
    }


    @GetMapping("/of/me")
    public Result queryMyBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
        return blogService.queryMyBlog(current);
    }

    @GetMapping("/of/user")
    public Result queryOfUserBlog(@RequestParam("id") Long id,
                                  @RequestParam(value = "current", defaultValue = "1") Integer current){
        return blogService.queryOfUserBlog(id, current);
    }

    @GetMapping("/hot")
    public Result queryHotBlog(@RequestParam(value = "current", defaultValue = "1") Integer current) {
        return blogService.queryHotBlog(current);
    }

    @GetMapping("/{id}")
    public Result queryByIdBlog(@PathVariable Long id){
        return blogService.queryByIdBlog(id);
    }

}

BlogServer

package com.hmdp.service.impl;

import cn.hutool.core.bean.BeanUtil;
import cn.hutool.core.util.ObjectUtil;
import cn.hutool.core.util.StrUtil;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.hmdp.dto.Result;
import com.hmdp.dto.UserDTO;
import com.hmdp.entity.Blog;
import com.hmdp.entity.User;
import com.hmdp.mapper.BlogMapper;
import com.hmdp.service.IBlogService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IUserService;
import com.hmdp.utils.SystemConstants;
import com.hmdp.utils.UserHolder;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static com.hmdp.utils.RedisConstants.BLOG_LIKED_KEY;

/**
 * <p>
 *  服务实现类
 * </p>
 */
@Service
public class BlogServiceImpl extends ServiceImpl<BlogMapper, Blog> implements IBlogService {

    @Resource
    private IUserService userService;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public Result queryHotBlog(Integer current) {
        // 根据用户查询
        Page<Blog> page = this.query()
                .orderByDesc("liked")
                .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        // 获取当前页数据
        List<Blog> records = page.getRecords();
        // 查询用户
        records.forEach(blog -> {
            // 封装blog
            queryUserBlog(blog);
            // 判断用户是否点赞
            isLiked(blog);
        });


        return Result.ok(records);
    }


    /**
     * 根据id查询博客
     * @param id
     * @return
     */
    @Override
    public Result queryByIdBlog(Long id) {
        //根据id查询博客
        Blog blog = this.getById(id);
        if (ObjectUtil.isEmpty(blog)){
            return Result.fail("博客不存在!");
        }
        //封装后的 blog 对象
        this.queryUserBlog(blog);
        // 判断用户是否点赞
        isLiked(blog);
        return Result.ok(blog);
    }

    /**
     * 更改点赞
     * @param id
     * @return
     */
    @Override
    public Result likeBlog(Long id) {

        //1.获取用户 id
        String userId = UserHolder.getUser().getId().toString();
        String key = BLOG_LIKED_KEY + id;

        //2.判断当前用户是否点赞
        //Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId);
        Double isMemberScore = stringRedisTemplate.opsForZSet().score(key, userId);
        if (isMemberScore == null){
            //3.未点赞
            //3.1 数据库点赞数 +1
            boolean isSuccess = update().setSql("liked = liked + 1").eq("id", id).update();
            //3.2 redis中 添加zset 成员
            if (isSuccess){
                //stringRedisTemplate.opsForSet().add(key,userId);
                stringRedisTemplate.opsForZSet().add(key,userId,System.currentTimeMillis()); //key value score
            }
        }else{
            //4.未点赞
            //4.1 数据库点赞数 -1
            boolean isSuccess = update().setSql("liked = liked - 1").eq("id", id).update();
            //4.2 redis中 删除zset 成员
            if (isSuccess){
                //stringRedisTemplate.opsForSet().remove(key,userId);
                stringRedisTemplate.opsForZSet().remove(key,userId);
            }
        }

        return Result.ok();
    }

    /**
     * 根据博客id 获取点赞排行榜前 5 名 <br>
     *      注意:                   <br>
     *      有一个坑:                <br>
     *      select * from tb_user where id in(3,2,1); 的查询结果顺序 是 1 2 3     <br>
     *      select * from tb_user where id in(3,2,1) order by field(id,3,2,1)  <br>
     *      这样才能保证按给定的顺序查询
     *
     * @param id
     * @return
     */
    @Override
    public Result likesBlogTop(Long id) {
        String key = BLOG_LIKED_KEY + id.toString();
        //根据博客id查询点赞前 5 名
        Set<String> range = stringRedisTemplate.opsForZSet().range(key, 0, 4);// top5
        if (range == null){
            return Result.ok();
        }
        List<Long> ids = range.stream().map(Long::valueOf).collect(Collectors.toList());
        if (ObjectUtil.isEmpty(ids)){
            return Result.ok();
        }
        String idStr = StrUtil.join("," , ids);
        //根据这些用户 id 获取用户信息
        List<UserDTO> userDTOS = userService
                //.listByIds(ids)
                .query()
                .in("id",ids)
                .last("order by field(id,"+ idStr + ")")
                .list()
                .stream()//封装成 UserDTO 防止敏感信息泄露
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());

        return Result.ok(userDTOS);
    }

    @Override
    public Result queryOfUserBlog(Long id, Integer current) {
        //根据用户查询blog
        Page<Blog> page = query()
                .eq("user_id", id)
                .page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        //获取当前页面数据
        List<Blog> records = page.getRecords();
        return Result.ok(records);
    }

    @Override
    public Result queryMyBlog(Integer current) {
        // 获取登录用户
        UserDTO user = UserHolder.getUser();
        // 根据用户查询
        Page<Blog> page = query()
                .eq("user_id", user.getId()).page(new Page<>(current, SystemConstants.MAX_PAGE_SIZE));
        // 获取当前页数据
        List<Blog> records = page.getRecords();
        return Result.ok(records);
    }


    //为博客 并设置用户信息
    private void queryUserBlog(Blog blog) {
        Long userId = blog.getUserId();
        User user = userService.getById(userId);
        blog.setName(user.getNickName());
        blog.setIcon(user.getIcon());
    }

    //判断当前用户是否为当前博客点赞
    private void isLiked(Blog blog){
        UserDTO userDTO = UserHolder.getUser();
        if (userDTO == null){
            //用户未登录,不需要查询是否点赞过
            return;
        }
        String userId = userDTO.getId().toString();
        String key = BLOG_LIKED_KEY + blog.getId().toString();
        //Boolean isMember = stringRedisTemplate.opsForSet().isMember(key, userId.toString());
        Double isMember = stringRedisTemplate.opsForZSet().score(key, userId.toString());
        //封装blog
        blog.setIsLike(isMember != null);
    }

}

Redis的效果

image-20221123193540629

Redis解决 [关注] [共同关注]

分析

单纯的关注 取消关注功能 只用数据库就行

查看共同关注

Redis数据类型选择 Set类型

  • key : follow:user: + 用户id
  • member:关注用户的id

Set类型 有一个命令可以查询 诺干个key的交集

image-20221124104641565

  • reids命令:sinter follow:user:3 follow:user:6 用户id:3 和 用户id:6 的共同关注
  • java命令: stringRedisTemplate.opsForSet().intersect(key1,key2);

库表设计

image-20221124103904419

CREATE TABLE `tb_follow` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键',
  `user_id` bigint(20) unsigned NOT NULL COMMENT '用户id',
  `follow_user_id` bigint(20) unsigned NOT NULL COMMENT '关联的用户id',
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=22 DEFAULT CHARSET=utf8mb4 ROW_FORMAT=COMPACT;

实体类

package com.hmdp.entity;

import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;

import java.io.Serializable;
import java.time.LocalDateTime;

/**
 * <p>
 * 
 * </p>
 *
 * @author 虎哥
 * @since 2021-12-22
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("tb_follow")
public class Follow implements Serializable {

    private static final long serialVersionUID = 1L;

    /**
     * 主键
     */
    @TableId(value = "id", type = IdType.AUTO)
    private Long id;

    /**
     * 用户id
     */
    private Long userId;

    /**
     * 关联的用户id
     */
    private Long followUserId;

    /**
     * 创建时间
     */
    private LocalDateTime createTime;


}

FollowController

package com.hmdp.controller;


import com.hmdp.dto.Result;
import com.hmdp.service.IFollowService;
import org.springframework.web.bind.annotation.*;

import javax.annotation.Resource;

/**
 * <p>
 *  前端控制器
 * </p>
 */
@RestController
@RequestMapping("/follow")
public class FollowController {

    @Resource
    private IFollowService followService;

    /**
     * 根据用户id 修改关注 取消关注
     * @param id
     * @param isFollow
     * @return
     */
    @PutMapping("/{id}/{isFollow}")
    public Result followUser(@PathVariable("id") Long id, @PathVariable("isFollow") boolean isFollow){
        return followService.followUser(id,isFollow);
    }

    /**
     * 根据 id 判断是否关注
     * @param id
     * @return
     */
    @GetMapping("/or/not/{id}")
    public Result notFollow(@PathVariable("id") Long id){
        return followService.notFollow(id);
    }

    /**
     * 共同关注查询
     * @param id
     * @return
     */
    @GetMapping("/common/{id}")
    public Result followCommon(@PathVariable("id") Long id){
        return followService.followCommon(id);
    }

}

FollowService

package com.hmdp.service.impl;

import cn.hutool.core.bean.BeanUtil;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.ObjectUtils;
import com.hmdp.dto.Result;
import com.hmdp.dto.UserDTO;
import com.hmdp.entity.Follow;
import com.hmdp.mapper.FollowMapper;
import com.hmdp.service.IFollowService;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.hmdp.service.IUserService;
import com.hmdp.utils.UserHolder;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import static com.hmdp.utils.RedisConstants.Follow_USER_KEY;

/**
 * <p>
 *  服务实现类
 * </p>
 *
 * @author 虎哥
 * @since 2021-12-22
 */
@Service
public class FollowServiceImpl extends ServiceImpl<FollowMapper, Follow> implements IFollowService {

    @Resource
    private IUserService userService;

    @Resource
    private StringRedisTemplate stringRedisTemplate;

    /**
     * 根据 id 修改 关注/取消关注
     * @param id
     * @param isFollow
     * @return
     */
    @Override
    public Result followUser(Long id, boolean isFollow) {
        //1.获取该用户 id
        Long userId = UserHolder.getUser().getId();
        //2.获取 要判断是否关注的用户id
        Long followUserId = userService.getById(id).getId();
        //3.判断 关注 还是 取消关注
        String key = Follow_USER_KEY + userId;
        if (isFollow){
            //关注逻辑
            Follow follow = new Follow();
            follow.setUserId(userId);
            follow.setFollowUserId(followUserId);
            boolean isSuccess = save(follow);
            //存入Redis
            if (isSuccess){
                stringRedisTemplate.opsForSet().add(key, id.toString());
            }

        }else{
            //取消关注的逻辑
            //数据库中移除
            LambdaQueryWrapper<Follow> queryWrapper = new LambdaQueryWrapper<Follow>();
            queryWrapper.eq(Follow::getUserId,userId).eq(Follow::getFollowUserId,followUserId);
            boolean isSuccess = remove(queryWrapper);
            //Redis中移除
            if (isSuccess){
                stringRedisTemplate.opsForSet().remove(key, id.toString());
            }
        }

        return Result.ok();
    }

    /**
     * 根据 id 判断是否关注
     * @param id
     * @return
     */
    @Override
    public Result notFollow(Long id) {
        //获取用户id
        Long userId = UserHolder.getUser().getId();
        //从数据库中查询
        Integer count = query().eq("user_id", userId)
                .eq("follow_user_id", id)
                .count();
        //判断是否已经关注
        return Result.ok(count > 0);
    }

    /**
     * 共同关注查询
     * @param id
     * @return
     */
    @Override
    public Result followCommon(Long id) {
        //1.获取当前用户id key
        String key1 = Follow_USER_KEY + UserHolder.getUser().getId().toString();
        //2.获取查询用户id key
        String key2 = Follow_USER_KEY + id.toString();
        //3.从Redis中求交集
        Set<String> intersect = stringRedisTemplate.opsForSet().intersect(key1, key2);
        //4.转换为集合
        if (ObjectUtils.isEmpty(intersect)){
            return Result.ok(Collections.emptyList());
        }
        List<Long> idList = intersect.stream().map(Long::valueOf).collect(Collectors.toList());
        //查询用户
        List<UserDTO> users =
                userService.listByIds(idList)
                .stream()
                .map(user -> BeanUtil.copyProperties(user, UserDTO.class))
                .collect(Collectors.toList());
        return Result.ok(users);
    }
}

Redis效果

image-20221124104624870

Redis解决 [Feed流] [博客推送]

Feed流

关注推送也叫做Feed流,直译为投喂。为用户持续的提供“沉浸式”的体验,通过无限下拉刷新获取新的信息。

image-20221124231338092

image-20221124231401963

Feed流的模式

Feed流产品有两种常见模式:

  • Timeline:不做内容筛选,简单的按照内容发布时间排序,常用于好友或关注。例如朋友圈

    • ➢ 优点:信息全面,不会有缺失。并且实现也相对简单
    • ➢ 缺点:信息噪音较多,用户不一定感兴趣,内容获取效率低
  • 智能排序[Rank]:利用智能算法屏蔽掉违规的、用户不感兴趣的内容。推送用户感兴趣信息来吸引用户

    • ➢ 优点:投喂用户感兴趣信息,用户粘度很高,容易沉迷
    • ➢ 缺点:如果算法不精准,可能起到反作用

Feed流 Timeline模式

Feed流,Timeline的模式,该模式的实现方案有三种:

  1. 拉模式
  2. 推模式
  3. 推拉结合

拉模式


image-20221124231652089


推模式


image-20221124231726647


推拉结合


image-20221124231742522


Feed流的实现方案


image-20221124231929634


案例分析

案例要求:

点击关注后,会查询出该用户关注的其他用户的博客,按时间降序排列,滚动分页。

使用 Timeline模式 的 推模式

接口:


image-20221124234334506


Redis的类型选择

使用 SortedSet 数据类型

  • key : feed:user: + 用户id
  • member:关注用户的博客id (被关注者发布的博客)
  • score:博客发布的时间戳

当一个用户发布博客时被关注者发布的博客id

根据当前用户,查询所有的关注者,

得到所有关注者的id 即: feed:user: + 用户id 后,

保存到 Redis 当中:

image-20221125001818690

要使用的命令

发布订阅时,向用户推送,被关在的用户会保存该博客id

image-20221125000345829

  • reids命令:zadd feed:user:3 5 521 id为x的用户,在521时刻,发布了id为5的博客,并推送给了id为3用户。
  • java命令: stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());

用户获取订阅的内容,可以进行分页

image-20221125002735581

  • reids命令:ZREMRANGEBYSCORE follow:user:5 最大时间戳 0 withscores limit 0 3 首次

  • reids命令:ZREMRANGEBYSCORE follow:user:5 上次最小时间戳 0 withscores limit 上次最小时间戳次数 3 下次

  • java命令: stringRedisTemplate.opsForZSet().reverseRangeByScoreWithScores(key, 0, maxTime, offset, 2);

    • 返回一个 ZSetOperations.TypedTuple<String>>

    //TypedTuple接口

    @Nullable V getValue(); //成员

    @Nullable Double getScore(); //分数

    
    
    
    
    
    
    
    
    
    
    

===开始实现===


返回给前端实体

用于分页擦寻后 返回的数据

package com.hmdp.dto;

import lombok.Data;

import java.util.List;

@Data
public class ScoreResult {

    //分页后的数据
    private List<?> list;
    //最小时间戳 用于下次请求分页
    private Long minTime;
    //最小时间戳出现的次数 用于下次请求分页的偏移量
    private Integer offset;

}

[发布] 信息的发布

保存博客 并 实现推送功能

image-20221124230414472

/**
 * 接口
 *
 * 保存博客 并 推送
 */
@PostMapping
public Result saveBlog(@RequestBody Blog blog) {
    //保存博客 并推送Feed流
    return blogService.saveBlog(blog);
}


/**
 * 业务
 *
 * 保存博客 并 推送
 * @param blog
 * @return
 */
@Override
public Result saveBlog(Blog blog) {
    // 获取登录用户
    UserDTO user = UserHolder.getUser();
    blog.setUserId(user.getId());
    // 保存探店博文
    boolean isSuccess = save(blog);
    if (!isSuccess){
        return Result.fail("新增笔记失败!");
    }
    // 推送给关注的 用户 Feet流 推模式
    //1.获取粉丝id
    List<Follow> follows = followService.query().eq("follow_user_id", UserHolder.getUser().getId()).list();
    //2.推送到邮箱当中
    for (Follow follow: follows) {
        //3.以 SortedSet 类型存储
        String key = FEED_USER_KEY + follow.getUserId();
        stringRedisTemplate.opsForZSet().add(key,blog.getId().toString(),System.currentTimeMillis());
    }
    
    // 返回 id
    return Result.ok(blog.getId());
}

[接收] 滚动分页查询的实现

image-20221125032511467

/**
 * 前端控制器
 * 
 * 分页查询当前用户 关注用户的博客
 *      Feed流 取
 * @param maxTime
 * @param offset
 * @return
 */
@GetMapping("/of/follow")
private Result queryOfFollow(@RequestParam("lastId") Long maxTime,
                             @RequestParam(value = "offset",defaultValue = "0") Integer offset){
    return blogService.queryOfFollow(maxTime,offset);
}


/**
 * 业务层
 * 
 * 分页查询当前用户 关注用户的博客
 * @param maxTime
 * @param offset
 * @return
 */
@Override
public Result queryOfFollow(Long maxTime, Integer offset) {
    //1.获取用户id
    Long userId = UserHolder.getUser().getId();
    //2.从Redis用户信箱中取出 关注用的发布的博客
    String key = FEED_USER_KEY + userId.toString();
    Set<ZSetOperations.TypedTuple<String>> typedTuples = stringRedisTemplate
                    .opsForZSet()
                    .reverseRangeByScoreWithScores(key, 0, maxTime, offset, 2);
    //3.判断邮件是否为空
    if (ObjectUtil.isEmpty(typedTuples)){
        return Result.ok();
    }

    //4.进行解析 博客id集合 当前页最小时间戳:minTime 最小时间戳出现的次数
    ArrayList<Long> blogIds = new ArrayList<>(typedTuples.size());
    long minTime = 0;
    int minCount = 1;
    for (ZSetOperations.TypedTuple<String> tuple : typedTuples) {
        //关注的博客id
        Long blogId = Long.valueOf(tuple.getValue());
        blogIds.add(blogId);
        //获取最小分数 和 最小分数个数
        long score = tuple.getScore().longValue();
        if (score == minTime){
            minCount ++;
        }else{
            minTime = score;
            minCount = 1;
        }
    }

    //5.根据 博客id集合 查询博客对象
    String idStr = StrUtil.join(",", blogIds);
    List<Blog> blogs = query()
            .in("id", blogIds)
            .last("order by field(id," + idStr + ")")
            .list();
    //给博客封装非数据库字段
    blogs.forEach(blog -> {
        //封装后的 blog 对象
        queryUserBlog(blog);
        // 判断用户是否点赞
        isLiked(blog);
    });

    //6.封装 ScoreResult 对象
    ScoreResult scoreResult = new ScoreResult();
    scoreResult.setList(blogs);
    scoreResult.setMinTime(minTime);
    scoreResult.setOffset(minCount);

    //7.返回个前端
    return Result.ok(scoreResult);
}

TODO:

About

我的Redis学习笔记,加案例实战。

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors