激情久久久_欧美视频区_成人av免费_不卡视频一二三区_欧美精品在欧美一区二区少妇_欧美一区二区三区的

服務器之家:專注于服務器技術及軟件下載分享
分類導航

PHP教程|ASP.NET教程|Java教程|ASP教程|編程技術|正則表達式|C/C++|IOS|C#|Swift|Android|VB|R語言|JavaScript|易語言|vb.net|

服務器之家 - 編程語言 - Java教程 - Java實現Redis延時消息隊列

Java實現Redis延時消息隊列

2021-11-16 13:28shikanatsu Java教程

本文主要介紹了Java實現Redis延時消息隊列,文中通過示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙伴們可以參考一下

什么是延時任務

延時任務,顧名思義,就是延遲一段時間后才執行的任務。舉個例子,假設我們有個發布資訊的功能,運營需要在每天早上7點準時發布資訊,但是早上7點大家都還沒上班,這個時候就可以使用延時任務來實現資訊的延時發布了。只要在前一天下班前指定第二天要發送資訊的時間,到了第二天指定的時間點資訊就能準時發出去了。如果大家有運營過公眾號,就會知道公眾號后臺也有文章定時發送的功能。總而言之,延時任務的使用還是很廣泛的。

延時任務的特點

  • 時間有序性
  • 時間具體性
  • 任務中攜帶詳細的信息 ,通常包括 任務ID, 任務的類型 ,時間點。

實現思路:

將整個Redis當做消息池,以kv形式存儲消息,key為id,value為具體的消息body
使用ZSET做優先隊列,按照score維持優先級(用當前時間+需要延時的時間作為score)
輪詢ZSET,拿出score比當前時間戳大的數據(已過期的)
根據id拿到消息池的具體消息進行消費
消費成功,刪除改隊列和消息
消費失敗,讓該消息重新回到隊列

代碼實現

Java實現Redis延時消息隊列

1.消息模型

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import lombok.Data;
import lombok.experimental.Accessors;
 
import javax.validation.constraints.NotNull;
import java.io.Serializable;
 
/**
 * Redis 消息隊列中的消息體
 * @author shikanatsu
 */
@Data
@Accessors(chain = true)
public class RedisMessage implements Serializable {
 
    /** 消息隊列組 **/
    private String group;
 
    /**
     * 消息id
     */
    private String id;
 
    /**
     * 消息延遲/ 秒
     */
    @NotNull(message = "消息延時時間不能為空")
    private long delay;
 
    /**
     * 消息存活時間 單位:秒
     */
    @NotNull(message = "消息存活時間不能為空")
    private int ttl;
    /**
     * 消息體,對應業務內容
     */
    private Object body;
    /**
     * 創建時間,如果只有優先級沒有延遲,可以設置創建時間為0
     * 用來消除時間的影響
     */
    private long createTime;
}

2.RedisMq 消息隊列實現類

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.shixun.base.redisMq;
 
import com.shixun.base.jedis.service.RedisService;
import org.springframework.stereotype.Component;
 
import javax.annotation.Resource;
 
/**
 * Redis消息隊列
 *
 * @author shikanatsu
 */
@Component
public class RedisMq {
 
    /**
     * 消息池前綴,以此前綴加上傳遞的消息id作為key,以消息{@link MSG_POOL}
     * 的消息體body作為值存儲
     */
    public static final String MSG_POOL = "Message:Pool:";
 
    /**
     * zset隊列 名稱 queue
     */
    public static final String QUEUE_NAME = "Message:Queue:";
 
//    private static final int SEMIH = 30 * 60;
 
 
    @Resource
    private RedisService redisService;
 
    /**
     * 存入消息池
     *
     * @param message
     * @return
     */
    public boolean addMsgPool(RedisMessage message) {
        if (null != message) {
            redisService.set(MSG_POOL + message.getGroup() + message.getId(), message, message.getTtl());
            return true;
        }
        return false;
    }
 
    /**
     * 從消息池中刪除消息
     *
     * @param id
     * @return
     */
    public void deMsgPool(String group, String id) {
        redisService.remove(MSG_POOL + group + id);
    }
 
    /**
     * 向隊列中添加消息
     *
     * @param key
     * @param score 優先級
     * @param val
     * @return 返回消息id
     */
    public void enMessage(String key, long score, String val) {
        redisService.zsset(key, val, score);
    }
 
    /**
     * 從隊列刪除消息
     *
     * @param id
     * @return
     */
    public boolean deMessage(String key, String id) {
        return redisService.zdel(key, id);
    }
}

3.消息生產者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import cn.hutool.core.convert.Convert;
import cn.hutool.core.lang.Assert;
import cn.hutool.core.util.IdUtil;
 
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import org.springframework.validation.annotation.Validated;
 
import javax.annotation.Resource;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.TimeUnit;
 
/**
 * 消息生產者
 *
 * @author shikanatsu
 */
@Component
public class MessageProvider {
 
    static Logger logger = LoggerFactory.getLogger(MessageProvider.class);
 
    @Resource
    private RedisMq redisMq;
 
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
 
    public boolean sendMessage(@Validated RedisMessage message) {
        Assert.notNull(message);
        //The priority is if there is no creation time
//        message.setCreateTime(System.currentTimeMillis());
        message.setId(IdUtil.fastUUID());
        Long delayTime = message.getCreateTime() + Convert.convertTime(message.getDelay(), TimeUnit.SECONDS, TimeUnit.MILLISECONDS);
        try {
            redisMq.addMsgPool(message);
            redisMq.enMessage(RedisMq.QUEUE_NAME+message.getGroup(), delayTime, message.getId());
            logger.info("RedisMq發送消費信息{},當前時間:{},消費時間預計{}",message.toString(),new Date(),sdf.format(delayTime));
        }catch (Exception e){
            e.printStackTrace();
            logger.error("RedisMq 消息發送失敗,當前時間:{}",new Date());
            return false;
        }
        return true;
    }
}

4.消息消費者

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
/**
 * Redis消息消費者
 * @author shikanatsu
 */
@Component
public class RedisMqConsumer {
 
    private static final Logger log = LoggerFactory.getLogger(RedisMqConsumer.class);
 
    @Resource
    private RedisMq redisMq;
 
    @Resource
    private RedisService redisService;
 
    @Resource
    private MessageProvider provider;
 
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
 
    //@Scheduled(cron = "*/1 * * * * ? ")
    /**
     Instead of a thread loop, you can use Cron expressions to perform periodic tasks
     */
    public void baseMonitor(RedisMqExecute mqExecute){
        String queueName = RedisMq.QUEUE_NAME+mqExecute.getQueueName();
        //The query is currently expired
        Set<Object> set = redisService.rangeByScore(queueName, 0, System.currentTimeMillis());
        if (null != set) {
            long current = System.currentTimeMillis();
            for (Object id : set) {
                long  score = redisService.getScore(queueName, id.toString()).longValue();
                //Once again the guarantee has expired , And then perform the consumption
                if (current >= score) {
                    String str = "";
                    RedisMessage message = null;
                    String msgPool = RedisMq.MSG_POOL+mqExecute.getQueueName();
                    try {
                        message = (RedisMessage)redisService.get(msgPool + id.toString());
                        log.debug("RedisMq:{},get RedisMessage success now Time:{}",str,sdf.format(System.currentTimeMillis()));
                        if(null==message){
                            return;
                        }
                        //Do something ; You can add a judgment here and if it fails you can add it to the queue again
                        mqExecute.execute(message);
                    } catch (Exception e) {
                        e.printStackTrace();
                        //If an exception occurs, it is put back into the queue
                        // todo:  If repeated, this can lead to repeated cycles
                        log.error("RedisMq: RedisMqMessage exception ,It message rollback , If repeated, this can lead to repeated cycles{}",new Date());
                        provider.sendMessage(message);
                    } finally {
                        redisMq.deMessage(queueName, id.toString());
                        redisMq.deMsgPool(message.getGroup(),id.toString());
                    }
                }
            }
        }
    }
}

5. 消息執接口

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
 * @author shikanatsu
 */
 
 
public interface RedisMqExecute {
 
    /**
     * 獲取隊列名稱
     * @return
     */
    public String getQueueName();
 
 
    /**
     * 統一的通過執行期執行
     * @param message
     * @return
     */
    public boolean execute(RedisMessage message);
 
 
    /**
     * Perform thread polling
     */
 
    public void   threadPolling();
 
}

6. 任務類型的實現類:可以根據自己的情況去實現對應的隊列需求 

?
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/**
 * 訂單執行
 *
 * @author shikanatsu
 */
@Service
public class OrderMqExecuteImpl implements RedisMqExecute {
 
 
    private static Logger logger = LoggerFactory.getLogger(OrderMqExecuteImpl.class);
 
    public final static String name = "orderPoll:";
 
    @Resource
    private RedisMqConsumer redisMqConsumer;
 
    private RedisMqExecute mqExecute = this;
 
    @Resource
    private OrderService orderService;
 
 
    @Override
    public String getQueueName() {
        return name;
    }
 
    @Override
    /**
     * For the time being, only all orders will be processed. You can change to make orders
     */
    public boolean execute(RedisMessage message) {
        logger.info("Do orderMqPoll ; Time:{}",new Date());
  //Do
        return true;
    }
 
    @Override
    /**  通過線程去執行輪詢的過程,時間上可以自由控制 **/
    public void threadPolling() {
        ThreadUtil.execute(() -> {
            while (true) {
                redisMqConsumer.baseMonitor(mqExecute);
                ThreadUtil.sleep(5, TimeUnit.MICROSECONDS);
            }
        });
    }
}

使用事例
 1. 實現RedisMqExecute 接口 創建對應的輪詢或者采取定時器的方式執行 和實現具體的任務。
 2.  通過MessageProvider 實現相對應的消息服務和綁定隊列組,通過隊列組的方式執行。
 3. 提示: 采取線程的方式需要在項目啟動過程中執行,采取定時器或者調度的方式可以更加動態的調整。

到此這篇關于Java實現Redis延時消息隊列的文章就介紹到這了,更多相關Java Redis延時消息隊列內容請搜索服務器之家以前的文章或繼續瀏覽下面的相關文章希望大家以后多多支持服務器之家!

原文鏈接:https://blog.csdn.net/qq_26093077/article/details/113179971

延伸 · 閱讀

精彩推薦
  • Java教程Java使用SAX解析xml的示例

    Java使用SAX解析xml的示例

    這篇文章主要介紹了Java使用SAX解析xml的示例,幫助大家更好的理解和學習使用Java,感興趣的朋友可以了解下...

    大行者10067412021-08-30
  • Java教程Java8中Stream使用的一個注意事項

    Java8中Stream使用的一個注意事項

    最近在工作中發現了對于集合操作轉換的神器,java8新特性 stream,但在使用中遇到了一個非常重要的注意點,所以這篇文章主要給大家介紹了關于Java8中S...

    阿杜7482021-02-04
  • Java教程Java實現搶紅包功能

    Java實現搶紅包功能

    這篇文章主要為大家詳細介紹了Java實現搶紅包功能,采用多線程模擬多人同時搶紅包,文中示例代碼介紹的非常詳細,具有一定的參考價值,感興趣的小伙...

    littleschemer13532021-05-16
  • Java教程小米推送Java代碼

    小米推送Java代碼

    今天小編就為大家分享一篇關于小米推送Java代碼,小編覺得內容挺不錯的,現在分享給大家,具有很好的參考價值,需要的朋友一起跟隨小編來看看吧...

    富貴穩中求8032021-07-12
  • Java教程20個非常實用的Java程序代碼片段

    20個非常實用的Java程序代碼片段

    這篇文章主要為大家分享了20個非常實用的Java程序片段,對java開發項目有所幫助,感興趣的小伙伴們可以參考一下 ...

    lijiao5352020-04-06
  • Java教程xml與Java對象的轉換詳解

    xml與Java對象的轉換詳解

    這篇文章主要介紹了xml與Java對象的轉換詳解的相關資料,需要的朋友可以參考下...

    Java教程網2942020-09-17
  • Java教程升級IDEA后Lombok不能使用的解決方法

    升級IDEA后Lombok不能使用的解決方法

    最近看到提示IDEA提示升級,尋思已經有好久沒有升過級了。升級完畢重啟之后,突然發現好多錯誤,本文就來介紹一下如何解決,感興趣的可以了解一下...

    程序猿DD9332021-10-08
  • Java教程Java BufferWriter寫文件寫不進去或缺失數據的解決

    Java BufferWriter寫文件寫不進去或缺失數據的解決

    這篇文章主要介紹了Java BufferWriter寫文件寫不進去或缺失數據的解決方案,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望...

    spcoder14552021-10-18
主站蜘蛛池模板: 91av久久| 一区在线不卡 | 国产精品亚洲精品日韩已方 | 亚洲成人中文字幕在线 | 精品在线视频播放 | 免费视频www在线观看 | 精品中文字幕久久久久四十五十骆 | 深夜激情视频 | 草操视频 | 五月激情久久 | 成年性羞羞视频免费观看无限 | 色阁阁69婷婷 | 欧美成人精品h版在线观看 国产一级淫片在线观看 | 久久久麻豆 | 99影视电影电视剧在线播放 | 欧美成人性色 | 午夜视频大全 | 久久国产一级 | 精品国产91一区二区三区 | 在线亚洲免费 | 国产成人自拍视频在线 | 久久综合伊人 | 一级片国语 | 欧美黑人伦理 | 精品黑人一区二区三区国语馆 | 亚洲影院久久久av天天蜜桃臀 | 日韩在线激情 | 日本羞羞的午夜电视剧 | 国产精品免费av一区二区三区 | 精品国产乱码一区二区 | 久久久综 | 欧美精品一区二区三区四区 | 一区二区久久电影 | 久久精品国产99久久6动漫亮点 | 国产噜噜噜噜噜久久久久久久久 | 91经典视频 | 久久久久9999 | 欧美成人二区 | 毛片在线视频观看 | 日韩三级伦理在线观看 | 久久爽久久爽久久av东京爽 |