Очереди
Очередь позволяет поместить медленную работу в буфер и дать отдельному воркер-процессу выполнить её позже — сохраняя HTTP-ответы быстрыми. Lift поставляет четыре драйвера (sync, в памяти array, Redis, база данных), базовый класс AbstractJob, повторные попытки, задержки, отслеживание проваленных задач и CLI-Worker с корректным завершением.
Ментальная модель: задача — это сериализуемый PHP-объект с методом
handle(). Очередь — это список задач, ожидающих выполнения. Воркер — это долгоживущий процесс, который извлекает задачи и вызываетhandle(). Сбой, повтор, провал, повтор сначала.
Когда ставить в очередь
| Операция | Sync или очередь? |
|---|---|
| Отправка транзакционного письма | Очередь — SMTP может занять секунды |
| Генерация PDF-отчёта | Очередь — держите запрос < 100 мс |
| Push-уведомление тысячам | Очередь — рассылка веером в воркере |
| Общение с ненадёжным сторонним API | Очередь — повторные попытки бесплатно |
| Чтение пользователя из БД для рендера страницы | Sync — пользователь ждёт |
| Обновление несвязанного кэша после записи | Очередь — некритичный путь |
Правило: всё, что строго не должно завершиться до возврата HTTP-ответа, должно ставиться в очередь.
Тур за 60 секунд
use Lift\Queue\AbstractJob;
// 1. Определить задачу
final class SendWelcomeEmail extends AbstractJob
{
public function __construct(private readonly string $email) {}
public function handle(): void
{
// …собственно отправить письмо…
}
}
// 2. Поместить из обработчика
$app->post('/signup', function (Request $req) use ($app) {
$email = $req->validate(['email' => 'required|email'])['email'];
$app->dispatch(new SendWelcomeEmail($email));
return Response::noContent();
});
// 3. Запустить воркер (отдельный CLI-процесс)
// vendor/bin/lift queue:work
Это весь жизненный цикл. Обработчик возвращается за миллисекунды; воркер делает медленную работу в фоне.
Обзор драйверов
| Драйвер | Класс | Где живут задачи | Использовать, когда |
|---|---|---|---|
| Sync | SyncQueue |
Нигде — выполняется сразу | По умолчанию. Разработка/тесты. Простые приложения. |
| Array | ArrayQueue |
Память PHP | Пакетирование в рамках запроса. Тесты. |
| Redis | RedisQueue |
Списки + sorted sets Redis | Продакшен по умолчанию. Дёшево, быстро, распределённо. |
| Database | DatabaseQueue |
SQL-таблица, которую вы контролируете | Когда у вас уже есть Postgres/MySQL и не хочется Redis. Идёт с таблицей проваленных задач бесплатно. |
Все четыре реализуют QueueInterface — код приложения не меняется при смене драйверов.
Выбор драйвера
Логика загрузки:
use Lift\Queue\QueueInterface;
use Lift\Queue\SyncQueue;
use Lift\Queue\RedisQueue;
$app->singleton(QueueInterface::class, function () use ($app) {
return match ($_ENV['QUEUE_DRIVER'] ?? 'sync') {
'redis' => new RedisQueue(
$app->make(\Lift\Redis\RedisClientInterface::class),
secret: $_ENV['QUEUE_SECRET'],
),
'db' => new \Lift\Queue\DatabaseQueue($app->make(\Lift\Database\Connection::class)),
'array' => new \Lift\Queue\ArrayQueue(),
default => new SyncQueue(),
};
});
// Необязательное сокращение:
$app->setQueue($app->make(QueueInterface::class));
Теперь $app->queue() / $app->dispatch(...) используют настроенный драйвер.
Определение задач
Наследуйте AbstractJob, переопределите handle(). У всего остального есть разумное значение по умолчанию.
final class ProcessReport extends AbstractJob
{
// Переопределения на класс — необязательны
protected string $queue = 'reports'; // имя очереди; по умолчанию 'default'
protected int $delay = 0; // задержка (с) перед тем, как стать доступной
protected int $tries = 5; // макс. попыток до failed()
public function __construct(
private readonly int $reportId,
) {}
public function handle(): void
{
// …делать работу; выбросить исключение при провале для автоматического повтора…
}
// Переопределите, чтобы оповестить кого-то после провала всех повторов
public function failed(\Throwable $e): void
{
error_log("Report {$this->reportId} permanently failed: " . $e->getMessage());
}
}
Задачи сериализуются при помещении (кроме
SyncQueue). Все свойства конструктора должны быть сериализуемыми — никаких ресурсов, никаких замыканий, никаких PDO-дескрипторов. Передавайте идентификаторы и ищите богатые объекты внутриhandle().
Плохо vs хорошо
// ❌ НЕПРАВИЛЬНО — $user, $logger не сериализуемы
new EmailJob($user, $logger);
// ✅ ПРАВИЛЬНО — только идентификаторы в конструкторе; искать внутри handle()
final class EmailJob extends AbstractJob
{
public function __construct(private readonly int $userId) {}
public function handle(): void
{
$user = User::find($this->userId);
$logger = $this->container()->get(LoggerInterface::class);
// …
}
}
Внедрение сервисов в handle()
Lift не внедряет автоматически в handle(). Самые чистые варианты:
- Статический поиск через шаг загрузки
RegisterContainerStatically. Многословно. - Получение из
Appчерез хукsetUp(). Связывает задачи с App. - Создание базового класса, который предоставляет помощник
container()подклассам.
Наименее болезненный паттерн — вариант 3 плюс подкласс Worker, внедряющий контейнер во время process(). Фреймворк намеренно держит JobInterface минимальным — вы выбираете связывание.
Отправка задач
$app->dispatch($job); // через QueueInterface — использует настроенный драйвер
// ИЛИ
$queue = $app->queue();
$queue->push($job); // то же самое
$queue->later(60, $job); // доступна через 60 секунд от текущего момента
Осмотр / очистка:
$queue->size('default'); // количество ожидающих
$queue->size('reports'); // на очередь
$queue->clear('default'); // стереть одну очередь
push() уважает $job->getDelay() — передача задачи, у которой getDelay() равно 60, эквивалентна later(60, $job).
Запуск воркера
Долгоживущий процесс, который зацикливается, извлекает задачи, выполняет их, спит, когда пусто:
vendor/bin/lift queue:work
vendor/bin/lift queue:work --queue=reports --sleep=2 --max-jobs=100
Программный эквивалент:
use Lift\Queue\Worker;
$worker = new Worker($app->queue(), $app->logger());
$worker->run(queue: 'default', sleep: 1, maxJobs: 0);
Аргумент run() |
Значение |
|---|---|
queue |
Имя для опроса. По умолчанию 'default'. |
sleep |
Секунды ожидания между пустыми опросами. |
maxJobs |
Остановиться после N задач (0 = без ограничения). Используйте для гигиены памяти. |
Корректное завершение
Воркер устанавливает обработчики SIGTERM / SIGINT (требует ext-pcntl). Когда вы делаете kill <pid> или нажимаете Ctrl-C:
- Он завершает текущую задачу.
- Чисто выходит с количеством обработанных задач.
Запускайте несколько воркеров под systemd, supervisord или Kubernetes Deployment, чтобы они автоматически перезапускались при выходе. Распространённый паттерн — --max-jobs=1000, чтобы каждый воркер перерабатывал свою память после каждых 1000 задач.
Пример systemd-юнита:
[Unit]
Description=Lift queue worker
After=network.target redis.service
[Service]
ExecStart=/usr/bin/php /var/www/myapp/vendor/bin/lift queue:work --max-jobs=1000
Restart=always
RestartSec=1
User=www-data
KillSignal=SIGTERM
TimeoutStopSec=60
[Install]
WantedBy=multi-user.target
Повторные попытки и окончательный провал
По умолчанию $tries = 3. Если handle() выбрасывает исключение:
- Воркер логирует
Job attempt failedи пробует снова немедленно. - После
getTries()всего попыток он вызывает$job->failed($exception). - С
DatabaseQueueпроваленная строка также помечается персистентно (см. ниже).
Чтобы перебросить исключение «сдаться сейчас», пропускающее повторы, пометьте его сигнальным значением и проверьте в handle():
public function handle(): void
{
try {
$this->doWork();
} catch (PermanentlyBrokenException $e) {
$this->tries = 1; // нестандартно — Lift уважает только getTries()
throw $e;
}
}
(Фреймворк не поставляет PermanentFailureException — держите задачи простыми или вызывайте failed() напрямую + return.)
Отложенные задачи
$queue->later(300, new SendReminderEmail($userId)); // через 5 минут от текущего момента
Поведение зависит от драйвера:
- Sync — задержка игнорируется, выполняется сразу.
- Array / Redis / Database — хранятся отдельно; воркер видит их только после того, как пройдёт их временная метка
ready-at.
RedisQueue использует sorted set с оценкой по ready-at, и pop() атомарно мигрирует подошедшие задачи в основной список. DatabaseQueue использует столбец available_at с SELECT … FOR UPDATE SKIP LOCKED.
Несколько очередей
«Очередь» — это просто имя. Используйте их для приоритизации:
final class CriticalPayment extends AbstractJob { protected string $queue = 'high'; }
final class CleanupOrphans extends AbstractJob { protected string $queue = 'low'; }
// Запустите воркер на очередь, масштабируйте их независимо:
vendor/bin/lift queue:work --queue=high # 4 таких
vendor/bin/lift queue:work --queue=low # 1 такой
Вы не получаете строгий приоритет «осушить high перед low» «из коробки»; вместо этого поднимайте больше воркеров на загруженной очереди.
Проваленные задачи (DatabaseQueue)
Только DatabaseQueue персистентно хранит проваленные задачи. Он предоставляет небольшой API управления:
$queue = $app->queue(); // предполагается, что это DatabaseQueue
$queue->failedCount('default'); // int
$queue->listFailed('default'); // массив строк (новейшие первыми)
// Перепоставить одну строку по ID
$queue->retry($rowId);
// Перепоставить каждую проваленную задачу
$queue->retryAll('default');
// Окончательно удалить все проваленные строки
$queue->clearFailed('default');
Проваленная строка хранит payload, attempts, error, failed_at — достаточно, чтобы отладить без повторного запуска.
Восстановление после сбоя
Если воркер-процесс убит посреди задачи (OOM, kill -9, потеря питания), строка остаётся reserved_at = <some-time> навсегда. DatabaseQueue::pop() вызывает pruneReserved() на каждом опросе — любая строка, зарезервированная дольше $reservedTimeout секунд (по умолчанию 60), освобождается для повтора. Настройте таймаут, если ваши задачи законно занимают дольше.
Добавление столбцов приложения
Наследуйте HasDatabaseExtra, если хотите сохранять идентификаторы арендаторов, корреляционные идентификаторы и т. п. в таблицу задач:
use Lift\Queue\AbstractJob;
use Lift\Queue\HasDatabaseExtra;
final class TenantJob extends AbstractJob implements HasDatabaseExtra
{
public function __construct(public readonly string $tenantId, public readonly int $id) {}
public function getDatabaseExtra(): array
{
return ['tenant_id' => $this->tenantId];
}
public function handle(): void { /* … */ }
}
// Определите дополнительный столбец при конструировании очереди:
new DatabaseQueue(
$db,
extraColumns: fn($t) => $t->string('tenant_id', 36)->nullable()->index(),
secret: $_ENV['QUEUE_SECRET'],
);
Теперь вы можете делать SELECT … WHERE tenant_id = '…' прямо к таблице очереди — удобно для операционных дашбордов.
Безопасность: подписанные полезные нагрузки
RedisQueue, DatabaseQueue и AmqpQueue сериализуют задачи через serialize(). Любой с доступом на запись к вашему ключу Redis, строке БД или AMQP-обменнику мог бы сконструировать полезную нагрузку, запускающую инъекцию PHP-объекта через unserialize. Все три драйвера с общим backend по умолчанию требуют подписанные payload'ы. Передавайте один и тот же непустой $secret всем producer'ам и worker'ам:
new RedisQueue($redis, secret: $_ENV['QUEUE_SECRET']);
new DatabaseQueue($db, secret: $_ENV['QUEUE_SECRET']);
new AmqpQueue(['secret' => $_ENV['QUEUE_SECRET'], /* broker config... */]);
Каждая полезная нагрузка подписывается HMAC-SHA256, а неподписанные payload'ы отклоняются до unserialize(). Используйте один и тот же секрет на каждом воркере.
Начиная с 1.3.0:
RedisQueue,DatabaseQueueиAmqpQueueпо умолчанию отказываются создавать или читать неподписанные payload'ы. Для доверенной legacy/local очереди включите это явно:allowUnsignedPayloads: true(DatabaseQueue/RedisQueue) или['allow_unsigned_payloads' => true](AmqpQueue). Не включайте это для общих production-backend'ов.
Тестирование
Три варианта, от самого дешёвого к самому реалистичному.
1. SyncQueue (по умолчанию в тестах)
$app->setQueue(new SyncQueue());
$app->dispatch(new SendWelcomeEmail($email)); // выполняется сразу
self::assertSame(1, $mailerSpy->count); // можно проверять побочные эффекты
По умолчанию — если задача выбрасывает исключение, ваш тест выбрасывает. Воркер не нужен.
2. ArrayQueue + ручной pop()
$queue = new ArrayQueue();
$app->setQueue($queue);
$response = $this->post('/signup', ['email' => '[email protected]'])->assertCreated();
self::assertSame(1, $queue->size());
self::assertInstanceOf(SendWelcomeEmail::class, $queue->pop());
Позволяет проверить «задача была поставлена в очередь», не запуская её фактически.
3. Настоящий драйвер в CI
Для e2e-тестов направьте QUEUE_DRIVER=redis на тестовый Redis (или используйте SQLite :memory: + DatabaseQueue). Запустите воркер в отдельном процессе и используйте Worker::process() напрямую для синхронных утверждений.
Операционные советы
- Утечки памяти накапливаются. PHP не возвращает память между запросами — используйте
--max-jobs=1000и дайте systemd перезапускать воркер. - Один воркер, одна очередь. Если смешать
--queue=default,reports, пропускная способность загруженной очереди голодит тихую. Поднимайте выделенные воркеры. - Мониторьте
size()— настройте оповещения, когда глубина очереди непрерывно растёт. - Идемпотентность. Воркеры могут выполнить задачу более одного раза (сбой до подтверждения, двойная отправка). Используйте ключи идемпотентности:
if (Db::exists("emails_sent.{$jobId}")) return;в началеhandle(). - Не ставьте в очередь внутри транзакции, которую вы ещё не зафиксировали — воркер может попытаться найти строки, которых ещё нет. Помещайте после возврата
$db->transaction(...).
Частые подводные камни
| Симптом | Причина | Исправление |
|---|---|---|
Job processed в логах, но ничего не произошло |
Использовался SyncQueue, и handle() был молчалив |
Перейдите на настоящий драйвер. |
Class not found при старте воркера после деплоя |
Воркер выполняет старый код | Перезапускайте воркер при каждом деплое. |
Предупреждение unserialize |
Класс задачи был переименован / удалён | Осушите старую очередь перед деплоем переименований; или прочитайте столбец payload вручную и перепоставьте. |
| Одна задача выполняется N раз | Нет идемпотентности; воркер упал после handle(), но до подтверждения |
Добавьте ключ идемпотентности (INSERT IGNORE INTO processed_jobs). |
| Воркер ест ОЗУ | Долгоживущий процесс накапливает состояние | --max-jobs=N; цикл перезапуска через systemd. |
pcntl_signal not available |
PHP скомпилирован без pcntl | Установите php-pcntl или примите некорректные kill (задачи могут быть переобработаны). |
| Отложенные задачи никогда не выполняются | Воркеры опрашивают неверное имя очереди | --queue=default совпадает с getQueue(); проверьте написание. |
Шпаргалка
// Определить
final class EmailJob extends AbstractJob {
protected int $tries = 5;
public function __construct(private int $userId) {}
public function handle(): void { /* … */ }
public function failed(\Throwable $e): void { /* оповестить */ }
}
// Драйвер
$app->setQueue(new RedisQueue($redis, secret: $_ENV['QUEUE_SECRET']));
// Поместить
$app->dispatch(new EmailJob(42));
$app->queue()->later(60, new EmailJob(42));
// Воркер (CLI)
vendor/bin/lift queue:work --queue=default --sleep=1 --max-jobs=1000
// Операции с проваленными задачами (только DatabaseQueue)
$queue->failedCount();
$queue->retry($rowId);
$queue->retryAll();
$queue->clearFailed();