2012-06-11 6 views
11

Идея состоит в том, чтобы иметь переменное количество каналов в срезе, нажимать каждое полученное через них значение в один канал и закрывать этот выходной канал после закрытия последнего из входных каналов. Нечто подобное, но для ряда каналов более два:Можно ли мультиплексировать несколько каналов в один?

func multiplex(cin1, cin2, cout chan int) { 
    n := 2 
    for { 
     select { 
     case v, ok := <-cin1: 
      if ok { 
       cout <- v 
      } else { 
       n -= 1 
      } 

     case v, ok := <-cin2: 
      if ok { 
       cout <- v 
      } else { 
       n -= 1 
      } 
     } 

     if n == 0 { 
      close(cout) 
      break 
     } 
    } 
} 

выше код избегает занято зацикливания, так как нет default случая, который хорошо (EDIT: это похоже на присутствие «хорошо» делает оператор select неблокирующим, и цикл занят в конце концов. Но для примера подумайте о коде, как будто он будет блокироваться). Может ли такая же функциональность быть достигнута и с произвольным количеством входных каналов? Очевидно, что это можно было бы сделать, уменьшив разрез попарно до одного канала, но я был бы более заинтересован в более простом решении, если это возможно.

ответ

24

Я считаю, что этот фрагмент делает то, что вы ищете. Я изменил подпись, чтобы было ясно, что входы и выходы должны использоваться только для связи в одном направлении. Обратите внимание на добавление sync.WaitGroup, вам нужно каким-то образом для всех входов сообщить, что они были выполнены, и это довольно легко.

func combine(inputs []<-chan int, output chan<- int) { 
    var group sync.WaitGroup 
    for i := range inputs { 
    group.Add(1) 
    go func(input <-chan int) { 
     for val := range input { 
     output <- val 
     } 
     group.Done() 
    } (inputs[i]) 
    } 
    go func() { 
    group.Wait() 
    close(output) 
    }() 
} 
+1

Ах, очень хорошее решение, понятно и лаконично. Спасибо! – elpres

+0

Теперь есть пакет с функцией (https://godoc.org/github.com/eapache/channels#Multiplex), который решает проблему, используя отражение вместо нескольких goroutines. – Evan

0

Используя goroutines, я произвел это. Это то, что вы хотите?

package main 

import (
    "fmt" 
) 

func multiplex(cin []chan int, cout chan int) { 
    n := len(cin) 
    for _, ch := range cin { 
     go func(src chan int) { 
      for { 
       v, ok := <-src 
       if ok { 
        cout <- v 
       } else { 
        n-- // a little dangerous. Maybe use a channel to avoid missed decrements 
        if n == 0 { 
         close(cout) 
        } 
        break 
       } 
      } 
     }(ch) 
    } 
} 

// a main to test the multiplex 
func main() { 
    cin := make([]chan int, 3) 
    cin[0] = make(chan int, 2) 
    cin[1] = make(chan int, 2) 
    cin[2] = make(chan int, 2) 
    cout := make(chan int, 2) 
    multiplex(cin, cout) 
    cin[1] <- 1 
    cin[0] <- 2 
    cin[2] <- 3 
    cin[1] <- 4 
    cin[0] <- 5 
    close(cin[1]) 
    close(cin[0]) 
    close(cin[2]) 
    for { 
     v, ok := <-cout 
     if ok { 
      fmt.Println(v) 
     } else { 
      break 
     } 
    } 
} 

EDIT: Ссылки:

http://golang.org/ref/spec#Receive_operator

http://golang.org/ref/spec#Close

+0

Документы говорят, что если вы читаете значение из канала с «, ok», операция не блокируется. Значение 'ok' тогда просто« false », и выполнение продолжается.Если это правильно (я новичок в Go и не могу сказать), то если канал пуст, но еще не закрыт, строка 'if ok' будет оцениваться как' false' и выполнить ветвь 'else'. Но если вы замените «v, ok: = <- src» и «if» с помощью оператора select, это может сработать. Надо проверить это. Спасибо вам за ответ, кстати. – elpres

+1

Где вы узнали, что операция не блокируется? Я не нахожу его, и он, похоже, не соответствует тому, что я наблюдаю. Я прочитал из документа, что он не блокирует *, как только канал закрыт *. –

+1

Это похоже на более раннюю версию спецификации, например. [здесь] (http://go.googlecode.com/hg/doc/go_spec.html?r=c64e293#Communication_operators), посмотрите последний абзац перед «Выражения метода». В текущей версии этот фрагмент немного изменился и говорит, что «нулевое значение возвращается, потому что канал« закрыт »и« empty_ (false) ». Это звучит как 'false', возвращается только после того, как каналы слиты и закрыты, верно? Это будет означать, что я ошибаюсь. – elpres

2

Edit: добавлена ​​парного пример сокращения кода и переупорядочению части ответа.

Предпочтительным решением является отказ от реструктуризации, так что у вас нет куска каналов. В реструктуризации часто может использоваться функция, которую несколько goroutines могут отправлять на один канал. Поэтому вместо того, чтобы каждый из ваших источников отправлял по отдельным каналам, а затем приходилось иметь дело с получением из нескольких каналов, просто создайте один канал и позвольте всем источникам отправлять по этому каналу.

Go не предлагает функцию для приема от куска каналов. Это часто задаваемый вопрос, и хотя предпочтительное решение является предпочтительным, есть способы его программирования. Решение, которое, как я думал, вы предлагаете в своем первоначальном вопросе, говоря, что «сокращение разреза попарно» является решением двоичного разрыва и покорения. Это работает отлично, если у вас есть решение для мультиплексирования двух каналов в одно. Ваш примерный код для этого очень близок к работе.

Вам просто не хватает одного маленького трюка, чтобы ваш примерный код работал. Где вы уменьшаете n, добавьте строку, чтобы установить переменную канала в nil. Например, я сделал код, читаемый

case v, ok := <-cin1: 
     if ok { 
      cout <- v 
     } else { 
      n-- 
      cin1 = nil 
     } 
    case v, ok := <-cin2: 
     if ok { 
      cout <- v 
     } else { 
      n-- 
      cin2 = nil 
     } 
    } 

Это решение делает то, что вы хотите, и не ожидание.

Итак, полный пример включения этого решения в функцию, которая мультиплексирует кусочек:

package main 

import (
    "fmt" 
    "time" 
) 

func multiplex(cin []chan int, cout chan int) { 
    var cin0, cin1 chan int 
    switch len(cin) { 
    case 2: 
     cin1 = cin[1] 
     fallthrough 
    case 1: 
     cin0 = cin[0] 
    case 0: 
    default: 
     cin0 = make(chan int) 
     cin1 = make(chan int) 
     half := len(cin)/2 
     go multiplex(cin[:half], cin0) 
     go multiplex(cin[half:], cin1) 
    } 
    for cin0 != nil || cin1 != nil { 
     select { 
     case v, ok := <-cin0: 
      if ok { 
       cout <- v 
      } else { 
       cin0 = nil 
      } 
     case v, ok := <-cin1: 
      if ok { 
       cout <- v 
      } else { 
       cin1 = nil 
      } 
     } 
    } 
    close(cout) 
} 

func main() { 
    cin := []chan int{ 
     make(chan int), 
     make(chan int), 
     make(chan int), 
    } 
    cout := make(chan int) 
    for i, c := range cin { 
     go func(x int, cx chan int) { 
      for i := 1; i <= 3; i++ { 
       time.Sleep(100 * time.Millisecond) 
       cx <- x*10 + i 
      } 
      close(cx) 
     }(i, c) 
    } 
    go multiplex(cin, cout) 
    for { 
     select { 
     case v, ok := <-cout: 
      if ok { 
       fmt.Println("main gets", v) 
      } else { 
       return 
      } 
     } 
    } 
} 
+1

Нет, не совсем. То, что я ищу как функцию с подписями 'func multiplex (cin [] chan int, cout chan int)', то есть тот, который может работать на произвольном количестве входных каналов вместо того, чтобы быть жестко закодированным до двух. – elpres