您当前的位置: 首页 > 慢生活 > 程序人生 网站首页程序人生
workerman常見問題-如何實現異步任務
发布时间:2022-01-01 13:07:33编辑:雪饮阅读()
如何实现异步任务
问:
如何异步处理繁重的业务,避免主业务被长时间阻塞。例如我要给1000用户发送邮件,这个过程很慢,可能要阻塞数秒,这个过程中因为主流程被阻塞,会影响后续的请求,如何将这样的繁重任务交给其它进程异步处理。
答:
可以在本机或者其它服务器甚至服务器集群预先建立一些任务进程处理繁重的业务,任务进程数可以开多一些,例如cpu的10倍,然后调用方利用AsyncTcpConnection将数据异步发送给这些任务进程异步处理,异步得到处理结果。
任务进程服务端
<?php
use Workerman\Worker;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// task worker,使用Text协议
$task_worker = new Worker('Text://0.0.0.0:12345');
// task进程数可以根据需要多开一些
$task_worker->count = 100;
$task_worker->name = 'TaskWorker';
$task_worker->onMessage = function(TcpConnection $connection, $task_data)
{
// 假设发来的是json数据
$task_data = json_decode($task_data, true);
// 根据task_data处理相应的任务逻辑.... 得到结果,这里省略....
if($task_data){
foreach($task_data as $key=>$val){
}
}
$task_result =["msg"=>"ok"];
// 发送结果
$connection->send(json_encode($task_result));
};
Worker::runAll();
任務進程客戶端:
<?php
use Workerman\Worker;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Connection\TcpConnection;
require_once __DIR__ . '/vendor/autoload.php';
// websocket服务
$worker = new Worker('websocket://0.0.0.0:8080');
$worker->onMessage = function(TcpConnection $ws_connection, $message)
{
// 与远程task服务建立异步连接,ip为远程task服务的ip,如果是本机就是127.0.0.1,如果是集群就是lvs的ip
$task_connection = new AsyncTcpConnection('Text://127.0.0.1:12345');
// 任务及参数数据
$task_data = array(
'function' => 'send_mail',
'args' => array('from'=>'xxx', 'to'=>'xxx', 'contents'=>'xxx'),
);
// 发送数据
$task_connection->send(json_encode($task_data));
// 异步获得结果
$task_connection->onMessage = function(AsyncTcpConnection $task_connection, $task_result)use($ws_connection)
{
// 结果
var_dump($task_result);
// 获得结果后记得关闭异步连接
$task_connection->close();
// 通知对应的websocket客户端任务完成
$ws_connection->send('task complete');
};
// 执行异步连接
$task_connection->connect();
};
Worker::runAll();
JavaScript的websocket客戶端:
<script>
ws2 = new WebSocket("ws://192.168.43.170:8080");
ws2.onopen=function(evt){
console.log("ws2連接成功!",evt);
var order={
order_id:220807,
uid:2022
}
ws2.send(JSON.stringify(order));
};
ws2.onmessage = function(e) {
console.log("ws2收到服务端的消息:" + e.data);
};
</script>
任務進程服務端啓動:
[root@localhost www.fpm.com]# /usr/local/php734/bin/php taskServer.php start
Workerman[taskServer.php] start in DEBUG mode
----------------------------------------- WORKERMAN ------------------------------------------
Workerman version:4.0.22 PHP version:7.3.4
------------------------------------------ WORKERS -------------------------------------------
proto user worker listen processes status
tcp root TaskWorker text://0.0.0.0:12345 100 [OK]
----------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
任務進程客戶端啓動,并有websocket客戶端連接並發送了消息時如:
[root@localhost www.fpm.com]# /usr/local/php734/bin/php taskClient.php start
Workerman[taskClient.php] start in DEBUG mode
------------------------------------------- WORKERMAN --------------------------------------------
Workerman version:4.0.22 PHP version:7.3.4
-------------------------------------------- WORKERS ---------------------------------------------
proto user worker listen processes status
tcp root none websocket://0.0.0.0:8080 1 [OK]
--------------------------------------------------------------------------------------------------
Press Ctrl+C to stop. Start success.
string(12) "{"msg":"ok"}"
JavaScript的websocket客戶端運行結果:
ws2連接成功! Event {isTrusted: true, type: 'open', target: WebSocket, currentTarget: WebSocket, eventPhase: 2, …}
index2.html:15 ws2收到服务端的消息:task complete
这样,繁重的任务交给本机或者其它服务器的进程去做,任务完成后会异步收到结果,业务进程就不会阻塞了。
关键字词:workerman,常見問題,異步,任務