2015-10-01 5 views
0

Я новичок в Go, и я учусь работать с goroutines.загрузка файлов с goroutines?

У меня есть функция, которая загружает изображения:

func imageDownloader(uri string, filename string) { 
    fmt.Println("starting download for ", uri) 

    outFile, err := os.Create(filename) 
    defer outFile.Close() 
    if err != nil { 
     os.Exit(1) 
    } 

    client := &http.Client{} 

    req, err := http.NewRequest("GET", uri, nil) 

    resp, err := client.Do(req) 
    defer resp.Body.Close() 

    if err != nil { 
     panic(err) 
    } 

    header := resp.ContentLength 
    bar := pb.New(int(header)) 
    rd := bar.NewProxyReader(resp.Body) 
    // and copy from reader 
    io.Copy(outFile, rd) 
} 

Когда я звоню сам по себе как часть другой функции, она загружает изображения полностью и там нет усеченных данных.

Однако, когда я пытаюсь изменить его, чтобы сделать его goroutine, изображения часто усекаются или файлы с нулевой длиной.

func imageDownloader(uri string, filename string, wg *sync.WaitGroup) { 
    ... 
    io.Copy(outFile, rd) 
    wg.Done() 
} 

func main() { 
var wg sync.WaitGroup 
wg.Add(1) 
go imageDownloader(url, file, &wg) 
wg.Wait() 
} 

Я неправильно использую WaitGroups? Что может быть причиной этого и как я могу его исправить?

Обновление:

Решено. Я разместил функцию wg.add() вне цикла. :(

ответ

3

Хотя я не уверен, что именно вызывает у вас проблемы, вот два варианта, как получить его обратно в рабочее состояние.

Во-первых, глядя на example of how to use waitgroups от синхронизации библиотеки, попробуйте позвонить defer wg.Done() в начало вашей функции, чтобы гарантировать, что даже если goroutine неожиданно закончится, чтобы ожидающая группа была правильно уменьшена.

Во-вторых, io.Copy возвращает ошибку, которую вы не проверяете. Это не очень хорошая практика, но в вашем конкретном случае это мешает вам увидеть, действительно ли есть ошибка в процедуре копирования. Проверьте ее и соответствующим образом обработайте. lso возвращает количество записанных байтов, что также может помочь вам.

+0

Выполнено. Однако ошибок нет, и это возвращает только длину файла, хотя остальные записываются на диск. Это странно. – mxplusb

3

Ваш пример не имеет ничего явно неправильного в использовании WaitGroups. Пока вы вызываете wg.Add() с тем же номером, что и количество запущенных goroutines, или увеличивая его на 1 при каждом запуске нового goroutine, это должно быть правильно.

Однако вы называете os.Exit и panic для некоторых ошибок условий в goroutine, поэтому если у вас есть более чем один из них работает, сбой в одном из них будет прекратить все из них, независимо от использования WaitGroups. Если он не работает без панического сообщения, я бы посмотрел на линию os.Exit(1).

Было бы хорошо, если вы начнете использовать defer wg.Done() в начале вашей функции, так что, даже если произошла ошибка, goroutine все еще уменьшает счетчик. Таким образом, ваш основной поток не будет завершаться, если один из goroutines возвращает ошибку.

+0

Я изменил на 'defer wg.Done()' в начале и удалил 'os.Exit()' и 'panic()'. Тем не менее, он по-прежнему усекает изображения. В текущем наборе данных было 41 отдельный goroutines, как вы думаете, это ограничение моей файловой системы, пытающейся сразу написать 41 отдельный файл? – mxplusb

0

Одно изменение, которое я сделал бы в вашем примере, это плечо defer, когда вы находитесь Done. Я думаю, что это defer ws.Done() должно быть первым утверждением в вашей функции.

Мне нравится WaitGroup простота. Однако мне не нравится, что нам нужно передать ссылку на goroutine, потому что это будет означать, что логика параллелизма будет смешана с вашей бизнес-логикой.

Так что я пришел с этой общей функции, чтобы решить эту проблему для меня:

// Parallelize parallelizes the function calls 
func Parallelize(functions ...func()) { 
    var waitGroup sync.WaitGroup 
    waitGroup.Add(len(functions)) 

    defer waitGroup.Wait() 

    for _, function := range functions { 
     go func(copy func()) { 
      defer waitGroup.Done() 
      copy() 
     }(function) 
    } 
} 

Так что ваш пример может быть решена следующим образом:

func imageDownloader(uri string, filename string) { 
    ... 
    io.Copy(outFile, rd) 
} 

func main() { 
    functions := []func(){} 
    list := make([]Object, 5) 
    for _, object := range list { 
     function := func(obj Object){ 
      imageDownloader(object.uri, object.filename) 
     }(object) 
     functions = append(functions, function) 
    } 

    Parallelize(functions...)   

    fmt.Println("Done") 
} 

Если вы хотели бы использовать его, вы можете найти его здесь https://github.com/shomali11/util

Смежные вопросы