RabbitMQ 是一个开源的消息中间件,广泛用于解耦、异步处理和分布式系统的消息传递。它支持多种消息传递模式,以下是 RabbitMQ 的几种常用模式及其 PHP 示例。
1. 基本概念
- 生产者(Producer):发送消息的应用程序。
- 消费者(Consumer):接收并处理消息的应用程序。
- 队列(Queue):消息存放的地方。
- 交换机(Exchange):接收生产者发送的消息,并将消息路由到一个或多个队列。
- 绑定(Binding):交换机和队列之间的联系。
- 路由键(Routing Key):路由消息的规则,通常由生产者指定。
2. 常见的消息模式
RabbitMQ 支持多种消息传递模式,常见的有:
- 简单模式(Point-to-Point)
- 发布/订阅模式(Publish/Subscribe)
- 路由模式(Routing)
- 主题模式(Topic)
- 工作队列模式(Work Queues)
3. 简单模式(Point-to-Point)
这种模式下,生产者发送消息到一个队列,消费者从队列中接收消息。每个消息仅被一个消费者处理。
示例
生产者代码(Producer)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 创建连接和通道
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
// 声明一个队列
$channel->queue_declare('hello', false, true, false, false);
// 发送消息
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent 'Hello World!'\n";
// 关闭通道和连接
$channel->close();
$connection->close();
?>
消费者代码(Consumer)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
// 声明队列
$channel->queue_declare('hello', false, true, false, false);
// 消费消息
echo " [*] Waiting for messages. To exit press Ctrl+C\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);
// 等待消息
while($channel->is_consuming()) {
$channel->wait();
}
// 关闭通道和连接
$channel->close();
$connection->close();
?>
4. 发布/订阅模式(Publish/Subscribe)
在这种模式下,生产者将消息发送到一个交换机,多个消费者订阅该交换机,接收消息。这是一个多对多的通信模式。
示例
生产者代码(Producer)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 创建连接和通道
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
// 声明一个交换机
$channel->exchange_declare('logs', 'fanout', false, true, false);
// 发送消息到交换机
$msg = new AMQPMessage('Hello, this is a log message');
$channel->basic_publish($msg, 'logs');
echo " [x] Sent 'Hello, this is a log message'\n";
// 关闭通道和连接
$channel->close();
$connection->close();
?>
消费者代码(Consumer)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare('logs', 'fanout', false, true, false);
// 声明一个临时队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 将队列绑定到交换机
$channel->queue_bind($queue_name, 'logs');
echo " [*] Waiting for logs. To exit press Ctrl+C\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
// 等待消息
while($channel->is_consuming()) {
$channel->wait();
}
// 关闭通道和连接
$channel->close();
$connection->close();
?>
5. 路由模式(Routing)
这种模式下,生产者将消息发送到指定的交换机,并使用路由键将消息路由到匹配的队列。消费者通过路由键来订阅感兴趣的消息。
示例
生产者代码(Producer)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 创建连接和通道
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare('direct_logs', 'direct', false, true, false);
// 发送消息,指定路由键
$routing_key = 'info';
$msg = new AMQPMessage('This is an info message');
$channel->basic_publish($msg, 'direct_logs', $routing_key);
echo " [x] Sent 'This is an info message' with routing_key 'info'\n";
// 关闭通道和连接
$channel->close();
$connection->close();
?>
消费者代码(Consumer)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare('direct_logs', 'direct', false, true, false);
// 声明一个临时队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);
// 绑定队列到交换机,指定路由键
$routing_key = 'info';
$channel->queue_bind($queue_name, 'direct_logs', $routing_key);
echo " [*] Waiting for 'info' messages. To exit press Ctrl+C\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
// 等待消息
while($channel->is_consuming()) {
$channel->wait();
}
// 关闭通道和连接
$channel->close();
$connection->close();
?>
6. 主题模式(Topic)
在主题模式下,消费者可以订阅多个路由键模式,使用 *
或 #
作为通配符。
示例
生产者代码(Producer)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
// 创建连接和通道
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare('topic_logs', 'topic', false, true, false);
// 发送消息,指定路由键
$routing_key = 'kern.critical';
$msg = new AMQPMessage('Critical kernel message');
$channel->basic_publish($msg, 'topic_logs', $routing_key);
echo " [x] Sent 'Critical kernel message' with routing_key 'kern.critical'\n";
// 关闭通道和连接
$channel->close();
$connection->close();
?>
消费者代码(Consumer)
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();
// 声明交换机
$channel->exchange_declare('topic_logs', 'topic', false, true, false);
// 声明一个临时队列
list($queue_name, ,) = $channel->
queue_declare("", false, false, true, false);
// 绑定队列到交换机,指定路由模式
$routing_key = 'kern.*';
$channel->queue_bind($queue_name, 'topic_logs', $routing_key);
echo " [*] Waiting for messages. To exit press Ctrl+C\n";
$callback = function($msg) {
echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume($queue_name, '', false, true, false, false, $callback);
// 等待消息
while($channel->is_consuming()) {
$channel->wait();
}
// 关闭通道和连接
$channel->close();
$connection->close();
?>
总结
- 简单模式:生产者发送消息到队列,消费者从队列中获取消息,适合点对点传输。
- 发布/订阅模式:生产者将消息发送到交换机,多个消费者订阅消息,适合广播。
- 路由模式:生产者根据路由键将消息发送到队列,消费者根据路由键过滤消息,适合点对点的定向传输。
- 主题模式:生产者使用通配符指定路由键,消费者可以订阅一个或多个模式,适合复杂的消息过滤。
通过上述示例,你可以在 PHP 中实现不同的 RabbitMQ 消息传递模式。
作者:admin 创建时间:2025-01-15 21:58
最后编辑:admin 更新时间:2025-01-15 21:59
最后编辑:admin 更新时间:2025-01-15 21:59