本文實例講述了PHP消息隊列實現(xiàn)及應(yīng)用。分享給大家供大家參考,具體如下:
在互聯(lián)網(wǎng)項目開發(fā)者經(jīng)常會遇到『給用戶群發(fā)短信』、『訂單系統(tǒng)有大量的日志需要記錄』或者在秒殺業(yè)務(wù)的時候服務(wù)器無法承受瞬間并發(fā)的壓力。
這種情況下,我們怎么保證系統(tǒng)正常有效的運行呢?
這個時候,我們可以引入一個叫『消息隊列』的概念來解決上面的需求。
消息隊列的概念、原理和場景
在高并發(fā)的時候,程序往往無法做到及時的處理。我們引入一個中間的系統(tǒng),來進行分流和減壓。
所以從本質(zhì)上講:消息隊列就是一個隊列結(jié)構(gòu)的中間件。也就是說,你把消息和內(nèi)容放入這個容器之后就可以直接返回,不用等它后期處理的結(jié)果。另外會有一個程序,讀取這些數(shù)據(jù)并按照順序處理。
1、隊列結(jié)構(gòu)的中間件
2、消息放入后,不必立即處理
3、由訂閱者/消費者按順序處理
也就是說:當遇到一個比較大或者耗時比較長的環(huán)節(jié)的時候,而同時你的業(yè)務(wù)又不需要立即知道這個環(huán)節(jié)的結(jié)果,使用消息隊列是好的選擇。
核心結(jié)構(gòu)如下面:

消息隊列 適用場景
一、數(shù)據(jù)需要冗余的時候
比如訂單系統(tǒng)中,后續(xù)需要進行數(shù)據(jù)的轉(zhuǎn)換和記錄。消息隊列可以把這些數(shù)據(jù)持久化的存儲在隊列中,然后由訂單后期處理程序進行處理,處理完成之后再把這條記錄從隊列中刪除。
二、系統(tǒng)的解耦
消息隊列解決了2套系統(tǒng)之間深度耦合的問題。
使用消息隊列后,入隊的系統(tǒng)和出隊的系統(tǒng)沒有直接的關(guān)系。
入隊系統(tǒng)和出隊系統(tǒng),其中一個崩潰之后不會影響另外一個的正常運行。
三、流量削峰
就是秒殺和搶購的時候,會出現(xiàn)明顯的流量劇增,對服務(wù)器的壓力非常大。
實際項目開發(fā)中,配合緩存來使用消息隊列,一種很好的方案。
四、異步通信
消息隊列本身就實現(xiàn)了程序的異步操作,因此只要適合于異步的場景都可以使用消息隊列
五、擴展性
比如訂單系統(tǒng),訂單入隊之后,后期或許還有財務(wù)系統(tǒng)處理,但是如果還要加一個配貨系統(tǒng)。
只需要讓這個配貨系統(tǒng) 訂閱這個 消息隊列 即可。
六、排序保證
在有些場景下,數(shù)據(jù)的處理順序是非常重要的,隊列本身就可以做成單線程的單進單出的系統(tǒng)。
從而有效的保證數(shù)據(jù)按照順序進行處理。
常見 隊列實現(xiàn) 的優(yōu)缺點
隊列介質(zhì):
Mysql:可靠性高、易實現(xiàn)、速度慢
Redis:速度快,單條大消息包時效率低
消息系統(tǒng):專業(yè)性強、可靠,學(xué)習(xí)成本高(比如:RabbtiMQ)
消息處理的觸發(fā)機制:
死循環(huán)方式讀取:易實現(xiàn),故障時無法及時恢復(fù);
定時任務(wù):壓力均分,有處理量上限。(最大的缺陷:定位任務(wù)時間的間隔和處理的數(shù)據(jù)需要精準把握,不能上一個任務(wù)還沒有處理完成,下一個認為就已經(jīng)啟動了)
守護進程:類似于PHP-FPM和PHP-CGI,需要shell知識
解耦案列:隊列處理 訂單系統(tǒng)和配送系統(tǒng)
我們在前面了解過消息隊列的使用場景
這里,我們要來處理其中一個場景:系統(tǒng)的解耦。
在電商項目中,當客戶提交了一個訂單之后,客戶在個人中心可以看到訂單處于配送中。
這個時候就要參與進來一個系統(tǒng),叫做『配送系統(tǒng)』。如果我們在做架構(gòu)的時候,把訂單系統(tǒng)和配送系統(tǒng)設(shè)計在一起的話就會出現(xiàn)一些問題:訂單系統(tǒng)的壓力比較大,但是配送系統(tǒng)沒有必要對這些壓力做及時的反應(yīng);我們不需要訂單系統(tǒng)出現(xiàn)故障之后導(dǎo)致配送系統(tǒng)故障。
所以我們需要把這2個系統(tǒng)分開,通過一個中間的隊列表來實現(xiàn)這2個系統(tǒng)的溝通。
如下圖架構(gòu):

具體到我們的程序代碼大致邏輯如下圖:

大致流程:order.php來接收用戶訂單,生成訂單號并對訂單進行處理(訂單系統(tǒng));在訂單系統(tǒng)會把配送系統(tǒng)所需要的數(shù)據(jù)放入隊列表中;我們的配送系統(tǒng)goods.php會有個定時腳本每分鐘執(zhí)行一次,處理隊列表中的數(shù)據(jù)。
簡單設(shè)計隊列表order_queue:
CREATE TABLE `order_queue` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`order_id` int(11) unsigned NOT NULL COMMENT '訂單ID(從訂單系統(tǒng)來的)',
`user_info` varchar(255) NOT NULL DEFAULT '' COMMENT '可以是用戶手機號/用戶id等(這里只是演示)',
`created_at` datetime NOT NULL COMMENT '訂單創(chuàng)建時間',
`updated_at` datetime NOT NULL COMMENT '本記錄最后處理完成時間',
`status` tinyint(2) NOT NULL COMMENT '0未處理,1已處理,2處理中',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
mysql訂單隊列
前面我們已經(jīng)分析清楚了邏輯,剩下的就是代碼實現(xiàn)了。
注意:我這里只是演示代碼,單純?yōu)榱苏故緦崿F(xiàn)過程。
1、接收訂單,處理訂單order.php
?php
// 這個文件是用來接收用戶的訂單信息 并寫入隊列的一個文件
if(!empty($_GET['user_info'])){
// 驗證 過濾 接收的數(shù)據(jù)
// todo...
// 這里是應(yīng)該首先是訂單中心的處理流程
// 因為訂單系統(tǒng)是一套單獨的系統(tǒng) 這里就不編寫這個系統(tǒng)了
// todo...
$order_id = rand(100000,99999); // 正常的訂單號從 訂單系統(tǒng)來,我們這里只是演示
// 把配送系統(tǒng)需要的訂單數(shù)據(jù)存入隊列表中
$insert_data = array(
'order_id'=>$order_id,
'user_info'=>$_GET['user_info'],
'created_at'=>date('Y-m-d H:i:s',time()),
'status'=>0
);
// 把上面的數(shù)據(jù) 插入到order_queue表中
// insert into order_queue
}
2、配送系統(tǒng)goods.php
?php
// 這個文件主要是配送系統(tǒng)處理隊列表中的訂單并進行標記的文件
//分析:
//第一步:先把要處理的記錄更新為『等待處理』
//第二步:選擇剛剛標記為『等待處理』的記錄,然后進行配送系統(tǒng)的處理
//第三步:把上面前面處理過的程序標記『已完成』
/////////////////////這里很重要,你一定要明白哦//////////////////////////////////////////////
//疑問:為什么不直接處理最后更新為『已完成』,多了先標記為『等待處理』?
//這是因為配送系統(tǒng)很可能不是及時完成的,它中間會有一段處理的時間,如果還在處理中有其他程序來進行讀取和操作,就沖突了。
//這樣設(shè)計其實也是一個鎖的機制
//1、
$waiting = array('status'=>0);
$lock = array('status'=>2);
//把狀態(tài)為0的記錄標記為2,每次更新3條(具體每次幾條看情況)
$sql = "update order_queue set status=2 where status=0 limit 3";
//2、
if(上面update成功){
// 選擇出要處理訂單內(nèi)容
// select * from order_queue where status = 2;
// 然后由配貨系統(tǒng)進行處理
// todo...
//3、處理完成把訂單狀態(tài)更新為已完成
$success = array(
'status'=>1,
'updated_at'=>date('Y-m-d H:i:s',time())
);
}else{
echo 'All Finished';
}
3、linux服務(wù)器 定時任務(wù)
寫個shell腳本:goods.sh
#!/bin/bash
date "+%G-%m-%d %H:%M:%S"
cd /var/www/
php goods.php
這個腳本就是去執(zhí)行orders.php這個程序的。
在linux服務(wù)器部署定時任務(wù):
crontab -e
*/1 * * * * /var/www/goods.sh >> /var/www/goods_shell.log 2>$1
每分鐘執(zhí)行一次goods.sh文件,并記錄日志到goods_shell.log文件(在對應(yīng)目錄新建該文件)
更多關(guān)于PHP相關(guān)內(nèi)容感興趣的讀者可查看本站專題:《PHP數(shù)據(jù)結(jié)構(gòu)與算法教程》、《php程序設(shè)計算法總結(jié)》、《php字符串(string)用法總結(jié)》、《PHP數(shù)組(Array)操作技巧大全》、《PHP常用遍歷算法與技巧總結(jié)》及《PHP數(shù)學(xué)運算技巧總結(jié)》
希望本文所述對大家PHP程序設(shè)計有所幫助。
您可能感興趣的文章:- PHP使用ActiveMQ實例
- PHP使用ActiveMQ實現(xiàn)消息隊列的方法詳解
- php ActiveMQ的安裝與使用方法圖文教程
- PHP Beanstalkd消息隊列的安裝與使用方法實例詳解
- PHP高級編程之消息隊列原理與實現(xiàn)方法詳解
- PHP PDO和消息隊列的個人理解與應(yīng)用實例分析
- PHP+RabbitMQ實現(xiàn)消息隊列的完整代碼
- PHP多進程通信-消息隊列使用
- php基于Redis消息隊列實現(xiàn)的消息推送的方法
- 使用PHP訪問RabbitMQ消息隊列的方法示例
- php實現(xiàn)通過stomp協(xié)議連接ActiveMQ操作示例