# redis-03-消息队列

现在用的消息队列主要有kafka,rabbitmq和redis。相比另外两个,用redis做消息队列的优点是吞吐量高,延迟低,简单易用,易于部署和运维。缺点是没有ACK机制,可靠性方面不足。

本次用swoole + redis 来模拟个消息推送系统。

# 思路

1. 某个业务调用消息接口,将内容和对应的uid存入redis列表中,key为msg。

2. 用户连接swoole后,会有个fd来标识这个会话,将fd和uid关联起来,存到redis的hash中,key为uid_fd。

3. swoole不断地拉取redis中的msg,有内容时,通过uid_fd找出对应的fd,将消息推送到客户端。

关键点在于redis列表的处理

回顾下redis的列表操作:

127.0.0.1:6379> lpush msg abc
(integer) 1
127.0.0.1:6379> lpush msg def
(integer) 2
127.0.0.1:6379> lrange msg 0 -1 #产生了两个消息
1) "def"
2) "abc"
127.0.0.1:6379> lpop msg #开始消费
"def"
127.0.0.1:6379> lpop msg
"abc"
127.0.0.1:6379> lpop msg #消费完,返回空值
(nil)
127.0.0.1:6379> lpop msg
(nil)

通过rpush/lpush将消息放入redis中,再通过lpop/rpop来获取存入的消息,将消息发送给客户端。

伪代码如下:

while(True){
	$msg = redis::lpop($key)
	if($msg){
	    swoole::send($uid,$msg) //有消息时,用swoole推送
	}
}

这里有个问题,当msg的消费完后,会陷入lpop的死循环,增加大量的redis空查询,浪费服务端资源。一个简单粗暴的方法是sleep一下,如:

while(True){
	$msg = redis::lpop($key)
	if($msg){
	    swoole::send($uid,$msg) //有消息时,用swoole推送
	}
	usleep(10000)//睡0.01秒
}

但通过sleep来缓解不是很优雅,redis提供了一个阻塞读的方法blpop/brpop,当队列是空的时候会进入休眠状态,收到新的消息时会立刻醒来。根据这个特性,可以将代码改为如下:

while(True){
	$msg = redis::blpop($key,1) #阻塞1秒
	if($msg){
	    swoole::send($uid,$msg) //有消息时,用swoole推送
	}
}

# 示例

swoole服务端

$server = new \swoole_server("0.0.0.0", 9501);

$server->set(['worker_num' => 4]);

$server->on('start',function ($server){
    echo "Start\n";
});

$server->on('connect',function ($server,$fd, $from_id){
    $server->send( $fd, "Hello {$fd}!" );
});

$server->on('receive',function ($server,$fd, $from_id,$data){
    echo "Get Message From Client {$fd}:{$data}\n";
    $server->send($fd, $data);
});

$server->on('close',function ($server,$fd, $from_id){
    echo "Client {$fd} close connection\n";
});

$process = new \swoole_process(function($process) use ($server) {
    $queue = Redis::blpop('uid',1); #将uid单独存到一个列表中,通过blpop获取
    if($queue){
        $uid = $queue[1];
        $msg = $this->getMsgByUid($uid); #通过uid获取到对应的fd和消息内容
        $server->send($msg['fd'], $msg['data']);
    }

});
$server->addProcess($process);

$server->start();

客户端

$client = new swoole_client(SWOOLE_SOCK_TCP, SWOOLE_SOCK_ASYNC);
$client->on("connect", function(swoole_client $cli) {
    $cli->send("GET / HTTP/1.1\r\n\r\n");
});
$client->on("receive", function(swoole_client $cli, $data){
    echo "Receive: $data\n";
    sleep(1);
});
$client->on("error", function(swoole_client $cli){
    echo "error\n";
});
$client->on("close", function(swoole_client $cli){
    echo "Connection close\n";
});
$client->connect('127.0.0.1', 9501);

# 后续问题

推送消息时,需考虑用户是否在线。这个问题可以用有序集+心跳来处理。

思路如下:

fd做value,时间戳做score,用户id做key,存入redis有序集
客户端定期发送个心跳包到服务端,swoole收到后更新fd对应的时间戳
推送消息时,取这个范围内的fd,有则是在线,无则按照离线处理

有兴趣的可以试试