2013-11-15 3 views
80

Чтобы запустить бесконечный цикл выполнения двух goroutines, я могу использовать следующий код:Как слушать N каналов? (оператор динамического выбора)

после получения сообщения об ошибке он запустит новый goroutine и продолжит вечно.

c1 := make(chan string) 
c2 := make(chan string) 

go DoShit(c1, 5) 
go DoShit(c2, 2) 

for ; true; { 
    select { 
    case msg1 := <-c1: 
     fmt.Println("received ", msg1) 
     go DoShit(c1, 1) 
    case msg2 := <-c2: 
     fmt.Println("received ", msg2) 
     go DoShit(c2, 9) 
    } 
} 

Теперь я хотел бы иметь такое же поведение для N goroutines, но как будет выглядеть оператор выбора в этом случае?

Это бит кода я начал с, но я запутался как код оператора выбора

numChans := 2 

//I keep the channels in this slice, and want to "loop" over them in the select statemnt 
var chans = [] chan string{} 

for i:=0;i<numChans;i++{ 
    tmp := make(chan string); 
    chans = append(chans, tmp); 
    go DoShit(tmp, i + 1) 

//How shall the select statment be coded for this case? 
for ; true; { 
    select { 
    case msg1 := <-c1: 
     fmt.Println("received ", msg1) 
     go DoShit(c1, 1) 
    case msg2 := <-c2: 
     fmt.Println("received ", msg2) 
     go DoShit(c2, 9) 
    } 
} 
+3

Я думаю, что вы хотите, это мультиплексирование каналов. http://golang.org/doc/effective_go.html#chan_of_chan В принципе, у вас есть один канал, который вы слушаете, а затем несколько дочерних каналов, которые входят в основной канал. Связанный SO Вопрос: http://stackoverflow.com/questions/10979608/is-it-possible-to-multiplex-several-channels-into-one – Brenden

ответ

94

Вы можете сделать это с помощью функции Select из reflect пакета:

func Select(cases []SelectCase) (chosen int, recv Value, recvOK bool)

Выберите, чтобы выполнить операцию выбора, описанную в списке случаев. Например, оператор выбора Go, он блокирует до тех пор, пока, по крайней мере, один из случаев не сможет выполнить , делает равномерный псевдослучайный выбор, а затем выполняет этот случай . Он возвращает индекс выбранного случая и, если в этом случае была принята операция , полученное значение и логическое значение, указывающее, соответствует ли значение посылке по каналу (в отличие от нулевого значения , полученного, поскольку канал закрыто).

Вы передаете в массиве SelectCase структур, которые определяют канал, чтобы выбрать на направление работы, а значение для отправки в случае операции отправки.

Так что вы могли бы сделать что-то вроде этого:

cases := make([]reflect.SelectCase, len(chans)) 
for i, ch := range chans { 
    cases[i] = reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)} 
} 
chosen, value, ok := reflect.Select(cases) 
# ok will be true if the channel has not been closed. 
ch := chans[chosen] 
msg := value.String() 

Вы можете экспериментировать с более конкретизирован примером здесь: http://play.golang.org/p/8zwvSk4kjx

+3

Существует ли практическое ограничение на количество случаев в таком выборе ? Тот, который, если вы выходите за пределы этого, то производительность сильно влияет? –

+0

Возможно, это моя некомпетентность, но я нашел этот шаблон очень трудным для работы, когда вы отправляете и получаете сложные структуры через канал. Передача общего «совокупного» канала, как сказал Тим Экллейр, в моем случае намного проще. – boramalper

+1

Это своего рода классный –

39

Вы можете выполнить это, обернув каждый канал в goroutine который «вперед» сообщений на общий «совокупный» канал. Например:

agg := make(chan string) 
for _, ch := range chans { 
    go func(c chan string) { 
    for msg := range c { 
     agg <- msg 
    } 
    }(ch) 
} 

select { 
case msg <- agg: 
    fmt.Println("received ", msg) 
} 

Если вам нужно знать, какой канал сообщения исходят от, вы можете обернуть его в структурах с какой-либо дополнительной информацией, прежде чем направить его в агрегированный канал.

В моем (ограниченный) тестирования, этот метод в значительной мере не выполняет, используя отражал пакет:

$ go test dynamic_select_test.go -test.bench=. 
... 
BenchmarkReflectSelect   1 5265109013 ns/op 
BenchmarkGoSelect    20  81911344 ns/op 
ok  command-line-arguments 9.463s 

Benchmark код here

+1

Ваш тестовый код неверен, вам нужно [перебирать букву «b.N'» (https://golang.org/pkg/testing/#hdr-Benchmarks) в рамках теста. В противном случае результаты (которые делятся на 'b.N', 1 и 2000000000 в вашем выходе) будут совершенно бессмысленными. –

+1

@DaveC Спасибо! Вывод не меняется, но результаты гораздо более разумны. –

+0

Действительно, я быстро взломал ваш тестовый код, чтобы получить [некоторые фактические цифры] (https://gist.github.com/dchapes/5ed761c4c4023850b2de). Там очень хорошо может быть что-то, что все еще отсутствует/неверно из этого теста, но единственное, что делает более сложный код отражения, заключается в том, что настройка выполняется быстрее (с GOMAXPROCS = 1), так как ему не нужна куча goroutines.В каждом другом случае простой канал слияния goroutine сдувает решение отражения (на ~ 2 порядка). –

12

Чтобы расширить на некоторые замечания относительно предыдущих ответов и обеспечить более четкое сравнение здесь является примером обоих подходов, представленных до сих пор с учетом того же ввода, фрагмента каналов для чтения и функции для вызова каждого значения, которое также должно знать, из какого канала было получено значение.

Есть три основные различия между подходами:

  • Сложность. Хотя это может быть частично предпочтительным для читателя, я считаю, что канальный подход более идиоматичен, прямолинейен и читабель.

  • Спектакль. В моей системе Xeon amd64 каналы goroutines + channels выполняют решение отражения примерно на два порядка (в общем отражение в Go часто медленнее и должно использоваться только тогда, когда это абсолютно необходимо). Конечно, если есть какая-либо значительная задержка либо в функции обработки результатов, либо при записи значений на входные каналы, это различие в производительности может стать незначительным.

  • Семантика блокировки/буферизации. Важность этого зависит от варианта использования. Чаще всего это либо не имеет значения, либо небольшая дополнительная буферизация в решении слияния goroutine может быть полезна для пропускной способности. Однако, если желательно иметь семантику, что только один писатель разблокирован, а его значение полностью обработано до, любой другой писатель разблокирован, то это может быть достигнуто только с помощью решения отражения.

Примечание. Оба подхода могут быть упрощены, если «идентификатор» передаваемого канала не требуется или если исходные каналы никогда не будут закрыты.

Goroutine слияние канал:

// Process1 calls `fn` for each value received from any of the `chans` 
// channels. The arguments to `fn` are the index of the channel the 
// value came from and the string value. Process1 returns once all the 
// channels are closed. 
func Process1(chans []<-chan string, fn func(int, string)) { 
    // Setup 
    type item struct { 
     int // index of which channel this came from 
     string // the actual string item 
    } 
    merged := make(chan item) 
    var wg sync.WaitGroup 
    wg.Add(len(chans)) 
    for i, c := range chans { 
     go func(i int, c <-chan string) { 
      // Reads and buffers a single item from `c` before 
      // we even know if we can write to `merged`. 
      // 
      // Go doesn't provide a way to do something like: 
      //  merged <- (<-c) 
      // atomically, where we delay the read from `c` 
      // until we can write to `merged`. The read from 
      // `c` will always happen first (blocking as 
      // required) and then we block on `merged` (with 
      // either the above or the below syntax making 
      // no difference). 
      for s := range c { 
       merged <- item{i, s} 
      } 
      // If/when this input channel is closed we just stop 
      // writing to the merged channel and via the WaitGroup 
      // let it be known there is one fewer channel active. 
      wg.Done() 
     }(i, c) 
    } 
    // One extra goroutine to watch for all the merging goroutines to 
    // be finished and then close the merged channel. 
    go func() { 
     wg.Wait() 
     close(merged) 
    }() 

    // "select-like" loop 
    for i := range merged { 
     // Process each value 
     fn(i.int, i.string) 
    } 
} 

Отражения выберите:

// Process2 is identical to Process1 except that it uses the reflect 
// package to select and read from the input channels which guarantees 
// there is only one value "in-flight" (i.e. when `fn` is called only 
// a single send on a single channel will have succeeded, the rest will 
// be blocked). It is approximately two orders of magnitude slower than 
// Process1 (which is still insignificant if their is a significant 
// delay between incoming values or if `fn` runs for a significant 
// time). 
func Process2(chans []<-chan string, fn func(int, string)) { 
    // Setup 
    cases := make([]reflect.SelectCase, len(chans)) 
    // `ids` maps the index within cases to the original `chans` index. 
    ids := make([]int, len(chans)) 
    for i, c := range chans { 
     cases[i] = reflect.SelectCase{ 
      Dir: reflect.SelectRecv, 
      Chan: reflect.ValueOf(c), 
     } 
     ids[i] = i 
    } 

    // Select loop 
    for len(cases) > 0 { 
     // A difference here from the merging goroutines is 
     // that `v` is the only value "in-flight" that any of 
     // the workers have sent. All other workers are blocked 
     // trying to send the single value they have calculated 
     // where-as the goroutine version reads/buffers a single 
     // extra value from each worker. 
     i, v, ok := reflect.Select(cases) 
     if !ok { 
      // Channel cases[i] has been closed, remove it 
      // from our slice of cases and update our ids 
      // mapping as well. 
      cases = append(cases[:i], cases[i+1:]...) 
      ids = append(ids[:i], ids[i+1:]...) 
      continue 
     } 

     // Process each value 
     fn(ids[i], v.String()) 
    } 
} 

[Полный код on the Go playground.]

+1

Также стоит отметить, что решение goroutines + channels не может выполнять все 'select' или' reflect.Select'. Горотины будут продолжать вращаться, пока они не будут потреблять все из каналов, поэтому нет четкого пути, по которому вы могли бы выпустить «Process1» раньше. Существует также проблема для проблем, если у вас много читателей, поскольку goroutines буферизуют один элемент из каждого из каналов, чего не будет с 'select'. –

+0

@JamesHenstridge, ваша первая заметка об остановке не соответствует действительности. Вы бы остановили Process1 точно так же, как вы бы остановили Process2; например добавив «стоп-канал», который закрыт, когда горуты должны остановиться. Process1 понадобился бы в двух случаях 'select' в цикле' for' вместо более простого в данный момент цикла 'for range'. Process2 нужно было бы вставить другой случай в 'cases' и специальный дескриптор этого значения' i'. –

+0

Это все еще не решает проблему, что вы читаете значения из каналов, которые не будут использоваться в раннем случае остановки. –

Смежные вопросы