2016-06-05 2 views
2

Я использую golang для реализации простого управляемого событиями работника. Это как это:Событие, управляемое рисунком в golang

go func() { 
     for { 
      select { 
      case data := <-ch: 
       time.Sleep(1) 
       someGlobalMap[data.key] = data.value 
      } 
     } 
    }() 

И главная функция создает несколько goroutines, и каждый из них будет делать вещи, как это:

ch <- data 
fmt.Println(someGlobalMap[data.key]) 

Как вы можете видеть, что, потому что мой рабочий, некоторые время, чтобы сделать работу, я получу нулевой результат в своей основной функции. Как я могу правильно управлять этим рабочим процессом?

+0

Чтение карты должно быть синхронизировано с письмами goroutines. Попробуйте добавить 'ch <- data' в качестве другого случая в предложении select. – Nadh

ответ

5

EDIT: Может быть, я неправильно понял ваш вопрос, я вижу, что вы говорите, что главный начнет много производителей goroutines. Я думал, что было много потребителя goroutines и одного производителя. Оставляя ответ здесь, если он может быть полезен для других, которые ищут этот шаблон, хотя пункты маркера все еще применяются к вашему делу.

Так что, если я правильно понимаю ваш прецедент, вы не можете ожидать отправки по каналу и сразу же прочитать результаты. Вы не знаете, когда рабочий обработает эту отправку, вам необходимо установить связь между goroutines, и это делается с помощью каналов. Предполагая, что просто вызов функции с возвращаемым значением не работает в вашем сценарии, если вам действительно нужно отправить работнику, затем заблокируйте, пока не получите результат, вы можете отправить канал как часть структуры данных, получить на него после отправки, то есть:

resCh := make(chan Result) 
ch <- Data{key, value, resCh} 
res := <- resCh 

Но вы, вероятно, должны попытаться сломать работу в качестве трубопровода самостоятельных шагов вместо этого, увидеть сообщение в блоге, что я связан в первоначальном ответе.


Оригинальный ответ, где я думал, что это был один производитель - несколько потребителей/работников картина:

Это общая картина, для которой Гоу goroutines и каналы семантика очень хорошо подходит. Есть несколько вещей, которые вам нужно иметь в виду:

  • Основная функция не будет автоматически ждать завершения горутин. Если больше нечего делать в основном, программа выходит и у вас нет ваших результатов.

  • Глобальная карта, используемая вами не для потоковой передачи. Вам нужно синхронизировать доступ с помощью мьютекса, но есть лучший способ - использовать выходной канал для результатов, который уже синхронизирован.

  • Вы можете использовать канал ... по каналу, и вы можете безопасно обмениваться каналом между несколькими goroutines. Как мы увидим, эта модель довольно элегантна для написания.

площадка: https://play.golang.org/p/WqyZfwldqp

Более подробную информацию о Go трубопроводов и моделей параллелизма, ввести обработку ошибок, раннее списание и т.д.: https://blog.golang.org/pipelines

Комментарии код потребительной случае вы упоминаете:

// could be a command-line flag, a config, etc. 
const numGoros = 10 

// Data is a similar data structure to the one mentioned in the question. 
type Data struct { 
    key string 
    value int 
} 

func main() { 
    var wg sync.WaitGroup 

    // create the input channel that sends work to the goroutines 
    inch := make(chan Data) 
    // create the output channel that sends results back to the main function 
    outch := make(chan Data) 

    // the WaitGroup keeps track of pending goroutines, you can add numGoros 
    // right away if you know how many will be started, otherwise do .Add(1) 
    // each time before starting a worker goroutine. 
    wg.Add(numGoros) 
    for i := 0; i < numGoros; i++ { 
     // because it uses a closure, it could've used inch and outch automaticaly, 
     // but if the func gets bigger you may want to extract it to a named function, 
     // and I wanted to show the directed channel types: within that function, you 
     // can only receive from inch, and only send (and close) to outch. 
     // 
     // It also receives the index i, just for fun so it can set the goroutines' 
     // index as key in the results, to show that it was processed by different 
     // goroutines. Also, big gotcha: do not capture a for-loop iteration variable 
     // in a closure, pass it as argument, otherwise it very likely won't do what 
     // you expect. 
     go func(i int, inch <-chan Data, outch chan<- Data) { 
      // make sure WaitGroup.Done is called on exit, so Wait unblocks 
      // eventually. 
      defer wg.Done() 

      // range over a channel gets the next value to process, safe to share 
      // concurrently between all goroutines. It exits the for loop once 
      // the channel is closed and drained, so wg.Done will be called once 
      // ch is closed. 
      for data := range inch { 
       // process the data... 
       time.Sleep(10 * time.Millisecond) 
       outch <- Data{strconv.Itoa(i), data.value} 
      } 
     }(i, inch, outch) 
    } 

    // start the goroutine that prints the results, use a separate WaitGroup to track 
    // it (could also have used a "done" channel but the for-loop would be more complex, with a select). 
    var wgResults sync.WaitGroup 
    wgResults.Add(1) 
    go func(ch <-chan Data) { 
     defer wgResults.Done() 

     // to prove it processed everything, keep a counter and print it on exit 
     var n int 
     for data := range ch { 
      fmt.Println(data.key, data.value) 
      n++ 
     } 

     // for fun, try commenting out the wgResults.Wait() call at the end, the output 
     // will likely miss this line. 
     fmt.Println(">>> Processed: ", n) 
    }(outch) 

    // send work, wherever that comes from... 
    for i := 0; i < 1000; i++ { 
     inch <- Data{"main", i} 
    } 

    // when there's no more work to send, close the inch, so the goroutines will begin 
    // draining it and exit once all values have been processed. 
    close(inch) 

    // wait for all goroutines to exit 
    wg.Wait() 

    // at this point, no more results will be written to outch, close it to signal 
    // to the results goroutine that it can terminate. 
    close(outch) 

    // and wait for the results goroutine to actually exit, otherwise the program would 
    // possibly terminate without printing the last few values. 
    wgResults.Wait() 
} 

В реальных сценариях, где объем работ не известен заранее, закрытие в-канала может прийти от, например сигнал SIGINT. Просто убедитесь, что ни один кодовый путь не может отправлять работу после закрытия канала, поскольку это будет паниковать.

+0

Вы сделали симуляции в go? Не могли бы вы поделиться кодом или просто сказать, какие дополнительные функции или библиотеки golang вы использовали? –