2013-10-04 2 views
4

Примечание - новичок в Go.Канальный мультиплексор

Я написал мультиплексор, который должен объединить выходы массива каналов в один. Счастлив конструктивной критикой.

func Mux(channels []chan big.Int) chan big.Int { 
    // Count down as each channel closes. When hits zero - close ch. 
    n := len(channels) 
    // The channel to output to. 
    ch := make(chan big.Int, n) 

    // Make one go per channel. 
    for _, c := range channels { 
     go func() { 
      // Pump it. 
      for x := range c { 
       ch <- x 
      } 
      // It closed. 
      n -= 1 
      // Close output if all closed now. 
      if n == 0 { 
       close(ch) 
      } 
     }() 
    } 
    return ch 
} 

Я проверяю его:

func fromTo(f, t int) chan big.Int { 
    ch := make(chan big.Int) 

    go func() { 
     for i := f; i < t; i++ { 
      fmt.Println("Feed:", i) 
      ch <- *big.NewInt(int64(i)) 
     } 
     close(ch) 
    }() 
    return ch 
} 

func testMux() { 
    r := make([]chan big.Int, 10) 
    for i := 0; i < 10; i++ { 
     r[i] = fromTo(i*10, i*10+10) 
    } 
    all := Mux(r) 
    // Roll them out. 
    for l := range all { 
     fmt.Println(l) 
    } 
} 

но мой выход очень странно:

Feed: 0 
Feed: 10 
Feed: 20 
Feed: 30 
Feed: 40 
Feed: 50 
Feed: 60 
Feed: 70 
Feed: 80 
Feed: 90 
Feed: 91 
Feed: 92 
Feed: 93 
Feed: 94 
Feed: 95 
Feed: 96 
Feed: 97 
Feed: 98 
Feed: 99 
{false [90]} 
{false [91]} 
{false [92]} 
{false [93]} 
{false [94]} 
{false [95]} 
{false [96]} 
{false [97]} 
{false [98]} 
{false [99]} 

Так мои вопросы:

  • Есть ли что-то я делать неправильно в Mux?
  • Почему я получаю только последние 10 из моего выходного канала?
  • Почему подача выглядит так странно? (1-й из каждого входного канала, весь последний канал, а затем ничего)
  • Есть ли лучший способ сделать это?

Мне нужно все входные каналы имеют равные права на выходной канал - то есть я не могу иметь все выходные данные из одного канала, а затем все из следующего и т.д.


Для тех, кто интересно - это было последний код после исправления и правильное использование (предположительно) из sync.WaitGroup

import (
    "math/big" 
    "sync" 
) 

/* 
    Multiplex a number of channels into one. 
*/ 
func Mux(channels []chan big.Int) chan big.Int { 
    // Count down as each channel closes. When hits zero - close ch. 
    var wg sync.WaitGroup 
    wg.Add(len(channels)) 
    // The channel to output to. 
    ch := make(chan big.Int, len(channels)) 

    // Make one go per channel. 
    for _, c := range channels { 
     go func(c <-chan big.Int) { 
      // Pump it. 
      for x := range c { 
       ch <- x 
      } 
      // It closed. 
      wg.Done() 
     }(c) 
    } 
    // Close the channel when the pumping is finished. 
    go func() { 
     // Wait for everyone to be done. 
     wg.Wait() 
     // Close. 
     close(ch) 
    }() 
    return ch 
} 

ответ

2

Каждые из вашего goroutines порождены от Mux заканчивает вытягивать из того же самого канала, так c обновляется на каждой итерации цикла –, они не просто фиксируют значение c. Вы получите ожидаемые результаты, если передать канал в goroutine так:

for _, c := range channels { 
    go func(c <-chan big.Int) { 
     ... 
    }(c) 
} 

Вы можете проверить эту модификацию here.

Еще одна возможная проблема заключается в том, что вы обрабатываете переменную n: если вы работаете с GOMAXPROCS != 1, у вас могут быть два goroutines, пытающихся обновить их сразу. Тип sync.WaitGroup был бы более безопасным способом дождаться завершения goroutines.

+0

Спасибо - это объясняет мою проблему точно. Получит ли результат одинаковые права на все каналы на любой архитектуре? – OldCurmudgeon

+0

Вы спрашиваете, будет ли планомерно распределяться каждый горутин, который кормит 'ch'? Я не знаю, определено это или нет. Если вам требуется конкретное чередование результатов, вам может понадобиться нечто большее. –

+0

Я обеспокоен тем, что в некоторых средах каждый канал может быть истощен до исчерпания, прежде чем следующий получит внешний вид. Этого следует избегать. Мне не нужна определенная последовательность, но мне нужен справедливый баланс между всеми каналами. – OldCurmudgeon

2

Немного после факта, я знаю, но я написал пакет, который реализует общую мультиплексную функцию, подобную этой. Он использует вызов «select» в пакете отражения для обеспечения эффективного и сбалансированного мультиплексирования без необходимости блокировки или группы ожидания.

1

Чтобы построить на James Hentridge ответ, идиоматический способ справиться с повторным присваиванием проблемы при использовании range заявления будет назначить локальную переменный в стоимость на карту:

for _, c := range channels { 
    c := c 
    go func() { 
    ... 
    }() 
} 
Смежные вопросы