2016-06-09 3 views
1

Синхронный Пример:Golang Неблокирующий буфера

type job struct { 
    Id int 
    Message string 
} 

for { 
    // getJob() blocks until job is received 
    job := getJob() 
    doSomethingWithJob(job) 
} 

Я хочу, чтобы обрабатывать рабочие места, как они поступают из getJob с doSomethingWithJob. например getJob может быть полезной нагрузкой, полученной от MessageQueue, такой как RabbitMQ/Beanstalkd или обработкой HTTP-запроса.

Я не хочу блокировать getJob, пока я doSomethingWithJob & наоборот. Однако я хочу контролировать/буферизовать количество заданий, чтобы я не перегружал систему. например макс. параллелизм 5.

Концепция рутирования меня смущает в данный момент, поэтому любые указатели в правильном направлении были бы очень благодарны, чтобы помочь мне учиться.

Обновление: Спасибо @JimB за вашу помощь. Почему работник 5 всегда подбирает работу?

jobCh := make(chan *job) 

// Max 5 Workers 
for i := 0; i < 5; i++ { 

    go func() { 

     for job := range jobCh { 
      time.Sleep(time.Second * time.Duration(rand.Intn(3))) 
      log.Println(i, string(job.Message)) 
     } 
    }() 
} 

for { 
    job, err := getJob() 
    if err != nil { 
     log.Println("Closing Channel") 
     close(jobCh) 
     break 
    } 

    jobCh <- job 
} 

log.Println("Complete") 

Пример выходных

2016/06/09 22:19:57 5 {"id":10692,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10687,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10699,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10701,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10703,"name":"Test Message"} 
2016/06/09 22:19:57 5 {"id":10704,"name":"Test Message"} 
+0

Это не 5-й рабочий, каждый рабочий получил i = 5. Это должно быть 'go func (i int) {...} (i)' https://golang.org/doc/faq#closures_and_goroutines – Darigaaz

ответ

4

Вы можете начать 5 goroutines чтения из канала для вызова doSomethingWithJob. Таким образом, одновременно обрабатывается не более 5 заданий.

jobCh := make(chan *job) 

// start 5 workers to process jobs 
for i := 0; i < 5; i++ { 
    go func() { 
     for job := range jobCh { 
      doSomethingWithJob(job) 
     } 
    }() 
} 

// send jobs to workers as fast as we can 
for { 
    jobCh <- getJob() 
} 
+0

Вау ... Я смотрел на все это назад и смущающе! – Gravy

+0

Я попробовал метод выше. И это работает, однако похоже, что только работник 5 подбирает работу. См. Обновление, о котором идет речь. – Gravy

+0

@Gravy: https://golang.org/doc/faq#closures_and_goroutines. Каждый goroutine использует одну и ту же переменную 'i', а последнее значение i -' 5'. – JimB

Смежные вопросы