2016-11-22 4 views
4

Amazon объявила о своем new FIFO SQS service, и я бы хотел использовать его в очереди Laravel для решения некоторых проблем параллелизма.Как разместить Amazon FIFO SQS в очереди Laravel?

Я создал несколько новых очередей и изменил конфигурации. Тем не менее, я получил ошибку, которая говорит MissingParameter

The request must contain the parameter MessageGroupId. 

Так я изменил файл vendor/laravel/framework/src/Illuminate/Queue/SqsQueue.php

public function pushRaw($payload, $queue = null, array $options = []) 
{ 
    $response = $this->sqs->sendMessage(['QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 
     'MessageGroupId' => env('APP_ENV', getenv('APP_ENV'))]); 

    return $response->get('MessageId'); 
} 

public function later($delay, $job, $data = '', $queue = null) 
{ 
    $payload = $this->createPayload($job, $data); 

    $delay = $this->getSeconds($delay); 

    return $this->sqs->sendMessage([ 
     'QueueUrl' => $this->getQueue($queue), 'MessageBody' => $payload, 'DelaySeconds' => $delay, 
     'MessageGroupId' => env('APP_ENV', getenv('APP_ENV')) 
    ])->get('MessageId'); 
} 

Я использую APP_ENV как идентификатор группы (это одна очередь сообщений, так на самом деле это не Я просто хочу, чтобы все было FIFO).

Но я все еще получаю то же сообщение об ошибке. Как я могу это исправить? Любая помощь будет оценена по достоинству.

(кстати, где имеет SDK определен sendMessage? Я могу найти заглушку для него, но я не нашел подробную реализацию)

ответ

8

Я хочу указать другим, которые могут споткнуться по той же проблеме, что, хотя редактирование SqsQueue.php работает, оно легко сбрасывается с помощью composer install или composer update. Альтернативой является внедрение нового Illuminate\Queue\Connectors\ConnectorInterface для SQS FIFO, а затем добавление его в менеджер очереди Laravel.

Мой подход заключается в следующем:

  1. Создать новый SqsFifoQueue класс, который расширяет Illuminate\Queue\SqsQueue но поддерживает SQS FIFO.
  2. Создайте новый класс SqsFifoConnector, который расширит Illuminate\Queue\Connectors\SqsConnector, который установит соединение, используя SqsFifoQueue.
  3. Создайте новый SqsFifoServiceProvider, который регистрирует SqsFifoConnector менеджеру очереди Laravel.
  4. Добавить SqsFifoServiceProvider на ваш config/app.php.
  5. Обновление config/queue.php для использования нового драйвера очереди SQS FIFO.

Пример:

  1. Создать новый SqsFifoQueue класс, который расширяет Illuminate\Queue\SqsQueue но поддерживает SQS FIFO.

    <?php 
    
    class SqsFifoQueue extends \Illuminate\Queue\SqsQueue 
    { 
        public function pushRaw($payload, $queue = null, array $options = []) 
        { 
         $response = $this->sqs->sendMessage([ 
          'QueueUrl' => $this->getQueue($queue), 
          'MessageBody' => $payload, 
          'MessageGroupId' => uniqid(), 
          'MessageDeduplicationId' => uniqid(), 
         ]); 
    
         return $response->get('MessageId'); 
        } 
    } 
    
  2. Создать новый SqsFifoConnector класс, который расширяет Illuminate\Queue\Connectors\SqsConnector, который установит соединение с использованием SqsFifoQueue.

    <?php 
    
    use Aws\Sqs\SqsClient; 
    use Illuminate\Support\Arr; 
    
    class SqsFifoConnector extends \Illuminate\Queue\Connectors\SqsConnector 
    { 
        public function connect(array $config) 
        { 
         $config = $this->getDefaultConfiguration($config); 
    
         if ($config['key'] && $config['secret']) { 
          $config['credentials'] = Arr::only($config, ['key', 'secret']); 
         } 
    
         return new SqsFifoQueue(
          new SqsClient($config), $config['queue'], Arr::get($config, 'prefix', '') 
         ); 
        } 
    } 
    
  3. Создать новую SqsFifoServiceProvider, которая регистрирует SqsFifoConnector менеджера очередей Laravel в.

    <?php 
    
    class SqsFifoServiceProvider extends \Illuminate\Support\ServiceProvider 
    { 
        public function register() 
        { 
         $this->app->afterResolving('queue', function ($manager) { 
          $manager->addConnector('sqsfifo', function() { 
           return new SqsFifoConnector; 
          }); 
         }); 
        } 
    } 
    
  4. Добавить SqsFifoServiceProvider в свой config/app.php.

    <?php 
    
    return [ 
        'providers'  => [ 
         ... 
         SqsFifoServiceProvider::class, 
        ], 
    ]; 
    
  5. Update config/queue.php использовать новый драйвер SQS FIFO очереди.

    <?php 
    
    return [ 
    
        'default' => 'sqsfifo', 
    
        'connections' => [ 
         'sqsfifo' => [ 
          'driver' => 'sqsfifo', 
          'key' => 'my_key' 
          'secret' => 'my_secret', 
          'queue' => 'my_queue_url', 
          'region' => 'my_sqs_region', 
         ], 
        ], 
    ]; 
    

Тогда ваша очередь теперь должна поддерживать SQS FIFO очереди.

Бесстыдная штепсельная вилка: Во время работы над вышеприведенными шагами я создал комплект композитора laravel-sqs-fifo, чтобы обработать его по адресу https://github.com/maqe/laravel-sqs-fifo.

+1

Я должен был добавить в свой префикс в 'sqsfifo' соединение в' config/queue.php', но кроме этого, это работает безупречно! –

+0

'MessageGroupId' => uniqid(), нарушает гарантию FIFO. Почему бы не использовать регулярную очередь в этот момент? https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/using-messagegroupid-property.html – Greg

0

Помимо MessageGroupId, она нуждается в MessageDeduplicationId или позволяя на основе контента дедупликации на ,

Смежные вопросы