当前位置:

Yii2 queue的队列使用详解

温馨提示:本文共4408个字,读完预计12分钟。

文章目录
  • 5. 启动/监听队列
  • 6. 使用Supervisor将队列任务启动 添加到守护进程中

一.基于redis

1.安装redis ,并进行相关配置

2.yii2 项目中使用composer 安装 yii2-redis

3.yii2 项目中使用compsoer 安装yii2-queue


 
  1. 配置文件设置:

  2.  

  3. 'redis' => [

  4. 'class' => \common\components\cache\RedisCache::class,

  5. 'hostname' => 'localhost',

  6. 'port' => 6379,

  7. 'database' => 0,

  8. 'connectionTimeout'=>3

  9. ],

  10. /*'queue' => [

  11. //RabbitMq 队列方案

  12. 'class' => yii\queue\amqp\Queue::class,

  13. 'host' => '127.0.0.1',

  14. 'port' => 5672,

  15. 'user' => 'guest',

  16. 'password' => 'guest',

  17. 'queueName' => 'yii-queue',

  18. ],*/

  19.  

     

  20. 'queue' => [

  21.   //Redis 队列方案

  22. 'class' => \yii\queue\redis\Queue::class,

  23. // 连接组件或它的配置

  24. 'redis' => 'redis',

  25. // Queue channel key

  26. 'channel' => 'queue',

  27. 'as log'=> \yii\queue\LogBehavior::class,

  28. ],

 

队列执行任务程序:


 
  1. 任务 一:

  2. <?php

  3. namespace services\queue\taskjob;

  4.  

  5. use services\queue\QueueTask;

  6. use yii\base\BaseObject;

  7.  

  8. class ReporttaskJob extends BaseObject implements \yii\queue\JobInterface

  9. {

  10. public $scene;

  11. public $content;

  12.  

     

  13. public function execute($queue)

  14. {

  15. //$transaction = \Yii::$app->getDb()->beginTransaction();

  16. echo "{$this->scene}".PHP_EOL;

  17. $saleSubjectServer = QueueTask::getSubject($this->scene);

  18. QueueTask::addObserver($saleSubjectServer,$this->scene,$this->content);

  19. //$transaction->commit();

  20.  

     

  21. }

  22. }

 

 
  1. 任务 二:

  2. <?php

  3. namespace services\queue\taskjob;

  4.  

  5.  

  6. use yii\base\BaseObject;

  7.  

  8. class TesttaskJob extends BaseObject implements \yii\queue\JobInterface

  9. {

  10.  

  11. public $name;

  12. public $content;

  13.  

  14. public function execute($queue)

  15. {

  16. //print_r($queue);

  17. echo "{$this->name}--{$this->content}".PHP_EOL;

  18. }

  19.  

  20. }

 

 

发布队列任务:


 
  1. <?php

  2.  

  3. namespace console\controllers;

  4.  

  5. use services\queue\QueueTask;

  6. use services\queue\taskjob\ReporttaskJob;

  7. use services\queue\taskjob\TesttaskJob;

  8.  

  9. class QueuesController extends BaseConsoleController

  10. {

  11.  

  12. /**

  13. * 控制台运行命令:./console/yii-advanced queue/up-star

  14. * @return array

  15. */

  16. public function actionUpStar()

  17. {

  18. $this->doTestJob();

  19. $this->doReportJob();

  20.  

  21. }

  22.  

  23. private function doReportJob()

  24. {

  25. for ($i = 1; $i <= 10; $i++)

  26. {

  27. $randNum = rand(10,100);

  28. $job = new ReporttaskJob([

  29. 'scene' => QueueTask::SCENE_SALE,

  30. 'content' => ['id' => 'dapeng', 'name' => $i.'大鹏'.$randNum],

  31. ]);

  32. //延迟执行

  33. $res = \Yii::$app->queue->delay(2 * $i)->push($job);

  34. //无延迟执行

  35. //$res = \Yii::$app->queue->push($job);

  36. echo $res . PHP_EOL;

  37. }

  38. }

  39.  

  40. private function doTestJob()

  41. {

  42. for ($i = 1; $i <= 10; $i++) {

  43. $job = new TesttaskJob([

  44. 'name' => "{$i}-大鹏",

  45. 'content' => "{$i}-man",

  46. ]);

  47. $res = \Yii::$app->queue->delay(2 * $i)->push($job);

  48. echo $res . PHP_EOL;

  49. }

  50. }

  51. }

 

队列正式执行命令:

//执行发布任务

./console/yii-advanced queues/up-star 

//执行监听队列

console/yii-advanced queue/listen 

console/yii-advanced queue/listen 5  //表示每隔5s监听一次队列

console/yii-advanced queue/run            //运行队列任务直到空,一般用于定时cron

console/yii-advanced queue/info         //查看队列状态

//查看队列目前状态

console/yii-advanced queue/info

Jobs
  – waiting: 0    等待处理的消息对象
  – delayed: 6   延迟处理的消息对象
  – reserved: 0  从队列获取作业,并执行它
  – done: 387   已完成消息对象

 

 

// 将作业推送到队列并获得其ID

$id = Yii::$app->queue->push(new SomeJob());

// 这个作业等待执行。

Yii::$app->queue->isWaiting($id);

// Worker 从队列获取作业,并执行它。

Yii::$app->queue->isReserved($id);

// Worker 作业执行完成。

Yii::$app->queue->isDone($id);

参考文献:https://www.yiiframework.com/extension/yiisoft/yii2-queue/doc/guide/2.1/zh-cn/usage

如果worker使用PHP以外的东西实现,那么您必须更改序列化数据的方式。例如, JSON:


 
  1. return [

  2. 'components' => [

  3. 'queue' => [

  4. 'class' => \yii\queue\<driver>\Queue::class,

  5. 'strictJobType' => false,

  6. 'serializer' => \yii\queue\serializers\JsonSerializer::class,

  7. ],

  8. ],

  9. ];

 

多生产多消费者

多进程守护执行,每个队列可以均衡的分配,如果为了更加省心可以使用同步锁机制执行

console/yii-advanced queue/listen

 

为了进程持续有效不掉档启用。可以使用:supervisor 服务

 

5. 启动/监听队列

yii2 使用command命令启动队列 命令有:

1. php yii queue/info

查看目前去队列信息,主要是读取存储的数据,貌似yii2框架队列的数据一直会存储在redis中

2. php yii queue/run

直接运行队列,跑完后会终止进程

3. php yii queue/listen

监听队列,只要有新的队列过来就会处理,进程直到自行关闭才回退出

  • yii queue/clear//清空初始化
  • yii queue/run // 执行
  • yii queue/listen //监听队列
  • yii queue/info 来//队列状态

yii queue/listen [wait]
listen命令启动一个守护进程,它可以无限查询队列。如果有新的任务,他们立即得到并执行。
wait是下一次查询队列的时间 当命令正确地通过supervisor来实现时,这种方法是最有效的。
yii queue/run
run命令获取并执行循环中的任务,直到队列为空。适用与cron。
run与listen命令的参数:
—verbose,-v: 将执行状态输出到控制台。
—isolate: 详细模式执行作业。如果启用,将打印每个作业的执行结果。
—color: 高亮显示输出结果。
yii queue/info
info命令打印关于队列状态的信息。

6. 使用Supervisor将队列任务启动 添加到守护进程中

推荐安装Supervisor,将 php yii queue/listen 等一系列队列进程,添加到进程保护中,防止中途崩溃时候,可以自救,哈哈~😄
关于Supervisor,可以参考《centos安装Supervisor以及简单配置(添加进程守护)

 

Redis队列

Yii2 queue的队列使用详解-Mr.Li's Blog

queue.attempts: Hash   任务接收次数信息

queue.message_id: String  任务总发布数量

queue.messages: Hash 任务消息对象数据

queue.reserved: Zset   

queue.waiting: List     等待处理的消息队列 (消息对象数据来自 queue.messages,用key进行关联)

queue.moving_lock:String   队列锁

Yii2 queue的队列使用详解-Mr.Li's BlogYii2 queue的队列使用详解-Mr.Li's BlogYii2 queue的队列使用详解-Mr.Li's BlogYii2 queue的队列使用详解-Mr.Li's Blog

Yii2 queue的队列使用详解-Mr.Li's BlogYii2 queue的队列使用详解-Mr.Li's Blog

Redis队列监听执行

Yii2 queue的队列使用详解-Mr.Li's Blog

RabbitMq队列监听执行【RabbitMq不支持失败重试一但失败整个对了将停止运行

Yii2 queue的队列使用详解-Mr.Li's Blog

总结

  • 安装yiisoft/yii2-queue
  • 配置
  • 创建队列任务类
  • 控制器将数据添加到队列中
  • 启动/监听队列
  • 使用Supervisor将队列任务启动 添加到守护进程中

 

本文链接:,转发请注明来源!
评论已关闭。