在 PHP 中实现队列有多种方式,根据不同的需求和环境可以选择以下方案:
1. 基于数据库的队列
- 原理:利用 MySQL、PostgreSQL 等数据库存储任务。
- 实现:
- 创建任务表(字段如
id,payload,status,created_at)。 - 生产者插入任务,消费者轮询或定时扫描处理。
- 优点:简单易实现,无需额外服务。
- 缺点:性能低(高并发下 I/O 瓶颈),需自行处理并发竞争。
- 适用场景:低频任务、小型项目。
2. 基于 Redis 的队列
- 原理:使用 Redis 的 List 或 Stream 数据结构存储任务。
- 实现:
- List 命令:
LPUSH(生产任务) +BRPOP(阻塞消费)。 - PHP 扩展:
phpredis或predis库。
// 生产者
$redis->lpush('my_queue', json_encode(['task' => 'data']));
// 消费者(阻塞获取)
while (true) {
$task = $redis->brpop('my_queue', 30); // 阻塞30秒
if ($task) process($task);
}
- 优点:高性能(内存操作),支持阻塞获取。
- 缺点:需单独维护 Redis,无严格消息确认机制(需自行实现)。
- 适用场景:中小型高并发场景。
3. 专业消息队列系统
(1)RabbitMQ
- 原理:基于 AMQP 协议的消息代理,支持复杂路由、持久化、ACK 确认。
- PHP 库:
php-amqplib。
// 生产者
$channel->basic_publish(
new AMQPMessage('Task Data'),
'exchange_name',
'routing_key'
);
// 消费者
$channel->basic_consume('queue_name', '', false, false, false, false, $callback);
- 优点:高可靠、灵活的路由规则、消息确认、死信队列。
- 缺点:需部署维护 RabbitMQ,学习曲线较陡。
(2)Apache Kafka
- 原理:分布式流处理平台,高吞吐量。
- PHP 库:
php-rdkafka。 - 适用场景:大数据日志处理、实时流计算。
(3)Beanstalkd
- 原理:轻量级内存队列,支持优先级、延迟任务。
- PHP 库:
pheanstalk。
// 生产者
$pheanstalk->useTube('default')->put('task_data');
// 消费者
$job = $pheanstalk->watch('default')->reserve();
process($job->getData());
$pheanstalk->delete($job);
- 优点:简单高效,支持任务超时重试。
- 缺点:单点故障风险(需配合持久化工具)。
4. 云服务队列
- AWS SQS、Google Cloud Pub/Sub 等托管服务。
- 优点:免运维,高可用,弹性扩展。
- 缺点:依赖云厂商,成本较高。
5. 框架内置队列
- Laravel Queue:
- 支持多种驱动(Database, Redis, Beanstalkd, SQS)。
- 通过 Artisan 命令启动消费者:
bash php artisan queue:work --queue=high,default - Symfony Messenger:类似机制,支持 AMQP、Redis 等。
6. 文件队列
- 原理:用文件(如 CSV、JSON)存储任务。
- 实现:
// 生产者
file_put_contents('queue.txt', json_encode($task) . "\n", FILE_APPEND);
// 消费者
$file = fopen('queue.txt', 'r+');
while (flock($file, LOCK_EX)) {
$task = fgets($file);
process($task);
// 删除已处理任务(需自行处理)
}
- 优点:无外部依赖。
- 缺点:性能极低,并发安全性差。
- 适用场景:极低频任务或测试环境。
选择建议:
| 场景 | 推荐方案 |
|---|---|
| 小型项目/低频任务 | 数据库队列、文件队列 |
| 高性能要求 | Redis、Beanstalkd |
| 企业级可靠消息 | RabbitMQ、Kafka |
| 云环境 | AWS SQS、Google Pub/Sub |
| 使用 Laravel/Symfony | 框架内置队列 |
注意:生产环境务必考虑 消息持久化、消费者宕机重试、任务超时处理 等可靠性设计。

