2017-02-23 17 views
1

У нас есть куча файлов, которые будут загружены в удаленное хранилище blob после обработки.Обработка очереди в очереди с повторением при сбое

В настоящее время интерфейс (PHP) создает redis-список таких файлов и дает ему уникальный идентификатор, называемый JobID. Затем он передает уникальный идентификатор в трубку beanstalk, которая принимается процессом Go. Он использует библиотеку Go workers для обработки каждого идентификатора задания в соответствии с тем, что делает net/http. Он получает идентификатор задания, извлекает список redis и начинает обработку файлов.

Однако в настоящее время обрабатывается только один файл. Поскольку операция здесь связана с I/O, а не с привязкой к ЦП, интуиция предполагает, что было бы полезно использовать goroutine для каждого файла.

Однако мы хотим повторить загрузку при сбое, а также отслеживать количество обработанных элементов на одно задание. Мы не можем запустить несвязанное количество goroutines, потому что одно Job может содержать около ~ 10k файлов для обработки, и 100s таких Рабочих мест могут быть отправлены в секунду во время пиковых значений. Какой был бы правильный подход для этого?

NB: Мы можем изменить технологию стека немного, если это необходимо (например, замена из beanstalkd для чего-то)

ответ

2

Вы можете ограничить количество goroutines, используя буферный chan с размером максимального количества goroutines вы хотите. Затем вы можете заблокировать этот chan, если он достигнет максимальной емкости. По завершении ваших goroutines они освободят слоты, чтобы разрешить запуск новых goroutines.

Пример:

package main 

import (
    "fmt" 
    "sync" 
) 

var (
    concurrent = 5 
    semaphoreChan = make(chan struct{}, concurrent) 
) 

func doWork(wg *sync.WaitGroup, item int) { 
    // block while full 
    semaphoreChan <- struct{}{} 

    go func() { 
     defer func() { 
      // read to release a slot 
      <-semaphoreChan 
      wg.Done() 
     }() 
     // This is where your work actually gets done 
     fmt.Println(item) 
    }() 
} 

func main() { 
    // we need this for the example so that we can block until all goroutines finish 
    var wg sync.WaitGroup 
    wg.Add(10) 

    // start the work 
    for i := 0; i < 10; i++ { 
     doWork(&wg, i) 
    } 

    // block until all work is done 
    wg.Wait() 
} 

Go площадка ссылка: https://play.golang.org/p/jDMYuCe7HV

Вдохновленный этой Golang Великобритании конференции говорят: https://youtu.be/yeetIgNeIkc?t=1413

+0

Это помогло мне начать работу по ограничению параллелизм. Однако проблема, которая до сих пор остается в силе, заключается в том, как отслеживать успех или неудачу работы. Задание содержит N подзадач, все из которых должны быть успешно обработаны, а ошибки должны быть отправлены. Как я к этому подхожу? – agathver

+0

Создайте канал, который вы передадите гороходу. Горотин может записать результат операции, включая ошибки на этом канале. Вызывающий может вытащить информацию из этого канала, обрабатывая ошибку при необходимости (например, зарегистрировать ошибку или повторить операцию). Если вам нужно повторить операцию, сделайте канал с настраиваемым типом структуры с необходимым контекстом для повторной попытки (например, вход, который требуется goroutine для повторной попытки) и ошибка. – MahlerFive

Смежные вопросы