手撸原生swoole完整版(HTTP、WebSocket服务端、代码热更新等)

在上一篇文章中,我们介绍了swoole的安装与基本使用,
在这篇文章中,完整的演示了swoole的使用,
HTTP服务端、WebSocket服务端、代码热更新,
接收到请求时,转发到真正的逻辑文件进行执行,
在逻辑文件中,如何完成数据交互与执行,
在后期文件中,我们将结合redis队列实现商品的抢购

新建文件 swoole.phpfeng/Index.php

swoole.php

<?php
/**
 * @Author: [FENG] <1161634940@qq.com>
 * @Date:   2024-08-20 13:26:14
 * @Last Modified by:   [FENG] <1161634940@qq.com>
 * @Last Modified time: 2024-08-31 17:38:36
 */
use \Swoole\Server\Helper;
use \Swoole\WebSocket\Frame;
use \Swoole\WebSocket\Server as WebSocketServer;
use \Swoole\Server;
use \Swoole\Timer;
use \Swoole\Process;

/**
 * Swoole WebSocket Server 命令行服务类
 * 此文件的修改不支持热更新,请于更新后重启swoole-websocket服务
 */
class WebSocket
{
    protected $monitor;
    protected $md5file;

    // Swoole对象
    public $swoole;

    // Socket的类型
    protected $sockType = SWOOLE_SOCK_TCP;

    // 运行模式
    protected $mode = SWOOLE_PROCESS;

    // swooleCommon 类实例
    protected $swooleCommon;

    /**
     * swooleCommon 类实例
     */
    protected $config = [
        "wss_switch"    => "0",
        // "ssl_cert_file" => "/www/wwwroot/yourDomain/cert/im.pem",
        // "ssl_key_file"  => "/www/wwwroot/yourDomain/cert/im.key",
        "websocket_port" => "9501",
        "worker_num"    => "2",
        "task_worker_num" => "2",
        "reactor_num"   => "2",
        "max_connections" => "20480",

        'daemonize' => false,
        'max_request' => 100000,
        'task_enable_coroutine' => true, // 关闭Task协程
        'max_wait_time' => 10,
        'reload_async' => true, // 异步模式下启动管理进程
    ];

    /**
     * 支持的响应事件
     * @var array
     */
    protected $event = [
        'Start',
        'Shutdown',
        'WorkerStart',
        'WorkerStop',
        'WorkerExit',
        'Receive',
        'Close',
        'Task',
        'Finish',
        'PipeMessage',
        'WorkerError',
        'ManagerStart',
        'ManagerStop',
        'Open',
        'Request',
        'Message',
        'HandShake',
    ];

    function __construct()
    {
        if ($this->config['wss_switch']) {
            if (file_exists($this->config['ssl_cert_file']) && file_exists($this->config['ssl_key_file'])) {
                $this->sockType = SWOOLE_SOCK_TCP | SWOOLE_SSL;
            } else {
                throw new \Exception('SSL certificate file does not exist!');
            }
        }

        $this->swoole = new WebSocketServer('127.0.0.1', $this->config['websocket_port'], $this->mode, $this->sockType);

        // 设置文件监控(监控时间秒,监控目录)(正式环境勿开,很占cpu)
        $this->setMonitor(1000, ['./feng/']);
        // 初始化swoole配置
        $this->option($this->config);

        $agreement = $this->config['wss_switch'] ? 'wss://' : 'ws://';
        $address   = $agreement . "127.0.0.1:" . $this->config['websocket_port'];

        echo "Swoole WebSocket server started: <" . $address . ">\n";
        echo "You can exit with `CTRL-C`\n";

        $this->swoole->start();
    }

    /**
     * Worker 进程启动
     * @param  $server
     * @param  $worker_id
     */
    public function onWorkerStart($server, $worker_id)
    {
        if (0 == $worker_id && $this->monitor) {
            // print_r(get_included_files());// 查看不支持热更新的文件列表
            $this->monitor($server);
        }
    }

    /**
     * 链接握手成功
     * @param  $server
     * @param  $frame
     */
    public function onOpen($server, $frame)
    {
        $server->push($frame->fd, json_encode([
            'type' => 'init',
            'client_id' => $frame->fd,
        ]));
    }

    /**
     * request回调
     * 当请求过来时,会调用此方法,然后转发到真正的逻辑文件执行
     * @param $request
     * @param $response
     */
    public function onRequest($request, $response) {
        $data = $request->post ?? ($request->get ?? []);
        // array_walk_recursive($data, ['app\worker\logic\Common', 'checkVariable']);
        $server = $this->swoole;//调用外部的server
        $client_id = $request->fd;

        if ($request->server['path_info'] == '/favicon.ico' || $request->server['request_uri'] == '/favicon.ico') {
            $response->status(404);
            $response->end();
            return;
        }

        $uri = array_filter(explode('/', ltrim($request->server['request_uri'], '/wss')));
        [$c, $a] = (count($uri) == 1) ? [$uri[0], ''] : ($uri ?: [$data['c'] ?? '', $data['a'] ?? '']);

        $response->header('Content-Type', 'application/json; charset=utf-8');
        // 检查要访问的类是否存在
        if (empty($data)) {
            $response->end(msg(0, '参数缺失,请填写完整参数'));
            return;
        }

        $filename = __DIR__ . '/feng/' . ucfirst($c) . '.php';
        if (file_exists($filename) && !empty($c)) { // 判定文件是否存在
            if (is_readable($filename)) {
                require_once $filename;
            }
            $className = "\\feng\\" . ucfirst($c);

            // 检查要访问的类是否存在
            if (!class_exists($className, false)) {
                $response->end(msg(0, '访问的控制器不存在!'));
                return;
            }
        } else {
            $response->end(msg(0, '访问的控制器文件不存在!'));
            return;
        }

        $o = new $className([$server, $response, $this->swooleCommon]); // 新建对象
        if (!empty($a) && !method_exists($o, $a)) {
            $response->end(msg(0, '访问的方法不存在!'));
            return;
        }

        if (isset($o) && $a) {
            $result = call_user_func_array([$o, $a], [$data]); //调用对象$o($c)里的方法$a
            if ($result) {
                $json = is_array($result) ? msg($result) : $result;
                $response->end($json);
            }
        } else {
            $json = msg(0, '参数缺失,请填写要执行的操作');
            $response->end($json);
        }
    }

    /**
     * 收到数据帧
     * 与request请求类似,转发到真正的逻辑文件执行
     * @param  $server
     * @param  $frame
     */
    public function onMessage($server, $frame)
    {
        $message = json_decode($frame->data, true) ?: $frame->data;
        // 安全检查过i滤
        // array_walk_recursive($message, ['app\worker\logic\Common', 'checkVariable']);
        $data = $message['data'] ?? [];
        $client_id = $frame->fd;

        if (!is_array($message) || !isset($message['c']) || !isset($message['a'])) {
            $server->push($client_id, msg('show_msg', '参数缺失,请填写完整参数'));
            return;
        }

        $filename = __DIR__ . '/feng/' . ucfirst($message['c']) . '.php';
        if (file_exists($filename)) { // 判定文件是否存在
            if (is_readable($filename)) {
                require_once $filename;
            }
            $className = "\\feng\\" . ucfirst($message['c']);

            // 检查要访问的类是否存在
            if (!class_exists($className, false)) {
                $server->push($client_id, msg('show_msg', '访问的控制器不存在!'));
                $server->close($client_id);
                return;
            }
        } else {
            $server->push($client_id, msg('show_msg', '错误的请求2!'));
            $server->close($client_id);
            return;
        }

        $o = new $className([$server, $frame, $this->swooleCommon]); // 实例化类
        $a = $message['a']; // 方法名称

        if (!method_exists($o, $message['a'])) {
            $server->push($client_id, msg('show_msg', '访问的方法不存在!'));
            return;
        }

        $result = call_user_func_array([$o, $a], [$data]); //调用对象$o($c)里的方法$a
        $json = is_array($result) ? msg($result) : $result;
        $server->push($client_id, $json); // 发送消息
    }

    /**
     * 链接关闭
     */
    public function onClose($server, $fd, $reactorId)
    {
        // 解除所有绑定关系
        // $this->swooleCommon->unbindFd($fd);
    }

    /**
     * @param $serv
     * @param $taskId
     * @param $workerId
     * @param $data
     */
    public function onTask($server, $taskId, $workerId, $data)
    {
        // 分发 task 任务机制,让不同的任务 走不同的逻辑
        // $obj = new app\common\lib\task\Task;

        // $method = $data['method'];

        // Timer::tick(5000, function() use ($taskId){
        //     echo "taskId  " . $taskId . "  timeout " . rand(1111,9999) . "\n";
        //     $flag = rand(1111,9999);
        //     return $flag; // 告诉worker
        // });

        $flag = rand(1111,9999);
        return $flag; // 告诉worker
    }

    /**
     * @param $server
     * @param $taskId
     * @param $data
     */
    public function onFinish($server, $taskId, $data) {
        echo "taskId:{$taskId}\n";
        echo "finish-data-sucess:{$data}\n";
    }

    public function option(array $option)
    {
        if (!empty($option)) {
            $this->swoole->set($this->checkOptions($option));
        }

        // 注册回调
        foreach ($this->event as $event) {
            if (method_exists($this, 'on' . $event)) {
                $this->swoole->on($event, [$this, 'on' . $event]);
            }
        }

        // 实例化swooleCommon类
        // $this->swooleCommon = new \app\worker\library\Common($option['max_connections'], $this->swoole);
    }

    protected function checkOptions(array $options)
    {
        if (class_exists(Helper::class)) {
            $constOptions = Helper::GLOBAL_OPTIONS + Helper::SERVER_OPTIONS + Helper::PORT_OPTIONS + Helper::HELPER_OPTIONS;
            foreach ($options as $k => $v) {
                if (!array_key_exists(strtolower($k), $constOptions)) {
                    unset($options[$k]);
                }
            }
        }
        return $options;
    }

    public function setMonitor($interval = 2, $path = [])
    {
        $this->monitor['interval'] = $interval;
        $this->monitor['path']     = (array)$path;
    }

    /**
     * 文件监控
     * @param $server
     */
    public function monitor($server)
    {
        if ($this->monitor['path']) {
            $serverId = $server->tick($this->monitor['interval'], function ($serverId) use ($server) {
                $md5Arr=[];
                foreach ($this->monitor['path'] as $path) {
                    $files = glob(rtrim($path, '/') . "/*.php");
                    foreach ($files as $file){
                        $md5Arr[] = md5_file($file);
                    }
                }

                $md5Value = md5(implode('',$md5Arr));
                if ($this->md5file==''){
                    $this->md5file = $md5Value;
                    return;
                }
                //文件有改动
                if (strcmp($this->md5file, $md5Value)!==0){
                    $this->md5file = $md5Value;
                    echo "[update] The service folder has changed and is currently reloading...\n";
                    $server->clearTimer($serverId);
                    Process::kill($server->master_pid, SIGUSR1); // 重启服务
                    return;
                }
            });

            // // 创建一个inotify实例(linux下安装inotify服务,pecl install inotify)
            // $fd = inotify_init();

            // // 添加需要监控的事件
            // $mask = IN_DELETE | IN_CREATE | IN_MOVED_FROM | IN_MOVED_TO | IN_CLOSE_WRITE;
            // foreach ($this->monitor['path'] as $path) {
            //     inotify_add_watch($fd, $path, $mask); // 添加监控目录
            // }

            // // 循环读取inotify事件
            // $serverId = $server->tick($this->monitor['interval'], function ($serverId) use ($server, $fd) {
            //     $events = inotify_read($fd);
            //     if ($events) {
            //         foreach ($events as $event) {
            //             // 检测到文件更改,重启服务
            //             echo "[update] The service folder has changed and is currently reloading...\n";
            //             $server->clearTimer($serverId);
            //             Process::kill($server->master_pid, SIGUSR1); // 重启服务
            //             return;
            //         }
            //     }
            // });
        }
    }

    /**
     * 魔术方法 有不存在的操作的时候执行
     * @access public
     * @param string $method 方法名
     * @param array $args 参数
     * @return mixed
     */
    public function __call($method, $args)
    {
        call_user_func_array([$this->swoole, $method], $args);
    }


}

new WebSocket();


/**
 * [result 返回状态数组]
 * @param  [type] $code [错误码]
 * @param  string $data [具体数据]
 * @return [type]       [description]
 */
function msg($code, $msg=false, $data=false)
{
    $code = is_string($code) ? trim($code) : $code;
    if (is_string($code) && preg_match('/^[A-Za-z_-]+$/', $code) === 1) {
        $result = ['type'=>$code, 'data'=>$msg];
    } else {
        if (is_numeric($code)) {
            $result = ['code'=>$code, 'msg'=>$msg, 'data'=>$data, 'time'=>time()];
        } else {
            $msg || $msg = '操作成功';
            $data = $data ?: $code;
            $result = ['code'=>1, 'msg'=>$msg, 'data'=>$code, 'time'=>time()];
        }
    }
    return json_encode($result, JSON_UNESCAPED_UNICODE);
}

feng/Index.php

<?php
/**
 * @Author: [FENG] <1161634940@qq.com>
 * @Date:   2024-08-28 13:26:14
 * @Last Modified by:   [FENG] <1161634940@qq.com>
 * @Last Modified time: 2024-08-31 17:49:07
 */
namespace feng;

class Index
{
    public $userInfo = [];
    protected $server;
    protected $reids;
    protected $frame;
    protected $request;

    protected $swooleCommon;

    public function __construct($base = [])
    {
        if ($base) {
            [$this->server, $this->frame, $this->swooleCommon] = $base;
        }
        // $this->redis = (new \Redis())->init($config); //实例化redis
    }

    public function ceshi($message=[])
    {
    	$data = [
    		'content'=>'这是一段消息',
    		'date' => date('Y-m-d H:i:s'),
    	];
    	return array_merge($data, $message);
    }


    public function index($value='')
    {
        return 'ceshi消息';
    }

}

开启swoole服务

(windows下使用 swoole swoole.php 开启服务)

php swoole.php

访问HTTP服务接口 http://127.0.0.1:9501/index/ceshi?id=1 QQ截图20240831175602.png

冯奎博客
请先登录后发表评论
  • latest comments
  • 总共0条评论