think-queue
think-queue copied to clipboard
migrateExpiredJobs job 过多
migrateExpiredJobs 中 同时符合的 job 过多时 会导致内存溢出
/**
* 移动延迟任务
*
* @param string $from
* @param string $to
* @param bool $attempt
*/
public function migrateExpiredJobs($from, $to, $attempt = true)
{
$this->redis->watch($from);
$time = time();
$limit = 100;
$jobs = $this->getExpiredJobs($from, $time, ['limit' => [0, $limit]]);
if(count($jobs)>0){
$this->transaction(function () use ($from, $to, $time, $jobs, $attempt,$limit) {
if (count($jobs) < $limit) {
// 数量少
$this->removeExpiredJobs($from, $time);
$this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt);
}else{
$page = 0;
$new_self = (new Redis(array_merge($this->options,['persistent'=>false])));
while(true){
$this->pushExpiredJobsOntoNewQueue($to, $jobs, $attempt);
if (count($jobs) < $limit) {
break;
}
$page++;
// 事务之外获取数据 当前事务内 会返回自身 object
$jobs = $new_self->getExpiredJobs($from,$time, ['limit' => [$page*$limit, $limit]]);
if (count($jobs) <= 0) {
break;
}
}
$this->removeExpiredJobs($from, $time);
}
});
}
$this->redis->unwatch();
}
/**
* 获取所有到期任务
*
* @param string $from
* @param int $time
* @return array
*/
protected function getExpiredJobs($from, $time, $options = [])
{
return $this->redis->zRangeByScore($from, '-inf', $time, $options);
}