2015-05-01 2 views
2

У меня есть сценарий, который выбирает некоторые данные из базы данных и отправляет их на канал, который будет обрабатываться несколькими goroutines, а затем результаты отправляются обратно в основной поток, который будет обновляться в базе данных.Go Channels: Как сделать это неблокирующим?

Тем не менее, он висит (возможно, блокирует) при отправке данных на первый канал.

Каналы создаются на глобальном уровне с:

var chin = make(chan in) 
var chout = make(chan out) 

in и out оба structs

Сначала запускаются goroutines:

for i:=0; i<5; i++ { 
    go worker() 
} 

Код для загрузки каналов затем:

 if verbose { 
      fmt.Println(`Getting nextbatch2 and sending to workers`) 
     } 

     rows, err = nextbatch2.Query() 
     if err != nil { 
      panic(err) 
     } 

     var numtodo int 
     for rows.Next() { 
      err = rows.Scan(&id, &data) 
      if err != nil { 
       rows.Close() 
       panic(err) 
      } 

      // Start 
      var vin in 
      vin.id = id 
      vin.data = data 
      chin <- vin 
      numtodo++ 
     } 
     rows.Close() 

Тогда сразу после этого:

 if verbose { 
      fmt.Println(`Processing out channel from workers`) 
     } 

     for res := range chout { 
      update5.Exec(res.data, res.id) 
      if numtodo--; numtodo == 0 { 
       break 
      } 
     } 

А на заднем плане многообразны worker() goroutines работает:

func worker() { 
     for res := range chin { 
      var v out 
      v.id = res.id 
      v.data = process(res.data) 
      chout <- v 
     } 
} 

Этот код висит после печати Getting nextbatch2 and sending to workers. Он никогда не доходит до Processing out channel from workers. Таким образом, он висит где-то внутри цикла rows.Next(), для которого я не могу понять причину, поскольку канал chin должен быть неблокирующим - даже если gotoutines worker() не обрабатывал его, он должен по крайней мере закончить этот цикл.

Любые идеи?

EDIT:

Добавив на fmt.Println(" on", numtodo) в конце цикла rows.Next() я могу видеть, что он блокирует после 5, которые я не понимаю, как это должно быть без блокировки, верно?

EDIT 2:

Изменяя каналов на make(chan in/out, 100) теперь будет блокировать после 105.

ответ

3

Приемники всегда блокируют до тех пор, пока данные получить. Если канал не буферизирован, отправитель блокирует, пока приемник не получит значение.

https://golang.org/doc/effective_go.html#channels

Таким образом, вы можете переписать потребительскую-код что-то вроде:

go func(){ 
    for res := range chout { 
     update5.Exec(res.data, res.id) 
    } 
}() 

Кроме того, необходимо close(chin) и close(chout) для правильного использования range заявления.

+0

Я не понимаю, что вы имеете в виду, перед этим блокируется. Проблема заключается не в блокировке приемника, а в приемнике. Проблема заключается в блокировке канала chin.Также их не нужно закрывать, поскольку все это зацикливается бесконечно, хотя вы не видите этот код здесь. – Alasdair

+1

Ваш 'worker()' не может отправить в 'chout', поскольку goroutine еще не читает' chout'. –

+0

Не собирается ли он в очереди? – Alasdair

Смежные вопросы