Laravel 5.5 队列 ゞ 浴缸里的玫瑰 2022-04-23 10:36 190阅读 0赞 ## 简介 ## Laravel 队列为不同的后台队列服务提供了统一的 API,例如 Beanstalk,Amazon SQS,Redis,甚至其他基于关系型数据库的队列。 队列的目的是将耗时的任务延时处理,比如发送邮件,从而大幅度缩短 Web 请求和响应的时间。 队列的配置文件存放在 config/queue.php。 每一种队列驱动的配置都可以在该文件中找到,包括数据库、Beanstalkd、Amazon SQS、Redis以及sync同步(本地使用)驱动。其中还包含了一个 null 队列驱动用于那些放弃队列的任务。 sync 同步驱动是指采用同步队列的方式执行任务,它是 Laravel 队列的默认配置。(开发环境建议使用 sync 同步驱动,方便调试。) 注:Laravel 现在提供了基于 Redis 的,拥有美观的后台和配置系统的 Horizon 队列扩展包,完整信息请参考 [Horizon文档][Horizon]。 ### 连接 ### 首先,我们必须知道连接和队列的关系。 在配置文件 config/queue.php 中,有一个 connections 配置项。该配置项定义了后台队列服务的特定连接,如 Amazon SQS、Redis、Database 等。 每种连接都可以有很多队列,可以想象在银行办理现金业务的各个窗口队列。 配置文件中的每个连接配置示例都有一个 queue 属性。当新的队列任务被添加到指定的连接时,该配置项的值就是默认监听的队列名称。如果你没有指定队列名称,那么 queue 的值,就是该任务默认添加到的队列名称。 queue 的值默认为 default。 // 以下的任务将被委派到默认队列 dispatch(new Job); // 以下任务将被委派到 "emails" 队列 dispatch((new Job)->onQueue('emails')); 有些应用并不需要将任务分配到多个队列,单个队列已经非常适用。但是,应用的任务有优先级差异或者类别差异时,推送任务到多个队列将是更好地选择。 Laravel 的队列进程支持通过优先级指定处理的队列。例如,你可以将高优先级的任务委派到 high (高优先级)队列,从而让它优先执行。 php artisan queue:work --queue=high,default ### 驱动 ### 数据库 要使用 database 队列驱动,需要先创建数据表来保存任务信息。 要生成创建这些表的迁移,可以运行 Artisan 命令 queue:table,迁移被创建之后,可以使用 migrate 命令生成这些表。 php artisan queue:table php artisan migrate 之后,就会自动在对应的数据库中创建下面的 jobs 表。 CREATE TABLE `jobs` ( `id` bigint(20) unsigned NOT NULL AUTO_INCREMENT, `queue` varchar(191) COLLATE utf8mb4_unicode_ci NOT NULL, `payload` longtext COLLATE utf8mb4_unicode_ci NOT NULL, `attempts` tinyint(3) unsigned NOT NULL, `reserved_at` int(10) unsigned DEFAULT NULL, `available_at` int(10) unsigned NOT NULL, `created_at` int(10) unsigned NOT NULL, PRIMARY KEY (`id`), KEY `jobs_queue_index` (`queue`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; 分发(委派)任务后,任务会被自动保存到 jobs 表。 当运行队列进程时,就可以处理队列中的任务。如果任务处理(handle方法)过程中没有抛出异常,则认为该任务处理成功,队列中的任务就会被自动删除。如果 handle 方法中有异常抛出,该任务会重新回到队列自动重试,直到达到最大尝试次数,之后会从 jobs 表中删除,转而存储到 failed\_jobs 表。 Redis 要使用 redis 队列驱动,需要在配置文件 config/database.php 中配置 Redis 连接。 如果 Redis 队列连接使用 Redis Cluster(集群),队列名称必须包含 key hash tag,以确保给定队列对应的所有 Redis keys 都存放到同一个 hash slot 中。 'redis' => [ 'driver' => 'redis', 'connection' => 'default', 'queue' => '{default}', 'retry_after' => 90, ], 注:对一般中小型应用推荐使用 Redis 作为队列驱动。 使用以下几种队列驱动时,需要安装相应的依赖: * Amazon SQS: aws/aws-sdk-php ~3.0 * Beanstalkd: pda/pheanstalk ~3.0 * Redis: predis/predis ~1.0 ## 创建任务 ## ### 生成任务类 ### 通常,所有的任务类都保存在 app/Jobs 目录。如果 app/Jobs 不存在,在运行 Artisan 命令 make:job 时,它将会自动创建。你可以通过 Artisan CLI 来生成队列任务类: php artisan make:job ProcessPodcast 生成的类都实现了 Illuminate\\Contracts\\Queue\\ShouldQueue 接口,它告诉 Laravel 将该任务推送到队列,而不是立即运行。 任务类非常简单,通常只包含处理该任务的 handle 方法,让我们看一个任务类的例子。 <?php namespace App\Jobs; use App\Podcast; use App\AudioProcessor; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; class ProcessPodcast implements ShouldQueue { use InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * 创建任务实例 * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * 执行任务 * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // 处理逻辑 } } 在本示例中,我们将 Eloquent 模型作为参数直接传递到构造函数。因为该任务使用了 SerializesModels trait,Eloquent 模型将会在任务被执行时优雅地序列化和反序列化。如果你的队列任务在构造函数中接收 Eloquent 模型,只有模型的主键会被序列化到队列,当任务真正被执行的时候,队列系统会自动从数据库中获取整个模型实例。这对应用而言是完全透明的,从而避免序列化整个 Eloquent 模型实例引起的问题。 handle 方法在任务被处理的时候调用,可以在 handle 方法中进行依赖注入。Laravel 服务容器会自动注入这些依赖。 注:二进制数据,如原生图片内容,在传递给队列任务之前先经过 base64\_encode 方法处理,此外,该任务被推送到队列时将不会被序列化为 JSON 格式。 ## 分发任务 ## 创建好任务类后,就可以通过任务自身的 dispatch 方法将其分发到队列。 dispatch 方法需要的唯一参数就是该任务的实例。 <?php namespace App\Http\Controllers; use App\Jobs\ProcessPodcast; use Illuminate\Http\Request; use App\Http\Controllers\Controller; class PodcastController extends Controller { /** * Store a new podcast. * * @param Request $request * @return Response */ public function store(Request $request) { // Create podcast... ProcessPodcast::dispatch($podcast); } } ### 延时分发 ### 有时你可能想要延迟队列任务的执行,这可以通过在分发任务时使用 delay 方法实现。 例如,你希望让某个任务在创建 10 分钟以后才执行: ProcessPodcast::dispatch($podcast) ->delay(now()->addMinutes(10)); 注:Amazon SQS 的队列服务最长延时 15 分钟。 ### 任务链 ### 任务链允许你指定一个需要在一个序列中执行的队列任务列表,如果序列中的某个任务失败,其它任务将不再运行。 要执行一个队列任务链,可以使用任意可分发任务上的 withChain 方法: ProcessPodcast::withChain([ new OptimizePodcast, new ReleasePodcast ])->dispatch(); ### 分发到指定的队列 ### 通过推送任务到不同的队列名称,可以将队列任务进行“分类”,甚至根据优先级来分配每个队列的进程数。 请注意,这并不意味着使用了配置项中那些不同的连接来管理队列,实际上只有单一连接会被用到。 要指定队列,可以使用 onQueue 方法。 ProcessPodcast::dispatch($podcast)->onQueue('default2'); ### 分发到指定的连接 ### 如果你使用了多个连接来管理队列,也可以分发任务到指定的连接。 要指定连接,可以使用 onConnection 方法。 ProcessPodcast::dispatch($podcast)->onConnection('sqs'); 当然,你可以同时使用 onConnection 和 onQueue 方法来指定任务的连接和队列。 $job = (new ProcessPodcast($podcast)) ->onConnection('sqs') ->onQueue('default2'); ## 最大失败次数 ## 指定队列任务最大失败次数的一种实现方式是通过 Artisan 命令 --tries 选项。 php artisan queue:work --tries=3 也可以在任务类中定义最大失败次数,来实现更加细粒度的控制。如果最大失败次数在任务类中指定,则其优先级高于命令行的指定。 <?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue { /** * The number of times the job may be attempted. * * @var int */ public $tries = 5; } ## 基于时间的尝试次数 ## 除了定义在任务失败前的最大尝试次数外,还可以定义在指定时间内允许任务的最大尝试次数,这可以通过在任务类中添加 retryUntil 方法来实现。 /** * Determine the time at which the job should timeout. * * @return \DateTime */ public function retryUntil() { return now()->addSeconds(5); } 注:还可以在队列时间监听器中定义 retryUntil 方法。 ## 超时时间 ## 注:timeout 方法为 PHP 7.1+ 和 pcntl 扩展做了优化。 队列任务最大运行时长(秒)可以通过 Artisan 命令上的 --timeout 开关来指定。 php artisan queue:work --timeout=30 你也可以在任务类中定义该任务允许运行的最大时长(秒),任务中指定的超时时间优先级高于命令行定义的数值。 <?php namespace App\Jobs; class ProcessPodcast implements ShouldQueue { /** * The number of seconds the job can run before timing out. * * @var int */ public $timeout = 120; } ## 频率限制 ## 注:该功能要求应用可以与 Redis 服务器进行交互。 如果使用了 Redis,那么可以使用时间或并发来控制队列任务。当队列任务与有频率限制的 API 交互时很有帮助。 通过 throttle 方法,可以限定给定类型任务每 60 秒只运行 10 次。如果不能获取锁,需要将任务释放回队列以便可以再次执行。 Redis::throttle('key')->allow(10)->every(60)->then(function () { // Job logic... }, function () { // Could not obtain lock... return $this->release(10); }); 注:在上面的例子中,key 可以是任意可以唯一标识你想要限定访问频率的任务类型的字符串。例如,这个键可以基于任务类名和操作 Eloquent 模型的 ID 进行构建。 除此之外,还可以指定同时处理给定任务的最大进程数量。这个功能在队列任务正在编辑一次只能由一个任务进行处理的资源时很有用。例如,使用 funnel 方法你可以让给定类型任务一次只能由一个工作进程进行处理。 Redis::funnel('key')->limit(1)->then(function () { // Job logic... }, function () { // Could not obtain lock... return $this->release(10); }); 注:使用频率限制时,任务在运行成功之前需要的最大尝试次数很难权衡,因此,将频率限制和基于时间的尝试次数结合起来使用是个不错的选择。 ## 处理错误 ## 如果任务在处理的时候有异常抛出,则该任务将会被自动释放回队列以便再次尝试执行。任务会持续被释放直到尝试次数达到应用允许的最大次数。 最大尝试次数通过 Artisan 命令 queue:work 上的 --tries 开关来定义。此外,该次数也可以在任务类自身上定义。 ## 运行队列进程 ## Laravel 自带了一个队列进程用来**处理被推送到队列的任务**。可以使用 queue:work 命令运行这个队列进程。 **注意:** 在 Linux 中,队列进程开始运行后,会持续监听队列,直至你手动停止或关闭终端。 php artisan queue:work 注:为了保持队列进程 queue:work 持续在后台运行,需要使用进程守护程序,比如 Supervisor 来确保队列进程持续运行。 **注意:** 队列进程是长生命周期的进程,会在启动后驻留内存。若应用有任何改动将不会影响到已经启动的进程。所以请在发布程序后,重启队列进程。 ### 处理单个任务 ### \--once 选项可用于告知进程只处理队列中的单个任务。 php artisan queue:work --once ### 指定连接和队列 ### 队列进程可以自定义连接和队列。 传递给 work 命令的连接名需要与配置文件 config/queue.php 中定义的某个连接配置相匹配。 php artisan queue:work redis 可以自定义将某个队列进程指定某个连接来管理。例如,如果所有的邮件任务都是通过 redis 连接上的 emails 队列处理,那么可以用以下命令来启动单一进程只处理单一队列。 php artisan queue:work redis --queue=emails ### 资源注意事项 ### 后台队列进程不会在处理每个任务前重启框架,因此你需要在每次任务完成后释放所有重量级的资源。 例如,如果你在使用 GD 库处理图片,需要在完成的时候使用 imagedestroy 来释放内存。 ### 队列优先级 ### 有时你需要区分任务的优先级。比如,在配置文件 config/queue.php 中,你可以定义 redis 连接的默认 queue 为 low。不过,如果需要将任务分发到高优先级 high,可以这么做。 dispatch((new Job)->onQueue('high')); 如果期望所有 high 高优先级的队列都将先于 low 低优先级的任务执行,可以像这样启动队列进程。 php artisan queue:work --queue=high,low ### 重启队列进程 ### 队列进程是长生命周期的进程,在重启以前,所有源码的修改并不会对其产生影响。 所以,最简单的方法是在每次发布新版本后重新启动队列进程。你可以通过 Aritisan 命令 queue:restart 来优雅地重启队列进程。 php artisan queue:restart 该命令将在队列进程完成正在进行的任务后,结束该进程,避免队列任务的丢失或错误。由于队列进程会在执行 queue:restart 命令后死掉,你仍然需要通过进程守护程序如 Supervisor 来自动重启队列进程。 注:队列使用缓存来存储重启信号,所以在使用此功能前你需要验证缓存驱动配置正确。 ### 任务过期 ### 在配置文件 config/queue.php 中,每个连接都定义了 retry\_after 项。该配置项的目的是定义任务在执行多少秒后释放回队列。 如果 retry\_after 设定的值为 90, 任务在运行 90 秒后还未完成,那么将被释放回队列而不是删除掉。毫无疑问,你需要把 retry\_after 的值设定为任务执行时间的最大可能值。 注:只有 Amazon SQS 配置信息不包含 retry\_after 项。Amazon SQS 的任务执行时间基于 Default Visibility Timeout ,该项在 Amazon AWS 控制台配置。 ### 队列进程超时 ### 队列进程 queue:work 可以设定超时 --timeout 项。它控制队列进程执行每个任务的最长时间,如果超时,该进程将被关闭。 各种错误都可能导致某个任务处于“冻结”状态,比如 HTTP 无响应等。队列进程超时就是为了将这些“冻结”的进程关闭。 php artisan queue:work --timeout=60 配置项 retry\_after 和 Aritisan 参数项 --timeout 不同,但目的都是为了确保任务的安全,并且只被成功的执行一次。 **注意:** 参数项 --timeout 的值应该始终小于配置项 retry\_after 的值,这是为了确保队列进程总在任务重试以前关闭。如果 --timeout 比retry\_after 大,那么你的任务可能被执行两次。 ### 进程休眠时间 ### 当任务在队列中有效时,进程会持续处理任务,没有延迟。不过,我们可以使用 sleep 配置项来指定没有新的有效任务产生时的休眠时间。 php artisan queue:work --sleep=3 ## 处理失败的任务 ## 不可避免会出现运行失败的任务。你不必为此担心,Laravel 可以轻松设置任务允许的最大尝试次数,若是执行次数达到该限定,该任务会被插入到 failed\_jobs 表。 要创建一个 failed\_jobs 表的迁移,可以使用 queue:failed-table 命令。 php artisan queue:failed-table php artisan migrate 【失败的任务表】的表结构为: CREATE TABLE `failed_jobs` ( `id` int(10) unsigned NOT NULL AUTO_INCREMENT, `connection` text COLLATE utf8_unicode_ci NOT NULL, `queue` text COLLATE utf8_unicode_ci NOT NULL, `payload` longtext COLLATE utf8_unicode_ci NOT NULL, `failed_at` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci COMMENT='失败的任务表'; 然后,运行队列进程时,通过 --tries 参数项来设置队列任务允许的最大尝试次数,如果没有指定 --tries 选项,任务失败后会被无限重试。 **说明:** 失败的任务表和队列驱动的具体类型无关,只要采用异步驱动(database、beanstalkd、sqs或者redis),都需要创建该失败的任务表,来保存超过最大尝试次数的失败任务。否则,任务失败后会一直保存在队列中。 失败的任务表的表名可以自定义。 php artisan queue:work redis --tries=3 ## 失败的任务通知 ## 可以在任务类中定义 failed 方法, 从而允许你在失败发生时执行指定的动作,比如发送任务失败的通知,记录日志等。导致任务失败的 Exception 会被传递到 failed 方法。 <?php namespace App\Jobs; use Exception; use App\Podcast; use App\AudioProcessor; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; class ProcessPodcast implements ShouldQueue { use InteractsWithQueue, Queueable, SerializesModels; protected $podcast; /** * Create a new job instance. * * @param Podcast $podcast * @return void */ public function __construct(Podcast $podcast) { $this->podcast = $podcast; } /** * Execute the job. * * @param AudioProcessor $processor * @return void */ public function handle(AudioProcessor $processor) { // Process uploaded podcast... } /** * The job failed to process. * * @param Exception $exception * @return void */ public function failed(Exception $exception) { // 发送失败通知, etc... } } ## 任务失败事件 ## 如果你期望在任务失败的时候触发某个事件,可以使用 Queue::failing 方法。该事件通过邮件或 HipChat 通知团队。 例如,可以在 Laravel 自带的 AppServiceProvider 中添加一个回调到该事件。 <?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Queue\Events\JobFailed; use Illuminate\Support\ServiceProvider; class AppServiceProvider extends ServiceProvider { /** * 启动应用服务. * * @return void */ public function boot() { Queue::failing(function (JobFailed $event) { // $event->connectionName // $event->job // $event->exception }); } /** * 注册服务提供者. * * @return void */ public function register() { // } } ## 重试失败的任务 ## 要查看已插入到 failed\_jobs 数据表中的所有失败任务,可以使用 Artisan 命 queue:failed。 php artisan queue:failed 该命令将会列出失败任务 ID(主键)、连接、队列和失败时间,主键 ID 可用于重试失败任务。 **重试失败的任务是指将失败的任务重新放回队列。** 例如,要重试一个 ID 为 5 的失败任务,可以运行下面的命令。 php artisan queue:retry 5 要重试所有失败任务,运行如下命令即可。 php artisan queue:retry all 如果要删除一个失败任务,可以使用 queue:forget 命令。 php artisan queue:forget 5 要删除所有失败任务,可以使用 queue:flush 命令。 php artisan queue:flush ## 任务事件 ## 通过 Queue 门面提供的 before 和 after 方法可以在任务被处理之前或之后指定要执行的回调。这些回调可用来记录日志或者记录统计数据。 通常,你可以在服务提供者中使用这些方法。比如,可以在 AppServiceProvider 这样用。 <?php namespace App\Providers; use Illuminate\Support\Facades\Queue; use Illuminate\Support\ServiceProvider; use Illuminate\Queue\Events\JobProcessed; use Illuminate\Queue\Events\JobProcessing; class AppServiceProvider extends ServiceProvider { /** * Bootstrap any application services. * * @return void */ public function boot() { Queue::before(function (JobProcessing $event) { // $event->connectionName // $event->job // $event->job->payload() }); Queue::after(function (JobProcessed $event) { // $event->connectionName // $event->job // $event->job->payload() }); } /** * Register the service provider. * * @return void */ public function register() { // } } 使用 Queue 门面上的 looping 方法,可以在进程尝试从队列中获取任务之前指定要执行的回调。 例如,你可以注册一个闭包来回滚之前失败任务遗留下来的事务。 Queue::looping(function () { while (DB::transactionLevel() > 0) { DB::rollBack(); } }); ## Supervisor 的用法 ## ### 安装 Supervisor ### Supervisor 是 Linux 系统中常用的进程守护程序。如果队列进程 queue:work 意外关闭,它会自动重启队列进程。 在 Ubuntu 安装 Supervisor 非常简单。 sudo apt-get install supervisor 注:如果自己配置 Supervisor 有困难,可以考虑使用 Laravel Forge,它会为 Laravel 项目自动安装并配置 Supervisor。 ### 配置 Supervisor ### Supervisor 配置文件通常存放在 /etc/supervisor/conf.d 目录,在该目录下,可以创建多个配置文件指示 Supervisor 如何监视进程。 例如,让我们创建一个开启并监视 queue:work 进程的 laravel-worker.conf 文件。 [program:laravel-worker] process_name=%(program_name)s_%(process_num)02d command=php /home/forge/app.com/artisan queue:work sqs --sleep=3 --tries=3 autostart=true autorestart=true user=forge numprocs=8 redirect_stderr=true stdout_logfile=/home/forge/app.com/worker.log 在本例中,numprocs 指令让 Supervisor 运行 8 个 queue:work 进程并监视它们,如果失败的话自动重启。当然,你需要修改 queue:work sqs 的 command 指令来映射你的队列连接。 ### 启动 Supervisor ### 当成功创建配置文件后,需要刷新 Supervisor 的配置信息并使用如下命令启动进程。 sudo supervisorctl reread sudo supervisorctl update sudo supervisorctl start laravel-worker:* 更多信息请查看 [Supervisor官方文档][Supervisor] 。 ## 监听队列 ## 监听队列,可以使用 queue:listen 命令。 php artisan queue:listen --queue=default2 php artisan queue:listen --queue=default --tries=3 Laravel 5.3 的文档已经不写 queue:listen 这个指令怎么用了,因此可以看出可能官方已经不怎么建议使用 queue:listen了。 但是在本地调试时,要使用 queue:listen。因为 queue:work 在启动后,如果代码修改了,queue:work 不会自动 Load 上下文,但是 queue:listen 就会重新 Load 新代码。 其余情况,全部推荐使用 queue:work,因为效率更高。 ## 常用命令 ## # 监听队列 php artisan queue:listen --queue=default --tries=3 # 执行 worker 进程(以守护进程的方式) php artisan queue:work --queue=default --tries=3 --daemon # 查看所有失败的任务 php artisan queue:failed # 重试所有失败任务(重新回到队列中) php artisan queue:retry all # 重试指定的失败任务 php artisan queue:retry <id> # 要删除一个失败任务,可以使用 queue:forget 命令 php artisan queue:forget 5 # 要删除所有失败任务,可以使用 queue:flush 命令 php artisan queue:flush # 重启队列 worker 进程 php artisan queue:restart # 查询帮助信息 php artisan queue:work --help [Horizon]: https://laravel.com/docs/5.5/horizon [Supervisor]: http://supervisord.org/index.html
还没有评论,来说两句吧...