shmilyzxt 2016-11-29 17:42:33 57098次浏览 27条评论 72 17 0

yii2-queue

a yii2 extension to make simple to use queue.

yii2-queue让队列的使用在yii2中变得更轻松,她为各种队列组件的使用提供了一个标准的接口,您只需要配置好需要使用的队列组件,就能轻松使用,
同时您在不同队列组件之间的切换也只需要修改下配置文件,重启下队列监听进程即可,目前支持数据库队列,redis队列,beanstalkd队列,
其它队列中间件支持正在添加中(当然,聪明的你也可以自行扩展)。

Installation

The preferred way to install this extension is through composer.

Either run

php composer.phar require --prefer-dist shmilyzxt/yii2-queue "dev-master"

or add

"shmilyzxt/yii2-queue": "dev-master"

to the require section of your composer.json file.

Usage

1:在配置文件中配置好需要使用的队列,完整的配置代码如下:

数据库队列

'queue' => [
            'class' => 'shmilyzxt\queue\queues\DatabaseQueue', //队列使用的类
            'jobEvent' => [ //队列任务事件配置,目前任务支持2个事件
                'on beforeExecute' => ['shmilyzxt\queue\base\JobEventHandler','beforeExecute'],
                'on beforeDelete' => ['shmilyzxt\queue\base\JobEventHandler','beforeDelete'],
            ],
            'connector' => [//队列中间件链接器配置(这是因为使用数据库,所以使用yii\db\Connection作为数据库链接实例)
                'class' => 'yii\db\Connection',
                'dsn' => 'mysql:host=localhost;dbname=yii2advanced',
                'username' => 'root',
                'password' => '',
                'charset' => 'utf8',
            ],
            'table' => 'jobs', //存储队列数据表名
            'queue' => 'default', //队列的名称
            'expire' => 60, //任务过期时间
            'maxJob' =>0, //队列允许最大任务数,0为不限制
            'failed' => [//任务失败日志记录(目前只支持记录到数据库)
                'logFail' => true, //开启任务失败处理
                'provider' => [ //任务失败处理类
                    'class' => 'shmilyzxt\queue\failed\DatabaseFailedProvider',
                    'db' => [ //数据库链接
                        'class' => 'yii\db\Connection',
                        'dsn' => 'mysql:host=localhost;dbname=yii2advanced',
                        'username' => 'root',
                        'password' => '',
                        'charset' => 'utf8',
                    ],
                    'table' => 'failed_jobs' //存储失败日志的表名
                ],
            ],
        ]

redis队列

'queue' => [
            'class' => 'shmilyzxt\queue\queues\RedisQueue',
            'jobEvent' => [
                'on beforeExecute' => ['shmilyzxt\queue\base\JobEventHandler','beforeExecute'],
                'on beforeDelete' => ['shmilyzxt\queue\base\JobEventHandler','beforeDelete'],
            ],
            'connector' => [ //需要安装 predis\predis 扩展来操作redis
                'class' => 'shmilyzxt\queue\connectors\PredisConnector',
                'parameters' => [
                    'scheme' => 'tcp',
                    'host' => '127.0.0.1',
                    'port' => 6379,
                    //'password' => '1984111a',
                    'db' => 0
                ],
                'options'=> [],
            ],
            'queue' => 'default',
            'expire' => 60,
            'maxJob' => 0,
            'failed' => [
                'logFail' => true,
                'provider' => [
                    'class' => 'shmilyzxt\queue\failed\DatabaseFailedProvider',
                    'db' => [
                        'class' => 'yii\db\Connection',
                        'dsn' => 'mysql:host=localhost;dbname=yii2advanced',
                        'username' => 'root',
                        'password' => '',
                        'charset' => 'utf8',
                    ],
                    'table' => 'failed_jobs'
                ],
            ],
        ]

beanstalkd队列

'queue' => [
            'class' => 'shmilyzxt\queue\queues\BeanstalkdQueue',
            'jobEvent' => [
                'on beforeExecute' => ['shmilyzxt\queue\base\JobEventHandler','beforeExecute'],
                'on beforeDelete' => ['shmilyzxt\queue\base\JobEventHandler','beforeDelete'],
            ],
            'connector' => [ //需要安装 pad\pheanstalk 扩展来操作beastalkd
                'class' => 'shmilyzxt\queue\connectors\PheanstalkConnector',
                'host' => '114.55.142.6',
                'port' => 11300
            ],
            'queue' => 'default',
            'expire' => 60,
            'maxJob' => 0,
            'failed' => [
                'logFail' => true,
                'provider' => [
                    'class' => 'shmilyzxt\queue\failed\DatabaseFailedProvider',
                    'db' => [
                        'class' => 'yii\db\Connection',
                        'dsn' => 'mysql:host=localhost;dbname=yii2advanced',
                        'username' => 'root',
                        'password' => '',
                        'charset' => 'utf8',
                    ],
                    'table' => 'failed_jobs'
                ],
            ],
        ],

2:在components数组配置项中配置好队列后,就可以开始使用队列了,首先是任务入队列,提供两个方法:
\Yii::$app->queue->pushOn($hander,$data,$queue='default')
即时任务入队列:这样的任务入队列后,如果队列监听在运行,那么任务会立刻进入ready状态,可以被监听进程执行。
该方法有3个参数,第一个为任务处理handler,第二个为任务数据,第三个为队列名称,默认为 default。
\Yii::$app->queue->laterOn($delay,$handler,$data,$queue='default')
延时任务入队列:这样的任务入队列后不会立刻被队列监听进程之行,需要等待 $delay秒后任务才就绪。

目前支持的handler有:
1,新建自己的队列处理handler,继承、shmilyzxt\queue\base\JobHandler,并实现任务处理方法handle()和失败处理方法failed()。
2, 一个php闭包,形如 function($job,$data){}

\Yii::$app->queue->pushOn(new SendMial(),['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email');
\Yii::$app->queue->pushOn(function($job,$data){var_dump($data)},['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email');

\Yii::$app->queue->laterOn(120,new SendMial(),['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email');
\Yii::$app->queue->pushOn(120,function($job,$data){var_dump($data)},['email'=>'49783121@qq.com','title'=>'test','content'=>'email test'],'email');

3:新建自己的队列处理handler,继承shmilyzxt\queue\base\JobHandler,并实现任务处理方法handle和失败处理方法failed,一个发邮件的jobhandler类似:

class SendMail extends JobHandler
{

    public function handle($job,$data)
    {
        if($job->getAttempts() > 3){
            $this->failed($job);
        }

        $payload = $job->getPayload();

        //$payload即任务的数据,你拿到任务数据后就可以执行发邮件了
        //TODO 发邮件
    }

    public function failed($job,$data)
    {
        die("发了3次都失败了,算了");
    }
}

4:启动后台队列监听进程,对任务进行处理,您可以使用yii console app来启动,你也可以使用更高级的如swoole来高效的运行队列监听,
目前提供了一个Worker类,在控制台程序使用Worker::listen(Queue $queue,$queueName='default',$attempt=10,$memory=512,$sleep=3,$delay=0)可以
启动队列监听进程,其中 $attempt是任务尝试次数,$memory是允许使用最大内存,$sleep表示每次尝试从队列中获取任务的间隔时间,$delay代表把任务重新加入队列
时是否延时(0代表不延时),一个标准yii console app 启动队列监听进程代码如下;

class WorkerController extends \yii\console\Controller
{
    public function actionListen($queueName='default',$attempt=10,$memeory=128,$sleep=3 ,$delay=0){
        Worker::listen(\Yii::$app->queue,$queueName,$attempt,$memeory,$sleep,$delay);
    }
}
yii worker/listen default 10 128 3 0

当后台监听任务启动起来后,一但有任务加入队列,队列就会调用入队列时设置的handler对队列任务进行处理了。每次会pop出一个任务进行处理,处理完成后删除任务,直到队列为空。

5:关于任务失败处理:
默认情况下,一个任务在执行时出现异常或者一个任务失败时并不是认为它真正失败了,此时会检测它的尝试次数是否已经超出设置的attempt,如果没超出会重新入队列尝试,如果超出了,
则该任务才是真正失败,这是会先调用任务处理handler类的failed()方法处理失败操作,如果没有failed()方法(比如handler为闭包或者您自定义的继承自shmilyzxt\queue\base\JobHandler
的类没有写failed()方法),则会尝试使用扩展自身的失败日志处理机制(配置项里的failed配置),会尝试把失败任务的详细信息写入到数据库表中(目前只支持数据库方式)。
建议您采用继承shmilyzxt\queue\base\JobHandler的方式生成任务处理handler并写自己的failed方法处理失败任务。

6:任务事件支持:
目前任务支持2个事件(beforeExecute,beforeDelete), beforeExecute是在任务被pop出来之后,执行之前执行。beforeDelete是任务在被删除之前执行
您可以使用这两个事件做自定易操作,只需要像上面配置文件里配置 jobEvent那样绑定事件处理handler即可 1。

觉得很赞
  • 评论于 2016-12-05 09:35 举报

    楼主就、讲解的不错!

    1 条回复
    评论于 2018-10-23 09:44 回复

    请问以下yii2 基础版本 怎么安装queue呢,高级版本安装成功了,但是我现在项目用的是基础版本,最后没有init.bat来重置生成 在cmd下 yii后,都没有关于queue的命令

  • 评论于 2016-12-23 09:15 举报

    不错,学习了!

  • 评论于 2016-12-24 16:06 举报

    真的很不错,之前没有看,所以在做邮件营销系统和短信营销系统的时候还自己造了个轮子~

  • 评论于 2016-12-28 16:42 举报

    为啥不用phpredis来支持redis呢,phpredis相对于predis性能上差距甚大

    1 条回复
    评论于 2018-10-23 09:44 回复

    请问以下yii2 基础版本 怎么安装queue呢,高级版本安装成功了,但是我现在项目用的是基础版本,最后没有init.bat来重置生成 在cmd下 yii后,都没有关于queue的命令

  • 评论于 2017-01-26 17:30 举报

    2017-01-27更新:添加Apahce ActiveMQ队列支持。添加错误日志文件格式支持
    activeMQ配置如下:

    'queue' => [
        'class' => 'shmilyzxt\queue\queues\ActivemqQueue',
        'jobEvent' => [
            'on beforeExecute' => ['shmilyzxt\queue\base\JobEventHandler','beforeExecute'],
            'on beforeDelete' => ['shmilyzxt\queue\base\JobEventHandler','beforeDelete'],
        ],
        'connector' => [ 
            'class' => 'shmilyzxt\queue\connectors\ActivemqConnector',
            'broker' => 'tcp://127.0.0.1:61613',
            'timeout' => 10
        ],
        'queue' => 'default',
        'expire' => 60,
        'maxJob' => 0,
        'failed' => [
            'logFail' => true,
            'provider' => [
                'class' => 'shmilyzxt\queue\failed\FileFaildProvider',//文件错误日志
                'filePath' => 'D://xampp/htdocs/advanced/vendor/shmilyzxt/yii2-queue/failed/log/failed.log',
            ],
        ],
    ]
    
  • 评论于 2017-02-03 10:13 举报

    ActiveMQ不支持延时队列任务。如果大家有好的实现方法欢迎给建议。

  • 评论于 2017-02-15 14:02 举报

    mark!

  • 评论于 2017-02-28 09:28 举报

    由于我的疏忽,很多文件编码弄成GBK了,请注意!

    1 条回复
    评论于 2017-03-23 09:38 回复

    能不能给个详细使用教程

  • 评论于 2017-03-16 12:04 举报

    任务失败的数据表(failed_jobs)表结构是什么样的?没有看见sql脚本

    1 条回复
    评论于 2017-03-23 09:40 回复

    sql脚本已提交

  • 评论于 2017-03-23 06:05 举报

    如何使用??

    5 条回复
    评论于 2017-03-23 09:40 回复

    我已经写的很清楚了吧~

    评论于 2017-03-23 13:54 回复

    比如发送邮件 在handle邮件发送成功之后如何自动终止掉?

    评论于 2017-03-23 14:04 回复

    你的意思是终止队列监听任务?你为何要终止它呢?,如果没有任务它就会处于等待状态,只到有新的邮件任务来它又继续发,你如果终止了,有新的发邮件任务入队列了怎么办?

    评论于 2017-03-23 14:12 回复

    我pushOn 了 三条数据做测试 。 接着我 在 handle 写了 发送邮件。 接着我启动监听。邮件全部发送完了之后 程序还在请求。整个程序都死掉了

    评论于 2017-04-28 21:38 回复

    多看多学多听多做

  • 评论于 2017-03-23 14:19 举报

    QQ图片20170323031720.png

    QQ图片20170323031752.png

    我做了下测试 邮件全部发送完毕之后 监听事件还在请求 程序卡死

    5 条回复
    评论于 2017-03-23 14:20 回复

    图片已发

    评论于 2017-03-23 14:31 回复

    Worker只是提供了一种思路去处理队列任务,它会不停的请求(请求间隔时间可以自定义,默认3秒)。扩展关注的是各种队列使用接口的统一,在任务进入队列后,怎么处理最佳,应该由你来决定,不一定使用我提供的Worker。

    评论于 2017-03-23 14:33 回复

    比如你可以写个crontab任务,每小时检测一次,调用JobHandler

    评论于 2017-03-23 14:43 回复

    突然间傻了 明白了 谢谢

    评论于 2019-12-18 18:20 回复

    您好,麻烦问一下,您是在command下边创建了一个文件里边写的listen方法和movderify方法吗,我用的是yii2-queue的扩展,参考的文章https://www.jb51.net/article/165724.htm 是这个,我是应该在command下边创建一个queue的控制器文件,里边写个方法,放入Yii::$app->queue->push执行下载图片的队列吗,目前我是这么写的,但是程序执行不成功,一直没有执行完毕,并且图片没有下载下来,麻烦问一下这个应该怎么用

  • 评论于 2017-04-25 17:14 举报

    你的这个消息队列组建在使用redis的时候,不是用redis的发布和订阅模式啊,而是用死循环读取数据,严格来说,这个不是消息队列

  • 评论于 2017-05-10 14:33 举报

    你好,今天刚用yii2,现在业务需求准备用消息队列,想问下:队列监听是服务器一直监听是吧,会不会给性能造成影响?会不会造成服务器卡死的情况?

    1 条回复
    评论于 2017-07-13 09:33 回复

    队列监听是会一直监听的,只要运行了就肯定有影响性能

  • 评论于 2017-07-07 13:34 举报

    多谢分享,mark----yii2队列扩展

  • 评论于 2017-07-19 09:33 举报

    控制台监听后,但是一直处于 sleep,怎么才算判断执行成功?redis 没有保存到东西

  • 评论于 2017-07-21 10:03 举报

    最近较忙,统一回复一下关于队列监听的问题,我这里只是提供了一种yii console的方式来监听队列,一旦运行就会一直监听,关于队列监听,不是该扩展重点关注的地方,您应该实现自己的方式,不一定使用我的方式,毕竟消费者可能都不是php程序。yii发布了官方队列扩展,实现思路跟我很像,也是提供了yii console的方式监听队列,大家可以参考 yiisoft/yii2-queue.

  • 评论于 2017-08-17 17:52 举报

    made,不,是mark

  • 评论于 2017-09-18 23:14 举报

    请问这是阻塞队列吗?我用这个经常会有任务在重新打开监听的时候才处理

  • 评论于 2017-12-21 20:16 举报

    php composer.phar require --prefer-dist yiisoft/yii2-queue 还不如用yii的

    4 条回复
    评论于 2017-12-26 11:07 回复

    是的,建议用官方的,如果官方早一年出来,我也不用费力去搞了~!

    评论于 2017-12-26 15:48 回复

    大侠 yiisoft/yii2-queue 这个怎么用 新人 可以知道一下吗?

    评论于 2017-12-26 15:49 回复

    yiisoft/yii2-queue 我配置出错

    评论于 2017-12-28 11:41 回复

    github文档很详细呀

  • 评论于 2018-04-25 17:55 举报

    大神 predis扩展也安装了。 报错Setting unknown property: shmilyzxt\queue\queues\RedisQueue::table 求解

    1 条回复
    评论于 2018-04-27 16:01 回复

    shmilyzxt\queue\queues\RedisQueue 这个类里没有table属性啊,你是不是配置错了。完整报错发来看看

您需要登录后才可以评论。登录 | 立即注册