2015-10-25 2 views
1

Я новичок Пойти и пытаюсь реализовать простую балансировку нагрузки, как показано в этом соскальзывает: http://concur.rspace.googlecode.com/hg/talk/concur.html#slide-42Go Golang выберите оператор не может получить отослано значение

Полный код:

package main 

import (
    "fmt" 
    "time" 
    "container/heap" 
) 

type Request struct { 
    fn func(*Worker) int 
    c chan int 
} 

func requester(work chan <-Request) { 
    c := make(chan int) 
    work <- Request{workFn, c} 
    result := <-c 
    furtherProcess(result) 
} 

func workFn(w *Worker) int { 
    time.Sleep(1000 * time.Millisecond) 
    return w.index 
} 

func furtherProcess(result int) { 
    fmt.Println(result) 
} 

type Worker struct { 
    request chan Request 
    pending int 
    index int 
} 

func (w *Worker) work(done chan *Worker) { 
    for req := range w.request { 
     req.c <- req.fn(w) 
     fmt.Println("sending to done:", done) 
     done <- w 
     fmt.Println("sended to done") 
    } 
} 

type Pool []*Worker 

type Balancer struct { 
    pool Pool 
    done chan *Worker 
} 

func (b *Balancer) balance(work chan Request) { 
    for { 
     fmt.Println("selecting, done:", b.done) 
     select { 
     case req := <-work: 
      b.dispatch(req) 
     case w := <-b.done: 
      fmt.Println("completed") 
      b.completed(w) 
     } 
    } 
} 

func (p Pool) Len() int { 
    return len(p) 
} 

func (p Pool) Less(i, j int) bool { 
    return p[i].pending < p[j].pending 
} 

func (p Pool) Swap(i, j int) { 
    p[i], p[j] = p[j], p[i] 
} 

func (p *Pool) Push(x interface{}) { 
    *p = append(*p, x.(*Worker)) 
} 

func (p *Pool) Pop() interface{} { 
    old := *p 
    n := len(old) 
    x := old[n - 1] 
    *p = old[0 : n - 1] 
    return x 
} 

func (b *Balancer) dispatch(req Request) { 
    w := heap.Pop(&b.pool).(*Worker) 
    w.request <- req 
    w.pending++ 
    heap.Push(&b.pool, w) 
    fmt.Println("dispatched to worker", w.index) 
} 

func (b *Balancer) completed(w *Worker) { 
    w.pending-- 
    heap.Remove(&b.pool, w.index) 
    heap.Push(&b.pool, w) 
} 

func Run() { 
    NumWorkers := 4 
    req := make(chan Request) 
    done := make(chan *Worker) 
    b := Balancer{make([]*Worker, NumWorkers), done} 
    for i := 0; i < NumWorkers; i++ { 
     w := Worker{make(chan Request), 0, i} 
     b.pool[i] = &w 
     go w.work(done) 
    } 
    go b.balance(req) 
    for i := 0; i < NumWorkers * 4; i++ { 
     go requester(req) 
    } 
    time.Sleep(200000 * time.Millisecond) 
} 

func main() { 
    Run() 
} 

Когда я побежал, я получил следующие результаты:

selecting, done: 0xc0820082a0 
dispatched to worker 0 
selecting, done: 0xc0820082a0 
dispatched to worker 3 
selecting, done: 0xc0820082a0 
dispatched to worker 2 
selecting, done: 0xc0820082a0 
dispatched to worker 1 
selecting, done: 0xc0820082a0 
sending to done: 0xc0820082a0 
sending to done: 0xc0820082a0 
3 
sending to done: 0xc0820082a0 
2 
1 
0 
sending to done: 0xc0820082a0 

Как вы можете видеть, это был выбор, и отсылка к одной трубе (сделано: 0xc0820082a0), но выбор не recei вешенное значение и блокировалось навсегда. Как такое могло произойти? В чем проблема с вышеуказанным кодом? Благодаря!

ответ

0

Использование kill -ABRT <PID> вы можете увидеть, что все рабочие будут заблокированы на done <- w, пока ваш Balancer заблокирован на w.request <- req, создавая затор (рабочие не могут идти дальше, пока балансир не получает их «сделали» сигналы, и балансир может» t идти дальше, пока выбранный работник не примет запрос).

Если вы замените done <- w на go func() { done <- w }(), вы увидите, что ваша программа будет обрабатывать 16 запросов без вешания.

Сторона примечания: вместо time.Sleep(200000 * time.Millisecond), посмотреть sync.WaitGroup