主页/PHP笔记/技术文章/PHP数据库消息队列的并发实现

PHP数据库消息队列的并发实现

数据库消息队列的并发实现

在基于数据库的消息队列中实现并发处理需要解决两个核心问题:安全地分配任务高效地处理任务。下面我将详细介绍实现方案并提供一个完整的可视化示例。

实现方案

1. 并发处理的核心机制

graph TD
    A[多个Worker进程] --> B[数据库队列]
    B --> C[使用FOR UPDATE SKIP LOCKED]
    C --> D[安全获取任务]
    D --> E[处理任务]
    E --> F[更新任务状态]

2. 关键技术点

  1. SELECT ... FOR UPDATE SKIP LOCKED
  • 允许并发worker安全地获取不同任务
  • MySQL 8.0+和PostgreSQL 9.5+支持
  • 避免锁竞争,提高并发性能
  1. 原子状态更新
  • 使用事务确保任务状态变更的原子性
  • 防止多个worker处理同一任务
  1. 进程管理
  • 使用Supervisor管理多个worker进程
  • 自动重启失败的进程
  1. 超时处理
  • 处理中状态的任务设置超时时间
  • 定期重置超时任务为待处理状态

完整实现示例

<?php
// 数据库配置
define('DB_HOST', 'localhost');
define('DB_NAME', 'message_queue_db');
define('DB_USER', 'root');
define('DB_PASS', '');
define('DB_CHARSET', 'utf8mb4');

class DBQueue {
    private $pdo;

    public function __construct() {
        $dsn = "mysql:host=".DB_HOST.";dbname=".DB_NAME.";charset=".DB_CHARSET;
        $options = [
            PDO::ATTR_ERRMODE => PDO::ERRMODE_EXCEPTION,
            PDO::ATTR_DEFAULT_FETCH_MODE => PDO::FETCH_ASSOC,
            PDO::ATTR_EMULATE_PREPARES => false,
        ];

        try {
            $this->pdo = new PDO($dsn, DB_USER, DB_PASS, $options);
        } catch (PDOException $e) {
            die("数据库连接失败: " . $e->getMessage());
        }
    }

    // 添加任务到队列
    public function addTask($queueName, $payload) {
        $sql = "INSERT INTO message_queue (queue_name, payload) VALUES (?, ?)";
        $stmt = $this->pdo->prepare($sql);
        $stmt->execute([$queueName, json_encode($payload)]);
        return $this->pdo->lastInsertId();
    }

    // 获取并锁定一个任务
    public function getTask($queueName) {
        $this->pdo->beginTransaction();

        try {
            // 使用SKIP LOCKED避免锁竞争
            $sql = "SELECT * FROM message_queue 
                    WHERE queue_name = ? AND status = 0
                    ORDER BY created_at ASC 
                    LIMIT 1 
                    FOR UPDATE SKIP LOCKED";

            $stmt = $this->pdo->prepare($sql);
            $stmt->execute([$queueName]);
            $task = $stmt->fetch();

            if (!$task) {
                $this->pdo->rollBack();
                return null;
            }

            // 更新任务状态为处理中
            $updateSql = "UPDATE message_queue 
                         SET status = 1, started_at = NOW() 
                         WHERE id = ?";
            $updateStmt = $this->pdo->prepare($updateSql);
            $updateStmt->execute([$task['id']]);

            $this->pdo->commit();
            return $task;
        } catch (Exception $e) {
            $this->pdo->rollBack();
            error_log("获取任务失败: " . $e->getMessage());
            return null;
        }
    }

    // 标记任务为完成
    public function completeTask($taskId) {
        $sql = "UPDATE message_queue 
               SET status = 2, completed_at = NOW() 
               WHERE id = ?";
        $stmt = $this->pdo->prepare($sql);
        return $stmt->execute([$taskId]);
    }

    // 标记任务为失败
    public function failTask($taskId, $error) {
        $sql = "UPDATE message_queue 
               SET status = 3, error_message = ?, retry_count = retry_count + 1 
               WHERE id = ?";
        $stmt = $this->pdo->prepare($sql);
        return $stmt->execute([$error, $taskId]);
    }

    // 重置超时任务
    public function resetTimedOutTasks($timeoutMinutes = 5) {
        $sql = "UPDATE message_queue 
               SET status = 0, started_at = NULL 
               WHERE status = 1 
               AND started_at < NOW() - INTERVAL ? MINUTE";
        $stmt = $this->pdo->prepare($sql);
        return $stmt->execute([$timeoutMinutes]);
    }
}

// 初始化数据库表
function initializeDatabase() {
    $pdo = new PDO(
        "mysql:host=".DB_HOST.";charset=".DB_CHARSET, 
        DB_USER, 
        DB_PASS
    );

    $pdo->exec("CREATE DATABASE IF NOT EXISTS ".DB_NAME);
    $pdo->exec("USE ".DB_NAME);

    $pdo->exec("CREATE TABLE IF NOT EXISTS message_queue (
        id INT AUTO_INCREMENT PRIMARY KEY,
        queue_name VARCHAR(50) NOT NULL DEFAULT 'default',
        payload TEXT NOT NULL,
        status TINYINT NOT NULL DEFAULT 0 COMMENT '0=等待, 1=处理中, 2=完成, 3=失败',
        retry_count INT NOT NULL DEFAULT 0,
        error_message TEXT,
        created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
        started_at TIMESTAMP NULL,
        completed_at TIMESTAMP NULL
    )");

    $pdo->exec("CREATE INDEX idx_status ON message_queue (status)");
    $pdo->exec("CREATE INDEX idx_queue_name ON message_queue (queue_name)");
}

// 示例任务处理器
class TaskProcessor {
    private $queue;

    public function __construct() {
        $this->queue = new DBQueue();
    }

    public function processTask($task) {
        $payload = json_decode($task['payload'], true);

        try {
            // 模拟不同类型的任务处理
            switch ($payload['type']) {
                case 'email':
                    $this->sendEmail($payload);
                    break;
                case 'report':
                    $this->generateReport($payload);
                    break;
                case 'cleanup':
                    $this->cleanupData($payload);
                    break;
                default:
                    throw new Exception("未知任务类型: ".$payload['type']);
            }

            // 标记任务完成
            $this->queue->completeTask($task['id']);
            return true;
        } catch (Exception $e) {
            // 标记任务失败
            $this->queue->failTask($task['id'], $e->getMessage());
            return false;
        }
    }

    private function sendEmail($data) {
        // 模拟邮件发送
        sleep(rand(1, 3)); // 随机处理时间
        if (rand(1, 10) === 1) { // 模拟10%的失败率
            throw new Exception("邮件发送失败: SMTP错误");
        }
        echo "发送邮件至: {$data['to']}\n";
    }

    private function generateReport($data) {
        // 模拟报告生成
        sleep(rand(2, 5));
        echo "生成报告: {$data['report_name']}\n";
    }

    private function cleanupData($data) {
        // 模拟数据清理
        sleep(rand(1, 4));
        echo "清理数据表: {$data['table_name']}\n";
    }
}

// Worker进程
function runWorker($queueName) {
    $queue = new DBQueue();
    $processor = new TaskProcessor();

    echo "Worker启动,监听队列: $queueName\n";

    while (true) {
        $task = $queue->getTask($queueName);

        if ($task) {
            echo "开始处理任务 ID: {$task['id']}\n";
            $processor->processTask($task);
        } else {
            // 没有任务,短暂休眠
            sleep(1);
        }
    }
}

// 定时任务:重置超时任务
function runTimeoutReset() {
    $queue = new DBQueue();
    while (true) {
        $resetCount = $queue->resetTimedOutTasks(5); // 5分钟超时
        if ($resetCount > 0) {
            echo "已重置 {$resetCount} 个超时任务\n";
        }
        sleep(60); // 每分钟检查一次
    }
}

// 命令行参数处理
if (php_sapi_name() === 'cli') {
    initializeDatabase();

    $command = $argv[1] ?? null;

    switch ($command) {
        case 'worker':
            $queueName = $argv[2] ?? 'default';
            runWorker($queueName);
            break;

        case 'timeout-reset':
            runTimeoutReset();
            break;

        case 'add-task':
            $type = $argv[2] ?? 'email';
            $queue = new DBQueue();

            $payload = ['type' => $type];

            // 根据类型添加不同参数
            switch ($type) {
                case 'email':
                    $payload['to'] = 'user'.rand(1,100).'@example.com';
                    $payload['subject'] = '测试邮件';
                    break;
                case 'report':
                    $payload['report_name'] = '日报_'.date('Ymd');
                    break;
                case 'cleanup':
                    $payload['table_name'] = 'logs_'.date('Ym');
                    break;
            }

            $taskId = $queue->addTask('default', $payload);
            echo "添加任务成功,ID: $taskId\n";
            break;

        default:
            echo "可用命令:\n";
            echo "  php queue.php worker [队列名称] - 启动worker进程\n";
            echo "  php queue.php timeout-reset - 启动超时重置进程\n";
            echo "  php queue.php add-task [任务类型] - 添加新任务\n";
            break;
    }
} else {
    // Web访问显示队列状态
    displayQueueStatus();
}

// 显示队列状态(Web访问)
function displayQueueStatus() {
    $queue = new DBQueue();
    $pdo = $queue->getPDO();

    // 获取队列统计信息
    $stats = $pdo->query("
        SELECT 
            queue_name,
            SUM(CASE WHEN status = 0 THEN 1 ELSE 0 END) as pending,
            SUM(CASE WHEN status = 1 THEN 1 ELSE 0 END) as processing,
            SUM(CASE WHEN status = 2 THEN 1 ELSE 0 END) as completed,
            SUM(CASE WHEN status = 3 THEN 1 ELSE 0 END) as failed,
            COUNT(*) as total
        FROM message_queue
        GROUP BY queue_name
    ")->fetchAll(PDO::FETCH_ASSOC);

    // 获取最近任务
    $recentTasks = $pdo->query("
        SELECT * FROM message_queue 
        ORDER BY created_at DESC 
        LIMIT 10
    ")->fetchAll(PDO::FETCH_ASSOC);

    // 渲染HTML界面
    echo '<!DOCTYPE html>
    <html lang="zh-CN">
    <head>
        <meta charset="UTF-8">
        <meta name="viewport" content="width=device-width, initial-scale=1.0">
        <title>数据库消息队列监控</title>
        <style>
            body { font-family: Arial, sans-serif; margin: 20px; }
            .container { max-width: 1200px; margin: 0 auto; }
            .stats { display: flex; flex-wrap: wrap; gap: 20px; margin-bottom: 30px; }
            .stat-card { background: #f8f9fa; border: 1px solid #dee2e6; border-radius: 5px; padding: 15px; flex: 1; min-width: 200px; }
            .stat-card h3 { margin-top: 0; }
            .queue-name { font-weight: bold; color: #0d6efd; }
            table { width: 100%; border-collapse: collapse; margin-bottom: 20px; }
            th, td { border: 1px solid #dee2e6; padding: 8px; text-align: left; }
            th { background-color: #f8f9fa; }
            .status-0 { background-color: #fff3cd; }
            .status-1 { background-color: #cff4fc; }
            .status-2 { background-color: #d1e7dd; }
            .status-3 { background-color: #f8d7da; }
        </style>
    </head>
    <body>
        <div class="container">
            <h1>数据库消息队列监控</h1>

            <div class="stats">';

    foreach ($stats as $stat) {
        echo '<div class="stat-card">
                <h3>队列: <span class="queue-name">'.$stat['queue_name'].'</span></h3>
                <p>待处理: '.$stat['pending'].'</p>
                <p>处理中: '.$stat['processing'].'</p>
                <p>已完成: '.$stat['completed'].'</p>
                <p>失败: '.$stat['failed'].'</p>
                <p><strong>总计: '.$stat['total'].'</strong></p>
            </div>';
    }

    echo '</div>

        <h2>最近任务</h2>
        <table>
            <thead>
                <tr>
                    <th>ID</th>
                    <th>队列</th>
                    <th>类型</th>
                    <th>状态</th>
                    <th>创建时间</th>
                    <th>开始时间</th>
                    <th>完成时间</th>
                    <th>重试次数</th>
                </tr>
            </thead>
            <tbody>';

    foreach ($recentTasks as $task) {
        $payload = json_decode($task['payload'], true);
        $statusText = ['等待', '处理中', '完成', '失败'][$task['status']];

        echo '<tr class="status-'.$task['status'].'">
                <td>'.$task['id'].'</td>
                <td>'.$task['queue_name'].'</td>
                <td>'.($payload['type'] ?? '').'</td>
                <td>'.$statusText.'</td>
                <td>'.$task['created_at'].'</td>
                <td>'.$task['started_at'].'</td>
                <td>'.$task['completed_at'].'</td>
                <td>'.$task['retry_count'].'</td>
            </tr>';
    }

    echo '</tbody>
        </table>
    </div>
    </body>
    </html>';
}

并发管理最佳实践

  1. 使用Supervisor管理进程
   ; /etc/supervisor/conf.d/queue_worker.conf

[program:queue_worker]

command=php /path/to/queue.php worker default process_name=%(program_name)s_%(process_num)02d numprocs=4 ; 启动4个worker进程 autostart=true autorestart=true user=www-data redirect_stderr=true stdout_logfile=/var/log/queue_worker.log

[program:queue_timeout_reset]

command=php /path/to/queue.php timeout-reset autostart=true autorestart=true user=www-data redirect_stderr=true stdout_logfile=/var/log/queue_timeout_reset.log

  1. 数据库优化技巧
  • statusqueue_name字段创建索引
  • 定期归档已完成的任务
   -- 创建索引
   CREATE INDEX idx_status ON message_queue (status);
   CREATE INDEX idx_queue_name ON message_queue (queue_name);

   -- 归档旧任务
   CREATE TABLE message_queue_archive LIKE message_queue;
   INSERT INTO message_queue_archive 
   SELECT * FROM message_queue 
   WHERE status = 2 AND completed_at < NOW() - INTERVAL 30 DAY;

   DELETE FROM message_queue 
   WHERE status = 2 AND completed_at < NOW() - INTERVAL 30 DAY;
  1. 处理死锁和重试
   public function getTask($queueName) {
       $maxRetries = 3;
       $retryCount = 0;

       while ($retryCount < $maxRetries) {
           try {
               // 尝试获取任务...
               return $task;
           } catch (PDOException $e) {
               // 检查是否为死锁错误
               if ($e->errorInfo[1] == 1213) { // MySQL死锁错误代码
                   $retryCount++;
                   usleep(100000); // 等待100ms后重试
                   continue;
               }
               throw $e;
           }
       }
       return null;
   }

性能优化建议

  1. 分表分区
  • 按队列名称分表:queue_emails, queue_reports
  • 按时间分区:每月一个分区
  1. 连接池
  • 使用连接池减少数据库连接开销
  • 推荐工具:Swoole\Database\PDOPool
  1. 批量处理
   // 一次获取多个任务
   $sql = "SELECT * FROM message_queue 
          WHERE queue_name = ? AND status = 0
          ORDER BY created_at ASC 
          LIMIT 10 
          FOR UPDATE SKIP LOCKED";
  1. 读写分离
  • 写操作使用主库
  • 读操作使用从库

总结

通过合理使用数据库的SKIP LOCKED特性、进程管理和监控机制,可以在基于数据库的消息队列中实现安全高效的并发处理。关键点包括:

  1. 使用SELECT ... FOR UPDATE SKIP LOCKED安全获取任务
  2. 使用Supervisor管理多个worker进程
  3. 实现任务超时重置机制
  4. 添加适当的数据库索引
  5. 提供监控界面可视化队列状态

对于更高吞吐量的场景(>100 TPS),建议考虑迁移到Redis或专业消息队列系统(如RabbitMQ、Kafka)。但对于中小型应用,数据库队列仍然是一个简单可靠的解决方案。