温馨提示:本文共4408个字,读完预计12分钟。
一.基于redis
1.安装redis ,并进行相关配置
2.yii2 项目中使用composer 安装 yii2-redis
3.yii2 项目中使用compsoer 安装yii2-queue
-
配置文件设置:
-
-
'redis' => [
-
'class' => \common\components\cache\RedisCache::class,
-
'hostname' => 'localhost',
-
'port' => 6379,
-
'database' => 0,
-
'connectionTimeout'=>3
-
],
-
/*'queue' => [
-
//RabbitMq 队列方案
-
'class' => yii\queue\amqp\Queue::class,
-
'host' => '127.0.0.1',
-
'port' => 5672,
-
'user' => 'guest',
-
'password' => 'guest',
-
'queueName' => 'yii-queue',
-
],*/
-
-
'queue' => [
-
//Redis 队列方案
-
'class' => \yii\queue\redis\Queue::class,
-
// 连接组件或它的配置
-
'redis' => 'redis',
-
// Queue channel key
-
'channel' => 'queue',
-
'as log'=> \yii\queue\LogBehavior::class,
-
],
队列执行任务程序:
-
任务 一:
-
<?php
-
namespace services\queue\taskjob;
-
-
use services\queue\QueueTask;
-
use yii\base\BaseObject;
-
-
class ReporttaskJob extends BaseObject implements \yii\queue\JobInterface
-
{
-
public $scene;
-
public $content;
-
-
public function execute($queue)
-
{
-
//$transaction = \Yii::$app->getDb()->beginTransaction();
-
echo "{$this->scene}".PHP_EOL;
-
$saleSubjectServer = QueueTask::getSubject($this->scene);
-
QueueTask::addObserver($saleSubjectServer,$this->scene,$this->content);
-
//$transaction->commit();
-
-
}
-
}
-
任务 二:
-
<?php
-
namespace services\queue\taskjob;
-
-
-
use yii\base\BaseObject;
-
-
class TesttaskJob extends BaseObject implements \yii\queue\JobInterface
-
{
-
-
public $name;
-
public $content;
-
-
public function execute($queue)
-
{
-
//print_r($queue);
-
echo "{$this->name}--{$this->content}".PHP_EOL;
-
}
-
-
}
发布队列任务:
-
<?php
-
-
namespace console\controllers;
-
-
use services\queue\QueueTask;
-
use services\queue\taskjob\ReporttaskJob;
-
use services\queue\taskjob\TesttaskJob;
-
-
class QueuesController extends BaseConsoleController
-
{
-
-
/**
-
* 控制台运行命令:./console/yii-advanced queue/up-star
-
* @return array
-
*/
-
public function actionUpStar()
-
{
-
$this->doTestJob();
-
$this->doReportJob();
-
-
}
-
-
private function doReportJob()
-
{
-
for ($i = 1; $i <= 10; $i++)
-
{
-
$randNum = rand(10,100);
-
$job = new ReporttaskJob([
-
'scene' => QueueTask::SCENE_SALE,
-
'content' => ['id' => 'dapeng', 'name' => $i.'大鹏'.$randNum],
-
]);
-
//延迟执行
-
$res = \Yii::$app->queue->delay(2 * $i)->push($job);
-
//无延迟执行
-
//$res = \Yii::$app->queue->push($job);
-
echo $res . PHP_EOL;
-
}
-
}
-
-
private function doTestJob()
-
{
-
for ($i = 1; $i <= 10; $i++) {
-
$job = new TesttaskJob([
-
'name' => "{$i}-大鹏",
-
'content' => "{$i}-man",
-
]);
-
$res = \Yii::$app->queue->delay(2 * $i)->push($job);
-
echo $res . PHP_EOL;
-
}
-
}
-
}
队列正式执行命令:
//执行发布任务
./console/yii-advanced queues/up-star
//执行监听队列
console/yii-advanced queue/listen
console/yii-advanced
queue/listen
5 //
表示每隔5s监听一次队列
console/yii-advanced
queue/run
//运行队列任务直到空,一般用于定时
cron
console/yii-advanced
queue/info
//查看队列状态
//查看队列目前状态
console/yii-advanced queue/info
Jobs
– waiting: 0 等待处理的消息对象
– delayed: 6 延迟处理的消息对象
– reserved: 0 从队列获取作业,并执行它
– done: 387 已完成消息对象
// 将作业推送到队列并获得其ID
$id = Yii::$app->queue->push(new SomeJob());
// 这个作业等待执行。
Yii::$app->queue->isWaiting($id);
// Worker 从队列获取作业,并执行它。
Yii::$app->queue->isReserved($id);
// Worker 作业执行完成。
Yii::$app->queue->isDone($id);
参考文献:https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.1/zh-cn/usage
如果worker使用PHP以外的东西实现,那么您必须更改序列化数据的方式。例如, JSON:
-
return [
-
'components' => [
-
'queue' => [
-
'class' => \yii\queue\<driver>\Queue::class,
-
'strictJobType' => false,
-
'serializer' => \yii\queue\serializers\JsonSerializer::class,
-
],
-
],
-
];
多生产多消费者
多进程守护执行,每个队列可以均衡的分配,如果为了更加省心可以使用同步锁机制执行
console/yii-advanced queue/listen
为了进程持续有效不掉档启用。可以使用:supervisor 服务
5. 启动/监听队列
yii2 使用command命令启动队列 命令有:
1. php yii queue/info
查看目前去队列信息,主要是读取存储的数据,貌似yii2框架队列的数据一直会存储在redis中
2. php yii queue/run
直接运行队列,跑完后会终止进程
3. php yii queue/listen
监听队列,只要有新的队列过来就会处理,进程直到自行关闭才回退出
- yii queue/clear//清空初始化
- yii queue/run // 执行
- yii queue/listen //监听队列
- yii queue/info 来//队列状态
yii queue/listen [wait]
listen命令启动一个守护进程,它可以无限查询队列。如果有新的任务,他们立即得到并执行。
wait是下一次查询队列的时间 当命令正确地通过supervisor来实现时,这种方法是最有效的。
yii queue/run
run命令获取并执行循环中的任务,直到队列为空。适用与cron。
run与listen命令的参数:
—verbose,-v: 将执行状态输出到控制台。
—isolate: 详细模式执行作业。如果启用,将打印每个作业的执行结果。
—color: 高亮显示输出结果。
yii queue/info
info命令打印关于队列状态的信息。
6. 使用Supervisor将队列任务启动 添加到守护进程中
推荐安装Supervisor,将 php yii queue/listen 等一系列队列进程,添加到进程保护中,防止中途崩溃时候,可以自救,哈哈~😄
关于Supervisor,可以参考《centos安装Supervisor以及简单配置(添加进程守护)》
Redis队列
queue.attempts: Hash 任务接收次数信息
queue.message_id: String 任务总发布数量
queue.messages: Hash 任务消息对象数据
queue.reserved: Zset
queue.waiting: List 等待处理的消息队列 (消息对象数据来自 queue.messages,用key进行关联)
queue.moving_lock:String 队列锁
Redis队列监听执行
RabbitMq队列监听执行【RabbitMq不支持失败重试一但失败整个对了将停止运行】
总结
- 安装yiisoft/yii2-queue
- 配置
- 创建队列任务类
- 控制器将数据添加到队列中
- 启动/监听队列
- 使用Supervisor将队列任务启动 添加到守护进程中