2014-01-06 4 views
0

У меня есть вариация классической проблемы «производитель-потребитель». В моей программе есть 10 производителей, работающих параллельно, и их целью является производство N продуктов в целом.Как «попробовать отправить» на канал и прервать, если канал заполнен?

Я думал об использовании буферизованного канала:

products := make([]int, 100) // In total, produce 100 products 

// The producers 
for i := 0; i < 10; i++ { 
    go func() { 
     products <- 1 // !! 
    }() 
} 

Однако, это не будет работать:

  • goroutine не понимает, что цель достигнута, канал отправки блоков, и функция никогда возвращается.
  • if len(products) < 100 { products <- 1 } не является атомной операцией, поэтому это не помогает.

Есть ли другой подход?

+1

Могу ли я вежливо предположить, что ваша терминология здесь немного вводит в заблуждение. Модель производителя-потребителя часто относится к одному производителю и многим потребителям, иначе известному как шаблон наблюдателя. Вы не используете его в этом смысле. Вместо этого вы описываете шаблон фермы задач, при этом многие рабочие производят результаты работы для одного контроллера. –

+0

@ Rick-777 Спасибо, ты прав. –

ответ

2

products := make([]int, 100) делает ломтик, а не chan. Вы хотите:

products := make(chan int, 100) 

Если вы действительно хотите сделать неблокирующая отправки, вы можете использовать выбрать:

select { 
case products <- 1: 
default: 
} 

Это первый попытаться отправить на продукты, и если она полна запуска по умолчанию код (no-op) и продолжить.

+0

Использование параметров по умолчанию имеет проблемы. Если потребитель должен выполнять большую работу с каждым отдельным пакетом по мере его отправки, это может привести к ошибочному срабатыванию по умолчанию (особенно если производители находятся в другом потоке, или пользователь выгружается средой выполнения). Этот метод также основывается на том, что потребительская процедура запускается до производственных процедур. – LinearZoetrope

+0

По умолчанию никогда не следует запускать ошибочно. Кроме того, подпрограмма потребителя должна быть запущена ПОСЛЕ производства для этого, чтобы вернуть только 100. Я согласен, что ваш ответ лучше, но я не согласен с тем, почему вы считаете, что моя работа не работает. –

+0

Вы правы, я пропустил, что вы используете буферный канал. Метод по умолчанию работает, когда канал буферизуется. – LinearZoetrope

1

Лучшим способом сделать это, вероятно, является использование канала выхода и оператора select в производителе. Go гарантирует, что замкнутый канал будет всегда регистрироваться как чтение без блокировки.

Here's a working version on the playground

package main 

import "fmt" 
import "time" 

func main() { 
    productionChan := make(chan int) 
    quit := make(chan struct{}) 
    for i := 0; i < 110; i++ { 
     go produce(productionChan, quit) 
    } 

    consume(productionChan, quit) 
    time.Sleep(5 * time.Second) // Just so we can observe the excess production channels quitting correctly 
} 

func consume(productionChan <-chan int, quit chan<- struct{}) { 
    payload := make([]int, 100) 

    for i := range payload { 
     payload[i] = <-productionChan 
    } 

    close(quit) 
    fmt.Println("Complete payload received, length of payload slice: ", len(payload)) 
} 

func produce(productionChan chan<- int, quit <-chan struct{}) { 
    select { 
    case <-quit: 
     fmt.Println("No need to produce, quitting!") 
    case productionChan <- 1: 
    } 
} 

Идея заключается в том, что отдельные потребитель goroutine перебирает полезной нагрузки кусочек нужного размера, после чего кусок заполнен, цикл завершается и закрывает канал бросить курить. Все производители блокируются в отдельном заявлении о том, отправлять или получать из канала выхода. Когда канал выхода закрыт, каждый производитель, запущенный с этим выходом, немедленно выйдет.

Эта идиома также может быть довольно легко модифицировать, если у вас есть меньшее количество производителей, которые отправляют несколько значений каждый.

-1

Отказ от ответственности: Это не идиоматично. Идите в малейшей степени, и я не предлагаю на самом деле использовать этот код на практике. Это сказало ...

Go недавно введено reflect.Value.TryRecv и reflect.Value.TrySend. Они делают именно то, что вы ищете.

products := make(chan int, 100) 

for i := 0; i < 10; i++ { 
    go func() { 
     for { 
      if !reflect.ValueOf(products).TrySend(1) { 
       return 
      } 
     } 
    }() 
} 

Просмотреть код на go playground.

+1

Woah чувак, это глубоко. –

+1

Вы не должны использовать рефлекс, если это абсолютно необходимо. Это намного менее эффективно, чем использование select (см. Мой ответ). Однако вы, вероятно, действительно хотите использовать ответ @ Jsor. –

0

Вы можете использовать select, чтобы попытаться отправить буферном канал: если вы выбираете на отправке и буфер канала заполнен, вы достигнете default случая:

//this goroutine sends to the channel until it can't 
func f(c chan int, wg *sync.WaitGroup) { 

    for i := 0; ; i++ { 
     select { 
     case c <- i: //sent successfully 
      continue 
     default: 
      fmt.Println("Can't send, aborting!") 
      wg.Done() 
      return 
     } 
    } 
} 

func main() { 

    c := make(chan int, 100) 
    wg := sync.WaitGroup{} 
    for i := 0; i < 10; i++ { 
     wg.Add(1) 
     go f(c, &wg) 
    } 

    wg.Wait() 
    fmt.Println("Done!") 

} 

Недостатка этого подхода является что, если потребитель начинает потреблять до того, как вы закончите производить, вы войдете в бесконечный цикл.

Вы также можете закрыть канал со стороны потребителей в результате чего производители паниковать, и ловить эту панику:

func f(c chan int) { 

    defer func() { 
     _ = recover() 
     fmt.Println("Done!") 
    }() 

    for i := 0; ; i++ { 
     c <- i 
    } 
} 

func main() { 

    c := make(chan int) 

    for i := 0; i < 10; i++ { 

     go f(c) 
    } 

    n := 0 
    for { 
     <-c 
     n++ 
     if n > 100 { 
      close(c) 
      break 
     } 
    } 

Но если вы просто хотите, чтобы не производить больше деталей, чем указанное количество, почему не икру N goroutines, каждый из которых производит 1 пункт? или нерестилищ K goroutines, каждый из которых производит предметы N/K? Лучше знать это заранее.

Смежные вопросы