2013-04-07 1 views
7

У меня есть кусок каналов, которые все получают то же самое сообщение:Убирают ли каналы сохранение при блокировке?

func broadcast(c <-chan string, chans []chan<- string) { 
    for msg := range c { 
     for _, ch := range chans { 
      ch <- msg 
     } 
    } 
} 

Однако, поскольку каждый из каналов в chans потенциально считываются с разной скоростью, я не хочу, чтобы блокировать другие каналы когда я получаю медленного потребителя. Я решил эту проблему с goroutines:

func broadcast(c <-chan string, chans []chan<- string) { 
    for msg := range c { 
     for _, ch := range chans { 
      go func() { ch <- msg }() 
     } 
    } 
} 

Однако порядок сообщений, которые получают передаваемыми в каждый канал имеет важное значение. Я посмотрел на спецификации, чтобы увидеть, если каналы сохранить порядок, когда блокируется, и все, что я нашел, было это:

Если емкость больше нуля, то канал является асинхронным: коммуникационные операции удастся без блокировки, если буфер не полный (отправляет) или не пустой (получает), и элементы принимаются в том порядке, в котором они отправлены.

Для меня, если запись заблокирована, то она не отправляется, а ждет отправки. В этом предположении выше сказанное ничего не говорится о порядке отправки, когда несколько писем блокируются при записи.

Есть ли гарантии относительно порядка отправки после разблокировки канала?

+0

Я бы посоветовал тщательно обернуть (захватить) переменные, используемые внутри goroutine, в параметры: go func (ch2 <-chan string, msg2 string) {ch2 <- msg2} (ch, msg). Вот небольшая демонстрация, которая не захватывает приводящее к неустойчивому поведению. Http://play.golang.org/p/9RTKvmRe5K – Deleplace

+0

Дальнейший комментарий к реальному ответу (который уже был хорошо объяснен). Просто хотел указать, что есть пакет, который реализует канал с буферизацией, поэтому вам не нужно писать свой собственный код для удаления старых сообщений: https://godoc.org/github.com/eapache/channels – Evan

ответ

6

Нет, никаких гарантий.

Даже если канал не заполнен, если два гортана запускаются примерно в одно и то же время, чтобы отправить его, я не думаю, что есть какая-то гарантия того, что сначала запускается goroutine, который был запущен первым. Поэтому вы не можете рассчитывать на поступающие сообщения.

+0

Хм, интересно. Что делать, если я сделал что-то вроде Ask Bjorn Hansen, но в случае по умолчанию уволил goroutine, как в моем примере? Еще никаких гарантий? – tjameson

+0

tjameson: что вы хотите, если канал заполнен? Если вы не хотите отказаться от сообщения, почему бы не просто продлить канал? Если вы не хотите отбрасывать сообщения, но хотите знать, как далеко позади клиента, сделайте канал «другим путем», который возвращает номер сообщения или какой-либо такой, чтобы вы могли определить, какое сообщение было отправлено/отправлено сокет/что бы вы ни делали. –

+0

@ AskBjørnHansen - Я пишу вышибалу IRC, поэтому я думаю, что хочу отказаться от самого старого. Возможно, я просто сделаю 'if len (ch) == cap (ch) {<-ch} ch <- msg', но если заказ будет гарантирован, тогда, возможно, я подожду n секунд, чтобы клиент мог догнать, затем бросить клиента, если он слишком далеко позади. Поскольку порядок не гарантируется, я, вероятно, поеду с первым. – tjameson

4

Вы можете удалить сообщение, если канал заполнен (а затем установить флаг, чтобы приостановить работу клиента и отправить им сообщение о том, что они отбрасывают сообщения или что-то еще).

Что-то вдоль линий (непроверенные):

type Client struct { 
    Name string 
    ch chan<-string 
} 

func broadcast(c <-chan string, chans []*Client) { 
    for msg := range c { 
     for _, ch := range chans { 
      select { 
      case ch.ch <- msg: 
      // all okay 
      default: 
       log.Printf("Channel was full sending '%s' to client %s", msg, ch.Name) 
      } 
     } 
    } 
} 
+0

+1 Это решает мою актуальную проблему, но на самом деле не отвечает на мой вопрос. Спасибо хоть! Я забыл о 'select' и полных каналах, и это заставило меня рассмотреть спецификацию и эффективно пойти и найти кучу других приятных вещей. – tjameson

1

В этом коде, никаких гарантий.

Основная проблема с данным примером кода заключается не в поведении канала, а в многочисленных созданных goroutines. Все goroutines «запускаются» внутри одного и того же неотъемлемого цикла без дальнейшей синхронизации, поэтому даже до того, как они начнут отправлять сообщения, мы просто не знаем, какие из них будут выполняться в первую очередь.

Однако это поднимает законный вопрос в целом: если мы каким-то образом гарантируем порядок нескольких инструкций отправки блокировки, гарантируем ли мы их получать в том же порядке?

Свойство «происшествий» перед отправкой трудно создать.Я боюсь, что это невозможно, потому что:

  1. Все, что может случиться до того отправки инструкции: например, другие goroutines выполняют свои собственные отправки или не
  2. goroutine блокированности посылающий не может одновременно управлять другими видами синхронизация

Например, если у меня есть 10 goroutines с номерами от 1 до 10, у меня нет способа разрешить им отправлять свой номер на канал одновременно в правильном порядке. Все, что я могу сделать, это использовать различные виды последовательных трюков, например, делать сортировку в одном одном goroutine.

+0

Да, этот вопрос - это то, к чему я пытался добраться. Я мог бы придумать пример использования сна, но, похоже, в любом случае нет никакой гарантии.Я предполагаю, что каналы не являются особым случаем для планировщика, что, по-видимому, означало @andybalholm. – tjameson

0

Это дополнение к уже опубликованным ответам.

Как практически все заявили, что проблема порядок исполнения goroutines, вы можете легко координировать goroutine выполнение с использованием каналов, передавая по номеру goroutine вы хотите запустить:

func coordinated(coord chan int, num, max int, work func()) { 
    for { 
     n := <-coord 

     if n == num { 
      work() 
      coord <- (n+1) % max 
     } else { 
      coord <- n 
     } 
    } 
} 

coord := make(chan int) 

go coordinated(coord, 0, 3, func() { println("0"); time.Sleep(1 * time.Second) }) 
go coordinated(coord, 1, 3, func() { println("1"); time.Sleep(1 * time.Second) }) 
go coordinated(coord, 2, 3, func() { println("2"); time.Sleep(1 * time.Second) }) 

coord <- 0 

или с помощью центрального goroutine, который выполняет рабочих в упорядоченным образом:

func executor(funs chan func()) { 
    for { 
     worker := <-funs 
     worker() 
     funs <- worker 
    } 
} 

funs := make(chan func(), 3) 

funs <- func() { println("0"); time.Sleep(1 * time.Second) } 
funs <- func() { println("1"); time.Sleep(1 * time.Second) } 
funs <- func() { println("2"); time.Sleep(1 * time.Second) } 

go executor(funs) 

Эти методы будут, конечно, удалить все параллелизм из-за синхронизации. Тем не менее, остается неизменным.

+0

Я не тестировал, но думаю, вы не можете гарантировать, что горутин с ожидаемым числом будет работать. Я бы добавил «sleep» в «func Coordinated» после координирования <-n (внутри else). – zk82

+0

Да, я не могу быть уверен, что горутин с этим номером будет работать. Тем не менее, я могу заставить выполнение функции, которая выполняет фактическую работу, когда настало время. Первое решение - немного догадок, а второе гарантирует, что функции вызываются по порядку. – nemo

Смежные вопросы