主页/PHP笔记/技术文章/PHP消息队列实现方案详解

PHP消息队列实现方案详解

在PHP中实现消息队列(Message Queue)有多种方式,下面介绍几种常见的实现方案,从简单到复杂逐步展开:


一、基于数据库的简易队列(适合轻量级场景)

原理:利用数据库表作为队列存储
表结构示例

CREATE TABLE `message_queue` (
  `id` INT AUTO_INCREMENT,
  `queue_name` VARCHAR(50) NOT NULL DEFAULT 'default',
  `payload` TEXT NOT NULL,        -- 存储JSON格式的任务数据
  `status` TINYINT NOT NULL DEFAULT 0, -- 0=等待, 1=处理中, 2=完成
  `created_at` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB;

生产者代码

// 生产者 producer.php
$db = new PDO('mysql:host=localhost;dbname=test', 'user', 'pass');
$payload = json_encode(['task' => 'send_email', 'email' => 'user@example.com']);

$stmt = $db->prepare("INSERT INTO message_queue (queue_name, payload) VALUES (?, ?)");
$stmt->execute(['email_queue', $payload]);

消费者代码

// 消费者 worker.php
$db = new PDO('mysql:host=localhost;dbname=test', 'user', 'pass');
$db->beginTransaction();

// 锁定并获取任务(避免多进程重复消费)
$stmt = $db->prepare("SELECT * FROM message_queue 
                     WHERE status = 0 AND queue_name = ?
                     ORDER BY id ASC LIMIT 1 FOR UPDATE SKIP LOCKED");
$stmt->execute(['email_queue']);
$task = $stmt->fetch(PDO::FETCH_ASSOC);

if ($task) {
    // 标记为处理中
    $db->prepare("UPDATE message_queue SET status = 1 WHERE id = ?")
       ->execute([$task['id']]);
    $db->commit();

    // 处理任务
    $data = json_decode($task['payload'], true);
    send_email($data['email']); // 实际业务逻辑

    // 标记完成
    $db->prepare("UPDATE message_queue SET status = 2 WHERE id = ?")
       ->execute([$task['id']]);
} else {
    $db->rollBack();
    sleep(1); // 无任务时休眠
}

二、使用 Redis 队列(推荐,高性能)

优势:内存操作,支持阻塞弹出,原子性操作
需要安装php-redis 扩展

生产者

// producer.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

$task = [
    'type' => 'log_cleanup',
    'path' => '/var/logs'
];

$redis->lPush('work_queue', json_encode($task)); // 左进

消费者(阻塞版)

// worker.php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);

while (true) {
    // 阻塞右出(0表示无限等待)
    $taskJson = $redis->brPop(['work_queue'], 0)[1]; 
    $task = json_decode($taskJson, true);

    // 执行任务
    cleanup_logs($task['path']); // 业务逻辑
}

三、专业消息队列系统(生产环境推荐)

方案1:RabbitMQ + PHP AMQP 扩展

安装

# 安装扩展
pecl install amqp

生产者

$connection = new AMQPConnection(['host' => 'localhost']);
$connection->connect();
$channel = $connection->channel();

// 声明队列
$channel->queue_declare('task_queue', false, true, false, false);

// 创建消息
$msg = new AMQPMessage(json_encode(['data' => '...']), [
    'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);

// 发布
$channel->basic_publish($msg, '', 'task_queue');
$channel->close();

消费者

$callback = function ($msg) {
    $task = json_decode($msg->body, true);
    process_task($task); // 业务处理
    $msg->ack(); // 手动确认
};

$channel->basic_consume('task_queue', '', false, false, false, false, $callback);

while (count($channel->callbacks)) {
    $channel->wait();
}

方案2:Beanstalkd + Pheanstalk 库

安装

composer require pda/pheanstalk

生产者

$pheanstalk = Pheanstalk\Pheanstalk::create('127.0.0.1');
$pheanstalk->useTube('email_tube')
           ->put(json_encode(['to' => 'user@example.com']));

消费者

$pheanstalk->watch('email_tube');

while ($job = $pheanstalk->reserve()) { // 阻塞获取
    $data = json_decode($job->getData(), true);
    send_email($data['to']);
    $pheanstalk->delete($job); // 确认删除
}

四、队列管理建议

  1. 守护进程:使用 Supervisor 管理消费者进程
   [program:queue_worker]
   command=php /path/to/worker.php
   autorestart=true
   numprocs=4 ; 启动4个进程
  1. 失败重试
  • Redis:用有序集合(ZSET)记录失败任务+重试时间
  • RabbitMQ:开启死信队列(Dead Letter Exchange)
  1. 监控
  • RabbitMQ:自带管理界面
  • Redis:redis-cli monitor 或 RedisInsight
  1. 序列化:推荐使用 JSON(可读性好)或 MsgPack(高效)

方案对比

方案优点缺点适用场景
数据库队列无需新组件,事务支持性能差,锁竞争严重低吞吐量(<10TPS)
Redis高性能,简单易用无持久化(需配置AOF)中小项目,实时性要求高
RabbitMQ功能完整,企业级特性需要维护Erlang环境复杂业务,生产环境
Beanstalkd轻量级,协议简单社区较小快速部署,PHP友好

生产环境建议:优先选择 RabbitMQ 或 Redis(开启持久化),配合 Supervisor 做进程管理。