欧美亚洲中文,在线国自产视频,欧洲一区在线观看视频,亚洲综合中文字幕在线观看

      1. <dfn id="rfwes"></dfn>
          <object id="rfwes"></object>
        1. 站長資訊網(wǎng)
          最全最豐富的資訊網(wǎng)站

          Redis如何實現(xiàn)延遲隊列?方法介紹

          Redis如何實現(xiàn)延遲隊列?方法介紹

          延遲隊列,顧名思義它是一種帶有延遲功能的消息隊列。那么,是在什么場景下我才需要這樣的隊列呢?

          1. 背景

          我們先看看以下業(yè)務場景:

          • 當訂單一直處于未支付狀態(tài)時,如何及時的關閉訂單
          • 如何定期檢查處于退款狀態(tài)的訂單是否已經(jīng)退款成功
          • 在訂單長時間沒有收到下游系統(tǒng)的狀態(tài)通知的時候,如何實現(xiàn)階梯式的同步訂單狀態(tài)的策略
          • 在系統(tǒng)通知上游系統(tǒng)支付成功終態(tài)時,上游系統(tǒng)返回通知失敗,如何進行異步通知實行分頻率發(fā)送:15s 3m 10m 30m 30m 1h 2h 6h 15h

          1.1 解決方案

          • 最簡單的方式,定時掃表。例如對于訂單支付失效要求比較高的,每2S掃表一次檢查過期的訂單進行主動關單操作。優(yōu)點是簡單缺點是每分鐘全局掃表,浪費資源,如果遇到表數(shù)據(jù)訂單量即將過期的訂單量很大,會造成關單延遲。

          • 使用RabbitMq或者其他MQ改造實現(xiàn)延遲隊列,優(yōu)點是,開源,現(xiàn)成的穩(wěn)定的實現(xiàn)方案,缺點是:MQ是一個消息中間件,如果團隊技術棧本來就有MQ,那還好,如果不是,那為了延遲隊列而去部署一套MQ成本有點大

          • 使用Redis的zset、list的特性,我們可以利用redis來實現(xiàn)一個延遲隊列RedisDelayQueue

          2. 設計目標

          • 實時性:允許存在一定時間的秒級誤差
          • 高可用性:支持單機、支持集群
          • 支持消息刪除:業(yè)務會隨時刪除指定消息
          • 消息可靠性:保證至少被消費一次
          • 消息持久化:基于Redis自身的持久化特性,如果Redis數(shù)據(jù)丟失,意味著延遲消息的丟失,不過可以做主備和集群保證。這個可以考慮后續(xù)優(yōu)化將消息持久化到MangoDB中

          3. 設計方案

          設計主要包含以下幾點:

          • 將整個Redis當做消息池,以KV形式存儲消息
          • 使用ZSET做優(yōu)先隊列,按照Score維持優(yōu)先級
          • 使用LIST結構,以先進先出的方式消費
          • ZSET和LIST存儲消息地址(對應消息池的每個KEY)
          • 自定義路由對象,存儲ZSET和LIST名稱,以點對點的方式將消息從ZSET路由到正確的LIST
          • 使用定時器維護路由
          • 根據(jù)TTL規(guī)則實現(xiàn)消息延遲

          3.1 設計圖

          還是基于有贊的延遲隊列設計,進行優(yōu)化改造及代碼實現(xiàn)。有贊設計
          Redis如何實現(xiàn)延遲隊列?方法介紹

          3.2 數(shù)據(jù)結構

          • ZING:DELAY_QUEUE:JOB_POOL 是一個Hash_Table結構,里面存儲了所有延遲隊列的信息。KV結構:K=prefix+projectName field = topic+jobId V=CONENT;V由客戶端傳入的數(shù)據(jù),消費的時候回傳
          • ZING:DELAY_QUEUE:BUCKET 延遲隊列的有序集合ZSET,存放K=ID和需要的執(zhí)行時間戳,根據(jù)時間戳排序
          • ZING:DELAY_QUEUE:QUEUE LIST結構,每個Topic一個LIST,list存放的都是當前需要被消費的JOB

          Redis如何實現(xiàn)延遲隊列?方法介紹
          圖片僅供參考,基本可以描述整個流程的執(zhí)行過程,圖片源于文末的參考博客中

          3.3 任務的生命周期

          1. 新增一個JOB,會在ZING:DELAY_QUEUE:JOB_POOL中插入一條數(shù)據(jù),記錄了業(yè)務方消費方。ZING:DELAY_QUEUE:BUCKET也會插入一條記錄,記錄執(zhí)行的時間戳
          2. 搬運線程會去ZING:DELAY_QUEUE:BUCKET中查找哪些執(zhí)行時間戳的RunTimeMillis比現(xiàn)在的時間小,將這些記錄全部刪除;同時會解析出每個任務的Topic是什么,然后將這些任務PUSH到TOPIC對應的列表ZING:DELAY_QUEUE:QUEUE
          3. 每個TOPIC的LIST都會有一個監(jiān)聽線程去批量獲取LIST中的待消費數(shù)據(jù),獲取到的數(shù)據(jù)全部扔給這個TOPIC的消費線程池
          4. 消費線程池執(zhí)行會去ZING:DELAY_QUEUE:JOB_POOL查找數(shù)據(jù)結構,返回給回調(diào)結構,執(zhí)行回調(diào)方法。

          3.4 設計要點

          3.4.1 基本概念

          • JOB:需要異步處理的任務,是延遲隊列里的基本單元
          • Topic:一組相同類型Job的集合(隊列)。供消費者來訂閱

          3.4.2 消息結構

          每個JOB必須包含以下幾個屬性

          • jobId:Job的唯一標識。用來檢索和刪除指定的Job信息
          • topic:Job類型??梢岳斫獬删唧w的業(yè)務名稱
          • delay:Job需要延遲的時間。單位:秒。(服務端會將其轉(zhuǎn)換為絕對時間)
          • body:Job的內(nèi)容,供消費者做具體的業(yè)務處理,以json格式存儲
          • retry:失敗重試次數(shù)
          • url:通知URL

          3.5 設計細節(jié)

          3.5.1 如何快速消費ZING:DELAY_QUEUE:QUEUE

          最簡單的實現(xiàn)方式就是使用定時器進行秒級掃描,為了保證消息執(zhí)行的時效性,可以設置每1S請求Redis一次,判斷隊列中是否有待消費的JOB。但是這樣會存在一個問題,如果queue中一直沒有可消費的JOB,那頻繁的掃描就失去了意義,也浪費了資源,幸好LIST中有一個BLPOP阻塞原語,如果list中有數(shù)據(jù)就會立馬返回,如果沒有數(shù)據(jù)就會一直阻塞在那里,直到有數(shù)據(jù)返回,可以設置阻塞的超時時間,超時會返回NULL;具體的實現(xiàn)方式及策略會在代碼中進行具體的實現(xiàn)介紹

          3.5.2 避免定時導致的消息重復搬運及消費

          • 使用Redis的分布式鎖來控制消息的搬運,從而避免消息被重復搬運導致的問題
          • 使用分布式鎖來保證定時器的執(zhí)行頻率

          4. 核心代碼實現(xiàn)

          4.1 技術說明

          技術棧:SpringBoot,Redisson,Redis,分布式鎖,定時器

          注意:本項目沒有實現(xiàn)設計方案中的多Queue消費,只開啟了一個QUEUE,這個待以后優(yōu)化

          4.2 核心實體

          4.2.1 Job新增對象

          /**  * 消息結構  *  * @author 睜眼看世界  * @date 2020年1月15日  */ @Data public class Job implements Serializable {      private static final long serialVersionUID = 1L;      /**      * Job的唯一標識。用來檢索和刪除指定的Job信息      */     @NotBlank     private String jobId;       /**      * Job類型??梢岳斫獬删唧w的業(yè)務名稱      */     @NotBlank     private String topic;      /**      * Job需要延遲的時間。單位:秒。(服務端會將其轉(zhuǎn)換為絕對時間)      */     private Long delay;      /**      * Job的內(nèi)容,供消費者做具體的業(yè)務處理,以json格式存儲      */     @NotBlank     private String body;      /**      * 失敗重試次數(shù)      */     private int retry = 0;      /**      * 通知URL      */     @NotBlank     private String url; }

          4.2.2 Job刪除對象

          /**  * 消息結構  *  * @author 睜眼看世界  * @date 2020年1月15日  */ @Data public class JobDie implements Serializable {      private static final long serialVersionUID = 1L;      /**      * Job的唯一標識。用來檢索和刪除指定的Job信息      */     @NotBlank     private String jobId;       /**      * Job類型??梢岳斫獬删唧w的業(yè)務名稱      */     @NotBlank     private String topic; }

          4.3 搬運線程

          /**  * 搬運線程  *  * @author 睜眼看世界  * @date 2020年1月17日  */ @Slf4j @Component public class CarryJobScheduled {      @Autowired     private RedissonClient redissonClient;      /**      * 啟動定時開啟搬運JOB信息      */     @Scheduled(cron = "*/1 * * * * *")     public void carryJobToQueue() {         System.out.println("carryJobToQueue --->");         RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);         try {             boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);             if (!lockFlag) {                 throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);             }             RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);             long now = System.currentTimeMillis();             Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);             List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());             RList<String> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);             readyQueue.addAll(jobList);             bucketSet.removeAllAsync(jobList);         } catch (InterruptedException e) {             log.error("carryJobToQueue error", e);         } finally {             if (lock != null) {                 lock.unlock();             }         }     } }

          4.4 消費線程

          @Slf4j @Component public class ReadyQueueContext {      @Autowired     private RedissonClient redissonClient;      @Autowired     private ConsumerService consumerService;      /**      * TOPIC消費線程      */     @PostConstruct     public void startTopicConsumer() {         TaskManager.doTask(this::runTopicThreads, "開啟TOPIC消費線程");     }      /**      * 開啟TOPIC消費線程      * 將所有可能出現(xiàn)的異常全部catch住,確保While(true)能夠不中斷      */     @SuppressWarnings("InfiniteLoopStatement")     private void runTopicThreads() {         while (true) {             RLock lock = null;             try {                 lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);             } catch (Exception e) {                 log.error("runTopicThreads getLock error", e);             }             try {                 if (lock == null) {                     continue;                 }                 // 分布式鎖時間比Blpop阻塞時間多1S,避免出現(xiàn)釋放鎖的時候,鎖已經(jīng)超時釋放,unlock報錯                 boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);                 if (!lockFlag) {                     continue;                 }                  // 1. 獲取ReadyQueue中待消費的數(shù)據(jù)                 RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);                 String topicId = queue.poll(60, TimeUnit.SECONDS);                 if (StringUtils.isEmpty(topicId)) {                     continue;                 }                  // 2. 獲取job元信息內(nèi)容                 RMap<String, Job> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);                 Job job = jobPoolMap.get(topicId);                  // 3. 消費                 FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消費JobId-->" + job.getJobId());                 if (taskResult.get()) {                     // 3.1 消費成功,刪除JobPool和DelayBucket的job信息                     jobPoolMap.remove(topicId);                 } else {                     int retrySum = job.getRetry() + 1;                     // 3.2 消費失敗,則根據(jù)策略重新加入Bucket                      // 如果重試次數(shù)大于5,則將jobPool中的數(shù)據(jù)刪除,持久化到DB                     if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {                         jobPoolMap.remove(topicId);                         continue;                     }                     job.setRetry(retrySum);                     long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;                     log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));                     RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);                     delayBucket.add(nextTime, topicId);                     // 3.3 更新元信息失敗次數(shù)                     jobPoolMap.put(topicId, job);                 }             } catch (Exception e) {                 log.error("runTopicThreads error", e);             } finally {                 if (lock != null) {                     try {                         lock.unlock();                     } catch (Exception e) {                         log.error("runTopicThreads unlock error", e);                     }                 }             }         }     } }

          4.5 添加及刪除JOB

          /**  * 提供給外部服務的操作接口  *  * @author why  * @date 2020年1月15日  */ @Slf4j @Service public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {      @Autowired     private RedissonClient redissonClient;       /**      * 添加job元信息      *      * @param job 元信息      */     @Override     public void addJob(Job job) {          RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());         try {             boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);             if (!lockFlag) {                 throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);             }             String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());              // 1. 將job添加到 JobPool中             RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);             if (jobPool.get(topicId) != null) {                 throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);             }              jobPool.put(topicId, job);              // 2. 將job添加到 DelayBucket中             RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);             delayBucket.add(job.getDelay(), topicId);         } catch (InterruptedException e) {             log.error("addJob error", e);         } finally {             if (lock != null) {                 lock.unlock();             }         }     }       /**      * 刪除job信息      *      * @param job 元信息      */     @Override     public void deleteJob(JobDie jobDie) {          RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());         try {             boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);             if (!lockFlag) {                 throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);             }             String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());              RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);             jobPool.remove(topicId);              RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);             delayBucket.remove(topicId);         } catch (InterruptedException e) {             log.error("addJob error", e);         } finally {             if (lock != null) {                 lock.unlock();             }         }     } }

          5. 待優(yōu)化的內(nèi)容

          1. 目前只有一個Queue隊列存放消息,當需要消費的消息大量堆積后,會影響消息通知的時效。改進的辦法是,開啟多個Queue,進行消息路由,再開啟多個消費線程進行消費,提供吞吐量
          2. 消息沒有進行持久化,存在風險,后續(xù)會將消息持久化到MangoDB中

          6. 源碼

          贊(0)
          分享到: 更多 (0)
          網(wǎng)站地圖   滬ICP備18035694號-2    滬公網(wǎng)安備31011702889846號