Я пытаюсь запустить несколько задач одновременно и немедленно вернуться, если есть какая-либо ошибка, не дожидаясь возврата всех подпрограмм. Код выглядит следующим образом. Я удалил шум, чтобы было легче переваривать, но я могу опубликовать полный код, если утечка не очевидна. Следует отметить, что я развертываю это на движке Google. Я не могу воспроизвести утечку на моей машине, но когда я заменю параллелизм после комментария // Consume the results
, приложение работает нормально, хотя я не понимаю, почему, потому что код выглядит правильно для меня.Где протекающая рутина Go?
package main
import "fmt"
import "sync"
import "errors"
func main() {
indexes := []int{1, 2, 3, 4, 5, 6, 7}
devCh := make(chan int, 7)
stopCh := make(chan struct{})
errCh := make(chan error, 7)
var wg sync.WaitGroup
go func() {
for _, sub := range indexes {
wg.Add(1)
go func(sub int) {
defer wg.Done()
// some code which creates other
// wait groups and spans other go routines
// handle errors
if sub == 99 { // unreachable
errCh <- errors.New("new error")
}
}(sub)
select {
// If there is any error we better stop the
// loop
case <-stopCh:
return
default:
}
devCh <- sub
}
wg.Wait()
close(devCh)
}()
// Consume the results
var results []int
var wt sync.WaitGroup
wt.Add(1)
go func() {
defer wt.Done()
for s := range devCh {
results = append(results, s)
}
return
}()
done := make(chan struct{})
go func() {
wt.Wait()
close(done)
}()
L:
for {
select {
case err := <-errCh:
fmt.Printf("error was %v", err)
close(stopCh)
return
case <-done:
break L
default:
}
}
fmt.Printf("all done, %v", results)
}
Редактировать: добавлен рабочий код.
Редактировать: добавлен код ближе к реальному коду, который может объяснить необходимость цикла for.
package main
import "fmt"
import "sync"
import "errors"
func main() {
indexes := []int{1, 2, 3, 4, 5, 6, 7}
indexesString := []string{"a", "b", "c", "d"}
devChS := make(chan string, 1000)
devCh := make(chan int, 7)
stopCh := make(chan struct{})
errCh := make(chan error, 7)
var wg sync.WaitGroup
go func() {
for _, sub := range indexes {
wg.Add(1)
go func(sub int) {
defer wg.Done()
// some code which creates other
// wait groups and spans other go routines
// handle errors
if sub == 99 { // unreachable
errCh <- errors.New("new error")
}
wg.Add(1)
go func(sub int) {
defer wg.Done()
for _, s := range indexesString {
devChS <- fmt.Sprintf("%s %s", s, sub)
}
return
}(sub)
}(sub)
select {
// If there is any error we better stop the
// loop
case <-stopCh:
return
default:
}
devCh <- sub
}
wg.Wait()
close(devCh)
close(devChS)
}()
// Consume the results
var results = struct {
integers []int
strings []string
}{}
var wt sync.WaitGroup
wt.Add(1)
go func() {
defer wt.Done()
for s := range devCh {
results.integers = append(results.integers, s)
}
return
}()
wt.Add(1)
go func() {
defer wt.Done()
for s := range devChS {
results.strings = append(results.strings, s)
}
return
}()
done := make(chan struct{})
go func() {
wt.Wait()
close(done)
}()
L:
for {
select {
case err := <-errCh:
fmt.Printf("error was %v", err)
close(stopCh)
return
case <-done:
break L
default:
}
}
fmt.Printf("all done, can return the results: %v", results)
}
Этот канал 'done' создается и закрывается в пределах 3 строк. Это также то, как вы выходите из своего бесконечного цикла. Возможно, вам следует рассмотреть его использование? –
Я не уверен, понимаю ли я. Я закрываю канал так, чтобы выбор мог прерываться.Он ломается по замкнутому каналу, не так ли? –
@Simon Whitehead Вот фрагмент, который доказывает, что он работает. http://play.golang.org/p/hJg1fbfO5d –