最近在学习RabbitMQ, 基于yii2框架写了一个简单的生产者/消费者模型Demo。自己懒得整理了,直接复制代码了,备忘和参考……

消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
<?php
/**
* 消费者模型
*/

namespace console\controllers;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
class MqReceiveController extends \yii\console\Controller
{

/**
* 多组交换机的收发
* @throws \ErrorException
*/

public function actionIndex()
{

//建立连接
$connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest');

//通过链接获得一个新通道.
$channel = $connection->channel();

$channel->exchange_declare('程序监控' , 'fanout');
$channel->exchange_declare('服务器监控', 'fanout');
$channel->exchange_declare('程序监控通配转发', 'topic',false,false,true,true);
$channel->exchange_bind('程序监控通配转发', '程序监控');

$channel->queue_declare('归档存储');
$channel->queue_declare('日志统计');
$channel->queue_declare('消息推送');

$channel->queue_bind('归档存储','程序监控');
$channel->queue_bind('归档存储','服务器监控');
$channel->queue_bind('日志统计','程序监控通配转发','Log.#');
$channel->queue_bind('消息推送','程序监控通配转发','Log.ERROR');

$channel->basic_consume("归档存储", "", false, false, false, false,
function ($message)
{

var_dump(iconv('utf-8','gbk','归档存储'.$message->body));
}
);
$channel->basic_consume("日志统计", "", false, false, false, false,
function ($message)
{

var_dump(iconv('utf-8','gbk','日志统计'.$message->body));
}
);

$channel->basic_consume("消息推送", "", false, false, false, false,
function ($message)
{

var_dump(iconv('utf-8','gbk','消息推送'.$message->body));
}
);
while (count($channel->callbacks)) {
$channel->wait();
}
//关闭通道
$channel->close();
//关闭链接
$connection->close();
}

/**
* 单组消息的收发
* @throws \ErrorException
*/

public function actionReceive(){
//建立连接
$conn = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest');

//打开通道
$channel = $conn->channel();

$channel->exchange_declare('yb', 'fanout');

$channel->queue_declare('ybphpmq');

$channel->queue_bind('ybphpmq','yb');

echo "[Waiting for you ack:]\n";

$callback = function ($message)
{

echo $message->body."\n";
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
};

$channel->basic_consume("ybphpmq", "", false, false, false, false, $callback );
while ( count($channel->callbacks) ){
$channel->wait();
}

//关闭连接
$channel->close();
$conn->close();
}
}

生产者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
<?php
/**
* 生产者DEMO
*/

namespace console\controllers;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use yii\base\InvalidArgumentException;

class MqSendController extends \yii\console\Controller
{

private $host;
private $port;
private $user;
private $pwd;
private $exchangeName;
private $queueName;
/**
* 在执行action(如actionIndex)之前, 会先执行这些behaviors, 如果没有通过, 则action不会继续执行下去
* 所以这里可做初始化
*/

// public function behaviors()
// {
// $conf = \Yii::$app->params;
// if(empty($conf) || !isset( $conf['rabbitmq'] )){
// throw new InvalidArgumentException("获取RabbitMQ参数失败");
// }
// list($this->host, $this->port, $this->user, $this->pwd, $this->exchangeName, $this->queueName) = $conf['rabbitmq'];
// }

/**
* @throws \Exception
*/

public function actionIndex()
{

//建立连接
$connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest');


//通过链接获得一个新通道.
$channel = $connection->channel();

//声明初始化交换机
$channel->exchange_declare('程序监控' , 'fanout');
$channel->exchange_declare('服务器监控', 'fanout');
$channel->exchange_declare('程序监控通配转发', 'topic',false,false,true,true);
$channel->exchange_bind('程序监控通配转发', '程序监控');

//声明初始化一条队列
$channel->queue_declare('归档存储');
$channel->queue_declare('日志统计');
$channel->queue_declare('消息推送');

//将队列与某个交换机进行绑定
$channel->queue_bind('归档存储','程序监控');
$channel->queue_bind('归档存储','服务器监控');
$channel->queue_bind('日志统计','程序监控通配转发','Log.#');
$channel->queue_bind('消息推送','程序监控通配转发','Log.ERROR');

$channel->basic_publish(new AMQPMessage('PHP警告'),'程序监控','PHP.WARNING');
$channel->basic_publish(new AMQPMessage('PHP错误'),'程序监控','PHP.ERROR');
$channel->basic_publish(new AMQPMessage('MYSQL警告'),'程序监控','MYSQL.WARNING');
$channel->basic_publish(new AMQPMessage('MYSQL错误'),'程序监控','MYSQL.ERROR');
$channel->basic_publish(new AMQPMessage('服务器错误'),'服务器监控','SERVER.ERROR');
//关闭通道
$channel->close();
//关闭链接
$connection->close();
}

public function actionSend(){
$conn = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest');

//新建通道
$channel = $conn->channel();

//声明初始化交换机;交换器类型: 直连模式: DIRECT("direct"), 广播模式:FANOUT("fanout"), 主题模式:TOPIC("topic"), HEADERS("headers");
$channel->exchange_declare('yb' , 'fanout');

//声明初始化一条队列
$channel->queue_declare('ybphpmq');

//将队列与某个交换机进行绑定
$channel->queue_bind('ybphpmq','yb');

// 发布消息
$msg = new AMQPMessage("Hello RabbitMQ !");

$channel->basic_publish($msg, 'yb');

// $channel->basic_publish($msg);
echo "[sent to 4 :] \n";

// 关闭连接
$channel->close();
$conn->close();
}
}