Java實現Redis延時消息隊列
延時任務,顧名思義,就是延遲一段時間后才執行的任務。舉個例子,假設我們有個發布資訊的功能,運營需要在每天早上7點準時發布資訊,但是早上7點大家都還沒上班,這個時候就可以使用延時任務來實現資訊的延時發布了。只要在前一天下班前指定第二天要發送資訊的時間,到了第二天指定的時間點資訊就能準時發出去了。如果大家有運營過公眾號,就會知道公眾號后臺也有文章定時發送的功能。總而言之,延時任務的使用還是很廣泛的。
延時任務的特點 時間有序性 時間具體性 任務中攜帶詳細的信息 ,通常包括 任務ID, 任務的類型 ,時間點。實現思路:將整個Redis當做消息池,以kv形式存儲消息,key為id,value為具體的消息body使用ZSET做優先隊列,按照score維持優先級(用當前時間+需要延時的時間作為score)輪詢ZSET,拿出score比當前時間戳大的數據(已過期的)根據id拿到消息池的具體消息進行消費消費成功,刪除改隊列和消息消費失敗,讓該消息重新回到隊列
代碼實現import lombok.Data;import lombok.experimental.Accessors;import javax.validation.constraints.NotNull;import java.io.Serializable;/** * Redis 消息隊列中的消息體 * @author shikanatsu */@Data@Accessors(chain = true)public class RedisMessage implements Serializable { /** 消息隊列組 **/ private String group; /** * 消息id */ private String id; /** * 消息延遲/ 秒 */ @NotNull(message = '消息延時時間不能為空') private long delay; /** * 消息存活時間 單位:秒 */ @NotNull(message = '消息存活時間不能為空') private int ttl; /** * 消息體,對應業務內容 */ private Object body; /** * 創建時間,如果只有優先級沒有延遲,可以設置創建時間為0 * 用來消除時間的影響 */ private long createTime;}2.RedisMq 消息隊列實現類
package com.shixun.base.redisMq;import com.shixun.base.jedis.service.RedisService;import org.springframework.stereotype.Component;import javax.annotation.Resource;/** * Redis消息隊列 * * @author shikanatsu */@Componentpublic class RedisMq { /** * 消息池前綴,以此前綴加上傳遞的消息id作為key,以消息{@link MSG_POOL} * 的消息體body作為值存儲 */ public static final String MSG_POOL = 'Message:Pool:'; /** * zset隊列 名稱 queue */ public static final String QUEUE_NAME = 'Message:Queue:';// private static final int SEMIH = 30 * 60; @Resource private RedisService redisService; /** * 存入消息池 * * @param message * @return */ public boolean addMsgPool(RedisMessage message) {if (null != message) { redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl()); return true;}return false; } /** * 從消息池中刪除消息 * * @param id * @return */ public void deMsgPool(String group, String id) {redisService.remove(MSG_POOL + group + id); } /** * 向隊列中添加消息 * * @param key * @param score 優先級 * @param val * @return 返回消息id */ public void enMessage(String key, long score, String val) {redisService.zsset(key, val, score); } /** * 從隊列刪除消息 * * @param id * @return */ public boolean deMessage(String key, String id) {return redisService.zdel(key, id); }}3.消息生產者
import cn.hutool.core.convert.Convert;import cn.hutool.core.lang.Assert;import cn.hutool.core.util.IdUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.stereotype.Component;import org.springframework.validation.annotation.Validated;import javax.annotation.Resource;import java.text.SimpleDateFormat;import java.util.Date;import java.util.concurrent.TimeUnit;/** * 消息生產者 * * @author shikanatsu */@Componentpublic class MessageProvider { static Logger logger = LoggerFactory.getLogger(MessageProvider.class); @Resource private RedisMq redisMq; SimpleDateFormat sdf = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss'); public boolean sendMessage(@Validated RedisMessage message) {Assert.notNull(message);//The priority is if there is no creation time//message.setCreateTime(System.currentTimeMillis());message.setId(IdUtil.fastUUID());Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);try { redisMq.addMsgPool(message); redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId()); logger.info('RedisMq發送消費信息{},當前時間:{},消費時間預計{}',message.toString(),new Date(),sdf.format(delayTime));}catch (Exception e){ e.printStackTrace(); logger.error('RedisMq 消息發送失敗,當前時間:{}',new Date()); return false;}return true; }}4.消息消費者
/** * Redis消息消費者 * @author shikanatsu */@Componentpublic class RedisMqConsumer { private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class); @Resource private RedisMq redisMq; @Resource private RedisService redisService; @Resource private MessageProvider provider; SimpleDateFormat sdf = new SimpleDateFormat('yyyy-MM-dd HH:mm:ss'); //@Scheduled(cron = '*/1 * * * * ? ') /** Instead of a thread loop, you can use Cron expressions to perform periodic tasks */ public void baseMonitor(RedisMqExecute mqExecute){String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName();//The query is currently expiredSet<Object> set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis());if (null != set) { long current = System.currentTimeMillis(); for (Object id : set) {long score = redisService.getScore(queueName, id.toString()).longValue();//Once again the guarantee has expired , And then perform the consumptionif (current >= score) { String str = ''; RedisMessage message = null; String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName(); try {message = (RedisMessage)redisService.get(msgPool + id.toString());log.debug('RedisMq:{},get RedisMessage success now Time:{}',str,sdf.format(System.currentTimeMillis()));if(null==message){ return;}//Do something ; You can add a judgment here and if it fails you can add it to the queue againmqExecute.execute(message); } catch (Exception e) {e.printStackTrace();//If an exception occurs, it is put back into the queue// todo: If repeated, this can lead to repeated cycleslog.error('RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}',new Date());provider.sendMessage(message); } finally {redisMq.deMessage(queueName, id.toString());redisMq.deMsgPool(message.getGroup(),id.toString()); }} }} }}5. 消息執接口
/** * @author shikanatsu */public interface RedisMqExecute { /** * 獲取隊列名稱 * @return */ public String getQueueName(); /** * 統一的通過執行期執行 * @param message * @return */ public boolean execute(RedisMessage message); /** * Perform thread polling */ public void threadPolling();}6. 任務類型的實現類:可以根據自己的情況去實現對應的隊列需求
/** * 訂單執行 * * @author shikanatsu */@Servicepublic class OrderMqExecuteImpl implements RedisMqExecute { private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class); public final static String name = 'orderPoll:'; @Resource private RedisMqConsumer redisMqConsumer; private RedisMqExecute mqExecute = this; @Resource private OrderService orderService; @Override public String getQueueName() {return name; } @Override /** * For the time being, only all orders will be processed. You can change to make orders */ public boolean execute(RedisMessage message) {logger.info('Do orderMqPoll ; Time:{}',new Date()); //Do return true; } @Override /** 通過線程去執行輪詢的過程,時間上可以自由控制 **/ public void threadPolling() {ThreadUtil.execute(() -> { while (true) {redisMqConsumer.baseMonitor(mqExecute);ThreadUtil.sleep(5, TimeUnit.MICROSECONDS); }}); }}
使用事例 1. 實現RedisMqExecute 接口 創建對應的輪詢或者采取定時器的方式執行 和實現具體的任務。 2. 通過MessageProvider 實現相對應的消息服務和綁定隊列組,通過隊列組的方式執行。 3. 提示: 采取線程的方式需要在項目啟動過程中執行,采取定時器或者調度的方式可以更加動態的調整。
到此這篇關于Java實現Redis延時消息隊列的文章就介紹到這了,更多相關Java Redis延時消息隊列內容請搜索好吧啦網以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持好吧啦網!
相關文章:
1. Python獲取抖音關注列表封號賬號的實現代碼2. ajax請求添加自定義header參數代碼3. Python數據分析之pandas函數詳解4. 解決Python 進程池Pool中一些坑5. php測試程序運行速度和頁面執行速度的代碼6. 無線標記語言(WML)基礎之WMLScript 基礎第1/2頁7. 三個不常見的 HTML5 實用新特性簡介8. 使用.net core 自帶DI框架實現延遲加載功能9. php網絡安全中命令執行漏洞的產生及本質探究10. Warning: require(): open_basedir restriction in effect,目錄配置open_basedir報錯問題分析
