swoole

2018-09-19 鲁鲁槟 收藏

一、前言

1.1、什么是 swoole

swoole是基于C开发的一个php扩展,类似你熟悉的Mysqli、cURL等等。

swoole的作用,其实更多的是解决php在某些方面的缺陷(当然,php是最好的语言),比如即时通讯、异步任务、消息队列等等。

Swoole是PHP语言的高性能网络通信框架,提供了PHP语言的异步多线程服务器,异步TCP/UDP网络客户端,异步MySQL,数据库连接池,AsyncTask,消息队列,毫秒定时器,异步文件读写,异步DNS查询。 Swoole虽然是标准的PHP扩展,实际上与普通的扩展不同。普通的扩展只是提供一个库函数。而swoole扩展在运行后会接管PHP的控制权,进入事件循环。当IO事件发生后,swoole会自动回调指定的PHP函数。

1.2、php的cli模式

PHP 除了可以被 Apache IIS 等服务器调用,还可以通过 cli 模式运行,因为 php 本质上还是 C 语言写的程序。

①、将 php.exe 加入环境变量

②、新建 cli.php

<?php
   echo "hello php cli\n";

③、打开 cmd ,切换到 cli.php 所在目录,输入

php cli.php

④、修改 cli.php

<?php
   echo "hello php cli\n";
   var_dump($_SERVER["argc"]);   //$_SERVER["argc"]  为传递的参数的个数
   var_dump($_SERVER["argv"]);   //S_SERVER["argv"]  为传递的参数的值,以数组表示

⑤、打开 cmd ,切换到 cli.php 所在目录,输入

 php cli.php one two

返回:

hello php cli

int(3)

array(3) {
  [0] =>
  string(8) "test.php"
  [1] =>
  string(3) "one"
  [2] =>
  string(3) "two"
}

1.3、进程和线程

①、进程

对于操作系统而言,进程就是一个任务,比方说你打开了一个记事本,那就启动了一个进程,打开了两个浏览器,就是另外开启了两个进程,再或者说我现在在word内写文章,打开word也会占用一个进程。也就是说,一个进程至少要干一件事情。

对于linux系统而言,如果你想要查看当前系统中运行着哪些进程,可以通过ps命令进行查看。

比如我现在打开一个终端,用vim打开一个文件

vim test.php

打开后这个终端不动,再新打开一个终端,执行ps命令后

ps aux | grep vim
root      8381  0.0  0.4 143844  4724 pts/0    S+   09:30   0:00 vim test.php
root      8876  0.0  0.0 103324   884 pts/2    S+   09:40   0:00 grep vim

可以看到,有两个vim相关的进程在我执行ps的那一霎那还在执行。

②、线程

有些情况下,一个进程会同时做一些事情,比如说word。它可以同时进行打字、拼写检查等操作。注意这里我们说的同时进行。像这样,在一个进程内部,同时运行着多个“子任务”,我们就可以把这些子任务称之为“线程”。即进程是由多个线程组成的,一个进程至少要有一个线程。实际上,线程是操作系统最小的执行单元。

③、多任务的实现

A、试想一下,如果我们要同时执行多个任务怎么办?

根据上文的理解,我们可以:启动多个进程

B、试想一下,如果我们要同时执行多个任务怎么办?根据上文的理解,我们可以

  • 启动多个进程

  • 启动一个进程,并在该进程内启动多个线程

  • 启动多个进程,每个进程内启动多个线程

④、多进程实现

我们举一个实际点的例子:各位熟悉的apache,其实就是一种多进程实现的案例。当父进程监听到有新的请求时,就会fork出新的子进程来对之进行处理。

Linux的fork()函数通过系统调用即可实现创建一个与原进程几乎相同的进程。对于多任务,通常我们会设计Master-Worker模式,即一个Master进程负责分配任务,多个Worker进程负责执行任务。同理,如果是多线程,Master就是主线程,Worker就是子线程。

⑤、多进程与多线程的区别

多进程的优点就是稳定性很高,如果一个进程挂了,不会影响其他子进程,当然,如果主进程挂了那就都玩完(主进程挂点的可能性微乎其微,后面讲进程模型会说到)。而对于多线程,这个恐怕就是致命的缺点了,因为所有线程共享内存,如果某一个线程挂了,那这个进程几乎就崩溃了。

性能方面,不论是进程还是线程,如果启动太多,无疑都会带来CPU的调度问题,因为进程或者线程的切换,本身就非常耗费资源。数量达到一定程度的时候,CPU和内存就消耗殆尽,电脑就死机了。

举一个例子:使用过windows的用户都知道,如果我们打开的软件越多(开启的进程也就越多),电脑就会越卡,甚至装死机没反应。

线程与进程相比,自然是要比进程更轻量一些,而且线程之间是共享内存的,所以不同线程之间的交互就显得容易实现。而对于多进程之间的通信,需要借助消息队列,共享内存等复杂的方式才可以实现。

1.4、IO模型

①、什么是 IO

IO即Input/Output,输入和输出的意思。在计算机的世界里,涉及到数据交换的地方,比如磁盘、网络等,就需要IO接口。

通常,IO是相对的。比如说你打开浏览器,通过网络IO获取我们网站的网页,浏览器首先会往服务器发送请求,这是一个Output操作,随后服务器给浏览器返回信息,这就是一个Input操作。以上都是基于浏览器而言。但是,有些操作就比较特殊。比如程序在运行时,数据被加载在内存中,通过程序往磁盘写数据,对内存而言,这就是单方面的的Output。

②、IO模型

IO模型通常有很多种,我们简单介绍下同步IO和异步IO。

③、同步IO

实际上我们刚刚介绍的浏览器请求服务器的过程正是同步IO的例子。

那我们再比如,假设我们要通过程序往磁盘写大量的数据,如果没有磁盘IO操作,php程序在内存中执行的速度是非常快的,但是磁盘写数据的过程相对而言就是漫长的,CPU就需要等待磁盘IO操作之后才能继续执行其他代码,像上面这两种情况,我们都称之为同步IO。

php本身是单线程的,当php进程被挂起的时候,像上面的读取磁盘数据,往磁盘写数据,在IO操作之前php代码就没办法继续执行了。

因为IO操作阻塞了当前线程,如果某用户也想从磁盘上读取或者写数据,就需要等待。

有些人要反驳了,这不对呀,我经历不是这样的,很多人可以同时访问我的网站,这没问题的。

这个没必要纠结,php本身是单进程单线程的,用户可以同时访问你的网站实际上是web服务器的功劳。这就是我们之前讨论过的,如何解决多任务的问题。

web服务器的进程模型暂时不多讨论,免得懵。

如果不考虑web服务器,是不是当前进程一旦阻塞,其他人访问php都会被阻塞啦?答案是肯定的。要解决这个问题,有回到我们一直强调的多进程或者多线程。

但是,如果为了解决并发问题,系统开启了大量的进程,就像我们之前说的,操作系统在进程或者线程间切换同样会造成CPU大量的开销。有没有更好的解决方案呢?

④、异步IO

答案就就是异步IO。我们再来强调一遍异步IO是要解决什么问题的:同一线程内,执行一些耗时的任务时,其他代码是不能继续执行的,要等待该任务操作完之后才可以。

异步IO是什么样的呢?当程序需要执行一个非常耗时的IO操作的时候,它只发出IO指令,不需要等待IO的结果,然后可以继续执行其他的代码了。当IO返回结果时,再通知CPU去处理,这就是异步IO。

总结:同步IO模型下,主线程只能被挂起等待,但是在异步IO模型中,主线程发起IO指令后,可以继续执行其他指令,没有被挂起,也没有切换线程的操作。由此看来,使用异步IO明显可以提高了系统性能。

1.5、TCP/IP和UDP

①、浏览器访问网站的过程

平时我们打开一个浏览器,然后输入网址后回车,即展现了一个网页的内容。这是一个非常简单的操作。我们来简单的概括下背后的逻辑。

  • 浏览器通过TCP/IP协议建立到服务器的TCP连接

  • 客户端向服务器发送HTTP协议请求包,请求服务器里的资源文档

  • 服务器向客户端发送HTTP协议应答包,如果请求的资源包含有动态语言的内容,那么服务器会调用动态语言的解释引擎负责处理“动态内容”,并将处理得到的数据返回给客户端

  • 客户端与服务器断开,由客户端解释HTML文档,在客户端屏幕上渲染图形结果

表面上看这就是两台电脑之间进行的一种通信。

更确切的说,是两台计算机上两个进程之间的通信。你打开浏览器相当于启动了一个浏览器进程,而服务端事先也启动了某个进程,在某个端口监听,时刻等待客户端的连接。

那么问题来了,为什么客户端可以请求到服务器呢?服务器上跑那么多服务,又是怎么知道客户端想要什么呢?

其实答案很简单,因为有网络。计算机为了联网,就必须遵循通信协议。早期的互联网有很多协议,但是最重要的就非TCP协议和IP协议莫属了。所以,我们把互联网的协议简称为TCP/IP协议。

②、IP协议

想必大家都有过这样的经历,客户端通过telnet连接服务器的时候,往往都需要一个ip地址和一个端口。如果客户端跟服务器之间有数据的交互,其过程大致是这样的:

IP协议负责把你本机的数据发送到服务端,数据被分割成一块一块的。然后通过IP包发送出去。IP包的特点是按块发送,但不保证能到达,也不保证数据块依次到达。

如果是这样进行数据传输,服务器根本不能保证接收到的数据的完整性和顺序性,这样是不是就会有很大的问题呢?

③、TCP协议

于是乎,TCP协议应运而生,它是建立在IP协议之上,专门负责建立可靠连接,并保证数据包顺序到达。TCP协议会通过握手建立连接,然后,对每个IP包编号,确保对方顺序收到,如果出现丢包,则重新发送。

这个时候再说TCP协议是一种面向连接、可靠的、基于IP之上的传出层协议就不难理解了吧。

TCP协议还有一个更重要的特点,它是基于数据流的。

什么意思呢?这就好比客户端和服务端要进行数据交互,中间有一个管子连接着,这个时候交互数据就好比管子中的水,当数据在传输(水在流动)的过程中,服务端是无法知道哪段数据是我们想要的完整数据。怎么解决这一问题呢?这个时候就需要双方约定一个协议来解决了。再往后说就说到应用层协议了,比如http协议,我们姑且不谈。

④、UDP协议

TCP懂了,UDP自然就不难理解了。

相对于TCP,使用UDP协议进行通信的最大区别就是,UDP不需要建立连接,给他一个ip和端口,就可以直接发送数据包了。但是,能不能成功到达就不知道了。虽然UDP传输不可靠,但是速度快。对于一些对数据要求不高的场景,使用UDP通信无疑是不错的选择。

二、swoole

2.1、swoole的安装与升级

windows用户可以使用cygwin环境来学习swoole,但是会有很多问题,下面我们主要介绍下linux环境下swoole的安装。

php版本推荐php5.4+

此外,你应该保证系统中安装了如下软件

gcc-4.4+
make
autoconf

gcc版本不够或者上述软件有一项未安装,下面的操作就没必要往下看了。

注:还需安装以下插件

A、pcre——主要用于swoole_server::connections-

yum install -y pcre pcre-devel

两种安装方式

①、方法一:编译安装

A、下载最新的稳定版,以pecl官网为准,后面针对swoole的学习,我们以1.9.6为准,如果本地已经安装过swoole了,但是版本不一致,可以直接看下面关于swoole升级的讲解。此外,由于swoole还很年轻,各个版本中可能会与我们所讲述的配置有所冲突,建议尽可能的保证你的版本跟我们一致,除非你知道版本本身的差异。

我们切换到 /usr/local/src 目录,你也可以下载到你期望的路径。利用wget下载,提示wget不是命令的请先下载一下这个命令

wget http://pecl.php.net/get/swoole-1.9.6.tgz

B、随后利用tar命令解压,同样tar命令不存在的自行下载

tar zxvf swoole-1.9.6.tgz

C、切换到 swoole-1.9.6 目录

cd swoole-1.9.6

D、找到phpize所在路径,注意要找你要给具体php版本安装的那个phpize,比如我的php源码在 /usr/local/php56/ 目录,phpize路径就是 /usr/local/php56/bin/phpize,在不确保终端下的phpize是哪个版本的php时候,建议指定绝对路径

/usr/local/php/bin/phpize

终端下输入上面的命令后回车即可

E、检查&&编译&&安装

./configure --with-php-config=/usr/local/php/bin/php-config
make 
sudo make install

依次输入上述命令进行操作。

注:如果要支持ssl,需要(当然,要先确保你的系统安装了openssl,php也安装了openssl扩展)

./configure --with-php-config=/usr/local/php/bin/php-config --enable-openssl

②、方法二:PECL安装

方法一的编译安装的过程稍微有一些麻烦,swoole也是pecl的项目,所以,我们还可以通过pecl进行一键安装

pecl install swoole

如果以上步骤一切正常的话,即表示swoole已经成功的安装了。

③、修改 php.ini

成功之后,我们打开php.ini,把swoole.so加入到文件最后

extension=swoole.so

随后通过命令php -m查看swoole是否被正确的安装

php -m | grep swoole

能看到结果即表示安装成功了,当然这是在我们安装过程中一切顺利的情况下进行的。

④、swoole升级

swoole现在还处于发展中,可能我们还没学完,新的版本又要出来了。有同学要说了,过段时间我估计就忘记现在安装的swoole的版本是多少了,这个怎么办?

我们可以通过 php --ri 命令查看swoole版本

php --ri swoole | grep Version

#结果
Version => 1.9.6

如果后期发现有新的版本发布了,怎么升级swoole呢?

  • 编译升级:编译升级,只需要从pecl官网下载最新的稳定版,按照我们一开始的编译安装步骤再走一遍就完事了。之前安装的版本不需要过问,这就相当于重新安装一次新版本就好了。友情提醒,尽可能的下载稳定版,非稳定版可能会发生很多意外的事。

  • pecl升级:这个更简单,一条命令搞定

pecl upgrade swoole

#结果
swoole

2.2、swoole初识之异步多线程服务器

①、同步和异步

我们在 IO模型 中解释过同步和异步的概念,并非是web开发模式下ajax这种异步的请求。在常见的web开发模式下,我们所碰到的几乎都是同步模式。

为什么这么说?无论是fpm还是httpd,同一时间内一个进程只能处理一个请求,如果当前进程处于繁忙,后面的请求也只能继续等待有新的空闲进程。如果负载稍微上去了些,我们还可以调整fpm和httpd的进程数,即增加worker进程的数量。但是,在服务器资源有限的情况下,随着worker进程数量的递增,系统消耗的资源也会逐步增加,直至over。

swoole是既支持全异步,也支持同步,同步模式我们后面结合fpm再说。

IO模型 中,我们也可以感受到异步很强大。为什么喃?

我们举一个一名老师指导多名学生解题的场景。

同步模式下,当该老师在给某学生A指导题目的时候,嘴里可能一边嘟囔着“这个要这么写...”,话没说完,另一个学生B喊道“老师快来,我这碰到难题了,快过来指导指导”。

“等会,没看见在忙吗?”

然后学生B只能乖乖的等老师给A解答完之后才可以。

异步模式就不同啦,老师在给A指导的同时,B又屁颠屁颠的喊着“老师老师...”,这个时候老师态度上就360大转弯,“来了来了”,顺便跟A说了“你先理解下我刚才说的,等会好了叫我”,然后呢,后面的剧情可能就是这样的

  • A解答完毕跟老师说“谢谢”,B喊老师

  • B先喊老师,A进入B一开始的状态,B解答完毕跟老师说“谢谢”

  • 剧情很多,自己没事想吧

又重温了下什么是同步和异步的概念,禁止混淆。

②、socket编程

socket是什么?

在大部分的书本或者网络文章中,你都能找到一个解释:套接字,是属于应用层和传输层之间的抽象层。真想把发明这词的人拉出来暴打一顿,这也太抽象了。

socket即套接字,是用来与另一个进程进行跨网络通信的文件,说是“文件”,也很好理解哈,因为在linux中一切都可以理解为“文件”。比如客户端可以借助socket与服务器之间建立连接。你也可以把socket理解为一组函数库,它确实也就是一堆函数。

我们知道,常见的网络应用都是基于Client-Server模型的。即一个服务器进程和多个客户端进程组合而成,如果你还理解为是一台电脑对另一台电脑,可以回去把 进程/线程 再看看了。在Client-Server模型中,服务器管理某种资源,并且通过对它管理的资源进行操作来为客户端提供服务。

那Client和Server又如何实现通信呢?这就要利用socket一系列的函数实现了。

基于套接字接口的网络应用的描述,用下面这张图来理解就好。

01.png

大致可以描述为:服务器创建一个socket,绑定ip和端口,在该端口处进行监听,然后通过accept函数阻塞。当有新的客户端连接进来时,server接收客户端数据并处理数据,然后返回给客户端,客户端关闭连接,server关闭该客户端,一次连接交互完成。

③、初识server

server,顾名思义,就是服务器。我们平时接触比较多的无非就是nginx和apache。作为webServer,二者都是通过监听某端口对外提供服务。

下面我们来创建一个简单的server。

A、创建一个server对象

server的创建,只需要绑定要监听的ip和端口,如果ip指定为127.0.0.1,则表示客户端只能位于本机才能连接,其他计算机无法连接。

$serv = new swoole_server("127.0.0.1", 9501);

端口这里指定为9501,可以通过netstat查看下该端口是否被占用。如果该端口被占用,可更改为其他端口,如9502,9503等。

B、配置

swoole的运行模式,默认是多进程模式,这根fpm有点像。怎么体现多进程呢?要开启几个进程才合适呢?

这个就需要我们做一些配置了,但是并非像fpm直接在文件内配置,我们可以在server创建后,通过$serv->set(array())指定配置项。当然,这个配置项也有很多,比如说我们可以指定日志文件记录具体的错误信息等等,你都可以在官网的手册上寻找有哪些配置项,我们也会在贯穿swoole的同时讲解一部分常用的配置项。

这里我们首要说明一下worker进程数的配置。

我们可以指定配置项worker_num等于某个正整数。这个正整数设置多少合适,即我要开多少个worker进程处理我们的业务逻辑才好呢?官方建议我们设置为CPU核数的1-4倍。因为我们开的进程越多,内存的占用也就更多,进程间切换也就需要耗费更多的资源。我们这里设置开启两个worker进程。默认该参数的值等于你机器的CPU核数。

$serv->set([
    "worker_num" => 2,
]);

C、事件驱动

swoole另外一个比较吸引人的地方,就是swoole_server是事件驱动的。我们在使用的过程中不需要关注底层怎么样怎么样,只需要对底层相应的动作注册相应的回调,在回调函数中处理我们的业务逻辑即可。

什么意思呢?我举个例子:

你启动了一个server,当客户端连接的时候,你不需要关心它是怎么连接的,你就单纯的注册一个connect函数,做一些连接后的处理即可。再比如server收到了client传递的数据,你用关心复杂的网络是怎么接受到的吗?不用,你只需要注册一个receive回调,处理数据就这么多。

让我们来看看几种常见的事件回调。

// 有新的客户端连接时,worker进程内会触发该回调
$serv->on("Connect", function ($serv, $fd) {
    echo "new client connected." . PHP_EOL;
});
  • 参数$serv是我们一开始创建的swoole_server对象,

  • 参数$fd是唯一标识,用于区分不同的客户端,同时该参数是1-1600万之间可以复用的整数。

我来解释下复用:假设现在客户端1、2、3处于连接中,客户端4要连接的话$fd就是4,但是不巧的是客户端3连接不稳定,断掉了,客户端4连接到server的话,$fd就是3,这样看的话,实际可能远不止1600W。那1600W个连接够用吗?我的妈呀,你丫单个业务先做到160W再考虑这个问题吧...

// server接收到客户端的数据后,worker进程内触发该回调
$serv->on("Receive", function ($serv, $fd, $fromId, $data) {
    // 收到数据后发送给客户端
    $serv->send($fd, "Server". $data);
});

Receive回调的前两个参数就不说了,刚说完。

上面说到的两个回调,都强调了是在worker进程内触发的。第三个参数$fromId指的是哪一个reactor线程,具体我们会在多进程模型一文中详细分析,先忽略吧。

我们看第四个参数,这个参数就是服务端接受到的数据,注意是字符串或者二进制内容哦,后面我们只谈字符串,不用怕。

注意我们在Receive回调内,调用了$serv的send方法,我们可以使用send方法,向client发起通知。

// 客户端断开连接或者server主动关闭连接时 worker进程内调用
$serv->on("Close", function ($serv, $fd) {
    echo "Client close." . PHP_EOL;
});

注意哦,当客户端与服务端的连接关闭的时候就会调用close回调,有些新手可能习惯性的会在close回调中继续调用$serv->close($fd),人都关闭了才去调用这个方法,你再调用是不是想找事?

到此呢,我们基本上已经搭建到了一个高性能的server。“我什么都没做,这就完啦?好没劲啊”

是的,非常简单,下面我们只需要调用start方法启动server即可。

// 启动server
$serv->start();

如此,便开启了一个server服务。

由于swoole_server只能运行在CLI模式下,所以不要试图通过浏览器进行访问,这样是无效的。不信的可以试试。

我们在命令行下面执行

php server.php

回车。

随后继续回车随便输入点什么都没有效果,感觉当前终端卡住了有木有?

我们平时执行完一个指令,执行完就结束了,但是现在的情况正好相反,当前程序一直处于执行中的状态,并没有退出终端。退出状态一般为当前终端的执行权交给了终端,即可用在终端下进行其他操作。

还记得我们第一步初始化server所填写的ip和端口吗,也就是说server现在正在监听9501端口提供服务。

当前终端暂时不动,我们新开一个终端,看看是不是这样。

netstat -an| grep 9501
tcp 0 0 127.0.0.1:9501 0.0.0.0:* LISTEN

发现本地的9501端口正在被监听对不对?server启动好了能干什么呢?常见的网络编程模式都是client-server的,也就是说我们还需要模拟一个客户端与之交互。

关于客户端,我们可以先通过telnet模拟

01.png

上图中上侧是开启的server窗口,下侧是我们用telnet模拟client的结果。

从结果中可以看出,客户端输入xxx,服务端就会直接返回 Server xxx,这正是我们在Receive回调方法中调用$serv->send方法发送给客户端的数据。而且在server启动的窗口下,也有我们在connect回调打印的信息。

在整个过程中,swoole server提供了类似web服务器的功能,监听端口,做出响应。

此外,swoole还提供了一套对socket客户端的封装,而且啊而且,这个要重点说一下,同步阻塞的swoole_client可以用于php-fpm或者apache环境。

swoole的大部分模块都只能运行在CLI模式下,像我们刚刚在cli下启动的server。但是对于面向web的应用怎么办?所以,swoole_client是我们与服务端交互的一个重要方法,先笔记记下。

下面我们用swoole_client来模拟下客户端。

新建一个Client.php文件。

代码如下:

connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");
// 向服务端发送数据
$client->send("hello server.");
// 从服务端接收数据
$response = $client->recv();
// 输出接受到的数据
echo $response . PHP_EOL;
// 关闭连接
$client->close();

我们看到,客户端无非就是创建一个socket对象,然后指定ip和端口,连接server,随后向server发送了一段数据,而后接收server的数据并输出,最后关闭连接。

看下模拟结果

01.png

注意到无论是server还是client,都是在CLI下执行的。

从模拟的结果中我们也可以清晰的看到client与server交互的整个过程。

但是,相信很多人都会有疑问,尤其是phper,server和客户端都这么玩,完全看不到实际应用啊。先慢慢练习吧,我们这才刚打响与swoole之间的战役。

server的关闭,手动执行Ctrl+C即退出。

2.3、swoole之task初体验

①、task初体验

在上文和IO模型中我们都对同步和异步进行了详细的解释,可能你们都懂了,可能部分人还是没懂,毕竟异步始终是个抽象的概念。

今天我们再来强化下这个概念,说一说Async Task。

AsyncTask,即异步任务。我们可以利用AsyncTask将一个耗时的任务投递到队列中,由进程池异步去执行。

博主你说人话,啥是异步任务?

总有些人吐槽不知道swoole的应用场景是啥,我们就以实际中遇到的问题为例:

  • 情景一:管理员需要给指定的用户发送邮件,当勾选10封甚至更多封的时候,点击发送,浏览器会一直转圈,直到邮件全部发送完毕。

  • 情景二:大家都爱看小说,我们以某小说网站的一个需求为例:要求作者可以把他事先写好的小说直接批量导入到网站(根据某种规则),这个操作起来同样会比较耗时。

从我们理解的角度思考,这其实都是php线程一直被阻塞,客户端才一直在等待服务端的响应。

对用户而言,这就是漫长的等待。如何优雅的提高用户体验就是一个非常棘手的问题。

我们的目的就是当用户选了10000封邮件或者提交了他含有500章节的内容之后,及时的通知用户邮件正在发送中或者提示用户章节内容正在上传中,对不对?明白我们今天的重点了吗?

对,你没理解错,AsyncTask的目的就是这个。下面我们来介绍下AsyncTask的使用。

②、先创建一个server

$serv = new swoole_server("127.0.0.1", 9501);

③、开启task功能

task功能默认是关闭的,开启task功能需要满足两个条件:

  • 配置task进程的数量

  • 注册task的回调函数onTask和onFinish

配置task进程的数量,即配置task_worker_num这个配置项。比如我们开启一个task进程

$serv->set([
    "task_worker_num" => 1,
]);

④、task怎么使用?

task进程其实是要在worker进程内发起的,即我们把需要投递的任务,通过worker进程投递到task进程中去处理。

怎么操作呢?我们可以利用swoole_server->task函数把任务数据投递到task进程池中。

swoole_server->task函数是非阻塞函数,任务投递到task进程中后会立即返回,即不管任务需要在task进程内处理多久,worker进程也不需要任何的等待,不会影响到worker进程的其他操作。但是task进程却是阻塞的,如果当前task进程都处于繁忙状态即都在处理任务,你又投递过来100个甚至更多任务,这个时候新投递的任务就只能乖乖的排队等task进程空闲才能继续处理。

如果投递的任务量总是大于task进程的处理能力,建议适当的调大task_worker_num的数量,增加task进程数,不然一旦task塞满缓冲区,就会导致worker进程阻塞,这将是我们不期望的结果。

我们写一个例子来解释下上面所说的内容。

$serv->on("Connect", function ($serv, $fd) {
    echo "new client connected." . PHP_EOL;
});
$serv->on("Receive", function ($serv, $fd, $fromId, $data) {
    echo "worker received data: {$data}" . PHP_EOL;

    // 投递一个任务到task进程中
    $serv->task($data);

    // 通知客户端server收到数据了
    $serv->send($fd, "This is a message from server.");

    // 为了校验task是否是异步的,这里和task进程内都输出内容,看看谁先输出
    echo "worker continue run."  . PHP_EOL;
});

⑤、注册onTask回调

/**
 * $serv swoole_server
 * $taskId 投递的任务id,因为task进程是由worker进程发起,所以多worker多task下,该值可能会相同
 * $fromId 来自那个worker进程的id
 * $data 要投递的任务数据
 */
$serv->on("Task", function ($serv, $taskId, $fromId, $data) {
    echo "task start. --- from worker id: {$fromId}." . PHP_EOL;
    for ($i=0; $i < 5; $i++) {
        sleep(1);
        echo "task runing. --- {$i}" . PHP_EOL;
    }
    echo "task end." . PHP_EOL;
});

为了模拟判断到底是不是异步的,我们在task的回调中循环一个耗时任务,另一个需要注意的地方,我们在task回调内的结尾并没有return任何内容。

⑥、注册onFinish回调

/**
 * 只有在task进程中调用了finish方法或者return了结果,才会触发finish
 */
$serv->on("Finish", function ($serv, $taskId, $data) {
    echo "finish received data "{$data}"" . PHP_EOL;
});

⑦、最后,调用server的start方法

$serv->start();

整个过程是这样的:我们在worker进程收到数据后,直接调用swoole_server->task函数把数据投递给task进程,随后在swoole_server->task调用后和task进程内都输出内容。

⑧、执行结果

准备就绪之后我们在终端下启动server,执行

php server.php

客户端的测试,我们仍然利用上文在client.php写好的代码进行测试,新开一个终端,执行

php client.php

一起看下测试结果:

服务端

new client connected.
worker received data: hello server.
worker continue run.
task start. --- from worker id: 3.
client closed
task runing. --- 0
task runing. --- 1
task runing. --- 2
task runing. --- 3
task runing. --- 4
task end.

客户端

This is a message from server.

从测试结果中,我们看到在swoole_server的task函数之后输出的内容“worker continue run”在task进程开始之前输出。第二个应该引起你注意的是在结果中我们并没有看到在onFinish回调中输出的信息,我们把task回调函数的最后一句echo改为return再试一次。

return "task end." . PHP_EOL;

如果你修改了代码之后,直接去执行client.php,你会发现结果并没有任何变化。

我们在server启动的那个终端下,按Ctrl+C退出,然后再重新启动server

php server.php

发现了什么?有没有看到server终端下面的最后一行显示的信息变了?

finish received data "task end.";

怎么回事,为什么是这样的呢?大白天见鬼啦?为什么要重启下server代码才生效呢?

这个问题跟常驻内存有关,我们准备后面单独增加一个章节说说这个事。

在结果中我们看到了在onFinish回调中打印的信息。为什么这个时候能输出onFinish回调的内容了呢?

这是因为task进程内一旦return或者调用swoole_server->finish方法,就会通知到worker进程该任务已经完成,worker进程会继续触发onFinish回调,进一步对投递的结果进行处理。

这个过程有没有必要呢?讲真话,还真得看自己的业务需求。比如我们以开篇抛出的情境一发送邮件为例,如果我们在task进程内发送完邮件就完事了,不需要关注邮件是否发送成功,反正发不发也无所谓,这个时候就没必要调onFinish回调了。但是如果说我们还需要确认发送的邮件是否成功,没成功还要再继续发,这个时候我们就可以在onFinish回调中继续处理task的结果了。

⑨、总结

  • 没有耗时任务的情况下,worker直接运行,无需开启task

  • 对于耗时的任务,可以在worker内调用task函数,把异步任务投递给task进程进行处理,task进程的数量取决于task_worker_num的配置

  • task进程内可以选择调用finish方法或者return,来通知worker进程此任务已完成,worker进程会在onFinish回调中对task的执行结果进一步处理。如果worker进程不关心任务的结果,finish就不需要了。

2.4、swoole之进程模型

①、引入 Master-Manager-Worker 模式

swoole是事件驱动的。在使用swoole的过程中,我们也体会到,swoole的使用非常简单,仅仅注册相应的回调处理我们的业务逻辑即可。

但是,在继续学习swoole之前,我们有必要再看一看swoole的运行流程和进程模型。

前面两篇文章我们已经对server和task做了简单的介绍,后面再对server的创建以及脚本的执行,如无特殊说明均在CLI下执行,我就不啰嗦了。

$serv = new swoole_server("127.0.0.1", 9501);
$serv->set([
    "worker_num" => 2,
    "task_worker_num" => 1,
]);
$serv->on("Connect", function ($serv, $fd) {
});
$serv->on("Receive", function ($serv, $fd, $fromId, $data) {
});
$serv->on("Close", function ($serv, $fd) {
});
$serv->on("Task", function ($serv, $taskId, $fromId, $data) {
});
$serv->on("Finish", function ($serv, $taskId, $data) {
});

$serv->start();

注意这里我们选择了两个worker进程个一个task进程,那是不是就意味着创建这个server就是开启了3个进程呢?我们来看下

新开一个终端,我们用ps命令看下结果

ps aux | grep server-process
root     21843  xxx... php server-process.php
root     21844  xxx... php server-process.php
root     21846  xxx... php server-process.php
root     21847  xxx... php server-process.php
root     21848  xxx... php server-process.php
root     21854  xxx... grep --color=auto server-process

为了方便阅读,ps的结果中部分不重要数据已经被稍加处理了。

排除最后一个结果(最后一个是我们运行的ps命令)我们发现,竟然有多达5个相似的进程在运行,按照我们理解,不应该是3个吗,怎么多了两个呢?

还记得我们在进程/线程一文中说过的多进程的实现吗?我们说到多进程的实现一般会被设计Master-Worker模式,常见的nginx默认的多进程模式也正是如此,当然swoole默认的也是多进程模型。

相比Master-Worker模式,swoole的进程模型可以用Master-Manager-Worker来形容。即在Master-Worker的基础上又增加了一层Manager进程。这也就解答了我们开头抛出的问题为什么是5个进程而不是3个进程了。(1个Master进程+1个Manager进程+2个Worker进程+1个Task进程)

正所谓“存在即合理”,我们来看一下Master\Manager\Worker三种进程各自存在的原因。

②、介绍 Master\Manager\Worker 三种进程

Master进程是一个多线程程序。注解:按照我们之前的理解,多个线程是运行在单一进程的上下文中的,其实对于单一进程中的每一个线程,都有它自己的上下文,但是由于共同存在于同一进程,所以它们也共享这个进程,包括它的代码、数据等等。

再回来继续说Master进程,Master进程就是我们的主进程,掌管生杀大权,它挂了,那底下的都得玩完。Master进程,包括主线程,多个Reactor线程等。

每一个线程都有自己的用途,比如主线程用于Accept、信号处理等操作,而Reactor线程是处理tcp连接,处理网络IO,收发数据的线程。

A、说明两点:

  • 主线程的Accept操作,socket服务端经常用accept阻塞,上一节介绍socket编程的时候有一张配图,可以看看

  • 信号处理,信号就相当于一条消息,比如我们经常操作的Ctrl+C其实就是给Master进程的主线程发送一个SIGINT的信号,意思就是你可以终止啦,信号有很多种,后面还有介绍

B、Reactor线程

通常,主线程处理完新的连接后,会将这个连接分配给固定的Reactor线程,并且这个Reactor线程会一直负责监听此socket(上文中后面对socket更新为socket即套接字,是用来与另一个进程进行跨网络通信的文件,文件可读可写),换句话就是说当此socket可读时,会读取数据,并将该请求分配给worker进程,这也就解释了我们在swoole初识讲解worker进程内的回调onReceive的第三个参数$fromId的含义;当此socket可写时,会把数据发送给tcp客户端。

用一张图清晰的梳理下

01.png

那swoole为啥不能像Nginx一样,是Master-Worker进程结构的呢?Manager进程是干啥的?

这个我正准备说。

C、Manager进程

我们知道,在Master-Worker模型中,Master只有一个,Worker是由父进程Master进程复制出来的,且Worker进程可以有多个。

注解:在linux中,父进程可以通过调用fork函数创建一个新的子进程,子进程是父进程的一个副本,几乎但不完全相同,二者的最大区别就是都拥有自己独立的进程ID,即PID。

对于多线程的Master进程而言,想要多Worker进程就必须fork操作,但是fork操作是不安全的,所以,在swoole中,有一个专职的Manager进程,Manager进程就专门负责worker/task进程的fork操作和管理。换句话也就是说,对于worker进程的创建、回收等操作全权有“保姆”Manager进程进行管理。

通常,worker进程被误杀或者由于程序的原因会异常退出,Manager进程为了保证服务的稳定性,会重新拉起新的worker进程,意思就是Worker进程你发生意外“死”了,没关系,我自身不“死”,就可以fork千千万万个你。

当然,Master进程和Manager进程我们是不怎么关心的,从前面两篇文章我们了解到,真正实现业务逻辑,是在worker/task进程内完成的。

再来一张图梳理下Manager进程和Worker/Task进程的关系。

01.png

D、区分进程

再回到我们开篇抛出的的5个进程的问题,ps的结果简直一模一样,有没有办法能区分这5个进程哪个是哪个呢?

有同学要说啦,既然各个进程之间存在父子关系,那我们就可以通过linux的pstree命令查看结果。

pstree | grep server-process

 | |   \-+= 02548 manks php server-process.php

 | |     \-+- 02549 manks php server-process.php

 | |       |--- 02550 manks php server-process.php

 | |       |--- 02551 manks php server-process.php

 | |       \--- 02552 manks php server-process.php

 |     \--- 02572 manks grep server-process

注:centos下命令可修改为 pstree -ap | grep server-process

从结果中我们可以看出,进程id等于02548的进程就是Master进程,因为从结构上看就它是“父”嘛,02549是Manager进程,Worker进程和Task进程就是02550、02551和02552了(每个人的电脑上显示的进程id可能不同,但顺序是一致的,依照此模型分析即可)。

我们看到pstree命令也只能得到大致结果,而且在事先不知道的情况下,根本无法区分Worker进程和Task进程。

在swoole中,我们可以在各个进程启动和关闭的回调中去解决上面这个问题。各个进程的启动和关闭?那岂不是又要记住主进程、Manager进程、Worker进程,二三得六,6个回调函数?

是的,不过这6个是最简单也是最好记的,你实际需要了解的可能还要更多。

Master进程:
    启动:onStart
    关闭:onShutdown
Manager进程:
    启动:onManagerStart
    关闭:onManagerStop
Worker进程:
    启动:onWorkerStart
    关闭:onWorkerStop

提醒:task_worker也会触发onWorkerStart回调。

是不是很好记?那我们就在server-process.php中通过上面这几种回调来实现对各个进程名的修改。

$serv->on("start", function ($serv){
    swoole_set_process_name("server-process: master");
});
// 以下回调发生在Manager进程
$serv->on("ManagerStart", function ($serv){
    swoole_set_process_name("server-process: manager");
});
$serv->on("WorkerStart", function ($serv, $workerId){
    if($workerId >= $serv->setting["worker_num"]) {
        swoole_set_process_name("server-process: task");
    } else {
        swoole_set_process_name("server-process: worker");
    }
});
ps aux | grep server-process
root     27546  xxx... server-process: master
root     27547  xxx... server-process: manager
root     27549  xxx... server-process: task worker
root     27550  xxx... server-process: worker
root     27551  xxx... server-process: worker
root     27570  xxx... grep --color=auto simple

运行结果谁是谁一目了然,简直了!

有同学傻眼了,说在workerStart回调中写的看不明白,worker进程和task进程怎么区分的?

我来解释一下:在onWorkerStart回调中,$workerId表示的是一个值,这个值的范围是0~worker_num,worker_num是我们的对worker进程的配置,其中0~worker_num表示worker进程的标识,包括0但不包括worker_num;worker_num~worker_num+task_worker_num是task进程的标识,包括worker_num不包括worker_num+task_worker_num。

按照高中学的区间的知识可能更好理解,以我们案例的配置,workerId的值的范围就是[0,2],[0,2)表示worker进程,[2,3)就表示task_worker进程。

swoole的进程模型很重要,本节掌握不好,后面的理解可能就会有些问题。

补充:我们在onWorkerStart的回调中,用了serv−>setting去获取配置的 server 信息,在swoole中预留了一些swooleserver的属性,我们可以在回调函数中访问。比如说我们可以用serv->connections属性获取当前server的所有的连接,再比如我们可以通过$serv->master_pid属性获取当前server的 主进程id 等等。

2.5、常驻内存以及如何避免内存泄漏

①、传统 web 开发模式之内存开销

Task初体验一节中我们提到,server中的代码修改之后,要先按Ctrl+C终止server再重新启动下server才会生效,当时我们一言以过之,本节我们主要就来看看这个常驻内存相关的事。

在传统的web开发模式中,我们知道,每一次php请求,都要经过php文件从磁盘上读取、初始化、词法解析、语法解析、编译等过程,而且还要与nginx或者apache通信,如果再涉及数据库的交互,还要再算上数据库的握手、验权、关闭等操作,可见一次请求的背后其实是有相当繁琐的过程,无疑,这个过程也就带来了相当多的开销!当然,所有的这些资源和内存,在一次请求结束之前,都会得到释放。

②、swoole 常驻内存

但是,swoole是常驻内存运行的。这有几点不同,我们分别了解下。

  • 在运行server之后所加载的任何资源,都会一直持续在内存中存在。也就是说假设我们开启了一个server,有100个client要connect,加载一些配置文件、初始化变量等操作,只有在第一个client连接的时候才有这些操作,后面的client连接的时候就省去了重复加载的过程,直接从内存中读取就好了。这样好不好呢?很明显非常好,如此一来还可以提升不小的性能。但是,对开发人员的要求也更高了。因为这些资源常驻内存,并不会像web模式下,在请求结束之后会释放内存和资源。也就是说我们在操作中一旦没有处理好,就会发生内存泄漏,久而久之就可能会发生内存溢出。之前一直对swoole印象不错,没想到都是坑。其实这都不算坑,如果你觉得是坑,权且当做是一种提升自身能力的约束好了。

  • 回到我们的开篇提到的问题上,再啰嗦的解释一遍:server一开始就把我们的代码加载到内存中了,无论后期我们怎么修改本地磁盘上的代码,客户端再次发起请求的时候,永远都是内存中的代码在生效,所以我们只能终止server,释放内存然后再重启server,重新把新的代码加载到内存中,如此,明白否?那有同学要说了,感觉好麻烦,是不是说在swoole中申请的内存啥的都要自己手动unset释放呢?对于局部变量,就没必要操这个心了,swoole会在事件回调函数返回之后释放。但是对于全局变量你就要悠着点了,因为他们在使用完之后并不会被释放。不会被释放?那在php中,这几种全局变量:global声明的变量,static声明的对象属性或者函数内的静态变量和超全局变量谁还敢用?一个不小心服务器直接就玩完的节奏!

③、为什么要用全局变量?

我们想一下为什么要用全局变量?

是不是就是想全局共享?但是,在多进程开发模式下,进程内的全局变量所用的内存那也是保存在子进程内存堆的,也并非共享内存,所以在swoole开发中我们还是尽量避免使用全局变量!

那我要是非用不可呢?就是乐意,就是想用。

④、如何避免内存泄漏?

比如有一个static大数组,用于保存客户端的连接标识。我们就可以在onClose回调内清理变量。

此外,swoole还提供了max_request机制,我们可以配置max_request和task_max_request这两个参数来避免内存溢出。

  • max_request的含义是worker进程的最大任务数,当worker进程处理的任务数超过这个参数时,worker进程会自动退出,如此便达到释放内存和资源的目的。不必担心worker进程退出后,没“人”处理业务逻辑了,因为我们还有Manager进程,Worker进程退出后Manager进程会重新拉起一个新的Worker进程。

  • task_max_request针对task进程,含义同max_request。

光溜溜的说了半天,我们来看下是不是这么玩的。

server的代码简写如下

$serv = new swoole_server("127.0.0.1", 9501);

$serv->set([
    "worker_num" => 1,
    "task_worker_num" => 1,
    "max_request" => 3,
    "task_max_request" => 4,
]);
$serv->on("Connect", function ($serv, $fd) {
});
$serv->on("Receive", function ($serv, $fd, $fromId, $data) {
    $serv->task($data);
});
$serv->on("Task", function ($serv, $taskId, $fromId, $data) {

});
$serv->on("Finish", function ($serv, $taskId, $data) {
});
$serv->on("Close", function ($serv, $fd) {
});
$serv->start();

client代码如下

$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$client->connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");

// 向服务端发送数据
$client->send("Just a test.");
$client->close();

为了方便测试,我们开了一个Worker进程,一个Task进程,Worker进程的最大任务设置为3次,Task进程的最大任务设置为4次。

运行server后,在client未请求前我们看下当前的进程结构

01.png

注意进程id等于15644和15645哦,这两个一个是Worker进程,一个是Task进程。Mac下我们就不区分到底谁是谁了。

随后我们让客户端请求3次,再看下结果

01.png

有没有发现原先进程id等于15645的现在变成15680了?请求3次后我们确定是Worker进程自动退出了,并且Manager进程拉起了一个15680的Worker进程。

我们再请求一次,第四次

01.png

发现进程id等于15644的Task进程消失了,有一个新的子进程15704被重新创建了。

看来官方没有骗人,说的都对。

So…原来我在一开始介绍的那么多都是废话?

不全是,因为max_request参数对server有下面几种限制条件。

  • max_request只能用于同步阻塞、无状态的请求响应式服务器程序

  • 纯异步的Server不应当设置max_request

  • 使用Base模式时max_request是无效的,其中Base模式是swoole运行模式的一种,我们主要介绍多进程模式

⑤、总结

  • 常驻内存减少了不小开销,swoole不错

  • 应尽量避免使用全局变量,不用最好,没啥用

  • max_request可以解决php的内存溢出问题,但是主要还是要养成释放内存的习惯,因为max_request也有限制场景

2.6、swoole之守护进程、信号和平滑重启

①、守护进程

之前我们介绍过进程和线程,今天我们再来谈一谈守护进程。

无论是server初识还是task邂逅,不管我们程序写的多么精彩,都没有办法把项目应用到实际业务中,因为我们知道,把运行server的终端关闭之后,server也就不复存在了。

那有没有一种办法说仅且当电脑关机的时候才终止server的运行,不管终端怎么玩,server也能够在后台持续运行呢?

守护进程(daemon)就是一种长期生存的进程,它不受终端的控制,可以在后台运行。其实我们之前也有了解,比如说nginx,fpm等一般都是作为守护进程在后台提供服务。

熟悉linux的同学可能知道,我们可以利用nohup命令让程序在后台跑。swoole官方也为我们提供了配置选项daemonize,默认不启用守护进程,若要开启守护进程,daemonize设置为true即可。

守护进程有优点,必然也存在缺点。我们启用守护进程后,server内所有的标准输出都会被丢弃,这样的话我们也就无法跟踪进程在运行过程中是否异常之类的错误信息了。为方便起见,swoole为我们提供了另一个配置选项log_file,我们可以指定日志路径,这样swoole在运行时就会把所有的标准输出统统记载到该文件内。

②、信号

学习本文之前,我们了解到,swoole是常驻内存的,若想让修改后的代码生效,就必须Ctrl+C,然后再重启server。对于守护进程化的server呢?了解过kill命令的同学要说了,我直接把它干掉,然后终端下再重启,就可以了。

事实上,对于线上繁忙的server,如果你直接把它干掉了,可能某个进程刚好就只处理了一半的数据,对于天天来回倒腾的你来说,面对错乱的数据你不头疼,DBA也想long死你!

这个时候我们就需要考虑如何平滑重启server的问题了。所谓的平滑重启,也叫“热重启”,就是在不影响用户的情况下重启服务,更新内存中已经加载的php程序代码,从而达到对业务逻辑的更新。

swoole为我们提供了平滑重启机制,我们只需要向swoole_server的主进程发送特定的信号,即可完成对server的重启。

我们在进程模型一文中介绍主进程的主线程的时候也提到过主线程的主要任务之一就是处理信号。

那什么是信号呢?

信号是软件中断,每一个信号都有一个名字。通常,信号的名字都以“SIG”开头,比如我们最熟悉的Ctrl+C就是一个名字叫“SIGINT”的信号,意味着“终端中断”。

③、平滑重启

在swoole中,我们可以向主进程发送各种不同的信号,主进程根据接收到的信号类型做出不同的处理。比如下面这几个

  • SIGTERM,一种优雅的终止信号,会待进程执行完当前程序之后中断,而不是直接干掉进程

  • SIGUSR1,将平稳的重启所有的Worker进程

  • SIGUSR2,将平稳的重启所有的Task进程

如果我们要实现重启server,只需要向主进程发送SIGUSR1信号就好了。

平滑重启的原理:当主进程收到SIGUSR1信号时,主进程就会向一个子进程发送安全退出的信号,所谓的安全退出的意思是主进程并不会直接把Worker进程杀死,而是等这个子进程处理完手上的工作之后,再让其光荣的“退休”,最后再拉起新的子进程(重新载入新的PHP程序代码)。然后再向其他子进程发送“退休”命令,就这样一个接一个的重启所有的子进程。

我们注意到,平滑重启实际上就是让旧的子进程逐个退出并重新创建新的进程。为了在平滑重启时不影响到用户,这就要求进程中不要保存用户相关的状态信息,即业务进程最好是无状态的,避免由于进程退出导致信息丢失。

感觉很美好的样子,凡是重启只要简单的向主进程发送信号就完事了呗。

理想很丰满,现实并非如此。

在swoole中,重启只能针对Worker进程启动之后载入的文件才有效!什么意思呢,就是说只有在onWorkerStart回调之后加载的文件,重启才有意义。在Worker进程启动之前就已经加载到内存中的文件,如果想让它重新生效,还是只能乖乖的关闭server再重启。

lsof -i:9501
kill -9 pid

说了这么多,我们写个例子看看到底怎么样向主进程发送SIGUSR1信号以便有效重启Worker进程。

首先我们创建一个Test类,用于处理onReceive回调的数据,为什么要把onReceive回调的业务拿出来单独写,看完例子你就明白了。

<?php

class Test
{
    public function run($data)
    {
        echo $data;
    }
}

在Test::run方法中,我们第一步仅仅是echo输出swoole_server接收到的数据。

当前目录下我们创建一个swoole_server的类NoReload.php

_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 1,
        ]);
        $this->_serv->on("Receive", [$this, "onReceive"]);

        $this->_test = new Test;
    }
    /**
     * start server
     */
    public function start()
    {
        $this->_serv->start();
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        $this->_test->run($data);
    }
}

$noReload = new NoReload;
$noReload->start();

特别提醒:我们在初始化swoole_server的时候的写法是命名空间的写法

new Swoole\Server

该种风格的写法等同于下划线写法 ,swoole对这两种风格的写法都支持

new swoole_server

此外我们看下server的代码逻辑:类定义之前require_once了Test.php,初始化的时候设置了一个Worker进程,注册了NoReload::onReceive方法为swoole_server的onReceive回调,在onReceive回调内接收到的数据传递给了Test::run方法处理。

接下来我们写一个client脚本测试下运行结果

connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");
// 向服务端发送数据
$client -> send("Just a test.\n");
$client->close();

客户端的测试代码也很简单,连接server并向server发一个字符串信息

01.png

正常,没发现问题,server所在终端输出了客户端send的内容。

在server不动的情况下我们修改下Test.php,代码如下

<?php
class Test
{
    public function run($data)
    {
        // echo $data;
        $data = json_decode($data, true);
        if (!is_array($data)) {
            echo "server receive \$data format error.\n";
            return ;
        } 
        var_dump($data);
    }
}

原先echo直接输出,现在我们改了下Test的代码,如果接收到的数据经过json_decode处理后不是数组,就返回一段内容并结束,否则打印receive到的数据

如果这个时候我们不对server进行重启,运行client的结果肯定还是一样的,看下结果

01.png

server端新的代码未生效,如果Test.php新的代码生效了,会在server所在终端输出“server receive $data format error.”,这符合我们的认知。

下面我们通过ps命令查看下左侧server的主进程的pid,然后通过kill命令向该进程发送SIGUSR1信号,看看结果如何

01.png结果发现即使向主进程发送了SIGUSR1信号,但是左侧server终端显示的依然是未生效的php代码,这也是对的,因为我们说过新的程序代码只针对在onWorkerStart回调之后才加载进来的php文件才能生效,我们事例中Test.php是在class定义之前就加载进来了,所以肯定不生效。

我们新建一个Reload.php文件,把server的代码修改如下

<?php

class Reload
{
    private $_serv;
    private $_test;

    /**
     * init
     */
    public function __construct()
    {
        $this->_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 1,
        ]);
        $this->_serv->on("Receive", [$this, "onReceive"]);
        $this->_serv->on("WorkerStart", [$this, "onWorkerStart"]);
    }
    /**
     * start server
     */
    public function start()
    {
        $this->_serv->start();
    }
    public function onWorkerStart($serv, $workerId)
    {
        require_once("Test.php");
        $this->_test = new Test;
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        $this->_test->run($data);
    }
}

$reload = new Reload;
$reload->start();

仔细观察,我们仅仅移除了在类定义之前引入Test.php以及在__construct中new Test的操作。

而是在__construct方法中增加了onWorkerStart回调,并在该回调内引入Test.php并初始化Test类。

Test.php的代码,我们仍然先后用上面的两处代码为例,运行client看下结果

01.png

图例右侧运行client过程中,给主进程发送SIGUSR1信号之前,记得修改Test.php的代码,然后再运行client脚本测试。

结果我们发现,在给主进程发送SIGUSR1信号之后,Test.php的新代码生效了。这也便实现了热重启的效果。

如此,我们在Test.php中不论如何更新代码,只需要找到主进程的PID,向它发送SIGUSR1信号即可。同理,SIGUSR2信号是只针对Task进程的,后面可以自行测试下。

热重启的效果实现了,现在针对Reload.php的server,让该server进程守护化看看。

__construct中,$serv->set代码修改如下

$this->_serv->set([   
    "worker_num" => 1,          
    "daemonize" => true,          
    "log_file" => __DIR__ . "/server.log",
]);

我们在终端下在运行下Reload.php

php Reload.php

代码好像突然就执行完毕了,现在终端不“卡”着了,终端的执行权又重新交给了终端,我们的server呢?怎么回事?

其实这就是守护进程化的概念,我们开启的swoole_server进程已经在后端跑着了,不信我们ps看下

ps aux | grep Reload
manks 14117   xxx...    1:51下午   0:07.49 php Reload.php
manks 14117   xxx...    1:51下午   0:07.47 php Reload.php
manks 36807   xxx...    1:54下午   0:00.00 grep Reload
manks 14116   xxx...    1:51下午   0:00.01 php Reload.php

发现还真有几个进程在跑着。

不光如此,我们再看下当前目录下是不是有一个server.log的日志文件,我们在swoole_server::set的log_file配置项指定了日志文件就是它,那么在server运行的过程中,所有的标准输出都会输出到这个文件中,此时我们再运行下client.php,然后打开server.log看看是不是终端输出的结果都显示在该文件内了呢?毋庸置疑。

注意!!!!当使用daemonize,需要在启动的回调里面同时改变一下目录,否则会有奇怪的事情发生

public function onWorkerStart($serv, $workerId){
        chdir(__DIR__);
        require_once "Test.php";
        $this->_test = new Test;
    }

2.7、swoole之定时器

①、前言

说起定时器,大家都不陌生。我最早接触定时器的概念,是javascript的setInterval和setTimeout这两个函数,前者会持续执行,后者仅会执行一次。

在后端开发中,一些涉及到定时器相关的需求,比如数据库备份,排行榜数据更新等,通常我们可以借助linux的crontab工具实现。但是对于一些想精确到秒级别或者想暂停定时器的需求,就相对麻烦一些了。

直到swoole的诞生,异步毫秒级的定时器真的是好用到没得说。

相对与javascript的setInterval和setTimeout,swoole也提供了永久性定时器和一次性定时器,我们分别来看下怎么玩。

②、永久性定时器

所谓的永久性定时器,就是在设定好定时器之后,该定时器就会按照一定的时间间隔执行,直到该定时器被删除。

这种类型的定时器,我们可以使用swoole_timer_tick函数创建,该函数接收3个参数,原型如下

int swoole_timer_tick(int $ms, callable $callback, mixed $params);
  • $ms 指时间,单位毫秒

  • $callback 回调函数,定时器创建后会调用该函数

  • $params 传递给回调函数的参数

即创建一个ms毫秒后执行callback的定时器。

来看一个简单的例子:tick.php

<?php
swoole_timer_tick(1000, function () {
    echo "This is a tick.\n";
});

案例中我们创建了一个永久性定时器,每1000毫秒即每秒执行一次回调函数,输出"This is a tick.\n"。

定时器的清除,可以使用swoole_timer_clear函数操作,该函数接收一个参数,定时器的id,函数原型如下

bool swoole_timer_clear(int $timerId)

再来看一个稍微完整的例子:tick-2.php

<?php

$i = 0;

swoole_timer_tick(1000, function ($timeId, $params) use (&$i) {
    $i ++;
    echo "hello, {$params} --- {$i}\n";
    if ($i >= 5) {
        swoole_timer_clear($timeId);
    }
}, 'world');

事例中我们创建了一个定时器,该定时器每秒执行一次,swoole_timer_tick的第二个参数即回调函数,该函数的参数$timeId是创建的定时器的id, params是swooletimertick的第三个参数传递的值,use闭包中我们取了变量i的地址,在回调函数中,我们对i++处理,当i >= 5的时候,用swoole_timer_clear函数清除了定时器。运行下该文件,我们看看结果

php tick.php
hello, world --- 1
hello, world --- 2
hello, world --- 3
hello, world --- 4
hello, world --- 5

需要说明的是,swoole_timer_tick函数是全局性的,通常情况下是可以在任意地方调用。

另外,如果在事件的回调函数内,我们还可以通过swoole_server->tick函数创建永久性定时器,并使用swoole_server->clearTimer函数清除定时器,比如上面的例子我们可以在回调函数onWorkerStart中这样写

<?php

$serv->set([
    'worker_num' => 2,
]);
$serv->on('WorkerStart', function ($serv, $workerId){
    if ($workerId == 0) {
        $i = 0;
        $params = 'world';
        $serv->tick(1000, function ($timeId) use ($serv, &$i, $params) {
            $i ++;
            echo "hello, {$params} --- {$i}\n";
            if ($i >= 5) {
                $serv->clearTimer($timeId);
            }
        });
    }
});

代码总体上就不分析了,只看一点,为什么在onWorkerStart回调内判断了$workerId是否等于0?

注意到我们开启了两个Worker进程,如果不判断,那么就会在两个Worker进程内各注册一个定时器,实际上也就是我们注册了两个相同的定时器,这是没有必要的。

注:swoole_server->tick等价于swoole_timer_tick,swoole_server->clearTimer等价于swoole_timer_clear。

③、一次性定时器

一次性定时器执行完一次之后,便会自动销毁。这种场景往往是当xxx几秒之后再执行。

同样也有两个函数供我们使用,全局的swoole_timer_after和回调内可调用的swoole_server->after。

前者的参数等同于swoole_timer_tick,只有一点不同,该函数所支持的最大毫秒数是86400000。

同样我们看两个简单的demo:tick-after.php

<?php
swoole_timer_after(3000, function () {
    echo "only once.\n";
});

回调内执行,我们这回举一个在onReceive内为例:tick-server.php

<?php
$serv = new swoole_server('127.0.0.1', 9501);
$serv->set([
    'worker_num' => 2,
]);
$serv->on('WorkerStart', function ($serv, $workerId){
    if ($workerId == 0) {
        $i = 0;
        $params = 'world';
        $serv->tick(1000, function ($timeId) use ($serv, &$i, $params) {
            $i ++;
            echo "hello, {$params} --- {$i}\n";
            if ($i >= 5) {
                $serv->clearTimer($timeId);
            }
        });
    }
});
$serv->on('Connect', function ($serv, $fd) {
});
$serv->on('Receive', function ($serv, $fd, $fromId, $data) {
    $serv->after(3000, function () {
        echo "only once.\n";
    });
});
$serv->on('Close', function ($serv, $fd) {
});
$serv->start();

以后再也不用crontab了,这随随便便简简单单的就实现了定时器的功能,so easy~

2.8、swoole之粘包问题

①、前言

什么是粘包问题,为什么我们要讲这个看起来比较奇怪的问题呢?

不着急解释,我们先看一个例子

创建一个server,server端代码如下:tcp-buffer-server.php

<?php

class TcpBufferServer
{
    private $_serv;

    /**
     * init
     */
    public function __construct()
    {
        $this->_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 1,
        ]);
        $this->_serv->on("Receive", [$this, "onReceive"]);
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        echo "Server received data: {$data}" . PHP_EOL;
    }
    /**
     * start server
     */
    public function start()
    {
        $this->_serv->start();
    }
}

$reload = new TcpBufferServer;
$reload->start();

server的代码很简单,仅仅是在收到客户端代码后,标准输出一句话而已,client的代码需要注意了,我们写了一个for循环,连续向server send三条信息,代码如下:tcp-buffer-client.php

<?php

$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$client->connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");

// 向服务端发送数据
for ($i = 0; $i < 3; $i++) {
    $client->send("Just a test.\n");
}
$client->close();

在未运行测试的情况下,我们期望server所在终端输出的结果应该是这样的

Server received data: Just a test.
Server received data: Just a test.
Server received data: Just a test.

注意哦,我们期望的结果是server被回调了3次,才有上述期望的结果值

实际运行的结果呢,是否与我们所期望的一致?我们看下

01.png

上图左边是server输出的信息。

我们看到,左侧显示的结果是server一次性输出的结果,按理论来说,client发起了3次请求,server应该跟我们期望的结果一致,会执行3次呀,这怎么回事呢?

这个问题,便是我们今天要说的粘包问题。

为了说清楚这个问题,我们先来看下client/server之间数据传递的过程

  • 客户端->发送数据

  • 服务端->接收数据

通常我们直觉性的认为,客户端直接向网络中传输数据,对端从网络中读取数据,但是这是不正确的。

socket有缓冲区buffer的概念,每个TCP socket在内核中都有一个发送缓冲区和一个接收缓冲区。客户端send操作仅仅是把数据拷贝到buffer中,也就是说send完成了,数据并不代表已经发送到服务端了,之后才由TCP协议从buffer中发送到服务端。此时服务端的接收缓冲区被TCP缓存网络上来的数据,而后server才从buffer中读取数据。

所以,在onReceive中我们拿到的数据并没有办法保证数据包的完整性,swoole_server可能会同时收到多个请求包,也可能只收到一个请求包的一部分数据。

这就是一个大问题呀,如此TCP协议不行呀,这货虽然能保证我们能正确的接收到数据但是数据不对呀,这麻烦不容小觑。

既然是个问题,那我们自然也就有解决问题的方法,不然我下面说啥呢,对吧。

swoole给我们提供了两种解决方案

②、方法一:EOF结束协议

EOF,end of file,意思是我们在每一个数据包的结尾加一个eof标记,表示这就是一个完整的数据包,但是如果你的数据本身含有EOF标记,那就会造成收到的数据包不完整,所以开启EOF支持后,应避免数据中含有EOF标记。

在swoole_server中,我们可以配置open_eof_check为true,打开EOF检测,配置package_eof来指定EOF标记。

swoole_server收到一个数据包时,会检测数据包的结尾是否是我们设置的EOF标记,如果不是就会一直拼接数据包,直到超出buffer或者超时才会终止,一旦认定是一个完整的数据包,就会投递给Worker进程,这时候我们才可以在回调内处理数据。

这样server就能保证接收到一个完整的数据包了?不能保证,这样只能保证server能收到一个或者多个完整的数据包。

为啥是多个呢?

我们说了开启EOF检测,即open_eof_check设置为true,server只会检测数据包的末尾是否有EOF标记,如果向我们开篇的案例连发3个EOF的数据,server可能还是会一次性收到,这样我们只能在回调内对数据包进行拆分处理。

我们拿开篇的案例为例

server开启eof检测并指定eof标记是\r\n,代码如下:server-eof-check.php

<?php
class ServerEofCheck
{
    private $_serv;
    /**
     * init
     */
    public function __construct()
    {
        $this->_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 1,
            "open_eof_check" => true, //打开EOF检测
            "package_eof" => "\r\n", //设置EOF
        ]);
        $this->_serv->on("Connect", array($this, "onConnect"));
        $this->_serv->on("Close", array($this, "onClose"));
        $this->_serv->on("Receive", [$this, "onReceive"]);
    }
    public function onConnect($serv, $fd, $fromId)
    {
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        echo "Server received data: {$data}" . PHP_EOL;
    }
    public function onClose($serv, $fd, $fromId)
    {
    }
    /**
     * start server
     */
    public function start()
    {
        $this->_serv->start();
    }
}
$reload = new ServerEofCheck;
$reload->start();

客户端设置发送的数据末尾是\r\n符号,代码如下:server-eof-client.php

<?php
$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$client->connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");
// 向服务端发送数据
for ($i = 0; $i < 3; $i++) {
    $client->send("Just a test.\r\n");
}
$client->close();

按照我们刚才的分析,server的效果可能会一次性收到多个完整的包,我们运行看看结果

01.png

因此我们还需要在onReceive回调内对收到的数据进行拆分处理

public function onReceive($serv, $fd, $fromId, $data)
{
    $datas = explode("\r\n", $data);
    foreach ($datas as $data)
    {
        if(!$data)
            continue;

        echo "Server received data: {$data}" . PHP_EOL;
    }
}

此时我们再看下运行结果

01.png

自行分包的效果便实现了,考虑到自行分包稍微麻烦,swoole提供了open_eof_split配置参数,启用该参数后,server会从左到右对数据进行逐字节对比,查找数据中的EOF标记进行分包,效果跟我们刚刚自行拆包是一样的,性能较差。

在案例的基础上我们看看open_eof_split配置

$this->_serv->set([
    "worker_num" => 1,
    "open_eof_check" => true, //打开EOF检测
    "package_eof" => "\r\n", //设置EOF
    "open_eof_split" => true,
]);

onReceive的回调,我们不需要自行拆包

public function onReceive($serv, $fd, $fromId, $data)
{
     echo "Server received data: {$data}" . PHP_EOL;
}

lient的测试代码使用\r\n(同server端package_eof标记一致),我们看下运行效果

01.png

EOF标记解决粘包就说这么多,下面我们再看看另一种解决方案

③、方法二:固定包头+包体协议

下面我们要说的,对于部分同学可能有点难度,对于不理解的,建议多看多操作多问多查,不躲避不畏惧,这样才能有所提高。

固定包头是一种非常通用的协议,它的含义就是在你要发送的数据包的前面,添加一段信息,这段信息了包含了你要发送的数据包的长度,长度一般是2个或者4个字节的整数。

在这种协议下,我们的数据包的组成就是包头+包体。其中包头就是包体长度的二进制形式。比如我们本来想向服务端发送一段数据 "Just a test." 共12个字符,现在我们要发送的数据就应该是这样的

pack("N", strlen("Just a test.")) . "Just a test."

其中php的pack函数是把数据打包成二进制字符串。

为什么这样就能保证Worker进程收到的是一个完整的数据包呢?我来解释一下:

当server收到一个数据包(可能是多个完整的数据包)之后,会先解出包头指定的数据长度,然后按照这个长度取出后面的数据,如果一次性收到多个数据包,依次循环,如此就能保证Worker进程可以一次性收到一个完整的数据包。

估计好多人都看蒙了,这都是神马玩意?我们以案例来分析

server代码

<?php
class ServerPack
{
    private $_serv;
    /**
     * init
     */
    public function __construct()
    {
        $this->_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 1,
            "open_length_check"     => true,      // 开启协议解析
            "package_length_type"   => "N",     // 长度字段的类型
            "package_length_offset" => 0,       //第几个字节是包长度的值
            "package_body_offset"   => 4,       //第几个字节开始计算长度
            "package_max_length"    => 81920,  //协议最大长度
        ]);
        $this->_serv->on("Receive", [$this, "onReceive"]);
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        $info = unpack("N", $data);
        $len = $info[1];
        $body = substr($data, - $len);
        echo "server received data: {$body}\n";
    }
    /**
     * start server
     */
    public function start()
    {
        $this->_serv->start();
    }
}
$reload = new ServerPack;
$reload->start();

客户端的代码

<?php
$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_SYNC);
$client->connect("127.0.0.1", 9501) || exit("connect failed. Error: {$client->errCode}\n");
// 向服务端发送数据
for ($i = 0; $i < 3; $i++) {
    $data = "Just a test.";
    $data = pack("N", strlen($data)) . $data;
    $client->send($data);
}
$client->close();

运行的结果

01.png

结果没错,是我们期望的结果。

我们来分析下这是为什么

  • 首先,在server端我们配置了open_length_check,该参数表明我们要开启固定包头协议解析

  • package_length_type配置,表明包头长度的类型,这个类型跟客户端使用pack打包包头的类型一致,一般设置为N或者n,N表示4个字节,n表示2个字节

  • 我们看下客户端的代码 pack('N', strlen($data)) . $data,这句话就是包头+包体的意思,包头是pack函数打包的二进制数据,内容便是真实数据的长度strlen(data)。在内存中,整数一般占用4个字节,所以我们看到,在这段数据中0-4字节表示的是包头,剩余的就是真实的数据。但是server不知道呀,怎么告诉server这一事实呢?看配置package_length_offset和package_body_offset,前者就是告诉server,从第几个字节开始是长度,后者就是从第几个字节开始计算长度。

  • 既然如此,我们就可以在onReceive回调对数据解包,然后从包头中取出包体长度,再从接收到的数据中截取真正的包体。

$info = unpack("N", $data);
$len = $info[1];
$body = substr($data, - $len);
echo "server received data: {$body}\n";

这便是swoole对于粘包问题的解决,你学会了吗?

2.9、异步发送邮件案例

①、引言

在介绍task的时候我们提到过邮件发送,不过当时是作为引子,主要讲的是swoole task方面的知识点,今天我们来实战性的完善一下“发送邮件”的案例。

先来总结一下我们在写代码的过程中应该注意哪些问题

  • 开启数量适中的Worker进程和Task进程

  • 守护进程化

  • 配置运行时日志

  • 平滑重启

  • 避免内存泄漏

  • 避免粘包问题

除此之外,跟swoole打交道,我们还应该注意下面这些

  • 为了避免Worker阻塞,避免使用sleep等睡眠函数

  • 不要使用die或者exit函数,即使在你调试的时候

  • 保持良好的代码风格,try/catch捕获异常

  • 如果Worker进程无法预料会发生异常退出,虽然Manager进程会重新拉起新的Worker进程,但是我们可以通过register_shutdown_function方法在进程退出前“善后”

那下面我们开始吧。

②、swiftmailer

首先发送邮件,我们借助第三方类库 swiftmailer。有些框架可能集成了swiftmailer,比如yii2,本来准备在yii2的基础之上来讲,考虑部分人可能对这个框架不熟悉,我们这里直接根据swiftmailer代码操作,框架中一样可以使用,无任何影响。

我们执行下面的命令,把swiftmailer下载到本地,下载好之后swiftmailer会被下载到一个叫vendor文件夹的目录里面

composer require "swiftmailer/swiftmailer"

然后我们封装一个简单的邮件类Mailer.php,同vendor目录同级,用于发送邮件,该类后期可自行完善,比如增加批量发送邮件或者增加发送模版邮件等操作。

<?php
require_once __DIR__ . "/vendor/autoload.php";
class Mailer
{
    public $transport;
    public $mailer;
    /**
     * 发送邮件类 参数 $data 需要三个必填项 包括 邮件主题`$data["subject"]`、接收邮件的人`$data["to"]`和邮件内容 `$data["content"]`
     * @param Array $data
     * @return bool $result 发送成功 or 失败
     */
    public function send($data)
    {
        $this->transport = (new Swift_SmtpTransport("smtp.qq.com", 587))
            ->setEncryption("tls")
            ->setUsername("452936616@qq.com")
            ->setPassword("xxxxxx");
        $this->mailer = new Swift_Mailer($this->transport);
        $message = (new Swift_Message($data["subject"]))
            ->setFrom(array("452936616@qq.com" => "lulublog"))
            ->setTo(array($data["to"]))
            ->setBody($data["content"]);
            
        $result = $this->mailer->send($message);
        // 释放
        $this->destroy();
        return $result;
    }
    public function destroy()
    {
        $this->transport = null;
        $this->mailer = null;
    }
}

在这段代码中,你需要修改的地方包括 Host、Post、Encryption、Username、Password和From。

Mailer类简单的封装好之后,我们写几行代码测试下你的邮件类是否可以正确的使用

ini_set("date.timezone","Asia/Shanghai");
require_once __DIR__ . "/Mailer.php";
$data = [
    "to" => "452936616@qq.com",
    "subject" => "just a test",
    "content" => "This is just a test.",
];
$mailer = new Mailer;
$mailer->send($data);

to是要发送给谁,subject邮件标题,content邮件内容。

如果不可以正常发送,请检查swiftmailer相关类正确引入并且保证Mailer类的配置可用。

③、TaskServer、TaskRun、TaskClient

邮件类准备好之后,我们正式开始写swoole server,主要代码如下:

<?php
class TaskServer
{
    private $_serv;
    private $_run;
    /**
    * init
    */
    public function __construct()
    {
        $this->_serv = new Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 2,
            "daemonize" => false,
            "log_file" => __DIR__ . "/server.log",
            "task_worker_num" => 2,
            "max_request" => 5000,
            "task_max_request" => 5000,
            "open_eof_check" => true, //打开EOF检测
            "package_eof" => "\r\n", //设置EOF
            "open_eof_split" => true, // 自动分包
        ]);
        $this->_serv->on("Connect", [$this, "onConnect"]);
        $this->_serv->on("Receive", [$this, "onReceive"]);
        $this->_serv->on("WorkerStart", [$this, "onWorkerStart"]);
        $this->_serv->on("Task", [$this, "onTask"]);
        $this->_serv->on("Finish", [$this, "onFinish"]);
        $this->_serv->on("Close", [$this, "onClose"]);
    }
    public function onConnect($serv, $fd, $fromId)
    {
    }
    public function onWorkerStart($serv, $workerId)
    {
        require_once __DIR__ . "/TaskRun.php";
        $this->_run = new TaskRun;
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
        $data = $this->unpack($data);
        $this->_run->receive($serv, $fd, $fromId, $data);
        // 投递一个任务到task进程中
        if (!empty($data["event"])) {
            $serv->task(array_merge($data , ["fd" => $fd]));
        }
    }
    public function onTask($serv, $taskId, $fromId, $data)
    {
        $this->_run->task($serv, $taskId, $fromId, $data);
    }
    public function onFinish($serv, $taskId, $data)
    {
        $this->_run->finish($serv, $taskId, $data);
    }
    public function onClose($serv, $fd, $fromId)
    {
    }
    /**
    * 对数据包单独处理,数据包经过`json_decode`处理之后,只能是数组
    * @param $data
    * @return bool|mixed
    */
    public function unpack($data)
    {
        $data = str_replace("\r\n", "", $data);
        if (!$data) {
            return false;
        }
        $data = json_decode($data, true);
        if (!$data || !is_array($data)) {
            return false;
        }
        return $data;
    }
    public function start()
    {
        $this->_serv->start();
    }
}
$reload = new TaskServer;
$reload->start();

有的人一看那么多代码就头疼,实际上这也就几行代码,仔细看的同学会发现,这个类就是server的基本配置和一些回调,不涉及任何业务逻辑。简单分析下

  • 配置项,对照开篇提到的注意项,挨个比较吧

  • 在onWorkerStart回调内,我们引入了实际处理业务逻辑的类TaskRun.php,为什么这么说呢?因为我们在onReceive\onTask\onFinish回调内均把数据交给了TaskRun对象去处理了

  • 我们约定,每个数据包都必须带有EOF标记\r\n,在server端为了更好的处理数据,onReceive回调内我们把数据包丢给了unpack方法处理,该方法的目的就是把数据包的EOF标记去掉,还原真实的数据包。我们还约定,server收到的数据包经过unpack处理之后只能是数组,非数组在unpack中就被直接处理掉了。

  • onReceive回调内,我们看到,只有数据包含有event项才会被投递给Task进程,这样做的原因是Task进程可能要处理各种任务,增加event项是为了表明投递过来的任务是要做什么的。

为什么要单独的把业务逻辑分开再另起一个文件处理呢?有疑问的可以回去再看看平滑重启一文,我们看TaskRun的实现

<?php
require_once ("./TaskClient.php");
require_once ("./Mailer.php");
class TaskRun
{
    public function receive($serv, $fd, $fromId, $data)
    {
    }
    public function task($serv, $taskId, $fromId, $data)
    {
        try {
            switch ($data["event"]) {
                case TaskClient::EVENT_TYPE_SEND_MAIL:
                    $mailer = new Mailer;
                    $result = $mailer->send($data);
                    break;
                default:
                    break;
            }
            return $result;
        } catch (\Exception $e) {
            throw new \Exception("task exception :" . $e->getMessage());
        }
    }
    public function finish($serv, $taskId, $data)
    {
        return true;
    }
}

目前,我们主要就一个业务,“发送邮件”,所以TaskRun类的实现现在看来非常简单。

因为发邮件是一件比较耗时的任务,所以我们这里完善的是task回调。我们根据投递给Task进程的数据类型,判断投递过来的数据是要做什么。比如我们这里有一项event,等于TaskClient::EVENT_TYPE_SEND_MAIL,这一项就是发送邮件的标识,如果要投递的任务的event项等于TaskClient::EVENT_TYPE_SEND_MAIL,就表明这个任务是邮件任务,程序上就可以通过switch去处理邮件了。

TaskClient是什么呢?这是一个封装好的客户端处理类,我们来看下

<?php
class TaskClient
{
    private $client;
    const EVENT_TYPE_SEND_MAIL = "send-mail";
    public function __construct ()
    {
        $this->client = new Swoole\Client(SWOOLE_SOCK_TCP);
        if (!$this->client->connect("127.0.0.1", 9501)) {
            $msg = "swoole client connect failed.";
            throw new \Exception("Error: {$msg}.");
        }
    }
    /**
     * @param $data Array
     * send data
     */
    public function sendData ($data)
    {
        $data = $this->togetherDataByEof($data);
        $this->client->send($data);
    }
    /**
     * 数据末尾拼接EOF标记
     * @param Array $data 要处理的数据
     * @return String json_encode($data) . EOF
     */
    public function togetherDataByEof($data)
    {
        if (!is_array($data)) {
            return false;
        }
        return json_encode($data) . "\r\n";
    }
}

我们看到,封装好的客户端类的功能就是连接server,并把需要send给server的数据,json_encode后追加EOF标记。这一点必须同server保持一致。

到此我们所有的封装基本结束,这是不是我们可以写一个client脚本在CLI下测试了呢?别急别急,这个时候在CLI下测试就没多大意义了,对于“发邮件”操作而言,往往这个动作都是在web环境下发起的,我们下面看下在web环境下,怎么把邮件投递给server。

在初识swoole的时候我让大家记过一个笔记:swoole的大部分模块都只能运行在CLI模式下,但是对于面向web的应用,swoole_client是我们与服务端交互的一个重要方法。不知道有多少人记得的。

下面我们在web可访问的目录下创建一个脚本文件index.php,写上一小段测试代码,如下

<?php
require_once ("./swoole-practice/TaskClient.php");
$data = [
    "event" => TaskClient::EVENT_TYPE_SEND_MAIL,
    "to" => "422744***@qq.com",
    "subject" => "just a test",
    "content" => "This just a test.",
];
$client = new TaskClient();
$client->sendData($data);

这段代码的含义就是引入TaskClient类,毕竟连接server发送数据包都是通过这个类实现的。

测试之前,我们先在CLI下把server跑起来,然后通过浏览器访问这个index.php。

在一切顺利的情况下,我们的web页面展示的仅仅是一段空白。但是此时,我们把接收邮件的邮箱打开,看看结果呢?有没有收到了一封测试邮件?

2.10、swoole与yii2的那些事

①、前言

由于yii2本身就是传统MVC框架的典型代表之一,所以这个问题又可以等同于swoole如何与传统的MVC框架整合,比如laravel\tp等。本文我们就以yii2为例,来简述下在传统MVC框架下是怎么运用swoole的。

没错,我们可以用yii2自带的console来配和操作swoole。明明很简单,你们却告诉我你不会,我总结一下可能有下面这些原因:

  • 命名空间的问题

  • 日志的使用问题

  • 热重启的问题

下面我们就这三个问题,来分别谈谈swoole与yii2的那些事。

②、命名空间的问题

前面我们介绍的swoole,基本上都很贴近原生,当然我们也没有引入过命名空间。

但是在yii2的console中,有命名空间的存在,所以,在实例化swoole_server等相关的对象时,我们就需要在这些扩展对象的前面添加反斜杠"",这也是很多基础薄弱的同学不明白为啥在yii2中new swoole_server等对象的时候报undefined 错误了。

比如我们这样写

new Swoole\Server("127.0.0.1", 9501);

就会抛出一个Fatal error:Class 'console\controllers\Swoole\Server' not found,这便是命名空间所引起的问题,如何避免这个问题呢?像我们上面说的,只需要在swoole对象的前面添加一个反斜杠即可。

比如像下面这样写就没问题

new \Swoole\Server("127.0.0.1", 9501);

③、日志的使用问题

在yii2中,平时我们在业务逻辑中做一些日志追踪,记录等操作,都来源于对yii\log\Logger类的操作。这个类很特殊,怎么个特殊法呢?且听我慢慢道来。

以文件存储为例,在脚本运行时记录的日志信息并没有在脚本运行到记录的那行代码就把日志写到你的文件内,当脚本运行到记录的那行代码时,这些信息只是被暂时性的写入到内存,等脚本完全结束的时候,才最终把这些日志统一一次性输出到文件系统中。

我们再来看一下swoole。swoole是常驻内存型,除非遇到致命错误或者其他非正常中断时,脚本才会执行完毕,所以在swoole程序正常运行时,你所记录的日志并没有正常的记录到文件内。

这就明显突出了常驻内存与yii2的日志管理的矛盾。

有些人可能还不明白,说的啥,听不懂。我们看一个例子

首先我们在console\config\main.php配置文件配置log组件

"components" => [
    "log" => [
        "targets" => [
            [
                "class" => "yii\log\FileTarget",
                "levels" => ["error", "warning"],
            ],
        ],
    ],
],

在console下,没有swoole_server的情况下,我们记录下面这样一行代码,看看日志中有没有记录

<?php
namespace console\controllers;
use Yii;
use yii\console\Controller;
/**
 * Test Console Application
 */
class TestController extends Controller
{
    public function actionIndex ()
    {
        yii::warning("This is a warning message.");
    }
}

控制台下输入如下命令执行

./yii test/index

执行完之后,我们打开console/runtime/logs/app.log,你会发现该文件内找到有类似下面这样的记录信息

2017-05-12 22:04:31 [-][-][-][warning][application] This is a warning message.

当然也有记录$_SERVER等其他信息。

现在我们在这段代码中增加swoole的代码,看看结果如何

_serv = new \Swoole\Server("127.0.0.1", 9501);
        $this->_serv->set([
            "worker_num" => 1,
        ]);
        $this->_serv->on("Start", [$this, "onStart"]);
        $this->_serv->on("Receive", [$this, "onReceive"]);
        $this->_serv->on("Close", [$this, "onClose"]);
    }
    public function actionStart ()
    {
        $this->_prepare();
        $this->_serv->start();
    }
    public function onStart($serv)
    {
        yii::warning("This is a warning message.");
    }
    public function onReceive($serv, $fd, $fromId, $data)
    {
    }
    public function onClose($serv)
    {
    }
}

命令行同样以./yii命令运行该脚本,即表示本地9501端口的swoole_server启动了,注意哦,我们在onStart回调内调起了yii::warning,打开日志文件你会发现,没有这条日志,真的什么都没有!其实这条信息有没有呢?有,只是没有到它该刷新出来的时候,很明显我们并不想要这样的结果,怎么办呢?我们可以手动直接把这条信息刷到文件中去。

$logObject = Yii::getLogger();
$logObject->log("This is a warning message.", \yii\log\Logger::LEVEL_WARNING);
$logObject->flush(true);

把上面这三行代码,替换掉我们在onStart回调内写的代码试试?

④、热重启的问题

在yii2的console下,不能有效的解决热重启问题,注意我们说的是在console下。

什么意思呢?

我们知道,热重启的实质是重启Worker进程,在Worker进程启动之前加载的文件都不能实现热更新,为了实现热更新,我们需要把被热更新的程序文件,在onWorkerStart回调内重新加载,更新内存中已经被加载过的程序。

在console中我们仍然可以这样做。

我们把主要的业务逻辑与控制器类剥离开来,在onWorkerStart中,实例化主要的业务逻辑类,跟我们邮件发送的案例中一致,只不过需要注意命名空间的问题,我们就不再举例了,这样肯定没问题。

但是为什么我要说yii2的console不能有效的解决热更新呢?

首先我们看看在Worker进程之前,都加载了哪些文件,当然这些已经加载过的文件,是不能被重新加载的。

public function onWorkerStart($serv, $workerId)
{
    print_r(get_included_files());
}

运行server之后,我们会在终端界面上看到大概有54个文件被加载了,其中就包括了common和console配置文件以及param和param-local文件,也就是说这些文件,一旦我们修改了,是没有办法实现热重启的,只能重启server。

这也就是我上面说的,在console下不能完全有效的实现热重启,如果这些你能接受,你觉得没问题,可以,就这么干没任何问题。

有些人就觉得有瑕疵的代码不好,接受不了,怎么办?

我们回顾下为什么会有这个瑕疵,这些提前加载的文件是怎么被加载的呢?如果我们把这些提前加载的文件放到onWorkerStart回调内再加载是不是就可以了避免了呢?

事实上可以。

我们注意到这个瑕疵是因为命令 ./yii 引起的,当我们运行这个命令的时候,实际上也就创建了一个console应用。(可以打开根目录下的yii文件一探究竟)

为了避免这个问题,我们转换思路,摒弃console,重新拾起 php xxx.php的执行方式,把需要自动加载的类文件以及components的初始化工作放在onWorkerStart回调内去处理即可。

关于swoole的基础,我们就先介绍这么多,有任何问题,尽量文章下留言哦,有好的意见也欢迎大家的反馈,谢谢。哦对了,关于swoole的学习,你可不能偷懒哦,正所谓师傅领进门,修行在个人,后面还需要多看文档,多做总结,多写实例。

补充:如何解决yii2日志按日期分割的问题?

有人说这很简单呀,设置logFile就可以了,代码如下

"log" => [
    "targets" => [
        [
            "class" => "yii\log\FileTarget",
            "levels" => ["error", "warning"],
            "logFile" => "@runtime/logs/app.log.".date("Ymd"),
        ],
    ],
],

这是正常思维的惯性,我不怪你。

我们刚刚也说了,对yii2来说,配置文件是在onWorkerStart之前加载的,而且只在第一次才被加载到内存中,也就是说这里设置的logFile对应的日期,仅仅是你启动server当天的日期,过了N天之后这些日志还是只会保存在这个文件内。

这如何是好?我们给出一个解决方案,不过最好的方案还是放弃console,在onWorkerStart之后初始化应用。

增加一个全局的类方法,在需要记录日志的时候调用该方法即可

use yii\log\Logger;
public static function flushLog($message)
{
    $dispatcher = new yii\log\Dispatcher();
    $dispatcher->targets = [
        Yii::createObject([
            "class" => "yii\log\FileTarget",
            "levels" => ["error", "warning"],
            "logFile" => "@runtime/logs/app.log.".date("Ymd"),
        ])
    ];
    
    $logger = $dispatcher->getLogger();
    $logger->log($message, Logger::LEVEL_ERROR);
    $logger->flush(true);
}

2.11、基于 swoole 扩展实现真正的 PHP 数据库连接池

①、问题来源

PHP的数据库连接池一直以来都是一个难题,很多从PHP语言转向Java的项目,大多数原因都是因为Java有更好的连接池实现。PHP的MySQL扩展提供了长连接的API,但在PHP机器数量较多,规模较大的情况下,mysql_pconnect非但不能节约MySQL资源,反而会加剧数据库的负荷。

假设有100台PHP的应用服务器,每个机器需要启动100个apache或fpm工作进程,那每个进程都会产生一个长连接到MySQL。这一共会产生1万个My SQL连接。大家都知道MySQL是每个连接会占用1个线程。那MYSQL就需要创建1万个线程,这样大量的系统资源被浪费在线程间上下文切换上。而你的业务代码中并不是所有地方都在做数据库操作,所以这个就是浪费的。

连接池就不同了,100个worker进程,公用10个数据库连接即可,当操作完数据库后,立即释放资源给其他worker进程。这样就算有100台PHP的服务器,那也只会创建1000个MySQL的连接,完全可以接受的。

以前确实没有好的办法来解决此问题的,现在有了swoole扩展,利用swoole提供的task功能可以很方便做出一个连接池来。

②、代码

$serv = new swoole_server("127.0.0.1", 9508);
$serv->set(array(
    'worker_num' => 100,
    'task_worker_num' => 10, //MySQL连接的数量
));
function my_onReceive($serv, $fd, $from_id, $data)
{
    //taskwait就是投递一条任务,这里直接传递SQL语句了
    //然后阻塞等待SQL完成
    $result = $serv->taskwait("show tables");
    if ($result !== false) {
        list($status, $db_res) = explode(':', $result, 2);
        if ($status == 'OK') {
            //数据库操作成功了,执行业务逻辑代码,这里就自动释放掉MySQL连接的占用
            $serv->send($fd, var_export(unserialize($db_res), true) . "\n");
        } else {
            $serv->send($fd, $db_res);
        }
        return;
    } else {
        $serv->send($fd, "Error. Task timeout\n");
    }
}
function my_onTask($serv, $task_id, $from_id, $sql)
{
    static $link = null;
    if ($link == null) {
        $link = mysqli_connect("127.0.0.1", "root", "root", "test");
        if (!$link) {
            $link = null;
            $serv->finish("ER:" . mysqli_error($link));
            return;
        }
    }
    $result = $link->query($sql);
    if (!$result) {
        $serv->finish("ER:" . mysqli_error($link));
        return;
    }
    $data = $result->fetch_all(MYSQLI_ASSOC);
    $serv->finish("OK:" . serialize($data));
}
function my_onFinish($serv, $data)
{
    echo "AsyncTask Finish:Connect.PID=" . posix_getpid() . PHP_EOL;
}
$serv->on('Receive', 'my_onReceive');
$serv->on('Task', 'my_onTask');
$serv->on('Finish', 'my_onFinish');
$serv->start();

这里task_worker_num就是要启用的数据库连接池数量,worker进程为100时,连接池数量为10就可以设置为worker_num = 100, task_worker_num = 10。

三、websocket

3.1、swoole之http服务器

①、引言

今天我们来了解一下swoole内置的http服务器,这篇我们一开始没打算更新,想着后面写websocket的时候可能有个别知识点会让人觉得突兀,这才有了今天我们要说的内容。

为什么一开始没打算写swoole http服务器的内容呢?考虑到一般的web应用,现在都有非常成熟的开发框架,比如yii2\laravel\tp5\yaf等框架。如果说让你自己基于swoole再写一套应用层面的逻辑呢?比如路由解析,日志封装等等,不仅费时而且费力。

②、swoole_http_server

swoole_http_server继承自swoole_server,用于创建swoole版的http服务器。php也可以自己创建http服务器了?对,你没听错,是http服务器。

所谓的http服务器,其含义就是一旦我们部署好,用户便可以直接通过浏览器访问该服务器。理论上而言,我们不需要再借用nginx或者httpd部署,是不是很神奇?但是,理论归理论,swoole_http_server对HTTP协议的支持还不成熟,所以实际,我们还是需要用nginx作为代理。

虽然swoole_http_server继承自swoole_server,但是除了onConnect/onReceive回调不能用之外,swoole_server提供的其他API函数都可以直接使用,跟我们先前学到的内容无缝对接!另外,虽然废除了onReceive回调,swoole_http_server却为我们新增了onRequest回调,该回调会在接收到一个http请求之后被调用。

下面我们创建一个简单的http服务器。

<?php
$http = new swoole_http_server("127.0.0.1", 8000);
$http->on("request", function (swoole_http_request $request, swoole_http_response $response) {
    $response->status(200);
    $response->end("hello world.");
});
$http->start();

$http 是swoole_http_server的一个对象,onRequest回调接收两个参数分别是swoole_http_request对象和swoole_http_response对象,分别负责request请求和response响应,这两个我们等会再说。

swoole_http_server的start方法用于启动该server。

CLI下运行该脚本,我们创建一个监听本地8000端口的HTTP服务器,如果该端口被占用,可以更换其他未被占用的端口。

server启动之后,我们就可以用浏览器对该监听该端口的http server发起一个http请求,浏览器地址栏访问 http://127.0.0.1:8000 (也可用curl http://127.0.0.1:8000)后我们可以看到页面上有输出"hello world."字样。

作为应用服务器,单纯的输出hello world的意义不大。如果要丰富应用层,就需要让onRequest回调丰富起来,这也就需要我们熟悉onRequest回调的两个参数,http请求和http响应的处理。

③、swoole_http_request

swoole_http_request,负责http请求的相关信息。我们可以在这个对象上,获取header\server\get\post\files\cookie等信息,这等同于php的超全局变量,但是这可不是超全局的,request关联的这些信息可能不存在。我们来看一个例子

$http->on("request", function (swoole_http_request $request, swoole_http_response $response) {

    print_r($request);

    $response->status(200);
    $response->end("hello world.");
});

server在onRequest回调内打印swoole_http_request信息,我们用chrome浏览器访问下 http://127.0.0.1:8000/

看server终端打印的信息,首先我解释一个非重要信息,server终端会打印出两个 swoole_http_request Object,这是因为第二个是 /favicon.ico 请求,忽略就好,后面我们用nginx部署好,在应用的根目录下添加一个 favicon.ico 就不会有这个请求问题了。

我们看server所在终端打印的 swoole_http_request Object,发现有下面这些属性

  • fd 客户端标识

  • header 请求的头信息,你可以对比下浏览器实际发出的request header,结果是一样的

  • server 请求相关的服务器信息

  • 还有一项data,HTTP请求的详细信息

get在哪里呢?木有get信息,我们再访问下这个路由 http://127.0.0.1:8000?id=123 ,看看server终端打印的信息里是不是多了一项get属性?

现在我们可以在onRequest回调内,通过request−>server来获取URI信息,也可以通过request->get获取get相关的信息等等。

可以进一步说,如果我们继续在onRequest回调内根据 swoole_http_request->server['request_uri']做好路由解析,把不同的路由解析到不同的 php 类文件,swoole版的http请求便被我们用活了。案例这里就不说了,有兴趣的可以去github上搜一搜大牛的源码学习学习。

④、swoole_http_response

swoole_http_response,负责处理HTTP响应信息,包括响应的头信息header\响应状态等。

比如上例中 response−>status()设置响应状态码,response->end() 发送响应体并结束请求。

其实都没有什么要说的,跟传统的HTTP请求和响应差别不大。

swoole_http_request和swoole_http_response对象的细节,还请多翻阅官方手册。

⑤、nginx代理

下面我们主要看看实际的部署问题。

即使swoole_http_server再牛逼,他现在也没赶超老牌的nginx,所以,在http请求上,我们还是靠nginx来处理,通过nginx把请求转发给swoole,如此一来你有没有发现整个逻辑少了fpm?没错,nginx+swoole顺利上位。

下面我们配置一个域名叫 "swoole.example.com"的站看看。

首先我们把监听8000端口的swoole_http_server打开,然后nginx添加如下的配置,重启nginx后,我们访问下 swoole.example.com 看看结果。不要忘记配置host哦

server {
    listen       80;
    root /var/www/test/;
    server_name  swoole.example.com;
    index index.php index.html;
    
    location / {
        if (!-e $request_filename) {
            proxy_pass http://127.0.0.1:8000;
        }
    }
}

默认 /var/www/test/ 目录下我们没有任何文件,空目录,location中我们判断如果访问的文件不存在,就转发到本地的8000端口即swoole_http_server。

我们访问下 http://swoole.example.com/index.html ,index.html是不存在的,按照nginx的逻辑,这个请求会被转发到本地的8000端口,所以结果页面展示的是hello world. 没有问题。也就是说现在我们成功的使用了nginx代理,swoole处理业务逻辑层。

后面,我们主要来说说websocket那些事。

3.2、swoole之websocket初识

①、什么是websocket

websocket != socket。

我猜有些人一看标题websocket就联想到socket,其实二者之间并没多大关系,这就好比javascript和java,千万不要混淆了。

那websocket是什么呢?

websocket是一个协议,它仅仅就是一个协议而已,跟我们所了解的http协议、https协议、ftp协议等等一样,都是一种单纯的协议。

websocket是一种怎样的协议呢?换句话说它有什么特点呢?

相对于Http这种非持久连接而言,websocket协议是一种持久化连接,它是一种独立的,基于TCP的协议。基于websocket,我们可以实现客户端和服务端双向通信。

你肯定听说过服务器推技术,在websocket出现之前,为了解决此类问题,常用的解决方法有轮询和long pull,这两种技术都是客户端和服务端建立源源不断的HTTP连接,非常消耗带宽和服务器资源。

websocket是双向持久连接,客户端和服务端只需要第一次建立连接即可实现双向通信,说到这里,你肯定明白我们学习websocket要做什么了。没错,基于websocket,我们可以做一些通讯,推送相关的服务。

swoole内置的websocket服务器,异步非阻塞多进程,牛逼的swoole!

②、创建websocket服务器

我们看一下在swoole中如何创建websocket服务器。

swoole的websocket服务器的创建同样非常简单,只需要我们处理好相应的回调即可。

<?php
class WebSocketServer
{
    private $_serv;
    public function __construct()
    {
        $this->_serv = new swoole_websocket_server("0.0.0.0", 9501);
        $this->_serv->set([
            "worker_num" => 1,
        ]);
        $this->_serv->on("open", [$this, "onOpen"]);
        $this->_serv->on("message", [$this, "onMessage"]);
        $this->_serv->on("close", [$this, "onClose"]);
    }
    /**
     * @param $serv
     * @param $request
     */
    public function onOpen($serv, $request)
    {
        echo "server: handshake success with fd{$request->fd}.\n";
    }
    /**
     * @param $serv
     * @param $frame
     */
    public function onMessage($serv, $frame)
    {
        $serv->push($frame->fd, "server received data :{$frame->data}");
    }
    public function onClose($serv, $fd)
    {
        echo "client {$fd} closed.\n";
    }
    public function start()
    {
        $this->_serv->start();
    }
}
$server = new WebSocketServer;
$server->start();

来看看我们创建的这个websocket服务器,我们介绍两个回调

  • onOpen回调:客户端与服务端建立连接的时候将触发该回调,回调的第二个参数是swoole_http_request对象,包括了http握手的一些信息,比如GET\COOKIE等

  • onMessage回调:这个是服务端收到客户端信息后回调,在该回调内我们调用了swoole_websocket_server::push方法向客户端推送了数据,注意哦,push的第一个参数只能是websocket客户端的标识

对应的,swoole是否也有提供websocket客户端呢?有,不过我们不准备使用,浏览器天生内置js版的websocket客户端,简单方便好用!当然,不包括低版本的IE浏览器,原因你懂得。

在js中,有一套操作websocket的API,我们可以用这个API创建websocket对象,建立与websocket服务端的连接,并且可以向server发送消息,接收server的消息,当然也少不了关闭连接的操作。我们看一个例子

首先我们创建一个index.html文件,写一段js代码,代码如下:

<script>
    var ws = new WebSocket("ws://139.199.201.210:9501");
    ws.onopen = function(event) {
        // 发送消息
        ws.send("This is websocket client.");
    };

    // 监听消息
    ws.onmessage = function(event) {
        console.log("Client received a message: ", event.data);
    };
    ws.onclose = function(event) {
        console.log("Client has closed.\n", event);
    };
</script>

js代码写好之后,回车之前记得先在CLI下把websocket服务器运行起来,不然哪能连接的上呢。

二者都准备完善之后,打开控制台,刷新下浏览器,我们看到控制台输出的结果

01.png

结果可能不是很明显,单独来看

  • 客户端先发送给server : This is websocket client.

  • server收到消息后,在onMessage回调内向客户端推送了 server received data :This is websocket client.

  • 客户端在onmessage回调内收到server的信息,并在server发过来的消息的前面增加了 Client received a message: ,这才有了控制台上面展示的信息

整个过程,完美的交互,至此,客户端和服务端便建立了持久化的双向连接。二者可以互发消息。

有些同学可能没转过弯来,这样就互通了,可以双向交互了?我们把js代码稍稍完善一下,做一个界面上的交互

创建一个文本框、一个点击发送的按钮和一个用于展示消息的div

<div>
    <textarea name="content" id="content" cols="30" rows="10"></textarea>
    <button onclick="send();">发送</button>
</div>
<div class="list" style="border: solid 1px #ccc; margin-top: 10px;">
    <ul id="ul">
    </ul>
</div>

01.png

然后我们看看js操作

<script>
var ws = new WebSocket("ws://139.199.201.210:9501");
ws.onopen = function(event) {
    ws.send("This is websocket client.");
};
ws.onmessage = function(event) {
    var data = event.data;
    var ul = document.getElementById("ul");
    var li = document.createElement("li");
    li.innerHTML = data;
    ul.appendChild(li);
};
ws.onclose = function(event) {
    console.log("Client has closed.\n", event);
};
function send() {
    var obj = document.getElementById("content");
    var content = obj.value;
    ws.send(content);
}
</script>

简单的说一下client处理的过程:连接上server之后,ws就发送了一条消息,然后onmessage回调中,会把接收自server的消息追加显示在我们创建的div#ul上,当我们在文本框输入消息内容的时候,点击发送,send方法会被调用,结果就是这个内容会被发送到server。

注:SSL的server,如果你的swoole安装的时候未配置ssl,那么必须重新编译安装。

$this->_serv = new swoole_websocket_server("0.0.0.0", 9501,SWOOLE_PROCESS, SWOOLE_SOCK_TCP | SWOOLE_SSL);
$this->_serv->set([
    "worker_num" => 1,
    "ssl_cert_file" => __DIR__."/source/test.lulublog.cn.crt",
    "ssl_key_file" => __DIR__."/source/test.lulublog.cn.key",
]);

注:SSL 客户端连接

var ws = new WebSocket("wss://test.lulublog.cn:9501");

我们看服务端做了哪些改动,server在前面的基础之上,只对onMessage回调做了修改,修改后的代码如下

public function onMessage($serv, $frame)
{
    // 循环当前的所有连接,并把接收到的客户端信息全部发送
    foreach ($serv->connections as $fd) {
        $serv->push($fd, $frame->data);
    }
}

还记得我们之前讲的swoole_server的一系列属性吗,比如setting,worker_id。今天我们再讲一个swoole_server::connections属性,这个属性是一个迭代器对象,记录着当前server所有的连接fd,所以我们这里循环所有的fd,并把客户端接收的消息给每一个客户端。

swoole_server::connections——此属性在1.7.16以上版本可用连接迭代器依赖pcre库(不是PHP的pcre扩展),未安装pcre库无法使用此功能,安装完成后重新编译swoole(可能需要安装新版本)。然后使用php --ri swoole查看swoole扩展相关信息中是否有pcre => enabled。

yum install -y pcre pcre-devel

为了对演示的结果看的更明显一些,我们同时打开两个客户端页面操作,看动图

01.gif

再往大了说,我们这个是不是可以扩展为二者的聊天了呢?

有兴趣的可以先捣鼓捣鼓,没兴趣的也可以捣鼓捣鼓了,因为后面我们基于websocket的实例可能不是聊天,而是web通知,敬请期待。

3.3、常见的websocket问题

①、引言

上一节我们讲述了websocket在swoole中的使用,并且我们也给出了一个简单的聊天模型,不同的客户端可以相互发消息。有些同学不以为然,server有swoole提供强大的API,客户端由h5提供websocket API,操作很方便,没感觉到什么问题呀,这一章节是否有存在的必要性呢?

有,非常有。今天我们就针对websocket中常见的几个问题做一个详细的总结说明,具体要说的重点大概有下面3个

  • 心跳检测的必要性

  • 校验客户端连接的有效性

  • 客户端的重连机制

②、心跳检测

还记得我们在进程模型一文中介绍的Master进程吗?当时我们说过,Master进程,包括主线程,多个Reactor线程等。其实主进程内还包括其他线程,比如我们现在讲的心跳检测,在Master进程内就有专门用于心跳检测的线程。

那到底什么是心跳检测呢?说着websocket,怎么谈到要医治病人了?这个心跳检测呢,是server定时检测客户端是否还连接的意思,即server定时检测client是否还活着,所以我们说的专业点就是所谓的心跳检测。

等等,老师你说“定时检测”?是不是说之前学的定时器可以派上用场了?

怎么感觉之前讲的不教你在实际场景中运用一次你就不会似的。当然,你要是用定时器也没问题,不过呢,我们都说有专门的心跳检测线程的存在了,所以,我们只需要简单的配置,开启这个心跳检测线程就可以了。

有同学还有疑问,server我们有onClose回调,客户端断开连接我们可以主动关闭连接或者删除客户端的映射关系,再者说,即使连接无效,断了就断了呗,反正我的server面向的client也没有多少,心跳检测就真的有存在的必要性么?

正常情况下,不需要。客户端断开连接能够通知到server,server自然也就可以主动关闭连接。但是,有很多非正常情况的存在,比如断电断网尤其是移动网络盛行的当下,二者之间建立的友好关系(连接)非常不稳定,这就必然会导致大量的fd(fd的数量是有限的,还记得最大是多少吗?)被浪费!所以为了解决这些问题,swoole内置了心跳检测机制。

我们只需要做如下简单的配置即可

$serv->set([
    "heartbeat_check_interval" => N,
    "heartbeat_idle_time" => M,
]);

如上,分别配置heartbeat_check_interval和heartbeat_idle_time参数,二者配合使用,其含义就是N秒检查一次,看看哪些连接M秒内没有活动的,就认为这个连接是无效的,server就会主动关闭这个无效的连接。

是不是说N秒server会主动向客户端发一个心跳包,没有收到客户端响应的才认为这个连接是死连接呢?那还要heartbeat_idle_time做什么,对吧?

swoole的实现原理是这样的:server每次收到客户端的数据包都会记录一个时间戳,N秒内循环检测下所有的连接,如果M秒内该连接还没有活动,才断开这个连接。

心跳检测的问题,记得自己动手实践实践哦,有不懂的可以下面给我留言。

③、校验客户端连接的有效性

按照我们上文创建的websocket server,当然只有本地的ip才能连接上,因为server监听的ip是127.0.0.1。实际项目上线后,如果你的websocket server是对外开放的,就需要把ip修改为服务器外网的ip地址或者修改为0.0.0.0。

如此,也便带来了新的问题:

任意客户端都可以连接到我们的server了,这个“任意”可不止我们自己认为有效的客户端,还包括你的我的所有的非有效或者恶意的连接,这可不是我们想要的。

如何避免这一问题呢?方法有很多种,比如我们可以在连接的时候认为只有get传递的参数valid=1才允许连接;或者我们只允许登录用户才可以连接server;再或者我们可以校验客户端每次send所携带的token,server对该值校验通过后才认为当前是有效连接等等。与此同时,server开启心跳检测,对于恶意无效的连接,直接干掉!

上面简单的介绍了一些解决方案,下面我们以client 连接server时携带token为例做一个实际说明。

首先我们只允许登录用户才可以连接server,假设某用户的唯一标识uid=100,token的生成规则我们约定如下:token=md5(md5(uid)+key),其中key=客户端和服务端双方约定的某个字符串,我们这里假设key="^www.lulublog.cn&swoole$",不包括双引号。

server的代码实现如下

<?php
class WebSocketServerValid
{
    private $_serv;
    public $key = "^www.lulublog.cn&swoole$";
    public function __construct()
    {
        $this->_serv = new swoole_websocket_server("0.0.0.0", 9501);
        $this->_serv->set([
            "worker_num" => 1,
            "heartbeat_check_interval" => 30,
            "heartbeat_idle_time" => 62,
        ]);
        $this->_serv->on("open", [$this, "onOpen"]);
        $this->_serv->on("message", [$this, "onMessage"]);
        $this->_serv->on("close", [$this, "onClose"]);
    }
    /**
     * @param $serv
     * @param $request
     */
    public function onOpen($serv, $request)
    {
        $this->checkAccess($serv, $request);
    }
    /**
     * @param $serv
     * @param $frame
     */
    public function onMessage($serv, $frame)
    {
        $this->_serv->push($frame->fd, "Server: " . $frame->data);
    }
    public function onClose($serv, $fd)
    {
        echo "client {$fd} closed.\n";
    }
    /**
     * 校验客户端连接的合法性,无效的连接不允许连接
     * @param $serv
     * @param $request
     * @return mixed
     */
    public function checkAccess($serv, $request)
    {
        // get不存在或者uid和token有一项不存在,关闭当前连接
        if (!isset($request->get) || !isset($request->get["uid"]) || !isset($request->get["token"])) {
            $this->_serv->close($request->fd);
            return false;
        }
        $uid = $request->get["uid"];
        $token = $request->get["token"];
        // 校验token是否正确,无效关闭连接
        if (md5(md5($uid) . $this->key) != $token) {
            $this->_serv->close($request->fd);
            return false;
        }
    }
    public function start()
    {
        $this->_serv->start();
    }
}
$server = new WebSocketServerValid;
$server->start();

可以看到,checkAccess是授权方法,我们在onOpen回调内对uid以及token进行了校验,无效则关闭连接。

为了模拟效果,我们分别贴上两种客户端代码,连接失败和连接成功

连接失败的主要jsdiamante如下

var ws = new WebSocket("ws://139.199.201.210:9501");
ws.onopen = function(event) {
    ws.send("This is websocket client.");
};
ws.onmessage = function(event) {
    console.log(event.data);
};
ws.onclose = function(event) {
    console.log("Client has closed.\n");
};

无论是console控制台还是server终端我们都可以看到客户端连接被关闭的提醒。下面我们再看模拟一种成功的结果

php代码和js代码如下

<?php
$key = "^www.lulublog.cn&swoole$";
$uid = 100;
$token = md5(md5($uid) . $key);
?>
<script>
var ws = new WebSocket("ws://139.199.201.210:9501?uid=<?php echo $uid; ?>&token=<?php echo $token; ?>");
ws.onopen = function(event) {
    ws.send("This is websocket client.");
};
ws.onmessage = function(event) {
    console.log(event.data);
};
ws.onclose = function(event) {
    console.log("Client has closed.\n");
};
</script>

可以看到,这次连接没有被关闭且console控制台会正常输出一些信息

Server: This is websocket client.

即我们完成了校验连接有效性的案例,下面我们接着看最后一个问题

④、客户端重连机制

有同学注意到,我们刚刚设置的心跳检测时间是30秒,如果客户端62秒内没有与server通信,server会关闭该连接,即部分人在上述success案例中的console控制台上会看到Client has closed.的提醒。这是我们设置的机制,属于正常现象。

那我们要说的重连机制又是什么呢?

客户端重连机制又可以理解为一种保活机制,你也可以跟服务端的心跳检测在一起理解为双向心跳。即我们有一种需求是,如何能保证客户端和服务端的连接一直是有效的,不断开的。

其实很简单,对客户端而言,只要触发error或者close再或者连接失败,就主动重连server,这便是我们的目的。

下面贴一段代码,来解决这个问题

<script>
    var ws; //websocket实例
    var lockReconnect = false; //避免重复连接
    var wsUrl = "ws://127.0.0.1:9501";

    function createWebSocket(url) {
        try {
            ws = new WebSocket(url);
            initEventHandle();
        } catch (e) {
            reconnect(url);
        }
    }

    function initEventHandle() {
        ws.onclose = function() {
            reconnect(wsUrl);
        };
        ws.onerror = function() {
            reconnect(wsUrl);
        };
        ws.onopen = function() {
            //心跳检测重置
            heartCheck.reset().start();
        };
        ws.onmessage = function(event) {
            //如果获取到消息,心跳检测重置
            //拿到任何消息都说明当前连接是正常的
            heartCheck.reset().start();
        }
    }

    function reconnect(url) {
        if (lockReconnect) return;
        lockReconnect = true;
        //没连接上会一直重连,设置延迟避免请求过多
        setTimeout(function() {
            createWebSocket(url);
            lockReconnect = false;
        }, 2000);
    }

    //心跳检测
    var heartCheck = {
        timeout: 60000, //60秒
        timeoutObj: null,
        serverTimeoutObj: null,
        reset: function() {
            clearTimeout(this.timeoutObj);
            clearTimeout(this.serverTimeoutObj);
            return this;
        },
        start: function() {
            var self = this;
            this.timeoutObj = setTimeout(function() {
                //这里发送一个心跳,后端收到后,返回一个心跳消息,
                //onmessage拿到返回的心跳就说明连接正常
                ws.send("");
                self.serverTimeoutObj = setTimeout(function() { //如果超过一定时间还没重置,说明后端主动断开了
                    ws.close(); //如果onclose会执行reconnect,我们执行ws.close()就行了.如果直接执行reconnect 会触发onclose导致重连两次
                }, self.timeout);
            }, this.timeout);
        }
    }

    createWebSocket(wsUrl);
</script>

在这种情况下,你可以尝试把server中断或者断网试试,结果是client会不停的每隔一定时间尝试连接server,直至连接成功。

完整代码

3.4、websocket通知案例以及多端口复合协议的使用

①、引言

最初是打算写个聊天室分享给大家,后来仔细斟酌了一下,还是讲个web通知吧,两个案例都差不多。

当然,在前面两篇介绍websocket的基础之上,相信你一定会觉得web通知这个功能就是一个小case。所以本文我们把重点放在后面多端口复合协议的使用。

websocket通知的实现方式,基本上跟websocket初识一文中最后介绍的案例差不多,只不过我们当时是循环所有的客户端推送消息,此时我们是一对一推送提醒。

②、需求分析

我们以评论被回复为例,当一条评论被其他某个用户(假设是用户B)回复,即发一条通知给被回复的评论所属人(假设是用户A),告诉A,他的评论被回复了。

③、功能分析

  • 我们不能保证用户B和用户A都处于连接状态,但是通常情况下,用户B至少是连接状态,用户A不一定跟server保持连接;

  • 任一用户都不止对应一个客户端。换言之,用户A和用户B都可能打开了多个tab页,对于一个tab页,就会有一个独立的fd标识,我们这里认为任一用户只有最新的fd有效,或者你可以认为用户所有的tab页的连接都有效;

  • 因为没有用户系统,我们以get传递的参数uid为标识,uid=100视为用户A,uid=101视为用户B;

  • 我们不准备做一个评论系统,我们模拟的tab页包将会包含一个输入内容的文本框、一个输入目标uid的input和一个发送的按钮以满足需求。

④、流程分析

  • 用户A($_GET['uid'] = 100)在某个tab页的输入框输入"回复xxx的内容"字样后,点击发送

  • 用户B($_GET['uid'] = 101)如果处于连接状态,则alert提醒用户B,他的评论被回复了

⑤、server 端代码

分析了半天,我们看server端代码的实现

<?php
class CommentServer
{
    private $_serv;
    public $key = "^www.lulublog.cn&swoole$";
    // 用户id和fd对应的映射,key => value,key是用户的uid,value是用户的fd
    public $user2fd = [];
    public function __construct()
    {
        $this->_serv = new swoole_websocket_server("0.0.0.0", 9501);
        $this->_serv->set([
            "worker_num" => 1,
            "heartbeat_check_interval" => 60,
            "heartbeat_idle_time" => 125,
        ]);
        $this->_serv->on("open", [$this, "onOpen"]);
        $this->_serv->on("message", [$this, "onMessage"]);
        $this->_serv->on("close", [$this, "onClose"]);
    }
    /**
     * @param $serv
     * @param $request
     * @return mixed
     */
    public function onOpen($serv, $request)
    {
        // 连接授权
        $accessResult = $this->checkAccess($serv, $request);
        if (!$accessResult) {
            return false;
        }
        // 始终把用户最新的fd跟uid映射在一起
        if (array_key_exists($request->get["uid"], $this->user2fd)) {
            $existFd = $this->user2fd[$request->get["uid"]];
            $this->close($existFd, "uid exists.");
            $this->user2fd[$request->get["uid"]] = $request->fd;
            return false;
        } else {
            $this->user2fd[$request->get["uid"]] = $request->fd;
        }
    }
    /**
     * @param $serv
     * @param $frame
     * @return mixed
     */
    public function onMessage($serv, $frame)
    {
        // 校验数据的有效性,我们认为数据被`json_decode`处理之后是数组并且数组的`event`项非空才是有效数据
        // 非有效数据,关闭该连接
        $data = $frame->data;
        $data = json_decode($data, true);
        if (!$data || !is_array($data) || empty($data["event"])) {
            $this->close($frame->fd, "data format invalidate.");
            return false;
        }
        // 根据数据的`event`项,判断要做什么,`event`映射到当前类具体的某一个方法,方法不存在则关闭连接
        $method = $data["event"];
        if (!method_exists($this, $method)) {
            $this->close($frame->fd, "event is not exists.");
            return false;
        }
        $this->$method($frame->fd, $data);
    }
    public function onClose($serv, $fd)
    {
        echo "client {$fd} closed.\n";
    }
    /**
     * 校验客户端连接的合法性,无效的连接不允许连接
     * @param $serv
     * @param $request
     * @return mixed
     */
    public function checkAccess($serv, $request)
    {
        // get不存在或者uid和token有一项不存在,关闭当前连接
        if (!isset($request->get) || !isset($request->get["uid"]) || !isset($request->get["token"])) {
            $this->close($request->fd, "access faild.");
            return false;
        }
        $uid = $request->get["uid"];
        $token = $request->get["token"];
        // 校验token是否正确,无效关闭连接
        if (md5(md5($uid) . $this->key) != $token) {
            $this->close($request->fd, "token invalidate.");
            return false;
        }
        return true;
    }
    /**
     * @param $fd
     * @param $message
     * 关闭$fd的连接,并删除该用户的映射
     */
    public function close($fd, $message = "")
    {
        // 关闭连接
        $this->_serv->close($fd);
        // 删除映射关系
        if ($uid = array_search($fd, $this->user2fd)) {
            unset($this->user2fd[$uid]);
        }
    }
    public function alertTip($fd, $data)
    {
        // 推送目标用户的uid非真或者该uid尚无保存的映射fd,关闭连接
        if (empty($data["toUid"]) || !array_key_exists($data["toUid"], $this->user2fd)) {
            $this->close($fd);
            return false;
        }
        $this->push($this->user2fd[$data["toUid"]], ["event" => $data["event"], "msg" => "收到一条新的回复."]);
    }
    /**
     * @param $fd
     * @param $message
     */
    public function push($fd, $message)
    {
        if (!is_array($message)) {
            $message = [$message];
        }
        $message = json_encode($message);
        // push失败,close
        if ($this->_serv->push($fd, $message) == false) {
            $this->close($fd);
        }
    }
    public function start()
    {
        $this->_serv->start();
    }
}
$server = new CommentServer;
$server->start();

满眼看下来,代码挺长的,没关系,我们整理了一下代码的逻辑

  • 我们给CommentServer类增加了一个属性 $user2fd,这个是key => value结构,用于保存uid和fd的映射关系

  • onOpen回调做两件事,验证授权和添加新的映射关系

  • onMessage回调只接收含有event项的数组,event等同于CommentServer类的方法名,我们这里只有一个用于web通知的alertTip方法

  • 此外,我们封装了该类的close方法和push方法,close方法用于server主动关闭连接,删除uid和fd的映射,push方法用于向指定的fd推送消息

⑥、客户端代码

<!--?php--><p><?php
$key = "^www.lulublog.cn&swoole$";
$uid = isset($_GET["uid"]) ? intval($_GET["uid"]) : 0;
$token = md5(md5($uid) . $key);
?>

<div>
    发送内容:<textarea name="content" id="content" cols="30" rows="10"></textarea><br>
    发送给谁:<input type="text" name="toUid" value="" id="toUid"><br>
    <button onclick="send();">发送</button>
</div>

<script>
    var ws = new WebSocket("ws://139.199.201.210:9501?uid=<?php echo $uid ?>&token=<?php echo $token; ?>");
    ws.onopen = function(event) {
    };
    ws.onmessage = function(event) {
        var data = event.data;
        data = eval("("+data+")");
        if (data.event == "alertTip") {
            alert(data.msg);
        }
    };
    ws.onclose = function(event) {
        console.log("Client has closed.\n");
    };
    function send() {
        var obj = document.getElementById("content");
        var content = obj.value;
        var toUid = document.getElementById("toUid").value;
        ws.send("{"event":"alertTip", "toUid": "+toUid+"}");
    }
</script>
</p><!--?php-->

server开启之后,演示的效果我们看下动图

01.gif

结果中,注意看地址栏,alert弹窗是在哪个tab页弹出的。

⑦、多端口复合协议:server与server之间的交互

上例中,我们模拟的是评论被回复的简单例子。

回顾过去讲的内容,无论是tcp server,http server还是websocket server,server都是独立的,server与server之间并没有太多的交互。

实际上有没有交互的必要呢?

假设现在有这么一个需求,在刚刚评论的案例中,前文用户的回复不是直接发送给被回复的用户,而是评论在后台被人审核成功的一瞬间,再通知被回复的用户呢?

审核操作改为ajax操作,success回调内再new一个websocket客户端,然后send?可以,但是这显然不是一个很好的操作。

在websocket初识的时候我们说过,要想与websocket server通信,客户端只能是websocket客户端!既然我们刚刚否决了new一个websocket客户端,那是要怎么做呢?

从程序的角度出发,如果我们在php的层面上直接就能通知到websocket服务器,换言之,如果我们能够从php的层面上,直接实现alertTip方法的功能是不是就对了?

前文我们介绍tcp server的时候了解到,首先我们要想让web应用同server进行“互撩”,swoole_client少不了,既然有swoole_client,swoole_server肯定也少不了。但是目前server正在跑websocket,难不成我们在单独跑一个tcp server?对,我们就是要在websocket server的基础之上,想办法再跑一个tcp server。

为了使用多端口复合协议,swoole为server提供了listen方法,可以让当前server监听新的端口。

比如我们可以让刚刚创建的websocket server额外监听9502端口,这个端口主要负责tcp的工作。

$this->_tcp = $this->_serv->listen("127.0.0.1", 9502, SWOOLE_SOCK_TCP);
$this->_tcp->set([
    "open_eof_check" => true, //打开EOF检测
    "package_eof" => "\r\n", //设置EOF
    "open_eof_split" => true, // 自动分包
]);
$this->_tcp->on("Receive", [$this, "onReceive"]);

listen函数返回的是swoole_server_port对象,需要注意的是swoole_server_port的set函数只能设置一些特定的参数,比如socket参数、协议相关等,像worker_num、log_file、max_request等等这些都是不支持的。就tcp服务器而言,swoole_server_port对象也仅仅对onConnect\onReceive\onClose这三个回调支持,其他的一律不可用,详细可翻阅swoole手册查看。

下面我们就以评论审核通知来看看多端口复合协议的玩法。

再来看下我们现在的流程

  • 用户回复某评论 => 评论进入审核状态 ;很明显这个过程我们不需要做什么

  • 管理员审核该评论 => 通知被回复的人;这个时候我们要做的就等同于alertTip函数要做的

server端除了刚刚设置的$this->_tcp一段代码之外,我们单独绑定了onReceive回调,下面看onReceive回调的实现

public function onReceive($serv, $fd, $fromId, $data)
{
    try {
        $data = json_decode($data, true);
        if (!isset($data["event"])) {
            throw new \Exception("params error, needs event param.", 1);
        }
        
        $method = $data["event"];

        // 调起对应的方法
        if(!method_exists($this, $method)) {
            throw new \Exception("params error, not support method.", 1);
        }
        $this->$method($fd, $data);

        return true;

    } catch (\Exception $e) {
        $msg = $e->getMessage();
        throw new \Exception("{$msg}", 1);
    }
}

可以看到,除了进行简单的判断之外,如果tcp客户单携带一个event=alertTip即可

在这之前,websocket客户端的代码我们依然以前面的为例,假设要回复的用户uid=100,我们运行server之后,先让uid=100的客户端连接到server,运行的客户端地址栏添加uid参数等于100即可

下面我们再写一个tcp client,连接9502端口,我们的tcp server在这个端口监听

<?php

class Client
{
    private $client;

    public function __construct ()
    {
        $this->client = new Swoole\Client(SWOOLE_SOCK_TCP);

        if (!$this->client->connect("127.0.0.1", 9502)) {
            $msg = "swoole client connect failed.";
            throw new \Exception("Error: {$msg}.");
        }
    }

    /**
     * @param $data Array
     * send data
     */
    public function sendData ($data)
    {
        $data = $this->togetherDataByEof($data);
        $this->client->send($data);
    }

    /**
     * 数据末尾拼接EOF标记
     * @param Array $data 要处理的数据
     * @return String json_encode($data) . EOF
     */
    public function togetherDataByEof($data)
    {
        if (!is_array($data)) {
            return false;
        }

        return json_encode($data) . "\r\n";
    }
}

$client = new Client;
$client->sendData([
    "event" => "alertTip",
    "toUid" => 100,
]);

现在无论是websocket服务器、tcp 服务器还是websocket客户端 tcp客户端都已经准备就绪了,下面我们浏览器直接访问下tcp client,如果正常的话,websocket客户端所在页面会弹出有新回复的通知。

看动图运行结果

01.gif

完整代码

共 1条评论

发表评论

您需要登录后才可以评论。登录 | 立即注册
阅读 301