laravel队列实现阶梯降频轮询 | 开源可商用方案 | laravel china 社区-大发黄金版app下载
例如有这样的场景,某电商系统的订单支付状态需要和上游的支付系统保持实时同步,需要每秒检查一次订单状态,如果订单状态为未支付,则继续检查,直到订单已完成。
然而针对每一笔订单,每秒检查一次会对服务器和上游api造成很大的负载。
如果我们能动态调整检查频率,比如前五次尝试中每1秒检查一次订单状态, 然后在接下来的五次尝试中每5秒检查一次,依此类推。这样既能减少服务器和外部api的负载,又能确保及时更新。
我们给他起个名字叫做阶梯降频轮询。
真实的例子比如对接支付宝时,支付成功的异步通知以逐渐减低请求频次的的方式通知业务方:
在进行异步通知交互时,如果支付宝收到的应答不是 success ,支付宝会认为通知失败,会通过一定的策略定期重新发起通知。通知的间隔频率为:4m、10m、10m、1h、2h、6h、15h。
准备工作
创建一个空的 laravel 11 项目,接下来我们来实现每间隔几秒钟请求上游api同步订单状态,同时确保控制好频率,不要发起太多请求,也不要在短时间内发起重复请求。
设置队列任务
创建基础任务类
首先,我们需要一个基础任务类来实现轮询逻辑。下面是一个 basepollingjob 类的示例,它实现了基本的轮询功能:
namespace app\jobs;
use illuminate\contracts\queue\shouldbeunique;
use illuminate\contracts\queue\shouldqueue;
use illuminate\foundation\queue\queueable;
use illuminate\queue\maxattemptsexceededexception;
use illuminate\support\facades\log;
abstract class basepollingjob implements shouldqueue, shouldbeunique
{
    use queueable;
    protected $jobdesc = '任务轮询队列';
    protected $jobpayload;
    const timeout_second = 60 * 3;
    public function retryuntil()
    {
        return now()->addseconds(self::timeout_second);
    }在这个类中,我们定义了任务的描述、有效负载和重试的超时时间。retryuntil 方法用于指定任务的最大重试时间,如果超过该时间队列任务尚未完成,laravel-queue会抛出一个异常maxattemptsexceededexception表示已达最大重试次数,业务上可以在fail()方法里标记处理超时。
调度下次运行
接下来,我们实现一个 schedulenextrunning 方法,根据尝试次数调节下次执行的时间间隔:
    protected function schedulenextrunning()
    {
        $attempts = $this->job->attempts();
        if ($attempts <= 5) {
            $this->release(1); // 前5次,每隔1秒执行一次
        } elseif ($attempts <= 10) {
            $this->release(5); // 接下来5次,每隔5秒执行一次
        } elseif ($attempts <= 20) {
            $this->release(10); // 接下来10次,每隔10秒执行一次
        } else {
            $this->release(30); // 超过20次后,每隔30秒执行一次
        }
    }这个方法根据当前的尝试次数来决定下次任务的执行间隔,确保在初始阶段短时间间隔检查,然后逐渐减少频率。
处理任务失败
在任务失败时或者处理超时,我们需要记录错误并执行相应的逻辑:
    public function failed(\throwable $exception)
    {
        if ($exception instanceof maxattemptsexceededexception) {
            log::info($this->jobdesc . '超时退出执行', [
                'payload' => $this->jobpayload,
                'error' => $exception->getmessage(),
            ]);
            $this->aftermaxattemptsexceeded();
        } else {
            log::error($this->jobdesc . '异常失败', [
                'payload' => $this->jobpayload,
                'error' => $exception->getmessage(),
                'file' => $exception->getfile(),
                'line' => $exception->getline(),
                'trace' => $exception->gettrace()
            ]);
        }
    }在这里,我们可以记录错误信息,并根据需要进行后续处理。
实现轮询任务
接下来,我们创建一个具体的轮询任务类 pollingorderstatusjob,用于检查订单状态:
namespace app\jobs;
use app\enums\orderstatus;
use app\models\order;
use illuminate\support\facades\log;
class pollingorderstatusjob extends basepollingjob
{
    protected $jobdesc = '订单同步队列';
    private order $order;
    public function __construct(order $order)
    {
        $this->order = $order;
        $this->jobpayload = $order->toarray();
    }在这个类中,我们定义了任务的描述和订单对象。在构造函数中,我们将订单信息存储为有效负载,以便后续使用。
为了确保同一个订单号不会重复入队列,我们需设置订单号为唯一锁获取的依据。
    // 使用订单号来获取唯一锁
    public function uniqueid()
    {
        return $this->order->trade_no;
    }处理任务逻辑
在 handle 方法中,我们实现具体的任务逻辑:
    public function handle()
    {
        try {
            log::info($this->jobdesc . '开始执行', [$this->order->id, $this->order->trade_no]);
            // 前置检测:订单状态如果已是终态,无需操作,退出队列
            if (!$this->checkorderbeforepolling($this->order)) {
                $this->delete();
                return;
            }
            // 查询订单状态:比如发起一个http请求,获取最新状态
            if (!$this->orderquery($this->order)) {
                $this->delete();
                return;
            }
            // 调度下次运行的时机
            $this->schedulenextrunning();
        } catch (\throwable $e) {
            $this->fail($e);
        }
    }在这个方法中,我们首先记录任务开始的日志。接着,调用 checkorderbeforepolling 方法检查订单是否需要继续轮询。如果不需要,则删除任务并返回。然后,我们调用 orderquery 方法查询订单状态,最后调度下次运行。
前置检测
在 checkorderbeforepolling 方法中,如果订单状态已是终态,比如已超时或已完成,就不需要同步了。防止因上层业务异常,这里可以做拦截。
    private function checkorderbeforepolling(order $order): bool
    {
        $orderstatus = orderstatus::from($order->status);
        if (!in_array($orderstatus, [orderstatus::default, orderstatus::paying])) {
            return false;
        }
        return true;
    }查询订单状态
orderquery 方法模拟查询订单状态的过程:
    private function orderquery(order $order)
    {
        // 模拟订单查询请求
        sleep(1);
        // 支付成功返回
//        $result = [
//            'amount' => 100,
//            'payment_no' => 'p123456',
//            'status' => 'success'
//        ];
        // 正在支付中返回
        $result = [
            'amount' => null,
            'payment_no' => null,
            'status' => 'pending'
        ];
       return $result['status'] == 'pending';
    }这里我们简单模拟了一个查询请求,实际应用中可能会调用外部 api。
处理超时
    public function aftermaxattemptsexceeded()
    {
        try {
            $this->order->status = orderstatus::timeout->value;
            $this->order->save();
            log::info($this->jobdesc . '-更新订单状态为超时', [$this->order->trade_no]);
        } catch (\throwable $e) {
            log::error($this->jobdesc . '-超时后置逻辑执行异常', [$this->order, $e->getmessage()]);
        }
    }最后
完整代码包括日志记录、数据库初始化、手把手教程,开箱即用。已开源在这里:
来自亿级别流水项目实战经验总结,开源不易,还请各位看官点个爱心/star,您的支持是我前进的动力~
本作品采用《cc 协议》,转载必须注明作者和本文链接
 
 
推荐文章: