think-queue icon indicating copy to clipboard operation
think-queue copied to clipboard

migrateExpiredJobs job 过多

Open yanqibin opened this issue 4 years ago • 2 comments

migrateExpiredJobs 中 同时符合的 job 过多时 会导致内存溢出

yanqibin avatar Nov 25 '21 03:11 yanqibin

/**
 * 移动延迟任务
 *
 * @param string $from
 * @param string $to
 * @param bool   $attempt
 */
public function migrateExpiredJobs($from, $to, $attempt = true)
{
    $this->redis->watch($from);
    $time = time();
    $limit = 100;
    $jobs = $this->getExpiredJobs($from, $time, ['limit' => [0, $limit]]);
    if(count($jobs)>0){
        $this->transaction(function () use ($from, $to, $time, $jobs, $attempt,$limit) {
            if (count($jobs) < $limit) {
                // 数量少
                $this->removeExpiredJobs($from, $time);
                $this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt);
            }else{

                $page = 0;

                $new_self = (new Redis(array_merge($this->options,['persistent'=>false])));
                while(true){
                    $this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt);
                    if (count($jobs) < $limit) {
                        break;
                    }
                    $page++;

                    // 事务之外获取数据   当前事务内 会返回自身 object
                    $jobs = $new_self->getExpiredJobs($from,$time, ['limit' => [$page*$limit, $limit]]);
                    if (count($jobs) <= 0) {
                        break;
                    }
                }
                $this->removeExpiredJobs($from, $time);
            }



        });
    }


    $this->redis->unwatch();
}

yanqibin avatar Nov 25 '21 07:11 yanqibin

/**
 * 获取所有到期任务
 *
 * @param  string $from
 * @param  int    $time
 * @return array
 */
protected function getExpiredJobs($from, $time, $options = [])
{

    return $this->redis->zRangeByScore($from, '-inf', $time, $options);
}

yanqibin avatar Nov 25 '21 07:11 yanqibin