什么是消息隊列?下面本篇文章帶大家了解一下消息隊列的基本概念,介紹一下node中如何使用消息隊列,希望對大家有所幫助!
1.消息隊列
什么是消息隊列
消息隊列就是消息的傳輸過程中保存消息的容器,本質(zhì)是一個隊列(先進(jìn)先出)
消息
指的是需要傳輸?shù)臄?shù)據(jù),可以是一些文本,字符串,或者是對象等信息。
消息隊列
則是兩個應(yīng)用間的通信服務(wù),消息的產(chǎn)生者
將數(shù)據(jù)存放到消息隊列中就可以立即返回,不需要等待消息的接收者
應(yīng)答。即:生產(chǎn)者
保證數(shù)據(jù)插入隊列,誰來取這條消息不需要管。消息的接收者
則只專注于接受消息并處理?!鞠嚓P(guān)教程推薦:nodejs視頻教程、編程教學(xué)】
消息隊列能做什么
-
解耦 上面介紹了,消息隊列將消息的生產(chǎn)者和消息的接收者分開,彼此都不受影響。
-
異步 異步就是為了減少請求的響應(yīng)時間,消息的生產(chǎn)者只需要處理簡單的邏輯,并將數(shù)據(jù)放到消息隊列中即可返回,復(fù)雜的邏輯,比如:數(shù)據(jù)庫操作,IO操作由消息的接收者處理。
-
削峰 消息隊列應(yīng)用在服務(wù)時,能將瞬時大量涌入的請求信息保存到消息隊列中,并立即返回。再由消息的接收者根據(jù)數(shù)據(jù)處理請求。
-
應(yīng)用場景 游戲活動,秒殺活動,下單等會造成瞬時流量暴增的應(yīng)用。
2.消息隊列的概念
介紹完消息隊列的基本信息,在開發(fā)消息隊列之前先介紹一下消息隊列的一些基本概念~
消息的生產(chǎn)者(producer)與消費(fèi)者(customer)
上文提到的生產(chǎn)者
與消費(fèi)者
,提供的是
鏈接,通道與隊列
-
鏈接(connection):表示服務(wù)程序與消息隊列之間的一條鏈接。一個服務(wù)程序可以創(chuàng)建多條鏈接。
-
通道(channel):消息隊列鏈接之間的一個通,一個鏈接可以有多個通道。
-
隊列(queue):消息隊列中存放數(shù)據(jù)的隊列,一個消息隊列服務(wù)可以有多個隊列。
總結(jié)一下,鏈接,通道隊列之間的關(guān)系是這樣的
交換機(jī)(exchange)
消息隊列發(fā)送消息時必須要有一個交換機(jī),如果沒有指定則用的是默認(rèn)的交換機(jī)。交換機(jī)的作用就是將消息才推到對應(yīng)的隊列中。消息隊列中一共有4種交換機(jī)
-
Direct: 指定隊列模式,消息來了,只發(fā)給指定的Queue,其他Queue都收不到。
-
fanout: 廣播模式,消息來了,就會發(fā)送給所有的隊列。
-
topic: 模糊匹配模式,通過模糊匹配的方式進(jìn)行相應(yīng)轉(zhuǎn)發(fā)。
-
header: 與Direct模式類似。
3.node使用rabbitMQ
安裝rabbitMQ
- 安裝rabbitMQ可以通過官網(wǎng)上進(jìn)行下載安裝,傳送門
- MAC可以直接用brew命令安裝
brew install rabbitmq
登錄后復(fù)制 - 安裝完成后啟動rabbitmq服務(wù)
然后再本地中訪問 http://localhost:15672/ 就可以看到rabbitmq服務(wù)的后臺。初始的賬號密碼均為 guest
node項目安裝amqplib
amqplib是node中使用消息隊列的一套工具,可以讓我們快速地使用消息隊列
地址:https://www.npmjs.com/package/amqplib
創(chuàng)建生產(chǎn)者
/** product.js 消費(fèi)者 */ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = config; (async () => { const connection = await amqplib.connect(connectUrl); const channel = await connection.createChannel(); const exchangeName = 'testExchange'; const key = 'testQueue'; const sendMsg = 'hello rabbitmq'; // 知道交換機(jī)類型 await channel.assertExchange(exchangeName, 'fanout', { durable: true, }); // 指定一個隊列 await channel.assertQueue(key); for (let i = 0; i < 100; i++) { channel.publish(exchangeName, key, Buffer.from(`${sendMsg} ${i}`)); } await channel.close(); await connection.close(); })();
運(yùn)行后在后臺可以看到新增了一個有100條消息的隊列
創(chuàng)建消費(fèi)者
/** customer.js 消費(fèi)者 */ const amqplib = require('amqplib'); const config = require('./config'); const { connectUrl } = config; (async () => { let connection = await amqplib.connect(connectUrl); const exchangeName = 'testExchange'; const key = 'testQueue'; // 創(chuàng)建兩個通道 const channel1 = await connection.createChannel(); const channel2 = await connection.createChannel(); // 指定一個交換機(jī) await channel1.assertExchange(exchangeName, 'fanout', { durable: true, }); // 指定一個隊列 await channel1.assertQueue(key); await channel1.bindQueue(key, exchangeName, key); channel1.consume(key, (msg) => { console.log('channel 1', msg.content.toString()); }); await channel2.assertExchange(exchangeName, 'fanout', { durable: true, }); await channel2.assertQueue(key); await channel2.bindQueue(key, exchangeName, key); channel2.consume(key, (msg) => { console.log('channel 2', msg.content.toString()); }); })();
執(zhí)行后可以看到,兩個通道可以同時工作接收消息