av一区二区在线观看_亚洲男人的天堂网站_日韩亚洲视频_在线成人免费_欧美日韩精品免费观看视频_久草视

您的位置:首頁技術文章
文章詳情頁

Java實現Redis延時消息隊列

瀏覽:103日期:2023-02-11 15:31:28
目錄什么是延時任務延時任務的特點實現思路:代碼實現1.消息模型2.RedisMq 消息隊列實現類3.消息生產者4.消息消費者5. 消息執接口6. 任務類型的實現類:可以根據自己的情況去實現對應的隊列需求 什么是延時任務

延時任務,顧名思義,就是延遲一段時間后才執行的任務。舉個例子,假設我們有個發布資訊的功能,運營需要在每天早上7點準時發布資訊,但是早上7點大家都還沒上班,這個時候就可以使用延時任務來實現資訊的延時發布了。只要在前一天下班前指定第二天要發送資訊的時間,到了第二天指定的時間點資訊就能準時發出去了。如果大家有運營過公眾號,就會知道公眾號后臺也有文章定時發送的功能。總而言之,延時任務的使用還是很廣泛的。

延時任務的特點 時間有序性 時間具體性 任務中攜帶詳細的信息 ,通常包括 任務ID, 任務的類型 ,時間點。實現思路:

將整個Redis當做消息池,以kv形式存儲消息,key為id,value為具體的消息body使用ZSET做優先隊列,按照score維持優先級(用當前時間+需要延時的時間作為score)輪詢ZSET,拿出score比當前時間戳大的數據(已過期的)根據id拿到消息池的具體消息進行消費消費成功,刪除改隊列和消息消費失敗,讓該消息重新回到隊列

代碼實現

Java實現Redis延時消息隊列

1.消息模型

import lombok.Data;import lombok.experimental.Accessors;import javax.validation.constraints.NotNull;import java.io.Serializable;/** * Redis 消息隊列中的消息體 * @author shikanatsu */@Data@Accessors(chain = true)public class RedisMessage implements Serializable { /** 消息隊列組 **/ private String group; /** * 消息id */ private String id; /** * 消息延遲/ 秒 */ @NotNull(message = '消息延時時間不能為空') private long delay; /** * 消息存活時間 單位:秒 */ @NotNull(message = '消息存活時間不能為空') private int ttl; /** * 消息體,對應業務內容 */ private Object body; /** * 創建時間,如果只有優先級沒有延遲,可以設置創建時間為0 * 用來消除時間的影響 */ private long createTime;}2.RedisMq 消息隊列實現類

package com.shixun.base.redisMq;import com.shixun.base.jedis.service.RedisService;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * Redis消息隊列 * * @author shikanatsu */@Componentpublic class RedisMq { /** * 消息池前綴,以此前綴加上傳遞的消息id作為key,以消息{@link MSG_POOL} * 的消息體body作為值存儲 */ public static final String MSG_POOL = 'Message:Pool:'; /** * zset隊列 名稱 queue */ public static final String QUEUE_NAME = 'Message:Queue:';// private static final int SEMIH = 30 * 60; @Resource private RedisService redisService; /** * 存入消息池 * * @param message * @return */ public boolean addMsgPool(RedisMessage message) {if (null != message) { redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl()); return true;}return false; } /** * 從消息池中刪除消息 * * @param id * @return */ public void deMsgPool(String group, String id) {redisService.remove(MSG_POOL + group + id); } /** * 向隊列中添加消息 * * @param key * @param score 優先級 * @param val * @return 返回消息id */ public void enMessage(String key, long score, String val) {redisService.zsset(key, val, score); } /** * 從隊列刪除消息 * * @param id * @return */ public boolean deMessage(String key, String id) {return redisService.zdel(key, id); }}3.消息生產者

import cn.hutool.core.convert.Convert;import cn.hutool.core.lang.Assert;import cn.hutool.core.util.IdUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import org.springframework.validation.annotation.Validated;import javax.annotation.Resource;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.TimeUnit;/** * 消息生產者 * * @author shikanatsu */@Componentpublic class MessageProvider { static Logger logger = LoggerFactory.getLogger(MessageProvider.class); @Resource private RedisMq redisMq; SimpleDateFormat sdf = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss'); public boolean sendMessage(@Validated RedisMessage message) {Assert.notNull(message);//The priority is if there is no creation time//message.setCreateTime(System.currentTimeMillis());message.setId(IdUtil.fastUUID());Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);try { redisMq.addMsgPool(message); redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId()); logger.info('RedisMq發送消費信息{},當前時間:{},消費時間預計{}',message.toString(),new Date(),sdf.format(delayTime));}catch (Exception e){ e.printStackTrace(); logger.error('RedisMq 消息發送失敗,當前時間:{}',new Date()); return false;}return true; }}4.消息消費者

/** * Redis消息消費者 * @author shikanatsu */@Componentpublic class RedisMqConsumer { private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class); @Resource private RedisMq redisMq; @Resource private RedisService redisService; @Resource private MessageProvider provider; SimpleDateFormat sdf = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss'); //@Scheduled(cron = '*/1 * * * * ? ') /** Instead of a thread loop, you can use Cron expressions to perform periodic tasks */ public void baseMonitor(RedisMqExecute mqExecute){String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName();//The query is currently expiredSet<Object> set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis());if (null != set) { long current = System.currentTimeMillis(); for (Object id : set) {long score = redisService.getScore(queueName, id.toString()).longValue();//Once again the guarantee has expired , And then perform the consumptionif (current >= score) { String str = ''; RedisMessage message = null; String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName(); try {message = (RedisMessage)redisService.get(msgPool + id.toString());log.debug('RedisMq:{},get RedisMessage success now Time:{}',str,sdf.format(System.currentTimeMillis()));if(null==message){ return;}//Do something ; You can add a judgment here and if it fails you can add it to the queue againmqExecute.execute(message); } catch (Exception e) {e.printStackTrace();//If an exception occurs, it is put back into the queue// todo: If repeated, this can lead to repeated cycleslog.error('RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}',new Date());provider.sendMessage(message); } finally {redisMq.deMessage(queueName, id.toString());redisMq.deMsgPool(message.getGroup(),id.toString()); }} }} }}5. 消息執接口

/** * @author shikanatsu */public interface RedisMqExecute { /** * 獲取隊列名稱 * @return */ public String getQueueName(); /** * 統一的通過執行期執行 * @param message * @return */ public boolean execute(RedisMessage message); /** * Perform thread polling */ public void threadPolling();}6. 任務類型的實現類:可以根據自己的情況去實現對應的隊列需求

/** * 訂單執行 * * @author shikanatsu */@Servicepublic class OrderMqExecuteImpl implements RedisMqExecute { private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class); public final static String name = 'orderPoll:'; @Resource private RedisMqConsumer redisMqConsumer; private RedisMqExecute mqExecute = this; @Resource private OrderService orderService; @Override public String getQueueName() {return name; } @Override /** * For the time being, only all orders will be processed. You can change to make orders */ public boolean execute(RedisMessage message) {logger.info('Do orderMqPoll ; Time:{}',new Date()); //Do return true; } @Override /** 通過線程去執行輪詢的過程,時間上可以自由控制 **/ public void threadPolling() {ThreadUtil.execute(() -> { while (true) {redisMqConsumer.baseMonitor(mqExecute);ThreadUtil.sleep(5, TimeUnit.MICROSECONDS); }}); }}

使用事例 1. 實現RedisMqExecute 接口 創建對應的輪詢或者采取定時器的方式執行 和實現具體的任務。 2. 通過MessageProvider 實現相對應的消息服務和綁定隊列組,通過隊列組的方式執行。 3. 提示: 采取線程的方式需要在項目啟動過程中執行,采取定時器或者調度的方式可以更加動態的調整。

到此這篇關于Java實現Redis延時消息隊列的文章就介紹到這了,更多相關Java Redis延時消息隊列內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!

標簽: Java
相關文章:
主站蜘蛛池模板: sese视频在线观看 | 999免费视频 | 日本免费黄色 | 国产精品中文字幕在线 | 久久精品国产一区 | 亚洲三区在线 | 成人影音 | 国内精品久久久久久影视8 最新黄色在线观看 | 午夜寂寞福利视频 | 精品啪啪 | yiren22 亚洲综合 | 亚洲视频二区 | 亚洲视频不卡 | 亚洲欧美日韩中文字幕一区二区三区 | 91精品国产91久久久久久 | 曰韩一二三区 | 精品在线一区二区三区 | 国产精品一区在线播放 | 午夜精品 | 日韩欧美在线一区 | 久久久精品视频免费看 | 国产精品久久久久一区二区三区 | 亚洲黄色一区二区三区 | 在线观看视频91 | 国产精品污污视频 | 毛片视频网站 | 性色av香蕉一区二区 | 久久精品亚洲成在人线av网址 | 午夜视频免费在线 | 欧美一级在线观看 | 国产激情一区二区三区 | 国产日韩av一区二区 | 亚洲国产免费 | 国产日韩欧美一区 | 91精品国产91久久综合桃花 | 亚洲人成在线播放 | 国产欧美精品一区二区三区 | 国产精品一码二码三码在线 | 激情综合五月 | 国产九九精品 | 久久99精品国产自在现线小黄鸭 |