2017-01-02 1 views
1

Я написал сценарий для переноса большого количества данных из одной базы данных в другую и получил ее работу отлично, но теперь я хочу попробовать и использовать goroutines для ускорения работы скрипта с помощью одновременных вызовов БД. Так как внесение изменений в вызов go processBatch(offset) вместо processBatch(offset), я вижу, что запускается несколько goroutines, но скрипт заканчивается почти мгновенно, и ничего не делается. Также количество запущенных goroutines меняется каждый раз, когда я вызываю скрипт. Нет ошибок (что я вижу).Вызов DB в goroutine с ошибкой без ошибок

Я по-прежнему новичок в goroutines и Go вообще, поэтому любые указания относительно того, что я могу делать неправильно, очень ценятся. Я удалил всю логику из приведенного ниже кода, который не связан с параллелизмом или доступом к БД, так как он отлично работает без изменений. Я также оставил комментарий, где, как я считаю, он терпит неудачу, поскольку ничего ниже этой строки не выполняется (печать не выводит). Я также попытался использовать sync.WaitGroup для смещения вызовов БД, но он ничего не менял.

var (
    legacyDB  *sql.DB 
    v2DB   *sql.DB 
) 

func main() { 

    var total, loops int 
    var err error 

    legacyDB, err = sql.Open("mysql", "...") 
    if err != nil { 
     panic(err) 
    } 
    defer legacyDB.Close() 

    v2DB, err = sql.Open("mysql", "...") 
    if err != nil { 
     panic(err) 
    } 
    defer v2DB.Close() 

    err = legacyDB.QueryRow("SELECT count(*) FROM users").Scan(&total) 
    checkErr(err) 

    loops = int(math.Ceil(float64(total)/float64(batchsize))) 

    fmt.Println("Total: " + strconv.Itoa(total)) 
    fmt.Println("Loops: " + strconv.Itoa(loops)) 

    for i := 0; i < loops; i++ { 
     offset := i * batchsize 

     go processBatch(offset) 
    } 

    legacyDB.Close() 
    v2DB.Close() 
} 

func processBatch(offset int) { 

    query := namedParameterQuery.NewNamedParameterQuery(` 
     SELECT ... 
     LIMIT :offset,:batchsize 
    `) 
    query.SetValue(...) 

    rows, err := legacyDB.Query(query.GetParsedQuery(), (query.GetParsedParameters())...) 
    // nothing after this line gets done (Println here does not show output) 
    checkErr(err) 
    defer rows.Close() 

    .... 

    var m runtime.MemStats 
    runtime.ReadMemStats(&m) 
    log.Printf("\nAlloc = %v\nTotalAlloc = %v\nSys = %v\nNumGC = %v\n\n", m.Alloc/1024/1024, m.TotalAlloc/1024/1024, m.Sys/1024/1024, m.NumGC) 
} 

func checkErr(err error) { 
    if err != nil { 
     panic(err) 
    } 
} 
+2

Вы нерест 'processBatch()' процедуры, но ваш 'основной() 'не ждет их завершения и просто заканчивается. Вы можете использовать 'sync.WaitGroup', чтобы' main() 'ждать завершения всех goroutines. – Nadh

ответ

2

Как NADH упоминалось в комментариях, что бы потому, что программа завершается, когда функция main заканчивается, независимо от того, или нет, есть еще и другие goroutines работают. Чтобы исправить это, достаточно * sync.WaitGroup. Группа WaitGroup используется для случаев, когда у вас есть несколько параллельных операций, и вы хотите подождать, пока они не будут завершены. Документацию можно найти здесь: https://golang.org/pkg/sync/#WaitGroup.

Пример реализации вашей программы без использования глобальных переменных будет выглядеть замена

fmt.Println("Total: " + strconv.Itoa(total)) 
fmt.Println("Loops: " + strconv.Itoa(loops)) 

for i := 0; i < loops; i++ { 
    offset := i * batchsize 

    go processBatch(offset) 
} 

с

fmt.Println("Total: " + strconv.Itoa(total)) 
fmt.Println("Loops: " + strconv.Itoa(loops)) 

wg := new(sync.WaitGroup) 
wg.Add(loops) 

for i := 0; i < loops; i++ { 
    offset := i * batchsize 

    go func(offset int) { 
     defer wg.Done() 
     processBatch(offset) 
    }(offset) 
} 

wg.Wait() 
+0

Хорошо, похоже, я уже был близок к этому решению раньше, когда я опробовал WaitGroup, но не понимал, как и где его использовать. Это очень помогло, спасибо! – sekl

Смежные вопросы