各位用户为了找寻关于基于Redis延迟队列的实现代码的资料费劲了很多周折。这里教程网为您整理了关于基于Redis延迟队列的实现代码的相关资料,仅供查阅,以下为您介绍关于基于Redis延迟队列的实现代码的详细内容
使用场景
工作中大家往往会遇到类似的场景:
1.对于红包场景,账户 A 对账户 B 发出红包通常在 1 天后会自动归还到原账户。
2.对于实时支付场景,如果账户 A 对商户 S 付款 100 元,5秒后没有收到支付方回调将自动取消订单。
解决方案分析
方案一:
采用通过定时任务采用数据库/非关系型数据库轮询方案。
优点:
1. 实现简单,对于项目前期这样是最容易的解决方案。
缺点:
1. DB 有效使用率低,需要将一部分的数据库的QPS分配给 JOB 的无效轮询。
2. 服务资源浪费,因为轮询需要对所有的数据做一次 SCAN 扫描 JOB 服务的资源开销很大。
方案二:
采用延迟队列:
优点:
1. 服务的资源使用率较高,能够精确的实现超时任务的执行。
2. 减少 DB 的查询次数,能够降低数据库的压力
缺点:
1. 对于延迟队列来说本身设计比较复杂,目前没有通用的比较好过的方案。
基于 Redis 的延迟队列实现
基于以上的分析,我决定通过 Redis 来实现分布式队列。
设计思路:
1. 第一步将需要发放的消息发送到延迟队列中。
2. 延迟队列将数据存入 Redis 的 ZSet 有序集合中score 为当前时间戳,member 存入需要发送的数据。
3. 添加一个 schedule 来进行对 Redis 有序队列的轮询。
4. 如果到达达到消息的执行时间,那么就进行业务的执行。
5. 如果没有达到消息的执行是将,那么消息等待下轮执行。
实现步骤:
由于本处篇幅有限,所以只列举部分代码,完整的代码可以在本文最后访问 GitHub 获取。由于本人阅历/水平有限,如有建议/或更正欢迎留言或提问。先在此谢谢大家驻足阅读
需要注意的问题:
单个 Redis 命令的执行是原子性的,但 Redis 没有在事务上增加任何维持原子性的机制,所以 Redis 事务的执行并不是原子性的。
事务可以理解为一个打包的批量执行脚本,但批量指令并非原子化的操作,中间某条指令的失败不会导致前面已做指令的回滚,也不会造成后续的指令不做。
我们可以通过 Redis 的 eval 命令来执行 lua 脚本来保证原子性实现Redis的事务。
实现步骤如下:
1. 延迟队列接口
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23/**
* 延迟队列
*
* @author zhengsh
* @date 2020-03-27
*/
public
interface
RedisDelayQueue<E
extends
DelayMessage> {
String META_TOPIC_WAIT =
"delay:meta:topic:wait"
;
String META_TOPIC_ACTIVE =
"delay:meta:topic:active"
;
String TOPIC_ACTIVE =
"delay:active:9999"
;
/**
* 拉取消息
*/
void
poll();
/**
* 推送延迟消息
*
* @param e
*/
void
push(E e);
}
2. 延迟队列消息
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30/**
* 消息体
*
* @author zhengsh
* @date 2020-03-27
*/
@Setter
@Getter
public
class
DelayMessage {
/**
* 消息唯一标识
*/
private
String id;
/**
* 消息主题
*/
private
String topic =
"default"
;
/**
* 具体消息 json
*/
private
String body;
/**
* 延时时间, 格式为时间戳: 当前时间戳 + 实际延迟毫秒数
*/
private
Long delayTime = System.currentTimeMillis() + 30000L;
/**
* 消息发送时间
*/
private
LocalDateTime createTime;
}
3. 延迟队列实现
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63/**
* 延迟队列实现
*
* @author zhengsh
* @date 2020-03-27
*/
@Component
public
class
RedisDelayQueueImpl<E
extends
DelayMessage>
implements
RedisDelayQueue<E> {
private
Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private
StringRedisTemplate redisTemplate;
@Override
public
void
poll() {
// todo
}
/**
* 发送消息
*
* @param e
*/
@SneakyThrows
@Override
public
void
push(E e) {
try
{
String jsonStr = JSON.toJSONString(e);
String topic = e.getTopic();
String zkey = String.format(
"delay:wait:%s"
, topic);
String u =
"redis.call('sadd', KEYS[1], ARGV[1])n"
+
"redis.call('zadd', KEYS[2], ARGV[2], ARGV[3])n"
+
"return 1"
;
Object[] keys =
new
Object[]{serialize(META_TOPIC_WAIT), serialize(zkey)};
Object[] values =
new
Object[]{ serialize(zkey), serialize(String.valueOf(e.getDelayTime())),serialize(jsonStr)};
Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
Object nativeConnection = connection.getNativeConnection();
if
(nativeConnection
instanceof
RedisAsyncCommands) {
RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
return
(Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);
}
else
if
(nativeConnection
instanceof
RedisAdvancedClusterAsyncCommands) {
RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
return
(Long) commands.getStatefulConnection().sync().eval(u, ScriptOutputType.INTEGER, keys, values);
}
return
0L;
});
logger.info(
"延迟队列[1],消息推送成功进入等待队列({}), topic: {}"
, result !=
null
&& result >
0
, e.getTopic());
}
catch
(Throwable t) {
t.printStackTrace();
}
}
private
byte
[] serialize(String key) {
RedisSerializer<String> stringRedisSerializer =
(RedisSerializer<String>) redisTemplate.getKeySerializer();
//lettuce连接包下序列化键值,否则无法用默认的ByteArrayCodec解析
return
stringRedisSerializer.serialize(key);
}
}
4. 定时任务
? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70/**
* 分发任务
*/
@Component
public
class
DistributeTask {
private
static
final
String LUA_SCRIPT;
private
Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private
StringRedisTemplate redisTemplate;
static
{
StringBuilder sb =
new
StringBuilder(
128
);
sb.append(
"local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, 1)n"
);
sb.append(
"if(next(val) ~= nil) thenn"
);
sb.append(
" redis.call('sadd', KEYS[2], ARGV[2])n"
);
sb.append(
" redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)n"
);
sb.append(
" for i = 1, #val, 100 don"
);
sb.append(
" redis.call('rpush', KEYS[3], unpack(val, i, math.min(i+99, #val)))n"
);
sb.append(
" endn"
);
sb.append(
" return 1n"
);
sb.append(
"endn"
);
sb.append(
"return 0"
);
LUA_SCRIPT = sb.toString();
}
/**
* 2秒钟扫描一次执行队列
*/
@Scheduled
(cron =
"0/5 * * * * ?"
)
public
void
scheduledTaskByCorn() {
try
{
Set<String> members = redisTemplate.opsForSet().members(META_TOPIC_WAIT);
assert
members !=
null
;
for
(String k : members) {
if
(!redisTemplate.hasKey(k)) {
// 如果 KEY 不存在元数据中删除
redisTemplate.opsForSet().remove(META_TOPIC_WAIT, k);
continue
;
}
String lk = k.replace(
"delay:wait"
,
"delay:active"
);
Object[] keys =
new
Object[]{serialize(k), serialize(META_TOPIC_ACTIVE), serialize(lk)};
Object[] values =
new
Object[]{serialize(String.valueOf(System.currentTimeMillis())), serialize(lk)};
Long result = redisTemplate.execute((RedisCallback<Long>) connection -> {
Object nativeConnection = connection.getNativeConnection();
if
(nativeConnection
instanceof
RedisAsyncCommands) {
RedisAsyncCommands commands = (RedisAsyncCommands) nativeConnection;
return
(Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);
}
else
if
(nativeConnection
instanceof
RedisAdvancedClusterAsyncCommands) {
RedisAdvancedClusterAsyncCommands commands = (RedisAdvancedClusterAsyncCommands) nativeConnection;
return
(Long) commands.getStatefulConnection().sync().eval(LUA_SCRIPT, ScriptOutputType.INTEGER, keys, values);
}
return
0L;
});
logger.info(
"延迟队列[2],消息到期进入执行队列({}): {}"
, result !=
null
&& result >
0
, TOPIC_ACTIVE);
}
}
catch
(Throwable t) {
t.printStackTrace();
}
}
private
byte
[] serialize(String key) {
RedisSerializer<String> stringRedisSerializer =
(RedisSerializer<String>) redisTemplate.getKeySerializer();
//lettuce连接包下序列化键值,否则无法用默认的ByteArrayCodec解析
return
stringRedisSerializer.serialize(key);
}
}
GitHub 地址
https://github.com/zhengsh/redis-delay-queue
参考地址
1.https://www.runoob.com/redis/redis-transactions.html
到此这篇关于基于Redis延迟队列的实现代码的文章就介绍到这了,更多相关Redis 延迟队列内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
原文链接:https://www.cnblogs.com/cbread/p/12630945.html