2016-05-25 3 views
5

Я пытаюсь провести качели на параллелизм в Голанге, реорганизовывая одну из моих утилит командной строки за последние несколько дней, но я застрял.Почему мой канал Голанг пишет блокировку навсегда?

Here's Оригинальный код (главная ветка).

Here's ветка с параллелизмом (ветка x_concurrent).

Выполнения параллельного кода go run jira_open_comment_emailer.go, то defer wg.Done() никогда не выполняется, если проблема JIRA добавляется в канал here, что вызывает мое wg.Wait() повесить навсегда.

Идея состоит в том, что у меня есть большое количество проблем JIRA, и я хочу открутить goroutine для каждого, чтобы увидеть, есть ли у него комментарий, на который мне нужно ответить. Если это так, я хочу добавить его в некоторую структуру (я выбрал канал после некоторого исследования), который позже я могу прочитать, как очередь, чтобы создать напоминание по электронной почте.

Вот соответствующий раздел кода:

// Given an issue, determine if it has an open comment 
// Returns true if there is an open comment on the issue, otherwise false 
func getAndProcessComments(issue Issue, channel chan<- Issue, wg *sync.WaitGroup) { 
    // Decrement the wait counter when the function returns 
    defer wg.Done() 

    needsReply := false 

    // Loop over the comments in the issue 
    for _, comment := range issue.Fields.Comment.Comments { 
     commentMatched, err := regexp.MatchString("~"+config.JIRAUsername, comment.Body) 
     checkError("Failed to regex match against comment body", err) 

     if commentMatched { 
      needsReply = true 
     } 

     if comment.Author.Name == config.JIRAUsername { 
      needsReply = false 
     } 
    } 

    // Only add the issue to the channel if it needs a reply 
    if needsReply == true { 
     // This never allows the defered wg.Done() to execute? 
     channel <- issue 
    } 
} 

func main() { 
    start := time.Now() 

    // This retrieves all issues in a search from JIRA 
    allIssues := getFullIssueList() 

    // Initialize a wait group 
    var wg sync.WaitGroup 

    // Set the number of waits to the number of issues to process 
    wg.Add(len(allIssues)) 

    // Create a channel to store issues that need a reply 
    channel := make(chan Issue) 

    for _, issue := range allIssues { 
     go getAndProcessComments(issue, channel, &wg) 
    } 

    // Block until all of my goroutines have processed their issues. 
    wg.Wait() 

    // Only send an email if the channel has one or more issues 
    if len(channel) > 0 { 
     sendEmail(channel) 
    } 

    fmt.Printf("Script ran in %s", time.Since(start)) 
} 
+3

У вас есть len (channel) 'повсюду, но этот канал не имеет длины, потому что он не забуферирован. Вам необходимо получить от канала для любых отправок для завершения (и в целом принятие решений на основе длины буферизованного канала является ошибкой, поскольку параллельные операции могут гоняться за изменением этого значения) – JimB

+0

Итак, если я делаю все моей записи на канал, ожидая их завершения, а затем чтения из канала ... это никогда не произойдет, потому что отправления никогда не будут завершены и не будут запускать 'defer wg.Done()'? Как бы вы решили реализовать этот параллелизм в целом? Кроме того, я не уверен, что вы правы на 'len (channel)', так как godocs заявляют, что он возвращает текущее количество элементов в канале, а не емкость, например 'cap (channel)' будет , https://golang.org/pkg/builtin/#len –

+0

'len (channel)' возвращает текущее количество элементов в буферизованном канале, но поскольку каналы обычно используются одновременно, результат 'len' «устаревает», как только вы его прочитаете. Как правило, у вас есть параллельные goroutines, отправляющие и получающие от канала. Я бы посоветовал снова перейти в раздел [Concurrency] (https://tour.golang.org/concurrency/1) в Tour Of Go, чтобы лучше понять, как работают каналы. – JimB

ответ

8

В goroutines блок на отправку в небуферизованном канала. Минимального изменения разблокирует goroutines является созданием буферного канала с емкостью для всех вопросов:

channel := make(chan Issue, len(allIssues)) 

и закрыть канал после вызова wg.Wait().

+2

Но это своего рода поражение цели канала как трубы между параллельными блоками .... – RickyA

+0

@RickyA Нет ничего плохого в использовании канала, это очередь элементов. –

+0

true, это экономит вам накладные расходы на прохождение мьютекса. – RickyA

Смежные вопросы