最近在重构云昭云耀项目时,又遇到了一个有趣的问题,就是抢票服务在高并发场景下的问题。
在这篇文章中,我将记录一下我是如何重构抢票服务的,以应对高并发场景下的问题。

抢票部分重构

先来看一看之前的业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package com.yundingshuyuan.recruit.service;

// import ...

@Slf4j
@Service
public class TicketGrabServiceImpl implements TicketGrabService {
// 用户信息Mapper
@Autowired
UserInfoMapper userInfoMapper;
// 宣讲会Mapper
@Autowired
LectureMapper lectureMapper;
// 宣讲会售票Mapper
@Autowired
LectureTicketMapper lectureTicketMapper;
@Autowired
RedisUtils redisUtils;
@Resource
QrCodeUtils qrCodeUtils;
@Autowired
Converter converter;
@Resource
StringRedisTemplate stringRedisTemplate;
@Autowired
RedissonClient redissonClient;


// 抢票并生成二维码
@Override
@Transactional(rollbackFor = {RuntimeException.class, InvalidParameterException.class})
public BasicResultVO<Boolean> ticketGrab(Integer ticketId, Integer userId) {
// 限制单个用户
String garbTicketLock = RedisConstant.GRAB_TICKET_LOCK + userId;
// 获取锁 + 加锁 (锁住校验抢票和添加到redis的逻辑)
RLock lock = redissonClient.getLock(garbTicketLock);
if (lock.tryLock()) {
// 业务实现
try {
// 1. 资格检验
if (isQualify(ticketId, userId)) {
// 2. 抢票
// 获取 Key-> value 修改
String lectureTicketKey = RedisConstant.LECTURE_TICKET_PREFIX + ticketId;
RAtomicLong atomicLong = redissonClient.getAtomicLong(lectureTicketKey);
// 乐观,允许抢票->校验remain
long remainAfterOperate = atomicLong.decrementAndGet();
// 超卖恢复 限制多个用户
if (remainAfterOperate < 0) {
// 恢复库存
atomicLong.set(0);
// 打印
log.info("非常遗憾!user {} 同学没有抢到票!( º﹃º )", userId);
//TODO: 自定义异常替换
throw new RuntimeException(TicketGrabRespStatusEnum.NOT_GET_TICKET.getMsg());
}
log.info("恭喜!user {} 同学抢到了 第{}场宣讲会 的门票(*´▽`*),还剩票数{}张..", userId, ticketId, remainAfterOperate);
// 插入key
stringRedisTemplate.opsForValue().setIfAbsent(RedisConstant.GRAB_USER_RECORD + userId + ":" + ticketId,
"ok", 120, TimeUnit.SECONDS);
return BasicResultVO.success(StrUtil.format(TicketGrabRespStatusEnum.GET_TICKET.getMsg(), ticketId),true);
}
} finally {
lock.unlock();
}
}
log.error(TicketGrabRespStatusEnum.FREQUENT_REQUEST.getMsg());
return new BasicResultVO<>(TicketGrabRespStatusEnum.FREQUENT_REQUEST, false);
}

// 将redis中的数据持久化到mysql中,并删除已经持久化过的数据
@Override
@Scheduled(fixedDelay = 60000) //1分钟
public void redisToMysql() {
//System.currentTimeMillis() - Long.parseLong(requestTime)
long start = System.currentTimeMillis();
log.info("定时任务开始执行");
String prefix = "*grab:";
Set<String> keys = stringRedisTemplate.keys(prefix + "*");
for (String key : keys) {
LectureTicket lectureTicketVo = new LectureTicket();
String[] split = key.split(":");
String userId = split[1];
String lectureId = split[2];
lectureTicketVo.setLectureId(Integer.valueOf(lectureId));
lectureTicketVo.setUserId(Integer.valueOf(userId));
lectureTicketMapper.insert(lectureTicketVo);
stringRedisTemplate.delete("grab:" + userId + ":" + lectureId);
}

long end = System.currentTimeMillis();
log.info("定时任务执行完毕,用时:" + (end - start) + "毫秒");
}

// 资格检验 (校验是否抢过票 | 是否到达抢票时间 | 是否票售空)
public boolean isQualify(Integer ticketId, Integer userId) {
// 是否到达抢票时间
if (!isReachOpenTime(ticketId)) {
//TODO: 自定义异常替换
throw new RuntimeException(TicketGrabRespStatusEnum.TIME_NOT_ALLOW.getMsg());
}
// 不可重复抢票校验
if (isExistTicket(ticketId, userId)) {
//TODO: 自定义异常替换
throw new RuntimeException(TicketGrabRespStatusEnum.TIME_NOT_ALLOW.getMsg());
}
// 票售空检验
if (isNoSurplus(ticketId)) {
//TODO: 自定义异常替换
throw new RuntimeException(TicketGrabRespStatusEnum.TIME_NOT_ALLOW.getMsg());
}
return true;
}

// 校验是否抢过票
public boolean isExistTicket(Integer ticketId, Integer userId) {
// redis exist || mysql exist
if ((stringRedisTemplate.opsForValue().get(RedisConstant.GRAB_USER_RECORD + userId + ":" + ticketId) != null)
|| lectureTicketMapper.checkCount(userId, ticketId) > 0) {
return true;
}
return false;
}

// 校验是否到达抢票时间
public boolean isReachOpenTime(Integer ticketId) {
String garbTimeKey = RedisConstant.GRAB_TIME_PREFIX + ticketId;
// 时间
String grabTimeStr = stringRedisTemplate.opsForValue().get(garbTimeKey);
LocalDateTime grabTime = LocalDateTime.parse(grabTimeStr, DateTimeFormatter.ofPattern(CommonConstant.DATE_TIME_FORMAT_YMDHMS));
LocalDateTime nowTime = LocalDateTime.now();
// 获取两个 LocalDateTime 对象的毫秒表示
long grabT = grabTime.atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli();
long nowT = nowTime.atZone(java.time.ZoneId.systemDefault()).toInstant().toEpochMilli();
// 计算时间差
if (nowT < grabT - 5000) {
return false;
}
return true;
}

// 校验是否票售空
private boolean isNoSurplus(Integer ticketId) {
RAtomicLong atomicLong = redissonClient.getAtomicLong(RedisConstant.LECTURE_TICKET_PREFIX + ticketId);
return atomicLong.get() <= 0;
}

}

根据项目落地后的反馈,这段代码在高并发场景下可能会面临以下问题:

  • 数据库一致性:余票数量使用Redis进行缓存,然后使用定时任务将缓存数据同一存储到MySQL并进行持久化,这样做的弊端就是Redis和MySQL之间的数据同步可能会延迟,导致数据不一致。在某些Timing下,同学抢到票后并不能第一时间获取所抢到的票的详细信息,
    如果此时前端也没有做相应的处理,同学可能会因为忽略的抢票提示信息而误以为自己没有抢到票。同时,定时任务频繁的操作数据库,可能会对数据库造成压力。

  • 错误处理:这里是由于开发周期比较短,没有对异常进行处理,直接抛出了RuntimeException,但这样可能会导致系统不稳定。

  • 可扩展性:没有采用设计模式,代码耦合度高,可扩展性差。
  • Redis键过期:代码为记录用户抢票的Redis键设置了过期时间。然而,它没有处理键在数据持久化到MySQL之前过期的情况 (这里键过期时间2min,数据同步间隔1min,但可能会出现连接超时等情况导致Redis键在同步前过期)。
  • 缺乏速率限制:代码没有实现任何速率限制。在高并发场景下,这可能导致某些同学用脚本请求接口淹没系统。
  • 阻塞操作:代码使用了阻塞操作,如lock.tryLock()。在高并发场景下,这可能导致线程饥饿或死锁。
  • 缺乏异步处理:代码没有使用任何形式的异步处理或队列系统。在高并发场景下,这可能导致响应时间慢,同学体验差。

重构方案

先看一下重构后的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
package com.yundingshuyuan.recruit.biz.ticketservice.service.impl;
// import ...

@Slf4j
@Service
@RequiredArgsConstructor
public class TicketServiceImpl implements TicketService, CommandLineRunner {

// 票务服务
private TicketService ticketService;
// 邮件服务
private final EmailService emailService;
// 用户服务
private final WxUserInfoService userService;
// 抢票校验过滤器
private final AbstractChainContext<GrabTicketReqDTO> garbTicketAbstractChainContext;
// redisson 客户端
private final RedissonClient redissonClient;
// redis 字符串操作工具类
private final StringRedisTemplateUtil stringRedisTemplateUtil;
// order mapper
private final OrderMapper orderMapper;
// lecture mapper
private final LectureMapper lectureMapper;
// 锁的缓存库
private final Cache<String, ReentrantLock> localLockMap = CacheUtil.newTimedCache(TimeUnit.DAYS.toMillis(1));
// kafka template
private final KafkaTemplate<String, String> kafkaTemplate;
// 转换器
private final Converter converter;
// 延时双删线程池
private final ExecutorService cachedThreadPool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>());
// Hash salt
private final String SALT = "YDSYsalthaSH";
/**
* 获取所有宣讲会

* @return
*/
@Override
public List<LectureDO> getAllTickets() {
return lectureMapper.selectList(null);
}

/**
* 抢票
* @param requestParam
* @param loginId
* @return
*/
@Override
public TicketGrabRespDTO grabTicket(GrabTicketReqDTO requestParam, String loginId) {
// 使用责任链模式进行参数验证,包括参数校验等
garbTicketAbstractChainContext.handler(TicketChainMarkEnum.GRAB_TICKET_FILTER.name(), requestParam);

// 限流: 从 Sentinel 中尝试获取一个令牌,如果获取失败(即令牌桶中没有足够的令牌),则抛出 BlockException 异常
Entry entry = null;
try {

entry = SphU.entry("grabTicket");
// 对于每一种票,创建一个本地锁和一个分布式锁
String lockKey = StrUtil.format(RedisKeyConstant.LOCK_GRAB_TICKETS, requestParam.getLectureId());
ReentrantLock localLock = localLockMap.get(lockKey);
if (localLock == null) {
synchronized (TicketService.class) {
// 再次尝试获取锁
if ((localLock = localLockMap.get(lockKey)) == null) {
localLock = new ReentrantLock(true);
localLockMap.put(lockKey, localLock);
}
}
}
RLock distributedLock = redissonClient.getFairLock(lockKey);

// 尝试获取所有的锁,然后执行扣减余票操作
try {
localLock.lock();
distributedLock.lock();
// 判断是否有剩余的票
checkTicketReserved(requestParam);
// 判断用户是否有票
checkOrderExist(requestParam, loginId);
// 执行抢票
return ticketService.excuteGrabTicket(requestParam, loginId);
} finally {
// 释放所有的锁
try {
localLock.unlock();
} catch (Throwable ignored) {
}
try {
distributedLock.unlock();
} catch (Throwable ignored) {
}
}
} catch (BlockException ex) {
throw new ServiceException("系统繁忙,请稍后再试");
} finally {
if (entry != null) {
entry.exit();
}
}
}

/**
* 判断是否有余票
* @param requestParam
* @return
*/
private void checkTicketReserved(GrabTicketReqDTO requestParam) {
// 判断是否有余票
long cnt = getTicketReservedCount(requestParam);
if (cnt <= 0) {
throw new ServiceException("已无余票");
}
}

/**
* 判断用户是否已经抢到票
* @param requestParam
* @param loginId
*/
private void checkOrderExist(GrabTicketReqDTO requestParam, String loginId) {
// 从Redis中获取订单的存在性
RSetCache<String> set = redissonClient.getSetCache(StrUtil.format(RedisKeyConstant.TICKET_ORDERS, requestParam.getLectureId()));
if (!set.contains(loginId)) {
// 如果Redis中没有这个信息,就查询数据库
TicketOrder ticketOrder = orderMapper.selectOne(new QueryWrapper<TicketOrder>()
.eq("user_id", loginId).eq("lecture_id", requestParam.getLectureId()));
if (ticketOrder != null) {
// 数据库中存在,加入缓存,并设置过期时间为1小时
set.add(loginId, 1, TimeUnit.HOURS);
throw new ServiceException("已存在订单");
}
} else {
// 缓存中存在
throw new ServiceException("已存在订单");
}
}

/**
* 执行抢票
* @param requestParam
* @param loginId
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public TicketGrabRespDTO excuteGrabTicket(GrabTicketReqDTO requestParam, String loginId) {
// 生成抢票的 key
String ticketKey = StrUtil.format(RedisKeyConstant.LOCK_TICKET_CNT, requestParam.getLectureId());
// 删除缓存
redissonClient.getAtomicLong(ticketKey).delete();

// 执行数据库操作,例如保存订单信息到数据库
OrderRespDTO orderRespDTO = ticketService.saveOrderAndUpdateTicketCount(requestParam);

// 通过消息队列 通知邮件服务 发送邮件通知用户抢票成功 将requestParam 传递给邮件服务
if (orderRespDTO != null) {
// 如果抢票成功,就发送邮件
log.info("抢票成功,发送邮件通知");
TicketEmailInfoDTO ticketEmailInfoDTO = new TicketEmailInfoDTO(requestParam, orderRespDTO.getLectureDO(), loginId);
kafkaTemplate.send("yzyy-ticket-service_send-mail_p1", JSON.toJSONString(ticketEmailInfoDTO));
}
// 创建响应消息
TicketGrabRespDTO ticketGrabRespDTO = new TicketGrabRespDTO();
ticketGrabRespDTO.setOrderSn(orderRespDTO.getOrderSn());

// 延迟一段时间后再次删除缓存
cachedThreadPool.execute(new delCacheByThread(ticketKey));
// 如果删除缓存没有成功,发送到消息队列通知消息队列进行删除缓存
return ticketGrabRespDTO;
}

/**
* 获取剩余票数
*
* @param param
*/
public long getTicketReservedCount(GrabTicketReqDTO param) {
String key = StrUtil.format(RedisKeyConstant.LOCK_TICKET_CNT, param.getLectureId());
// 从 Redis 中获取剩余票数
long ticketCount = redissonClient.getAtomicLong(key).get();
if (ticketCount == 0) {
// 如果 Redis 中没有对应的键,就查询数据库中的剩余票数
LectureDO lecture = lectureMapper.selectById(param.getLectureId());
if (lecture == null) {
throw new ServiceException("请求的宣讲会不存在");
}
ticketCount = lecture.getRemainingTickets();
// 将剩余票数存储到 Redis 中
redissonClient.getAtomicLong(key).set(ticketCount);
}
return ticketCount;
}

/**
* 保存订单并更新票数
*
* @param param
* @return
*/
@Transactional
public OrderRespDTO saveOrderAndUpdateTicketCount(GrabTicketReqDTO param) {
String loginId = StpUtil.getLoginId("");

// 创建一个新的订单
TicketOrder ticketOrder = new TicketOrder();
ticketOrder.setUserId(loginId);
ticketOrder.setLectureId(param.getLectureId());
// 保存订单到数据库
orderMapper.insert(ticketOrder);
// 查询票
LectureDO lecture = lectureMapper.selectById(param.getLectureId());
if (lecture != null && lecture.getRemainingTickets() > 0) {
// 如果票存在且剩余票数大于0,就减少剩余票数
lecture.setRemainingTickets(lecture.getRemainingTickets() - 1);
// 更新票的信息,使用乐观锁
if (lectureMapper.updateById(lecture) == 0) {
// 如果更新失败,说明乐观锁生效,抛出异常
log.error("乐观锁失效,更新失败");
throw new ServiceException("乐观锁失效,更新失败");
}
log.info("USER : " + loginId +
" 抢到了 LETURE : " +
param.getLectureId() +
" 在 TIME : " +
LocalDateTime.now() +
" 剩余票数 : " + (lecture.getRemainingTickets()));
return new OrderRespDTO(ticketOrder.getId(), lecture);
} else {
log.error("票不存在或已售罄");
// 如果票不存在或剩余票数为0,就抛出异常
throw new ServiceException("票不存在或已售罄");
}
}

/**
* 监听邮件发送消息队列
* @param ticketInfoJson
*/
@Idempotent(
uniqueKeyPrefix = "yzyy_ticket:lock_send_email:",
key = "#ticketInfoJson.hashCode()",
type = IdempotentTypeEnum.SPEL,
scene = IdempotentSceneEnum.MQ,
keyTimeout = 7200L
)
@KafkaListener(topics = "yzyy-ticket-service_send-mail_p1", groupId = "mail-group")
public void sendTicketInfoEmail(String ticketInfoJson) {
// 尝试将 JSON 字符串解析为 GrabTicketReqDTO 对象
TicketEmailInfoDTO reqDto = JSON.parseObject(ticketInfoJson, TicketEmailInfoDTO.class);
// 获取接收者的邮箱地址
String toEmail = userService.queryUserInfo(reqDto.getLoginId()).getEmail();
// 校验接收者的邮箱地址是否为空
if (toEmail == null || toEmail.isEmpty()) {
throw new IllegalArgumentException("接收者的邮箱地址为空");
}
// 创建邮件内容
LectureDO lectureDO = reqDto.getLectureDO();
MailContent content = MailContent.builder().subjection(TICKET_MAIL_TEMPLATE.getSubjection())
.content(StrUtil.format(TICKET_MAIL_TEMPLATE.getContent(), lectureDO.getLectureTheme(), lectureDO.getRemainingTickets()))
.build();
// 发送邮件
emailService.sendEmail(content, "云昭云耀<ruafafa6170@163.com>", toEmail, true);
log.info("向{}邮件发送成功", toEmail);
}

/**
* 发布宣讲会门票
* @param requestParam
*/
@Override
@Transactional
public void releaseLectureTickets(LectureInfoReqDTO requestParam) {
LectureDO convert = converter.convert(requestParam, LectureDO.class);
lectureMapper.insert(convert);
}

@Override
public Boolean cancelTicket() {

return null;
}

/**
* 获取接口验证值
* @param lectureId
* @return
*/
@Override
public String getVerifyHash(String lectureId) {
// 验证是否在抢购时间内
LectureDO lectureDO = lectureMapper.selectOne(new QueryWrapper<LectureDO>().eq("id", lectureId));
// 检查宣讲会合法性
if (lectureDO == null) {
throw new ServiceException("宣讲会不存在");
}
LocalDateTime grabTime = lectureDO.getGrabTime();
LocalDateTime now = LocalDateTime.now();
if (now.isBefore(grabTime)) {
throw new ServiceException("不在抢购时间内");
}
String loginId = StpUtil.getLoginId("");
// 生成hash
String verifyHash = DigestUtils.md5DigestAsHex((loginId + lectureId + SALT).getBytes());

// 将hash和宣讲会信息存入redis
String hashKey = StrUtil.format(RedisKeyConstant.VERIFY_HASH_KEY, lectureId, loginId);
stringRedisTemplateUtil.put(hashKey, verifyHash, 1, TimeUnit.HOURS);

return verifyHash;
}

@Override
public Ticket queryTicket() {
return null;
}

@Override
public Boolean grantTicket() {
return null;
}

@Override
public void run(String... args) throws Exception {
ticketService = SpringUtil.getBean(TicketService.class);
}

/**
* 删除缓存的线程
*/
private class delCacheByThread implements Runnable {
private final String key;

public delCacheByThread(String key) {
this.key = key;
}

public void run() {
try {
Thread.sleep(1000);
stringRedisTemplateUtil.delete(key);
} catch (Exception e) {
log.error("delCacheByThread执行出错", e);
}
}
}
}

1. 确保数据库一致性

重构后的代码首要解决的就是数据库一致性问题,为了让同学们能够第一时间确认自己是否抢到票,
更好的体验紧张刺激的抢票流程, 我们必须废弃之前使用定时任务将Redis中的数据同步到MySQL的方案,选用更为稳妥的方案。

缓存与数据库双写数据一致性的解决方案有很多种:

  • 读写分离:在这种模式下,所有的写操作(增、删、改)都先写数据库,然后再更新缓存。读操作则直接从缓存中读取。如果缓存中没有数据,再去数据库中读取并更新到缓存中。
  • 延时双删:在更新数据库后,先删除缓存,然后延时一段时间再次删除。这种方式可以防止在更新数据库后查询操作将旧的数据写入缓存的情况。
  • 消息队列:使用消息队列来保证数据库和缓存的一致性。所有的写操作都写入消息队列,然后有一个消费者来消费消息,执行数据库的写操作和缓存的更新。
  • 数据库触发器:在数据库中设置触发器,当数据发生变化时,触发器自动更新缓存。
  • 分布式锁:使用分布式锁来保证在更新数据库和缓存时的原子性。在获取到锁后,先更新数据库,然后更新缓存,最后释放锁

针对本秒杀项目需要明确的是,同学频繁访问访问接口,发起的读请求将会远大于写请求,这是因为可发放的票只占抢票同学的很小一部分。
书院每年宣讲会抢票人数大概有2000人,而报告厅仅能容纳100人,发放100张票,大部分人访问接口后已经没有余票,不会再进行写操作。
对于系统余票数据多读取少写入的特点,需要使用缓存降低对数据库的压力,并尽量使得过程缓存数据库一致,保证缓存和数据库的最终一致性。

所以在重构时采用了放弃原有的异步同步缓存和数据库的方式,并使用 缓存双删 的方案保证数据一致性,关于缓存双删的详细内容,可以查看这篇文章
秒杀系统实战(四)| 缓存与数据库双写问题的争议

以下是 缓存双删 的实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
/**
* 执行抢票
* @param requestParam
* @param loginId
* @return
*/
@Override
@Transactional(rollbackFor = Exception.class)
public TicketGrabRespDTO excuteGrabTicket(GrabTicketReqDTO requestParam, String loginId) {
// 生成抢票的 key
String ticketKey = StrUtil.format(RedisKeyConstant.LOCK_TICKET_CNT, requestParam.getLectureId());
// 删除缓存
redissonClient.getAtomicLong(ticketKey).delete();

// 执行数据库操作,例如保存订单信息到数据库
OrderRespDTO orderRespDTO = ticketService.saveOrderAndUpdateTicketCount(requestParam);

// 通过消息队列 通知邮件服务 发送邮件通知用户抢票成功 将requestParam 传递给邮件服务
if (orderRespDTO != null) {
// 如果抢票成功,就发送邮件
log.info("抢票成功,发送邮件通知");
TicketEmailInfoDTO ticketEmailInfoDTO = new TicketEmailInfoDTO(requestParam, orderRespDTO.getLectureDO(), loginId);
kafkaTemplate.send("yzyy-ticket-service_send-mail_p1", JSON.toJSONString(ticketEmailInfoDTO));
}
// 创建响应消息
TicketGrabRespDTO ticketGrabRespDTO = new TicketGrabRespDTO();
ticketGrabRespDTO.setOrderSn(orderRespDTO.getOrderSn());

// 延迟一段时间后再次删除缓存
cachedThreadPool.execute(new delCacheByThread(ticketKey));
// 如果删除缓存没有成功,发送到消息队列通知消息队列进行删除缓存
return ticketGrabRespDTO;
}

TicketServiceImpl实现了TicketService接口,涵盖了抢票系统的核心功能。grabTicket方法通过责任链模式进行参数验证,然后尝试从Sentinel获取令牌,若失败则抛出异常。对每一种票,创建本地和分布式锁,执行扣减余票操作。excuteGrabTicket方法生成抢票key,执行数据库操作并发送成功邮件通知。getTicketReservedCount从Redis获取剩余票数,否则查询数据库并存入Redis。saveOrderAndUpdateTicketCount保存订单并更新票数,sendTicketInfoEmail监听邮件消息队列发送邮件,releaseLectureTickets发布宣讲会门票。getVerifyHash验证抢购时间并生成hash存入Redis。run方法实现CommandLineRunner接口,用于应用程序启动后执行。delCacheByThread是内部类,用于新线程中删除缓存。

2. 防止单用户频繁请求接口

在高并发场景下,同学可能会用脚本请求接口淹没系统,为了防止这种情况的发生,我们要对单个同学频繁请求。
这里通过对接口进行幂等完成,幂等是指对同一个接口的多次请求,返回的结果是一致的,不会因为多次请求而产生副作用。
幂等标志为用户请求的唯一标识,可以是用户ID、订单号等,通过幂等标志判断是否重复请求,若重复则直接返回结果,不再执行后续操作。

1
2
3
4
5
6
7
8
9
10
11
12
@Idempotent(
uniqueKeyPrefix = "yzyy_ticket:lock_grab-tickets:",
key = "T(cn.dev33.satoken.stp.StpUtil).getLoginId('')",
message = "正在执行抢票流程,请稍后...",
scene = IdempotentSceneEnum.RESTAPI,
type = IdempotentTypeEnum.SPEL
)
@PostMapping("/grab")
public Result<TicketGrabRespDTO> grabTicket(@RequestBody GrabTicketReqDTO requestParam) {
String loginId = StpUtil.getLoginId("");
return Results.success(ticketService.grabTicket(requestParam, loginId));
}

有关该幂等注解的实现,可以查看一个注解实现接口幂等,这样才优雅!
这段代码使用了一个名为 @Idempotent 的注解来实现接口的幂等性。用于处理抢票请求。注解的参数定义了如何实现幂等性:

  • uniqueKeyPrefix:这是幂等性键的前缀,用于区分不同的接口或操作。
  • key:这是幂等性键的主体部分,通常是一个能唯一标识请求的值。在这里,它是通过 StpUtil.getLoginId(‘’) 获取的当前登录用户的 ID。
  • message:当发生重复请求时,返回给用户的消息。
  • scene:幂等性的应用场景,这里是 REST API。
  • type:幂等性键的类型,这里是 SPEL(Spring Expression Language),表示 key 参数是一个 SPEL 表达式。
    当一个用户尝试抢票时,系统会根据 uniqueKeyPrefix 和 key 生成一个幂等性键,然后检查这个键是否已经存在。如果键已经存在,说明这个用户已经发起过抢票请求,系统就直接返回 message 参数指定的消息,而不会再次执行抢票操作。这样就实现了接口的幂等性,防止了重复提交请求。

3. 限流

在高并发场景下,同学可能会用脚本请求接口淹没系统,为了防止这种情况的发生,我们要对请求限流。
常见的限流算法有令牌桶算法和漏桶算法,这里我不过多赘述,这里通过Sentinel实现限流,Sentinel是阿里巴巴开源的一款流量控制组件,可以实现限流、熔断、降级等功能。
这里通过Sentinel实现限流,Sentinel是阿里巴巴开源的一款流量控制组件,可以实现限流、熔断、降级等功能。

1
2
3
4
5
6
7
8
9
10
11
12
// 限流: 从 Sentinel 中尝试获取一个令牌,如果获取失败(即令牌桶中没有足够的令牌),则抛出 BlockException 异常
Entry entry = null;
try {
entry = SphU.entry("grabTicket");
// ... 其他代码
} catch (BlockException ex) {
throw new ServiceException("系统繁忙,请稍后再试");
} finally {
if (entry != null) {
entry.exit();
}
}

4. 防止超卖

在剩余票数很少的情况下,请求并发量很大,由于没有正确的控制并发操作,多个线程同时操作同一份资源,可能会出现超卖的情况,为了防止这种情况的发生,我们要对剩余票数进行校验。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55

// 尝试获取所有的锁,然后执行扣减余票操作
try {
localLock.lock();
distributedLock.lock();
// 判断是否有剩余的票
checkTicketReserved(requestParam);
// 判断用户是否有票
checkOrderExist(requestParam, loginId);
// 执行抢票
return ticketService.excuteGrabTicket(requestParam, loginId);
} finally {
// 释放所有的锁
try {
localLock.unlock();
} catch (Throwable ignored) {}
try {
distributedLock.unlock();
} catch (Throwable ignored) {}
}

@Transactional
public OrderRespDTO saveOrderAndUpdateTicketCount(GrabTicketReqDTO param) {
String loginId = StpUtil.getLoginId("");

// 创建一个新的订单
TicketOrder ticketOrder = new TicketOrder();
ticketOrder.setUserId(loginId);
ticketOrder.setLectureId(param.getLectureId());
// 保存订单到数据库
orderMapper.insert(ticketOrder);
// 查询票
LectureDO lecture = lectureMapper.selectById(param.getLectureId());
if (lecture != null && lecture.getRemainingTickets() > 0) {
// 如果票存在且剩余票数大于0,就减少剩余票数
lecture.setRemainingTickets(lecture.getRemainingTickets() - 1);
// 更新票的信息,使用乐观锁
if (lectureMapper.updateById(lecture) == 0) {
// 如果更新失败,说明乐观锁生效,抛出异常
log.error("乐观锁失效,更新失败");
throw new ServiceException("乐观锁失效,更新失败");
}
log.info("USER : " + loginId +
" 抢到了 LETURE : " +
param.getLectureId() +
" 在 TIME : " +
LocalDateTime.now() +
" 剩余票数 : " + (lecture.getRemainingTickets()));
return new OrderRespDTO(ticketOrder.getId(), lecture);
} else {
log.error("票不存在或已售罄");
// 如果票不存在或剩余票数为0,就抛出异常
throw new ServiceException("票不存在或已售罄");
}
}

需要注意的是,这里的乐观锁用于在并发环境下,多个线程同时修改同一份数据导致的数据不一致问题。
在更新数据时,乐观锁会检查数据是否被其他线程修改,如果数据已经被其他线程修改,那么这次更新就会失败。
但是并不能解决防止超卖的问题,因为乐观锁只能保证数据的一致性,不能保证数据的准确性。
例如,如果两个线程几乎同时读取到了剩余票数,并且都判断剩余票数大于0,然后都尝试去更新票数,即使使用了乐观锁,也可能会出现超卖的情况。
这是因为乐观锁只能在更新数据时检查数据是否被其他线程修改,而不能防止多个线程同时读取并使用旧的数据。
所以,我们需要给剩余票数的更新操作加锁,保证只有一个线程能够执行更新操作,从而避免超卖的情况。

压力测试

有关压力测试的内容,可以查看我的压力测试实战Jemeter博客