2016-02-10 5 views
1

У меня есть несколько файлов . Все .npz создают те же структуры: каждый из них содержит две переменные, всегда с одинаковыми именами переменных. В настоящее время, я просто перебираем все .npz файлы, получить два значения переменных и добавить их в какую-то глобальную переменную:Загрузка нескольких файлов npz в многопоточном режиме

# Let's assume there are 100 npz files 
x_train = [] 
y_train = [] 
for npz_file_number in range(100): 
    data = dict(np.load('{0:04d}.npz'.format(npz_file_number))) 
    x_train.append(data['x']) 
    y_train.append(data['y']) 

Это займет некоторое время, и узким местом является процессор. Порядок, в котором переменные x и y прилагаются к переменным x_train и y_train, не имеет значения.

Есть ли способ загрузить несколько файлов в многопометном режиме?

+0

Sure, но процессор будет по-прежнему является узким местом. Добавление большего количества поваров на кухню не ускоряет приготовление пищи. –

+0

@BrentWashburne Почему бы не уменьшить время загрузки? –

+0

Ваш код пытается обновить два массива из нескольких потоков. Как это произойдет? Каждый поток блокируется, а один поток обновляет массив, а затем следующий будет писать в массив и так далее. То есть, только один поток будет активен из-за общей блокировки. Позвольте мне задать тот же вопрос: почему вы думаете, что несколько потоков сократят время загрузки? –

ответ

2

Я был удивлен комментариями @Brent Washburne и решил попробовать сам. Я думаю, что общая проблема двоякая:

Во-первых, чтение данных часто связано с IO, поэтому запись многопоточного кода часто не дает высокой производительности. Во-вторых, выполнение распараллеливания разделяемой памяти в python по своей сути затруднено из-за дизайна самого языка. Там намного больше накладных расходов по сравнению с родными c.

Но посмотрим, что мы можем сделать.

# some imports 
import numpy as np 
import glob 
from multiprocessing import Pool 
import os 

# creating some temporary data 
tmp_dir = os.path.join('tmp', 'nptest') 
if not os.path.exists(tmp_dir): 
    os.makedirs(tmp_dir) 
    for i in range(100): 
     x = np.random.rand(10000, 50) 
     file_path = os.path.join(tmp_dir, '%05d.npz' % i) 
     np.savez_compressed(file_path, x=x) 

def read_x(path): 
    with np.load(path) as data: 
     return data["x"] 

def serial_read(files): 
    x_list = list(map(read_x, files)) 
    return x_list 

def parallel_read(files): 
    with Pool() as pool: 
     x_list = pool.map(read_x, files) 
    return x_list 

ОК, достаточно материала подготовлено. Давайте получим тайминги.

files = glob.glob(os.path.join(tmp_dir, '*.npz')) 

%timeit x_serial = serial_read(files) 
# 1 loops, best of 3: 7.04 s per loop 

%timeit x_parallel = parallel_read(files) 
# 1 loops, best of 3: 3.56 s per loop 

np.allclose(x_serial, x_parallel) 
# True 

На самом деле это похоже на приличное ускорение. Я использую два реальных и два гиперпотоковых ядра.


Для запуска и время все сразу, вы можете выполнить этот скрипт:

from __future__ import print_function 
from __future__ import division 

# some imports 
import numpy as np 
import glob 
import sys 
import multiprocessing 
import os 
import timeit 

# creating some temporary data 
tmp_dir = os.path.join('tmp', 'nptest') 
if not os.path.exists(tmp_dir): 
    os.makedirs(tmp_dir) 
    for i in range(100): 
     x = np.random.rand(10000, 50) 
     file_path = os.path.join(tmp_dir, '%05d.npz' % i) 
     np.savez_compressed(file_path, x=x) 

def read_x(path): 
    data = dict(np.load(path)) 
    return data['x'] 

def serial_read(files): 
    x_list = list(map(read_x, files)) 
    return x_list 

def parallel_read(files): 
    pool = multiprocessing.Pool(processes=4) 
    x_list = pool.map(read_x, files) 
    return x_list 


files = glob.glob(os.path.join(tmp_dir, '*.npz')) 
#files = files[0:5] # to test on a subset of the npz files 

# Timing: 
timeit_runs = 5 

timer = timeit.Timer(lambda: serial_read(files)) 
print('serial_read: {0:.4f} seconds averaged over {1} runs' 
     .format(timer.timeit(number=timeit_runs)/timeit_runs, 
     timeit_runs)) 
# 1 loops, best of 3: 7.04 s per loop 

timer = timeit.Timer(lambda: parallel_read(files)) 
print('parallel_read: {0:.4f} seconds averaged over {1} runs' 
     .format(timer.timeit(number=timeit_runs)/timeit_runs, 
     timeit_runs)) 
# 1 loops, best of 3: 3.56 s per loop 

# Examples of use: 
x = serial_read(files) 
print('len(x): {0}'.format(len(x))) # len(x): 100 
print('len(x[0]): {0}'.format(len(x[0]))) # len(x[0]): 10000 
print('len(x[0][0]): {0}'.format(len(x[0][0]))) # len(x[0]): 10000 
print('x[0][0]: {0}'.format(x[0][0])) # len(x[0]): 10000 
print('x[0].nbytes: {0} MB'.format(x[0].nbytes/1e6)) # 4.0 MB 
+0

Да, если процесс связан с IO, то многопроцессорность улучшит ситуацию. (Незначительная деталь, ОП запросила многопоточность.) Но OP сказал, что процесс связан с процессором, поэтому накладные расходы на управление несколькими процессами могут нанести ущерб производительности. –

Смежные вопросы