缓存穿透是指用户查询不存在的数据,mysql中查不到数据无法写入redis,导致每次查询都会落在mysql上
缓存空数据,数据库查询结果为空,任然把这个空结果缓存至redis {key:'查询条件',value:null}
参考布隆过滤器 - 问尤龙の时光 (wenyoulong.com)
有如下接口方法,根据用户ID查询用户信息,用户id随便编一个都行
/**
* 根据ID获取的用户,用于测试缓存穿透
* @param userId 用户IO
* @return 用户信息
*/
@GetMapping("/getById/{userId}")
public String getById(@PathVariable(value = "userId") String userId){
return userService.getById(userId);
}
@Override
public String getById(String userId) {
try {
// 先从缓存中查询用户
String cachedUserInfo = redisTemplate.opsForValue().get(userId);
if(cachedUserInfo == null){
System.out.println("redis中没有查询到数据,进入数据库查询");
// 模拟数据库查询用户,数据库中也没有这个用户,返回null
Thread.sleep(100);
return null;
}
return cachedUserInfo;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
调用postman发起200个请求,然后发现每次请求缓存中找不到数据,都会去数据库中查询数据,执行时间总共为21856毫秒
新增一个v2接口测试缓存空数据的情况
/**
* 根据ID获取的用户,设置null值,用于测试缓存穿透
* @param userId 用户IO
* @return 用户信息
*/
@GetMapping("/getByIdv2/{userId}")
public String getByIdv2(@PathVariable(value = "userId") String userId){
return userService.getByIdv2(userId);
}
@Override
public String getByIdv2(String userId) {
try {
// 先从缓存中查询用户
String cachedUserInfo = redisTemplate.opsForValue().get(userId);
if(cachedUserInfo == null){
System.out.println("redis中没有查询到数据,进入数据库查询");
// 模拟数据库查询用户,数据库中也没有这个用户,返回null,将null值设置在redis
Thread.sleep(100);
redisTemplate.opsForValue().set(userId,"null");
return null;
}
return cachedUserInfo;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
此时在postman中请求这个接口,可以看到时间大大缩短
再来新建一个布隆过滤器测试一下
@Configurable
@ConfigurationProperties(prefix = "spring.redis")
@Data
public class RedissonConfig {
private String host;
private String port;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer()
.setAddress("redis://" + host + ":" + port);
return Redisson.create(config);
}
}
@Configuration
@Import(RedissonConfig.class)
public class BloomFilterConfig {
@Autowired
private RedissonClient redissonClient;
@Bean
public RBloomFilter<String> userInfoBloomFilter() {
// 创建一个布隆过滤器,这里使用了默认的序列化器
RBloomFilter<String> bloomFilter = redissonClient.getBloomFilter("userInfo");
// 初始化布隆过滤器,设置期望的元素数量和期望的误报率
// 这里假设我们期望存储大约 10 万个元素,且希望误报率为 1%
// 向布隆过滤器中添加元素
long phoneNum = 100000L;
double falseProbability = 0.01;
bloomFilter.tryInit(phoneNum, falseProbability);
for(int i=0;i<phoneNum;i++){
bloomFilter.add(String.valueOf(i));
}
return bloomFilter;
}
}
/**
* 根据ID获取的用户,设置null值,用于测试布隆过滤器
* @param userId 用户IO
* @return 用户信息
*/
@GetMapping("/getByIdv3/{userId}")
public String getByIdv3(@PathVariable(value = "userId") String userId){
return userService.getByIdv3(userId);
}
@Override
public String getByIdv3(String userId) {
try {
boolean exist = userInfoBloomFilter.contains(userId);
// 过滤器中不存在这个用户,查询数据库,有的话将数据存入redis,没有就在redis缓存一个null
if(!exist){
// 这里从redis查询是因为布隆过滤器为空时,也把null值缓存到了redis
String cachedUserInfo = redisTemplate.opsForValue().get(userId);
// redis还没有设置过null值
if(cachedUserInfo == null){
System.out.println("布隆过滤器不存在数据,查询数据库是否有数据并缓存");
/*
* 正常这里应该还要判断数据库返回值是否为null,不为null则缓存数据到redis,同时把数据添加到过滤器,
* 若为null则把null缓存到redis
* 方便演示我直接缓存一个null作为用户信息
* */
Thread.sleep(100);
redisTemplate.opsForValue().set(userId,"null值");
return "null值";
}else{
return cachedUserInfo;
}
}else{
/*
* 过滤器存判断存在则查询缓存中的数据,这里要注意因为布隆过滤器可能存在误报
* 所以在判断一次缓存值是否存在并进行对应操作
* */
String cachedUserInfo = redisTemplate.opsForValue().get(userId);
if(cachedUserInfo == null){
System.out.println("布隆过滤器误报,查询数据库");
Thread.sleep(100);
// 如果有则返回并设置缓存,更新布隆过滤器,没有就设置缓存为null
redisTemplate.opsForValue().set(userId,"null值");
return "null值";
}
return cachedUserInfo;
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
运行结果如下
缓存击穿是指在高并发情况下,当缓存中某个热点数据失效时,大量的请求会直接打到后端数据库或其他数据源上,导致后端系统的压力瞬间增大,甚至可能导致后端系统崩溃的现象。
缓存击穿通常是由于以下几个原因造成的:
常用的解决方案有两种分别是互斥锁和逻辑过期,互斥锁保证了数据强一致性,但是可能会造成死锁,可用性相对低,更新数据期间,其他线程都会阻塞,逻辑过期提升了可用性,但是不能确保数据一致性
在数据即将过期时,使用一个互斥锁(如 Redis 的 SETNX 或者其他锁机制)来保证只有一个线程去加载数据。其他请求在获取锁失败后,可以选择等待一段时间再重试,或者返回一个临时的结果
设置热点数据在redis中存储时间永不过期,然后使用定时任务,去定时刷新数据,或者检测有变动时把数据更新到缓存
逻辑过期是指给数据新增一个过期时间字段,查询时,发现过期则新开一个线程去更新缓存,并直接返回老的数据
这里演示一下缓存击穿的情况
/**
* 缓存击穿演示
* @param goodsId 商品IO
* @return 商品信息
*/
@GetMapping("/getById/{goodsId}")
public String getById(@PathVariable(value = "goodsId") String goodsId){
return goodsService.getById(goodsId);
}
下述代码中,查询缓存不存在则去数据库加载数据把数据放到缓存中
@Override
public String getById(String goodsId) {
String cachedGoodsInfo = redisTemplate.opsForValue().get(goodsId);
//这里数据是存在的只是缓存中没有,所以直接模拟返回数据
if(!StringUtils.hasLength(cachedGoodsInfo)){
try{
System.out.println("缓存没有数据,查询数据库");
// 模拟从数据库加载
Thread.sleep(100);
// 这里查询到数据后是要更新到缓存的,但是高并发下,可能存在多个线程同时更新的问题
redisTemplate.opsForValue().set(goodsId,"商品信息");
return "商品信息";
}catch (InterruptedException e){
throw new RuntimeException("线程休眠异常");
}
}
return cachedGoodsInfo;
}
jmeter建立一个200的并发,然后发现总共查询了100多次数据库
这里我们用互斥锁来解决缓存击穿的问题,保证只会有一个请求去缓存数据,其他请求等待
private static final String GOODS_LOCK_KEY = "GOODS_LOCK_KEY:";
@Override
public String getByIdv2(String goodsId) {
String cachedGoodsInfo = redisTemplate.opsForValue().get(goodsId);
//数据不存在
if(!StringUtils.hasLength(cachedGoodsInfo)){
// 缓存没有数据
RLock rLock = redissonClient.getLock(GOODS_LOCK_KEY+goodsId);
try{
rLock.lock();
// 再次判断若缓存无数据则才更新缓存,确定只有一个线程会执行
if(!StringUtils.hasLength(redisTemplate.opsForValue().get(goodsId))){
System.out.println("缓存没有数据,获取锁更新数据到缓存");
// 模拟从数据库加载
Thread.sleep(500);
String goodsInfo = "商品信息";
redisTemplate.opsForValue().set(goodsId,goodsInfo);
System.out.println("成功更新数据到缓存");
return goodsInfo;
}
}catch (InterruptedException e){
throw new RuntimeException("线程休眠异常");
}finally {
System.out.println("释放锁");
rLock.unlock();
}
}
return cachedGoodsInfo;
}
逻辑过期代码演示
private static final String GOODS_LOCK_KEY = "GOODS_LOCK_KEY:";
@Override
public String getByIdv3(String goodsId) {
// 查询缓存
String goodsInfoStr = redisTemplate.opsForValue().get(goodsId);
// 缓存不存在
if(!StringUtils.hasLength(goodsInfoStr)){
/*
* 这里需要根据数据是否真实存在,采取缓存穿透或缓存击穿的处理方法
* 这里我默认数据存在,缓存一个数据,然后返回
* */
try {
Thread.sleep(100);
String goodsName = "手机";
cacheData(goodsId,goodsName,2);
return goodsName;
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
// 缓存存在,判断逻辑时间是否过期
GoodsCacheData cachedObj = JSON.toJavaObject(JSON.parseObject(goodsInfoStr), GoodsCacheData.class);
// 当前时间超过逻辑时间
if(LocalDateTime.now().isAfter(cachedObj.getExpireTime())){
RLock lock = redissonClient.getLock(GOODS_LOCK_KEY + goodsId);
try{
lock.lock();
cachedObj = JSON.toJavaObject(JSON.parseObject(goodsInfoStr), GoodsCacheData.class);
if(LocalDateTime.now().isAfter(cachedObj.getExpireTime())){
System.out.println("缓存逻辑过期");
ExecutorService executorService = Executors.newFixedThreadPool(1);
executorService.execute(() -> {
// 重新查询数据,更新缓存
cacheData(goodsId,"手机更新后",2);
});
}
return cachedObj.getGoodsName();
}catch (Exception e){
log.error("异常: ", e);
}finally {
System.out.println("释放锁");
lock.unlock();
}
}
// 这里默认直接返回老的数据值
return cachedObj.getGoodsName();
}
/**
* 商品添加逻辑过期时间并缓存
* @param goodsId 商品id
* @param goodsName 商品名称
* @param expireTime 逻辑过期时间(秒)
*/
private void cacheData(String goodsId,String goodsName,long expireTime){
GoodsCacheData goodsCacheData = new GoodsCacheData();
goodsCacheData.setGoodsId(goodsId);
goodsCacheData.setGoodsName(goodsName);
goodsCacheData.setExpireTime(LocalDateTime.now().plusSeconds(expireTime));
redisTemplate.opsForValue().set(goodsId, JSON.toJSONString(goodsCacheData));
}
缓存雪崩是指在同一时间段内,大量key过期或者redis宕机,导致后端数据库访问量激增
随机时间代码如下
/**
* 设置带有随机过期时间的缓存值
* @param key 缓存键
* @param value 缓存值
* @param baseExpireTime 基础过期时间(秒)
*/
public void setWithRandomExpire(String key, String value, int baseExpireTime) {
// 计算一个随机的过期时间,例如基础时间加上一个随机偏移量
long randomExpire = baseExpireTime + (long) (Math.random() * baseExpireTime);
// 设置缓存并指定过期时间
stringRedisTemplate.opsForValue().set(key, value, randomExpire, TimeUnit.SECONDS);
}