2017-02-09 1 views
0

Это был провал моего существования.Внедрение семафора с несколькими производителями (с гортанами)

type ec2Params struct { 
    sess *session.Session 
    region string 
} 

type cloudwatchParams struct { 
    cl cloudwatch.CloudWatch 
    id string 
    metric string 
    region string 
} 

type request struct { 
    ec2Params 
    cloudwatchParams 
} 

// Control concurrency and sync 
var maxRoutines = 128 
var sem chan bool 
var req chan request 

func main() { 
    sem := make(chan bool, maxRoutines) 
    for i := 0; i < maxRoutines; i++ { 
     sem <- true 
    } 
    req := make(chan request) 
    go func() { // This is my the producer 
     for _, arn := range arns { 
      arnCreds := startSession(arn) 
      for _, region := range regions { 
       sess, err := session.NewSession(
        &aws.Config{****}) 
       if err != nil { 
        failOnError(err, "Can't assume role") 
       } 
       req <- request{ec2Params: ec2Params{ **** }} 
      } 
     } 
    }() 
    for f := range(req) { 
     <- sem 
     if (ec2Params{}) != f.ec2Params { 
      go getEC2Metrics(****) 
     } else { 
      // I should be excercising this line of code too, 
      // but I'm not :(
      go getMetricFromCloudwatch(****) 
     } 
     sem <- true 
    } 
} 

getEC2Metrics и getCloudwatchMetrics являются goroutines для выполнения

func getMetricFromCloudwatch(cl cloudwatch.CloudWatch, id, metric, region string) { 
    // Magic 
} 

func getEC2Metrics(sess *session.Session, region string) { 
    ec := ec2.New(sess) 
    var ids []string 
    l, err := ec.DescribeInstances(&ec2.DescribeInstancesInput{}) 
    if err != nil { 
     fmt.Println(err.Error()) 
    } else { 
     for _, rsv := range l.Reservations { 
      for _, inst := range rsv.Instances { 
       ids = append(ids, *inst.InstanceId) 
      } 
     } 
     metrics := cfg.AWSMetric.Metric 
     if len(ids) >= 0 { 
      cl := cloudwatch.New(sess) 
      for _, id := range ids{ 
       for _, metric := range metrics { 
        // For what I can tell, execution get stuck here 
        req <- request{ cloudwatchParams: ***** }} 
       } 
      } 
     } 
    } 
} 

Оба анонимного производителя в main и getEC2Metrics должны публиковать данные в req асинхронно, но до сих пор кажется, что все, что getEC2Metrics публикует на канал никогда не обрабатывается. Похоже, что что-то мешает мне публиковать изнутри горутин, но я ничего не нашел. Я хотел бы знать, как это сделать и выпустить исправленное поведение (это, фактически, рабочий семафор).

Основание реализации можно найти здесь: https://burke.libbey.me/conserving-file-descriptors-in-go/

+1

Вы не используете свой семафор, вы просто вынимаете токен, отправляете goroutine и кладете маркер обратно - вы никогда не блокируете. (BTW, вам не нужно «заполнять» семафор, вы можете отменить его и отправить токен, чтобы взять семафор, и получить токен для его выпуска) – JimB

+0

Вы полностью правы! Я обновил это и нашел другую проблему, которая заключается в том, что мне нужно передать ссылку на канал для моей функции getEC2Metrics; По какой-то причине я не могу использовать глобальное определение. –

ответ

0

Im бесится, комментарий JimB сделал вращение колеса, и теперь я решил это!

// Control concurrency and sync 
var maxRoutines = 128 
var sem chan bool 
var req chan request // Not reachable inside getEC2Metrics 

func getEC2Metrics(sess *session.Session, region string, req chan <- request) { 
    .... 
    .... 
      for _, id := range ids{ 
       for _, metric := range metrics { 
        req <- request{ **** }} // When using the global req, 
              // this would block 
       } 
      } 
    .... 
    .... 
} 

func main() { 
    sem := make(chan bool, maxRoutines) 
    for i := 0; i < maxRoutines; i++ { 
     sem <- true  
    } 
    req := make(chan request) 
    go func() { 
     // Producing tasks 
    }() 
    for f := range(req) { 
     <- sem // checking out tickets outside the goroutine does block 
       //outside of the goroutine 
     go func() { 
      defer func() { sem <- true }() 
      if (ec2Params{}) != f.ec2Params { 
       getEC2Metrics(****, req) // somehow sending the channel makes 
             // possible to publish to it 
      } else { 
       getMetricFromCloudwatch(****) 
      } 
     }() 
    } 
} 

Были два вопроса:

  1. семафора не блокирующие (я думаю, это потому, что я проверял и в жетонах внутри goroutine, так что состояние гонки возможно).
  2. По какой-то причине глобальный канал req не был правильно исправлен getEC2Metrics, поэтому он оставил бы alll goroutines застрявшим, пытаясь опубликовать канал, который, по-видимому, был в области видимости, но это было не так (я действительно не знаю, пока не знаю).

Я честно просто повезло со вторым предметом, до сих пор я не нашел никаких документов относительно этой причуды, но в конце я рад, что он работает.

+0

Не существует «условия гонки» при отправке и приеме с канала, это примитив синхронизации, предназначенный для одновременного использования. Вы затеняете глобальный канал 'req' в основном. – JimB