1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98
| <?php
* 消费者模型 */ namespace console\controllers; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; class MqReceiveController extends \yii\console\Controller { * 多组交换机的收发 * @throws \ErrorException */ public function actionIndex() { $connection = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest');
$channel = $connection->channel();
$channel->exchange_declare('程序监控' , 'fanout'); $channel->exchange_declare('服务器监控', 'fanout'); $channel->exchange_declare('程序监控通配转发', 'topic',false,false,true,true); $channel->exchange_bind('程序监控通配转发', '程序监控');
$channel->queue_declare('归档存储'); $channel->queue_declare('日志统计'); $channel->queue_declare('消息推送');
$channel->queue_bind('归档存储','程序监控'); $channel->queue_bind('归档存储','服务器监控'); $channel->queue_bind('日志统计','程序监控通配转发','Log.#'); $channel->queue_bind('消息推送','程序监控通配转发','Log.ERROR');
$channel->basic_consume("归档存储", "", false, false, false, false, function ($message) { var_dump(iconv('utf-8','gbk','归档存储'.$message->body)); } ); $channel->basic_consume("日志统计", "", false, false, false, false, function ($message) { var_dump(iconv('utf-8','gbk','日志统计'.$message->body)); } );
$channel->basic_consume("消息推送", "", false, false, false, false, function ($message) { var_dump(iconv('utf-8','gbk','消息推送'.$message->body)); } ); while (count($channel->callbacks)) { $channel->wait(); } $channel->close(); $connection->close(); }
* 单组消息的收发 * @throws \ErrorException */ public function actionReceive(){ $conn = new AMQPStreamConnection('127.0.0.1', '5672', 'guest', 'guest');
$channel = $conn->channel();
$channel->exchange_declare('yb', 'fanout');
$channel->queue_declare('ybphpmq');
$channel->queue_bind('ybphpmq','yb');
echo "[Waiting for you ack:]\n";
$callback = function ($message) { echo $message->body."\n"; $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); };
$channel->basic_consume("ybphpmq", "", false, false, false, false, $callback ); while ( count($channel->callbacks) ){ $channel->wait(); }
$channel->close(); $conn->close(); } }
|