在上一篇文章中,我们介绍了swoole的安装与基本使用,
在这篇文章中,完整的演示了swoole的使用,
HTTP服务端、WebSocket服务端、代码热更新,
接收到请求时,转发到真正的逻辑文件进行执行,
在逻辑文件中,如何完成数据交互与执行,
在后期文件中,我们将结合redis队列实现商品的抢购
新建文件 swoole.php
及 feng/Index.php
<?php
/**
* @Author: [FENG] <1161634940@qq.com>
* @Date: 2024-08-20 13:26:14
* @Last Modified by: [FENG] <1161634940@qq.com>
* @Last Modified time: 2024-08-31 17:38:36
*/
use \Swoole\Server\Helper;
use \Swoole\WebSocket\Frame;
use \Swoole\WebSocket\Server as WebSocketServer;
use \Swoole\Server;
use \Swoole\Timer;
use \Swoole\Process;
/**
* Swoole WebSocket Server 命令行服务类
* 此文件的修改不支持热更新,请于更新后重启swoole-websocket服务
*/
class WebSocket
{
protected $monitor;
protected $md5file;
// Swoole对象
public $swoole;
// Socket的类型
protected $sockType = SWOOLE_SOCK_TCP;
// 运行模式
protected $mode = SWOOLE_PROCESS;
// swooleCommon 类实例
protected $swooleCommon;
/**
* swooleCommon 类实例
*/
protected $config = [
"wss_switch" => "0",
// "ssl_cert_file" => "/www/wwwroot/yourDomain/cert/im.pem",
// "ssl_key_file" => "/www/wwwroot/yourDomain/cert/im.key",
"websocket_port" => "9501",
"worker_num" => "2",
"task_worker_num" => "2",
"reactor_num" => "2",
"max_connections" => "20480",
'daemonize' => false,
'max_request' => 100000,
'task_enable_coroutine' => true, // 关闭Task协程
'max_wait_time' => 10,
'reload_async' => true, // 异步模式下启动管理进程
];
/**
* 支持的响应事件
* @var array
*/
protected $event = [
'Start',
'Shutdown',
'WorkerStart',
'WorkerStop',
'WorkerExit',
'Receive',
'Close',
'Task',
'Finish',
'PipeMessage',
'WorkerError',
'ManagerStart',
'ManagerStop',
'Open',
'Request',
'Message',
'HandShake',
];
function __construct()
{
if ($this->config['wss_switch']) {
if (file_exists($this->config['ssl_cert_file']) && file_exists($this->config['ssl_key_file'])) {
$this->sockType = SWOOLE_SOCK_TCP | SWOOLE_SSL;
} else {
throw new \Exception('SSL certificate file does not exist!');
}
}
$this->swoole = new WebSocketServer('127.0.0.1', $this->config['websocket_port'], $this->mode, $this->sockType);
// 设置文件监控(监控时间秒,监控目录)(正式环境勿开,很占cpu)
$this->setMonitor(1000, ['./feng/']);
// 初始化swoole配置
$this->option($this->config);
$agreement = $this->config['wss_switch'] ? 'wss://' : 'ws://';
$address = $agreement . "127.0.0.1:" . $this->config['websocket_port'];
echo "Swoole WebSocket server started: <" . $address . ">\n";
echo "You can exit with `CTRL-C`\n";
$this->swoole->start();
}
/**
* Worker 进程启动
* @param $server
* @param $worker_id
*/
public function onWorkerStart($server, $worker_id)
{
if (0 == $worker_id && $this->monitor) {
// print_r(get_included_files());// 查看不支持热更新的文件列表
$this->monitor($server);
}
}
/**
* 链接握手成功
* @param $server
* @param $frame
*/
public function onOpen($server, $frame)
{
$server->push($frame->fd, json_encode([
'type' => 'init',
'client_id' => $frame->fd,
]));
}
/**
* request回调
* 当请求过来时,会调用此方法,然后转发到真正的逻辑文件执行
* @param $request
* @param $response
*/
public function onRequest($request, $response) {
$data = $request->post ?? ($request->get ?? []);
// array_walk_recursive($data, ['app\worker\logic\Common', 'checkVariable']);
$server = $this->swoole;//调用外部的server
$client_id = $request->fd;
if ($request->server['path_info'] == '/favicon.ico' || $request->server['request_uri'] == '/favicon.ico') {
$response->status(404);
$response->end();
return;
}
$uri = array_filter(explode('/', ltrim($request->server['request_uri'], '/wss')));
[$c, $a] = (count($uri) == 1) ? [$uri[0], ''] : ($uri ?: [$data['c'] ?? '', $data['a'] ?? '']);
$response->header('Content-Type', 'application/json; charset=utf-8');
// 检查要访问的类是否存在
if (empty($data)) {
$response->end(msg(0, '参数缺失,请填写完整参数'));
return;
}
$filename = __DIR__ . '/feng/' . ucfirst($c) . '.php';
if (file_exists($filename) && !empty($c)) { // 判定文件是否存在
if (is_readable($filename)) {
require_once $filename;
}
$className = "\\feng\\" . ucfirst($c);
// 检查要访问的类是否存在
if (!class_exists($className, false)) {
$response->end(msg(0, '访问的控制器不存在!'));
return;
}
} else {
$response->end(msg(0, '访问的控制器文件不存在!'));
return;
}
$o = new $className([$server, $response, $this->swooleCommon]); // 新建对象
if (!empty($a) && !method_exists($o, $a)) {
$response->end(msg(0, '访问的方法不存在!'));
return;
}
if (isset($o) && $a) {
$result = call_user_func_array([$o, $a], [$data]); //调用对象$o($c)里的方法$a
if ($result) {
$json = is_array($result) ? msg($result) : $result;
$response->end($json);
}
} else {
$json = msg(0, '参数缺失,请填写要执行的操作');
$response->end($json);
}
}
/**
* 收到数据帧
* 与request请求类似,转发到真正的逻辑文件执行
* @param $server
* @param $frame
*/
public function onMessage($server, $frame)
{
$message = json_decode($frame->data, true) ?: $frame->data;
// 安全检查过i滤
// array_walk_recursive($message, ['app\worker\logic\Common', 'checkVariable']);
$data = $message['data'] ?? [];
$client_id = $frame->fd;
if (!is_array($message) || !isset($message['c']) || !isset($message['a'])) {
$server->push($client_id, msg('show_msg', '参数缺失,请填写完整参数'));
return;
}
$filename = __DIR__ . '/feng/' . ucfirst($message['c']) . '.php';
if (file_exists($filename)) { // 判定文件是否存在
if (is_readable($filename)) {
require_once $filename;
}
$className = "\\feng\\" . ucfirst($message['c']);
// 检查要访问的类是否存在
if (!class_exists($className, false)) {
$server->push($client_id, msg('show_msg', '访问的控制器不存在!'));
$server->close($client_id);
return;
}
} else {
$server->push($client_id, msg('show_msg', '错误的请求2!'));
$server->close($client_id);
return;
}
$o = new $className([$server, $frame, $this->swooleCommon]); // 实例化类
$a = $message['a']; // 方法名称
if (!method_exists($o, $message['a'])) {
$server->push($client_id, msg('show_msg', '访问的方法不存在!'));
return;
}
$result = call_user_func_array([$o, $a], [$data]); //调用对象$o($c)里的方法$a
$json = is_array($result) ? msg($result) : $result;
$server->push($client_id, $json); // 发送消息
}
/**
* 链接关闭
*/
public function onClose($server, $fd, $reactorId)
{
// 解除所有绑定关系
// $this->swooleCommon->unbindFd($fd);
}
/**
* @param $serv
* @param $taskId
* @param $workerId
* @param $data
*/
public function onTask($server, $taskId, $workerId, $data)
{
// 分发 task 任务机制,让不同的任务 走不同的逻辑
// $obj = new app\common\lib\task\Task;
// $method = $data['method'];
// Timer::tick(5000, function() use ($taskId){
// echo "taskId " . $taskId . " timeout " . rand(1111,9999) . "\n";
// $flag = rand(1111,9999);
// return $flag; // 告诉worker
// });
$flag = rand(1111,9999);
return $flag; // 告诉worker
}
/**
* @param $server
* @param $taskId
* @param $data
*/
public function onFinish($server, $taskId, $data) {
echo "taskId:{$taskId}\n";
echo "finish-data-sucess:{$data}\n";
}
public function option(array $option)
{
if (!empty($option)) {
$this->swoole->set($this->checkOptions($option));
}
// 注册回调
foreach ($this->event as $event) {
if (method_exists($this, 'on' . $event)) {
$this->swoole->on($event, [$this, 'on' . $event]);
}
}
// 实例化swooleCommon类
// $this->swooleCommon = new \app\worker\library\Common($option['max_connections'], $this->swoole);
}
protected function checkOptions(array $options)
{
if (class_exists(Helper::class)) {
$constOptions = Helper::GLOBAL_OPTIONS + Helper::SERVER_OPTIONS + Helper::PORT_OPTIONS + Helper::HELPER_OPTIONS;
foreach ($options as $k => $v) {
if (!array_key_exists(strtolower($k), $constOptions)) {
unset($options[$k]);
}
}
}
return $options;
}
public function setMonitor($interval = 2, $path = [])
{
$this->monitor['interval'] = $interval;
$this->monitor['path'] = (array)$path;
}
/**
* 文件监控
* @param $server
*/
public function monitor($server)
{
if ($this->monitor['path']) {
$serverId = $server->tick($this->monitor['interval'], function ($serverId) use ($server) {
$md5Arr=[];
foreach ($this->monitor['path'] as $path) {
$files = glob(rtrim($path, '/') . "/*.php");
foreach ($files as $file){
$md5Arr[] = md5_file($file);
}
}
$md5Value = md5(implode('',$md5Arr));
if ($this->md5file==''){
$this->md5file = $md5Value;
return;
}
//文件有改动
if (strcmp($this->md5file, $md5Value)!==0){
$this->md5file = $md5Value;
echo "[update] The service folder has changed and is currently reloading...\n";
$server->clearTimer($serverId);
Process::kill($server->master_pid, SIGUSR1); // 重启服务
return;
}
});
// // 创建一个inotify实例(linux下安装inotify服务,pecl install inotify)
// $fd = inotify_init();
// // 添加需要监控的事件
// $mask = IN_DELETE | IN_CREATE | IN_MOVED_FROM | IN_MOVED_TO | IN_CLOSE_WRITE;
// foreach ($this->monitor['path'] as $path) {
// inotify_add_watch($fd, $path, $mask); // 添加监控目录
// }
// // 循环读取inotify事件
// $serverId = $server->tick($this->monitor['interval'], function ($serverId) use ($server, $fd) {
// $events = inotify_read($fd);
// if ($events) {
// foreach ($events as $event) {
// // 检测到文件更改,重启服务
// echo "[update] The service folder has changed and is currently reloading...\n";
// $server->clearTimer($serverId);
// Process::kill($server->master_pid, SIGUSR1); // 重启服务
// return;
// }
// }
// });
}
}
/**
* 魔术方法 有不存在的操作的时候执行
* @access public
* @param string $method 方法名
* @param array $args 参数
* @return mixed
*/
public function __call($method, $args)
{
call_user_func_array([$this->swoole, $method], $args);
}
}
new WebSocket();
/**
* [result 返回状态数组]
* @param [type] $code [错误码]
* @param string $data [具体数据]
* @return [type] [description]
*/
function msg($code, $msg=false, $data=false)
{
$code = is_string($code) ? trim($code) : $code;
if (is_string($code) && preg_match('/^[A-Za-z_-]+$/', $code) === 1) {
$result = ['type'=>$code, 'data'=>$msg];
} else {
if (is_numeric($code)) {
$result = ['code'=>$code, 'msg'=>$msg, 'data'=>$data, 'time'=>time()];
} else {
$msg || $msg = '操作成功';
$data = $data ?: $code;
$result = ['code'=>1, 'msg'=>$msg, 'data'=>$code, 'time'=>time()];
}
}
return json_encode($result, JSON_UNESCAPED_UNICODE);
}
<?php
/**
* @Author: [FENG] <1161634940@qq.com>
* @Date: 2024-08-28 13:26:14
* @Last Modified by: [FENG] <1161634940@qq.com>
* @Last Modified time: 2024-08-31 17:49:07
*/
namespace feng;
class Index
{
public $userInfo = [];
protected $server;
protected $reids;
protected $frame;
protected $request;
protected $swooleCommon;
public function __construct($base = [])
{
if ($base) {
[$this->server, $this->frame, $this->swooleCommon] = $base;
}
// $this->redis = (new \Redis())->init($config); //实例化redis
}
public function ceshi($message=[])
{
$data = [
'content'=>'这是一段消息',
'date' => date('Y-m-d H:i:s'),
];
return array_merge($data, $message);
}
public function index($value='')
{
return 'ceshi消息';
}
}
(windows下使用 swoole swoole.php
开启服务)
php swoole.php
访问HTTP服务接口 http://127.0.0.1:9501/index/ceshi?id=1
本文为冯奎原创文章,转载无需和我联系,但请注明来自冯奎博客fengkui.net
最新评论