2015-03-26 2 views
-1

Я пытаюсь запустить несколько задач одновременно и немедленно вернуться, если есть какая-либо ошибка, не дожидаясь возврата всех подпрограмм. Код выглядит следующим образом. Я удалил шум, чтобы было легче переваривать, но я могу опубликовать полный код, если утечка не очевидна. Следует отметить, что я развертываю это на движке Google. Я не могу воспроизвести утечку на моей машине, но когда я заменю параллелизм после комментария // Consume the results, приложение работает нормально, хотя я не понимаю, почему, потому что код выглядит правильно для меня.Где протекающая рутина Go?

package main 

import "fmt" 
import "sync" 
import "errors" 

func main() { 
    indexes := []int{1, 2, 3, 4, 5, 6, 7} 
    devCh := make(chan int, 7) 
    stopCh := make(chan struct{}) 
    errCh := make(chan error, 7) 
    var wg sync.WaitGroup 
    go func() { 
     for _, sub := range indexes { 
      wg.Add(1) 
      go func(sub int) { 
       defer wg.Done() 
       // some code which creates other 
       // wait groups and spans other go routines 
       // handle errors 
       if sub == 99 { // unreachable 
        errCh <- errors.New("new error") 

       } 
      }(sub) 
      select { 
      // If there is any error we better stop the 
      // loop 
      case <-stopCh: 
       return 
      default: 
      } 
      devCh <- sub 
     } 
     wg.Wait() 
     close(devCh) 
    }() 
    // Consume the results 
    var results []int 
    var wt sync.WaitGroup 
    wt.Add(1) 
    go func() { 
     defer wt.Done() 
     for s := range devCh { 
      results = append(results, s) 
     } 
     return 
    }() 
    done := make(chan struct{}) 
    go func() { 
     wt.Wait() 
     close(done) 
    }() 

L: 
    for { 
     select { 
     case err := <-errCh: 
      fmt.Printf("error was %v", err) 
      close(stopCh) 
      return 
     case <-done: 
      break L 
     default: 
     } 
    } 
    fmt.Printf("all done, %v", results) 
} 

Редактировать: добавлен рабочий код.

Редактировать: добавлен код ближе к реальному коду, который может объяснить необходимость цикла for.

package main 

import "fmt" 
import "sync" 
import "errors" 

func main() { 
    indexes := []int{1, 2, 3, 4, 5, 6, 7} 
    indexesString := []string{"a", "b", "c", "d"} 
    devChS := make(chan string, 1000) 

    devCh := make(chan int, 7) 
    stopCh := make(chan struct{}) 
    errCh := make(chan error, 7) 
    var wg sync.WaitGroup 
    go func() { 
     for _, sub := range indexes { 
      wg.Add(1) 
      go func(sub int) { 
       defer wg.Done() 
       // some code which creates other 
       // wait groups and spans other go routines 
       // handle errors 
       if sub == 99 { // unreachable 
        errCh <- errors.New("new error") 

       } 
       wg.Add(1) 
       go func(sub int) { 
        defer wg.Done() 
        for _, s := range indexesString { 
         devChS <- fmt.Sprintf("%s %s", s, sub) 

        } 

        return 
       }(sub) 
      }(sub) 
      select { 
      // If there is any error we better stop the 
      // loop 
      case <-stopCh: 
       return 
      default: 
      } 
      devCh <- sub 
     } 
     wg.Wait() 
     close(devCh) 
     close(devChS) 
    }() 
    // Consume the results 
    var results = struct { 
     integers []int 
     strings []string 
    }{} 
    var wt sync.WaitGroup 
    wt.Add(1) 
    go func() { 
     defer wt.Done() 
     for s := range devCh { 
      results.integers = append(results.integers, s) 
     } 
     return 
    }() 
    wt.Add(1) 
    go func() { 
     defer wt.Done() 
     for s := range devChS { 
      results.strings = append(results.strings, s) 
     } 
     return 
    }() 
    done := make(chan struct{}) 
    go func() { 
     wt.Wait() 
     close(done) 
    }() 

L: 
    for { 
     select { 
     case err := <-errCh: 
      fmt.Printf("error was %v", err) 
      close(stopCh) 
      return 
     case <-done: 
      break L 
     default: 
     } 
    } 
    fmt.Printf("all done, can return the results: %v", results) 
} 
+0

Этот канал 'done' создается и закрывается в пределах 3 строк. Это также то, как вы выходите из своего бесконечного цикла. Возможно, вам следует рассмотреть его использование? –

+0

Я не уверен, понимаю ли я. Я закрываю канал так, чтобы выбор мог прерываться.Он ломается по замкнутому каналу, не так ли? –

+0

@Simon Whitehead Вот фрагмент, который доказывает, что он работает. http://play.golang.org/p/hJg1fbfO5d –

ответ

1

Т.Л., д-р: Цикл, который ничего не делает, но повторить нелипкую проверку, пока не удается может привести к труднодоступному диагностировать проблемы (как минимум, он может злоупотребить процессор); использование проверки блокировки может исправить это.

Я не настолько уверен в деталях вашего дела; Я написал цикл, похожий на ваш, который постоянно зависает с «процессом, занятым слишком долго» на Playground, но когда я его запускаю локально, он завершается.

Как я уже говорил, я бы постарался и для более простого дизайна.


Go only has limited pre-emption of running goroutines: бег нить только дает управление к goroutine планировщиком, когда операция блокировки (как I/O или канал операционного или ждать, чтобы взять замок) происходит.

Так с GOMAXPROCS=1, если (одна) бегущая нить начинает зацикливаться, ничто еще не обязательно получит шанс запустить.

А for { select { ...default: } } может поэтому начать цикл проверки элементов в канале, но никогда не отказываться от контроля основного потока, чтобы другой голубь мог написать элемент. Другой код запускается в любом случае, когда GOMAXPROCS превышает 1, но не тогда, когда он равен 1, как в App Engine (или игровой площадке). Поведение зависит не только от GOMAXPROCS, но от того, что сначала запускается goroutine, что не обязательно определено.

Чтобы избежать этой ситуации, удалите default:, так что select - это операция блокировки, которая дает планировщику, когда он не может получить элемент, позволяя запускать другой код. Вы можете обобщить это на другие случаи, когда вы можете делать цикл без проверки; любой из них мог бы постоянно перегружать ресурсы, постоянно перепроверяя, когда блокирующий вызов не будет. Когда GOMAXPROCS>1 или ограниченное ограничение времени выполнения сохраняет вас, опрос (как называется повторная проверка) может по-прежнему потреблять больше ЦП, чем блокирование.

Например, это терпит неудачу с «процесс занимает слишком много времени» on the Playground, хотя раздражающе он завершает надежно на моей машине:

package main 

import "fmt" 

func main() { 
    c := make(chan struct{}) 
    go func() { c <- struct{}{} }() 
    for { 
     select { 
     case <-c: 
      fmt.Println("success") 
      return 
     default: 
     } 
    } 
} 

Я не могу сказать, если есть и другие проблемы, но зависнуть на Образец, похожий на образец, заслуживает внимания.

+0

Кажется, что ответ через изменение не имеет для меня большого смысла, поскольку цикл for определенно не голодает за ресурсы (то есть дешево). Некоторые сетевые коды также используют такие циклы, но, возможно, я просто ошибаюсь. –

+0

Один проход через цикл является дешевым, но он никогда не уступает планировщику, поэтому другой код может отправить элемент для его получения, поэтому он заканчивается бесконечным циклом. У меня нет источника, но в сетевом коде вы будете делать блокирующий вызов (в этом случае часто просто «Dial()»/'Read()'/'Write()'), который дает планировщик и позволяет другим гортанам выполнять другую работу. – twotwotwo

+0

Это может быть слишком много, но есть ли способ, которым я мог бы «обнаружить» эту проблему, не имея на самом деле знать внутренности/поведение планировщика? –

Смежные вопросы