在PHP中实现消息队列(Message Queue)有多种方式,下面介绍几种常见的实现方案,从简单到复杂逐步展开:
一、基于数据库的简易队列(适合轻量级场景)
原理:利用数据库表作为队列存储
表结构示例:
CREATE TABLE `message_queue` (
`id` INT AUTO_INCREMENT,
`queue_name` VARCHAR(50) NOT NULL DEFAULT 'default',
`payload` TEXT NOT NULL, -- 存储JSON格式的任务数据
`status` TINYINT NOT NULL DEFAULT 0, -- 0=等待, 1=处理中, 2=完成
`created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB;
生产者代码:
// 生产者 producer.php
$db = new PDO('mysql:host=localhost;dbname=test', 'user', 'pass');
$payload = json_encode(['task' => 'send_email', 'email' => 'user@example.com']);
$stmt = $db->prepare("INSERT INTO message_queue (queue_name, payload) VALUES (?, ?)");
$stmt->execute(['email_queue', $payload]);
消费者代码:
// 消费者 worker.php
$db = new PDO('mysql:host=localhost;dbname=test', 'user', 'pass');
$db->beginTransaction();
// 锁定并获取任务(避免多进程重复消费)
$stmt = $db->prepare("SELECT * FROM message_queue
WHERE status = 0 AND queue_name = ?
ORDER BY id ASC LIMIT 1 FOR UPDATE SKIP LOCKED");
$stmt->execute(['email_queue']);
$task = $stmt->fetch(PDO::FETCH_ASSOC);
if ($task) {
// 标记为处理中
$db->prepare("UPDATE message_queue SET status = 1 WHERE id = ?")
->execute([$task['id']]);
$db->commit();
// 处理任务
$data = json_decode($task['payload'], true);
send_email($data['email']); // 实际业务逻辑
// 标记完成
$db->prepare("UPDATE message_queue SET status = 2 WHERE id = ?")
->execute([$task['id']]);
} else {
$db->rollBack();
sleep(1); // 无任务时休眠
}
二、使用 Redis 队列(推荐,高性能)
优势:内存操作,支持阻塞弹出,原子性操作
需要安装:php-redis 扩展
生产者
// producer.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
$task = [
'type' => 'log_cleanup',
'path' => '/var/logs'
];
$redis->lPush('work_queue', json_encode($task)); // 左进
消费者(阻塞版)
// worker.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
while (true) {
// 阻塞右出(0表示无限等待)
$taskJson = $redis->brPop(['work_queue'], 0)[1];
$task = json_decode($taskJson, true);
// 执行任务
cleanup_logs($task['path']); // 业务逻辑
}
三、专业消息队列系统(生产环境推荐)
方案1:RabbitMQ + PHP AMQP 扩展
安装:
# 安装扩展
pecl install amqp
生产者:
$connection = new AMQPConnection(['host' => 'localhost']);
$connection->connect();
$channel = $connection->channel();
// 声明队列
$channel->queue_declare('task_queue', false, true, false, false);
// 创建消息
$msg = new AMQPMessage(json_encode(['data' => '...']), [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
// 发布
$channel->basic_publish($msg, '', 'task_queue');
$channel->close();
消费者:
$callback = function ($msg) {
$task = json_decode($msg->body, true);
process_task($task); // 业务处理
$msg->ack(); // 手动确认
};
$channel->basic_consume('task_queue', '', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
方案2:Beanstalkd + Pheanstalk 库
安装:
composer require pda/pheanstalk
生产者:
$pheanstalk = Pheanstalk\Pheanstalk::create('127.0.0.1');
$pheanstalk->useTube('email_tube')
->put(json_encode(['to' => 'user@example.com']));
消费者:
$pheanstalk->watch('email_tube');
while ($job = $pheanstalk->reserve()) { // 阻塞获取
$data = json_decode($job->getData(), true);
send_email($data['to']);
$pheanstalk->delete($job); // 确认删除
}
四、队列管理建议
- 守护进程:使用 Supervisor 管理消费者进程
[program:queue_worker]
command=php /path/to/worker.php
autorestart=true
numprocs=4 ; 启动4个进程
- 失败重试:
- Redis:用有序集合(ZSET)记录失败任务+重试时间
- RabbitMQ:开启死信队列(Dead Letter Exchange)
- 监控:
- RabbitMQ:自带管理界面
- Redis:
redis-cli monitor或 RedisInsight
- 序列化:推荐使用 JSON(可读性好)或 MsgPack(高效)
方案对比
| 方案 | 优点 | 缺点 | 适用场景 |
|---|---|---|---|
| 数据库队列 | 无需新组件,事务支持 | 性能差,锁竞争严重 | 低吞吐量(<10TPS) |
| Redis | 高性能,简单易用 | 无持久化(需配置AOF) | 中小项目,实时性要求高 |
| RabbitMQ | 功能完整,企业级特性 | 需要维护Erlang环境 | 复杂业务,生产环境 |
| Beanstalkd | 轻量级,协议简单 | 社区较小 | 快速部署,PHP友好 |
生产环境建议:优先选择 RabbitMQ 或 Redis(开启持久化),配合 Supervisor 做进程管理。

