温馨提示:本文共4408个字,读完预计12分钟。
一.基于redis
1.安装redis ,并进行相关配置
2.yii2 项目中使用composer 安装 yii2-redis
3.yii2 项目中使用compsoer 安装yii2-queue
-
配置文件设置: -
-
'redis' => [ -
'class' => \common\components\cache\RedisCache::class, -
'hostname' => 'localhost', -
'port' => 6379, -
'database' => 0, -
'connectionTimeout'=>3 -
], -
/*'queue' => [ -
//RabbitMq 队列方案 -
'class' => yii\queue\amqp\Queue::class, -
'host' => '127.0.0.1', -
'port' => 5672, -
'user' => 'guest', -
'password' => 'guest', -
'queueName' => 'yii-queue', -
],*/ -
-
'queue' => [ -
//Redis 队列方案 -
'class' => \yii\queue\redis\Queue::class, -
// 连接组件或它的配置 -
'redis' => 'redis', -
// Queue channel key -
'channel' => 'queue', -
'as log'=> \yii\queue\LogBehavior::class, -
],
队列执行任务程序:
-
任务 一: -
<?php -
namespace services\queue\taskjob; -
-
use services\queue\QueueTask; -
use yii\base\BaseObject; -
-
class ReporttaskJob extends BaseObject implements \yii\queue\JobInterface -
{ -
public $scene; -
public $content; -
-
public function execute($queue) -
{ -
//$transaction = \Yii::$app->getDb()->beginTransaction(); -
echo "{$this->scene}".PHP_EOL; -
$saleSubjectServer = QueueTask::getSubject($this->scene); -
QueueTask::addObserver($saleSubjectServer,$this->scene,$this->content); -
//$transaction->commit(); -
-
} -
}
-
任务 二: -
<?php -
namespace services\queue\taskjob; -
-
-
use yii\base\BaseObject; -
-
class TesttaskJob extends BaseObject implements \yii\queue\JobInterface -
{ -
-
public $name; -
public $content; -
-
public function execute($queue) -
{ -
//print_r($queue); -
echo "{$this->name}--{$this->content}".PHP_EOL; -
} -
-
}
发布队列任务:
-
<?php -
-
namespace console\controllers; -
-
use services\queue\QueueTask; -
use services\queue\taskjob\ReporttaskJob; -
use services\queue\taskjob\TesttaskJob; -
-
class QueuesController extends BaseConsoleController -
{ -
-
/** -
* 控制台运行命令:./console/yii-advanced queue/up-star -
* @return array -
*/ -
public function actionUpStar() -
{ -
$this->doTestJob(); -
$this->doReportJob(); -
-
} -
-
private function doReportJob() -
{ -
for ($i = 1; $i <= 10; $i++) -
{ -
$randNum = rand(10,100); -
$job = new ReporttaskJob([ -
'scene' => QueueTask::SCENE_SALE, -
'content' => ['id' => 'dapeng', 'name' => $i.'大鹏'.$randNum], -
]); -
//延迟执行 -
$res = \Yii::$app->queue->delay(2 * $i)->push($job); -
//无延迟执行 -
//$res = \Yii::$app->queue->push($job); -
echo $res . PHP_EOL; -
} -
} -
-
private function doTestJob() -
{ -
for ($i = 1; $i <= 10; $i++) { -
$job = new TesttaskJob([ -
'name' => "{$i}-大鹏", -
'content' => "{$i}-man", -
]); -
$res = \Yii::$app->queue->delay(2 * $i)->push($job); -
echo $res . PHP_EOL; -
} -
} -
}
队列正式执行命令:
//执行发布任务
./console/yii-advanced queues/up-star
//执行监听队列
console/yii-advanced queue/listen
console/yii-advanced
queue/listen5 //表示每隔5s监听一次队列console/yii-advanced
queue/run//运行队列任务直到空,一般用于定时cronconsole/yii-advanced
queue/info//查看队列状态
//查看队列目前状态
console/yii-advanced queue/info
Jobs
– waiting: 0 等待处理的消息对象
– delayed: 6 延迟处理的消息对象
– reserved: 0 从队列获取作业,并执行它
– done: 387 已完成消息对象
// 将作业推送到队列并获得其ID
$id = Yii::$app->queue->push(new SomeJob());
// 这个作业等待执行。
Yii::$app->queue->isWaiting($id);
// Worker 从队列获取作业,并执行它。
Yii::$app->queue->isReserved($id);
// Worker 作业执行完成。
Yii::$app->queue->isDone($id);
参考文献:https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.1/zh-cn/usage
如果worker使用PHP以外的东西实现,那么您必须更改序列化数据的方式。例如, JSON:
-
return [ -
'components' => [ -
'queue' => [ -
'class' => \yii\queue\<driver>\Queue::class, -
'strictJobType' => false, -
'serializer' => \yii\queue\serializers\JsonSerializer::class, -
], -
], -
];
多生产多消费者
多进程守护执行,每个队列可以均衡的分配,如果为了更加省心可以使用同步锁机制执行
console/yii-advanced queue/listen
为了进程持续有效不掉档启用。可以使用:supervisor 服务
5. 启动/监听队列
yii2 使用command命令启动队列 命令有:
1. php yii queue/info
查看目前去队列信息,主要是读取存储的数据,貌似yii2框架队列的数据一直会存储在redis中
2. php yii queue/run
直接运行队列,跑完后会终止进程
3. php yii queue/listen
监听队列,只要有新的队列过来就会处理,进程直到自行关闭才回退出
- yii queue/clear//清空初始化
- yii queue/run // 执行
- yii queue/listen //监听队列
- yii queue/info 来//队列状态
yii queue/listen [wait]
listen命令启动一个守护进程,它可以无限查询队列。如果有新的任务,他们立即得到并执行。
wait是下一次查询队列的时间 当命令正确地通过supervisor来实现时,这种方法是最有效的。
yii queue/run
run命令获取并执行循环中的任务,直到队列为空。适用与cron。
run与listen命令的参数:
—verbose,-v: 将执行状态输出到控制台。
—isolate: 详细模式执行作业。如果启用,将打印每个作业的执行结果。
—color: 高亮显示输出结果。
yii queue/info
info命令打印关于队列状态的信息。
6. 使用Supervisor将队列任务启动 添加到守护进程中
推荐安装Supervisor,将 php yii queue/listen 等一系列队列进程,添加到进程保护中,防止中途崩溃时候,可以自救,哈哈~😄
关于Supervisor,可以参考《centos安装Supervisor以及简单配置(添加进程守护)》
Redis队列
queue.attempts: Hash 任务接收次数信息
queue.message_id: String 任务总发布数量
queue.messages: Hash 任务消息对象数据
queue.reserved: Zset
queue.waiting: List 等待处理的消息队列 (消息对象数据来自 queue.messages,用key进行关联)
queue.moving_lock:String 队列锁




Redis队列监听执行
RabbitMq队列监听执行【RabbitMq不支持失败重试一但失败整个对了将停止运行】
总结
- 安装yiisoft/yii2-queue
- 配置
- 创建队列任务类
- 控制器将数据添加到队列中
- 启动/监听队列
- 使用Supervisor将队列任务启动 添加到守护进程中