数据库消息队列的并发实现
在基于数据库的消息队列中实现并发处理需要解决两个核心问题:安全地分配任务和高效地处理任务。下面我将详细介绍实现方案并提供一个完整的可视化示例。
实现方案
1. 并发处理的核心机制
graph TD
A[多个Worker进程] --> B[数据库队列]
B --> C[使用FOR UPDATE SKIP LOCKED]
C --> D[安全获取任务]
D --> E[处理任务]
E --> F[更新任务状态]
2. 关键技术点
SELECT ... FOR UPDATE SKIP LOCKED
- 允许并发worker安全地获取不同任务
- MySQL 8.0+和PostgreSQL 9.5+支持
- 避免锁竞争,提高并发性能
- 原子状态更新
- 使用事务确保任务状态变更的原子性
- 防止多个worker处理同一任务
- 进程管理
- 使用Supervisor管理多个worker进程
- 自动重启失败的进程
- 超时处理
- 处理中状态的任务设置超时时间
- 定期重置超时任务为待处理状态
完整实现示例
<?php
// 数据库配置
define('DB_HOST', 'localhost');
define('DB_NAME', 'message_queue_db');
define('DB_USER', 'root');
define('DB_PASS', '');
define('DB_CHARSET', 'utf8mb4');
class DBQueue {
private $pdo;
public function __construct() {
$dsn = "mysql:host=".DB_HOST.";dbname=".DB_NAME.";charset=".DB_CHARSET;
$options = [
PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
PDO::ATTR_EMULATE_PREPARES => false,
];
try {
$this->pdo = new PDO($dsn, DB_USER, DB_PASS, $options);
} catch (PDOException $e) {
die("数据库连接失败: " . $e->getMessage());
}
}
// 添加任务到队列
public function addTask($queueName, $payload) {
$sql = "INSERT INTO message_queue (queue_name, payload) VALUES (?, ?)";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$queueName, json_encode($payload)]);
return $this->pdo->lastInsertId();
}
// 获取并锁定一个任务
public function getTask($queueName) {
$this->pdo->beginTransaction();
try {
// 使用SKIP LOCKED避免锁竞争
$sql = "SELECT * FROM message_queue
WHERE queue_name = ? AND status = 0
ORDER BY created_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED";
$stmt = $this->pdo->prepare($sql);
$stmt->execute([$queueName]);
$task = $stmt->fetch();
if (!$task) {
$this->pdo->rollBack();
return null;
}
// 更新任务状态为处理中
$updateSql = "UPDATE message_queue
SET status = 1, started_at = NOW()
WHERE id = ?";
$updateStmt = $this->pdo->prepare($updateSql);
$updateStmt->execute([$task['id']]);
$this->pdo->commit();
return $task;
} catch (Exception $e) {
$this->pdo->rollBack();
error_log("获取任务失败: " . $e->getMessage());
return null;
}
}
// 标记任务为完成
public function completeTask($taskId) {
$sql = "UPDATE message_queue
SET status = 2, completed_at = NOW()
WHERE id = ?";
$stmt = $this->pdo->prepare($sql);
return $stmt->execute([$taskId]);
}
// 标记任务为失败
public function failTask($taskId, $error) {
$sql = "UPDATE message_queue
SET status = 3, error_message = ?, retry_count = retry_count + 1
WHERE id = ?";
$stmt = $this->pdo->prepare($sql);
return $stmt->execute([$error, $taskId]);
}
// 重置超时任务
public function resetTimedOutTasks($timeoutMinutes = 5) {
$sql = "UPDATE message_queue
SET status = 0, started_at = NULL
WHERE status = 1
AND started_at < NOW() - INTERVAL ? MINUTE";
$stmt = $this->pdo->prepare($sql);
return $stmt->execute([$timeoutMinutes]);
}
}
// 初始化数据库表
function initializeDatabase() {
$pdo = new PDO(
"mysql:host=".DB_HOST.";charset=".DB_CHARSET,
DB_USER,
DB_PASS
);
$pdo->exec("CREATE DATABASE IF NOT EXISTS ".DB_NAME);
$pdo->exec("USE ".DB_NAME);
$pdo->exec("CREATE TABLE IF NOT EXISTS message_queue (
id INT AUTO_INCREMENT PRIMARY KEY,
queue_name VARCHAR(50) NOT NULL DEFAULT 'default',
payload TEXT NOT NULL,
status TINYINT NOT NULL DEFAULT 0 COMMENT '0=等待, 1=处理中, 2=完成, 3=失败',
retry_count INT NOT NULL DEFAULT 0,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
started_at TIMESTAMP NULL,
completed_at TIMESTAMP NULL
)");
$pdo->exec("CREATE INDEX idx_status ON message_queue (status)");
$pdo->exec("CREATE INDEX idx_queue_name ON message_queue (queue_name)");
}
// 示例任务处理器
class TaskProcessor {
private $queue;
public function __construct() {
$this->queue = new DBQueue();
}
public function processTask($task) {
$payload = json_decode($task['payload'], true);
try {
// 模拟不同类型的任务处理
switch ($payload['type']) {
case 'email':
$this->sendEmail($payload);
break;
case 'report':
$this->generateReport($payload);
break;
case 'cleanup':
$this->cleanupData($payload);
break;
default:
throw new Exception("未知任务类型: ".$payload['type']);
}
// 标记任务完成
$this->queue->completeTask($task['id']);
return true;
} catch (Exception $e) {
// 标记任务失败
$this->queue->failTask($task['id'], $e->getMessage());
return false;
}
}
private function sendEmail($data) {
// 模拟邮件发送
sleep(rand(1, 3)); // 随机处理时间
if (rand(1, 10) === 1) { // 模拟10%的失败率
throw new Exception("邮件发送失败: SMTP错误");
}
echo "发送邮件至: {$data['to']}\n";
}
private function generateReport($data) {
// 模拟报告生成
sleep(rand(2, 5));
echo "生成报告: {$data['report_name']}\n";
}
private function cleanupData($data) {
// 模拟数据清理
sleep(rand(1, 4));
echo "清理数据表: {$data['table_name']}\n";
}
}
// Worker进程
function runWorker($queueName) {
$queue = new DBQueue();
$processor = new TaskProcessor();
echo "Worker启动,监听队列: $queueName\n";
while (true) {
$task = $queue->getTask($queueName);
if ($task) {
echo "开始处理任务 ID: {$task['id']}\n";
$processor->processTask($task);
} else {
// 没有任务,短暂休眠
sleep(1);
}
}
}
// 定时任务:重置超时任务
function runTimeoutReset() {
$queue = new DBQueue();
while (true) {
$resetCount = $queue->resetTimedOutTasks(5); // 5分钟超时
if ($resetCount > 0) {
echo "已重置 {$resetCount} 个超时任务\n";
}
sleep(60); // 每分钟检查一次
}
}
// 命令行参数处理
if (php_sapi_name() === 'cli') {
initializeDatabase();
$command = $argv[1] ?? null;
switch ($command) {
case 'worker':
$queueName = $argv[2] ?? 'default';
runWorker($queueName);
break;
case 'timeout-reset':
runTimeoutReset();
break;
case 'add-task':
$type = $argv[2] ?? 'email';
$queue = new DBQueue();
$payload = ['type' => $type];
// 根据类型添加不同参数
switch ($type) {
case 'email':
$payload['to'] = 'user'.rand(1,100).'@example.com';
$payload['subject'] = '测试邮件';
break;
case 'report':
$payload['report_name'] = '日报_'.date('Ymd');
break;
case 'cleanup':
$payload['table_name'] = 'logs_'.date('Ym');
break;
}
$taskId = $queue->addTask('default', $payload);
echo "添加任务成功,ID: $taskId\n";
break;
default:
echo "可用命令:\n";
echo " php queue.php worker [队列名称] - 启动worker进程\n";
echo " php queue.php timeout-reset - 启动超时重置进程\n";
echo " php queue.php add-task [任务类型] - 添加新任务\n";
break;
}
} else {
// Web访问显示队列状态
displayQueueStatus();
}
// 显示队列状态(Web访问)
function displayQueueStatus() {
$queue = new DBQueue();
$pdo = $queue->getPDO();
// 获取队列统计信息
$stats = $pdo->query("
SELECT
queue_name,
SUM(CASE WHEN status = 0 THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as processing,
SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as completed,
SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END) as failed,
COUNT(*) as total
FROM message_queue
GROUP BY queue_name
")->fetchAll(PDO::FETCH_ASSOC);
// 获取最近任务
$recentTasks = $pdo->query("
SELECT * FROM message_queue
ORDER BY created_at DESC
LIMIT 10
")->fetchAll(PDO::FETCH_ASSOC);
// 渲染HTML界面
echo '<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>数据库消息队列监控</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; }
.container { max-width: 1200px; margin: 0 auto; }
.stats { display: flex; flex-wrap: wrap; gap: 20px; margin-bottom: 30px; }
.stat-card { background: #f8f9fa; border: 1px solid #dee2e6; border-radius: 5px; padding: 15px; flex: 1; min-width: 200px; }
.stat-card h3 { margin-top: 0; }
.queue-name { font-weight: bold; color: #0d6efd; }
table { width: 100%; border-collapse: collapse; margin-bottom: 20px; }
th, td { border: 1px solid #dee2e6; padding: 8px; text-align: left; }
th { background-color: #f8f9fa; }
.status-0 { background-color: #fff3cd; }
.status-1 { background-color: #cff4fc; }
.status-2 { background-color: #d1e7dd; }
.status-3 { background-color: #f8d7da; }
</style>
</head>
<body>
<div class="container">
<h1>数据库消息队列监控</h1>
<div class="stats">';
foreach ($stats as $stat) {
echo '<div class="stat-card">
<h3>队列: <span class="queue-name">'.$stat['queue_name'].'</span></h3>
<p>待处理: '.$stat['pending'].'</p>
<p>处理中: '.$stat['processing'].'</p>
<p>已完成: '.$stat['completed'].'</p>
<p>失败: '.$stat['failed'].'</p>
<p><strong>总计: '.$stat['total'].'</strong></p>
</div>';
}
echo '</div>
<h2>最近任务</h2>
<table>
<thead>
<tr>
<th>ID</th>
<th>队列</th>
<th>类型</th>
<th>状态</th>
<th>创建时间</th>
<th>开始时间</th>
<th>完成时间</th>
<th>重试次数</th>
</tr>
</thead>
<tbody>';
foreach ($recentTasks as $task) {
$payload = json_decode($task['payload'], true);
$statusText = ['等待', '处理中', '完成', '失败'][$task['status']];
echo '<tr class="status-'.$task['status'].'">
<td>'.$task['id'].'</td>
<td>'.$task['queue_name'].'</td>
<td>'.($payload['type'] ?? '').'</td>
<td>'.$statusText.'</td>
<td>'.$task['created_at'].'</td>
<td>'.$task['started_at'].'</td>
<td>'.$task['completed_at'].'</td>
<td>'.$task['retry_count'].'</td>
</tr>';
}
echo '</tbody>
</table>
</div>
</body>
</html>';
}
并发管理最佳实践
- 使用Supervisor管理进程
; /etc/supervisor/conf.d/queue_worker.conf
[program:queue_worker]
command=php /path/to/queue.php worker default process_name=%(program_name)s_%(process_num)02d numprocs=4 ; 启动4个worker进程 autostart=true autorestart=true user=www-data redirect_stderr=true stdout_logfile=/var/log/queue_worker.log
[program:queue_timeout_reset]
command=php /path/to/queue.php timeout-reset autostart=true autorestart=true user=www-data redirect_stderr=true stdout_logfile=/var/log/queue_timeout_reset.log
- 数据库优化技巧
- 为
status和queue_name字段创建索引 - 定期归档已完成的任务
-- 创建索引
CREATE INDEX idx_status ON message_queue (status);
CREATE INDEX idx_queue_name ON message_queue (queue_name);
-- 归档旧任务
CREATE TABLE message_queue_archive LIKE message_queue;
INSERT INTO message_queue_archive
SELECT * FROM message_queue
WHERE status = 2 AND completed_at < NOW() - INTERVAL 30 DAY;
DELETE FROM message_queue
WHERE status = 2 AND completed_at < NOW() - INTERVAL 30 DAY;
- 处理死锁和重试
public function getTask($queueName) {
$maxRetries = 3;
$retryCount = 0;
while ($retryCount < $maxRetries) {
try {
// 尝试获取任务...
return $task;
} catch (PDOException $e) {
// 检查是否为死锁错误
if ($e->errorInfo[1] == 1213) { // MySQL死锁错误代码
$retryCount++;
usleep(100000); // 等待100ms后重试
continue;
}
throw $e;
}
}
return null;
}
性能优化建议
- 分表分区
- 按队列名称分表:
queue_emails,queue_reports - 按时间分区:每月一个分区
- 连接池
- 使用连接池减少数据库连接开销
- 推荐工具:
Swoole\Database\PDOPool
- 批量处理
// 一次获取多个任务
$sql = "SELECT * FROM message_queue
WHERE queue_name = ? AND status = 0
ORDER BY created_at ASC
LIMIT 10
FOR UPDATE SKIP LOCKED";
- 读写分离
- 写操作使用主库
- 读操作使用从库
总结
通过合理使用数据库的SKIP LOCKED特性、进程管理和监控机制,可以在基于数据库的消息队列中实现安全高效的并发处理。关键点包括:
- 使用
SELECT ... FOR UPDATE SKIP LOCKED安全获取任务 - 使用Supervisor管理多个worker进程
- 实现任务超时重置机制
- 添加适当的数据库索引
- 提供监控界面可视化队列状态
对于更高吞吐量的场景(>100 TPS),建议考虑迁移到Redis或专业消息队列系统(如RabbitMQ、Kafka)。但对于中小型应用,数据库队列仍然是一个简单可靠的解决方案。

