RabbitMQ 是一个开源的消息中间件,广泛用于解耦、异步处理和分布式系统的消息传递。它支持多种消息传递模式,以下是 RabbitMQ 的几种常用模式及其 PHP 示例。

1. 基本概念

  • 生产者(Producer):发送消息的应用程序。
  • 消费者(Consumer):接收并处理消息的应用程序。
  • 队列(Queue):消息存放的地方。
  • 交换机(Exchange):接收生产者发送的消息,并将消息路由到一个或多个队列。
  • 绑定(Binding):交换机和队列之间的联系。
  • 路由键(Routing Key):路由消息的规则,通常由生产者指定。

2. 常见的消息模式

RabbitMQ 支持多种消息传递模式,常见的有:

  • 简单模式(Point-to-Point)
  • 发布/订阅模式(Publish/Subscribe)
  • 路由模式(Routing)
  • 主题模式(Topic)
  • 工作队列模式(Work Queues)

3. 简单模式(Point-to-Point)

这种模式下,生产者发送消息到一个队列,消费者从队列中接收消息。每个消息仅被一个消费者处理。

示例

生产者代码(Producer)

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建连接和通道
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

// 声明一个队列
$channel->queue_declare('hello', false, true, false, false);

// 发送消息
$msg = new AMQPMessage('Hello World!');
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent 'Hello World!'\n";

// 关闭通道和连接
$channel->close();
$connection->close();
?>

消费者代码(Consumer)

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('hello', false, true, false, false);

// 消费消息
echo " [*] Waiting for messages. To exit press Ctrl+C\n";
$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume('hello', '', false, true, false, false, $callback);

// 等待消息
while($channel->is_consuming()) {
    $channel->wait();
}

// 关闭通道和连接
$channel->close();
$connection->close();
?>

4. 发布/订阅模式(Publish/Subscribe)

在这种模式下,生产者将消息发送到一个交换机,多个消费者订阅该交换机,接收消息。这是一个多对多的通信模式。

示例

生产者代码(Producer)

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建连接和通道
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

// 声明一个交换机
$channel->exchange_declare('logs', 'fanout', false, true, false);

// 发送消息到交换机
$msg = new AMQPMessage('Hello, this is a log message');
$channel->basic_publish($msg, 'logs');

echo " [x] Sent 'Hello, this is a log message'\n";

// 关闭通道和连接
$channel->close();
$connection->close();
?>

消费者代码(Consumer)

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

// 声明交换机
$channel->exchange_declare('logs', 'fanout', false, true, false);

// 声明一个临时队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// 将队列绑定到交换机
$channel->queue_bind($queue_name, 'logs');

echo " [*] Waiting for logs. To exit press Ctrl+C\n";
$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

// 等待消息
while($channel->is_consuming()) {
    $channel->wait();
}

// 关闭通道和连接
$channel->close();
$connection->close();
?>

5. 路由模式(Routing)

这种模式下,生产者将消息发送到指定的交换机,并使用路由键将消息路由到匹配的队列。消费者通过路由键来订阅感兴趣的消息。

示例

生产者代码(Producer)

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建连接和通道
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

// 声明交换机
$channel->exchange_declare('direct_logs', 'direct', false, true, false);

// 发送消息,指定路由键
$routing_key = 'info';
$msg = new AMQPMessage('This is an info message');
$channel->basic_publish($msg, 'direct_logs', $routing_key);

echo " [x] Sent 'This is an info message' with routing_key 'info'\n";

// 关闭通道和连接
$channel->close();
$connection->close();
?>

消费者代码(Consumer)

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

// 声明交换机
$channel->exchange_declare('direct_logs', 'direct', false, true, false);

// 声明一个临时队列
list($queue_name, ,) = $channel->queue_declare("", false, false, true, false);

// 绑定队列到交换机,指定路由键
$routing_key = 'info';
$channel->queue_bind($queue_name, 'direct_logs', $routing_key);

echo " [*] Waiting for 'info' messages. To exit press Ctrl+C\n";
$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

// 等待消息
while($channel->is_consuming()) {
    $channel->wait();
}

// 关闭通道和连接
$channel->close();
$connection->close();
?>

6. 主题模式(Topic)

在主题模式下,消费者可以订阅多个路由键模式,使用 *# 作为通配符。

示例

生产者代码(Producer)

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

// 创建连接和通道
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

// 声明交换机
$channel->exchange_declare('topic_logs', 'topic', false, true, false);

// 发送消息,指定路由键
$routing_key = 'kern.critical';
$msg = new AMQPMessage('Critical kernel message');
$channel->basic_publish($msg, 'topic_logs', $routing_key);

echo " [x] Sent 'Critical kernel message' with routing_key 'kern.critical'\n";

// 关闭通道和连接
$channel->close();
$connection->close();
?>

消费者代码(Consumer)

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest', '/');
$channel = $connection->channel();

// 声明交换机
$channel->exchange_declare('topic_logs', 'topic', false, true, false);

// 声明一个临时队列
list($queue_name, ,) = $channel->

queue_declare("", false, false, true, false);

// 绑定队列到交换机,指定路由模式
$routing_key = 'kern.*';
$channel->queue_bind($queue_name, 'topic_logs', $routing_key);

echo " [*] Waiting for messages. To exit press Ctrl+C\n";
$callback = function($msg) {
    echo " [x] Received ", $msg->body, "\n";
};

$channel->basic_consume($queue_name, '', false, true, false, false, $callback);

// 等待消息
while($channel->is_consuming()) {
    $channel->wait();
}

// 关闭通道和连接
$channel->close();
$connection->close();
?>

总结

  • 简单模式:生产者发送消息到队列,消费者从队列中获取消息,适合点对点传输。
  • 发布/订阅模式:生产者将消息发送到交换机,多个消费者订阅消息,适合广播。
  • 路由模式:生产者根据路由键将消息发送到队列,消费者根据路由键过滤消息,适合点对点的定向传输。
  • 主题模式:生产者使用通配符指定路由键,消费者可以订阅一个或多个模式,适合复杂的消息过滤。

通过上述示例,你可以在 PHP 中实现不同的 RabbitMQ 消息传递模式。

作者:admin  创建时间:2025-01-15 21:58
最后编辑:admin  更新时间:2025-01-15 21:59