Я был удивлен комментариями @Brent Washburne и решил попробовать сам. Я думаю, что общая проблема двоякая:
Во-первых, чтение данных часто связано с IO, поэтому запись многопоточного кода часто не дает высокой производительности. Во-вторых, выполнение распараллеливания разделяемой памяти в python по своей сути затруднено из-за дизайна самого языка. Там намного больше накладных расходов по сравнению с родными c.
Но посмотрим, что мы можем сделать.
# some imports
import numpy as np
import glob
from multiprocessing import Pool
import os
# creating some temporary data
tmp_dir = os.path.join('tmp', 'nptest')
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
for i in range(100):
x = np.random.rand(10000, 50)
file_path = os.path.join(tmp_dir, '%05d.npz' % i)
np.savez_compressed(file_path, x=x)
def read_x(path):
with np.load(path) as data:
return data["x"]
def serial_read(files):
x_list = list(map(read_x, files))
return x_list
def parallel_read(files):
with Pool() as pool:
x_list = pool.map(read_x, files)
return x_list
ОК, достаточно материала подготовлено. Давайте получим тайминги.
files = glob.glob(os.path.join(tmp_dir, '*.npz'))
%timeit x_serial = serial_read(files)
# 1 loops, best of 3: 7.04 s per loop
%timeit x_parallel = parallel_read(files)
# 1 loops, best of 3: 3.56 s per loop
np.allclose(x_serial, x_parallel)
# True
На самом деле это похоже на приличное ускорение. Я использую два реальных и два гиперпотоковых ядра.
Для запуска и время все сразу, вы можете выполнить этот скрипт:
from __future__ import print_function
from __future__ import division
# some imports
import numpy as np
import glob
import sys
import multiprocessing
import os
import timeit
# creating some temporary data
tmp_dir = os.path.join('tmp', 'nptest')
if not os.path.exists(tmp_dir):
os.makedirs(tmp_dir)
for i in range(100):
x = np.random.rand(10000, 50)
file_path = os.path.join(tmp_dir, '%05d.npz' % i)
np.savez_compressed(file_path, x=x)
def read_x(path):
data = dict(np.load(path))
return data['x']
def serial_read(files):
x_list = list(map(read_x, files))
return x_list
def parallel_read(files):
pool = multiprocessing.Pool(processes=4)
x_list = pool.map(read_x, files)
return x_list
files = glob.glob(os.path.join(tmp_dir, '*.npz'))
#files = files[0:5] # to test on a subset of the npz files
# Timing:
timeit_runs = 5
timer = timeit.Timer(lambda: serial_read(files))
print('serial_read: {0:.4f} seconds averaged over {1} runs'
.format(timer.timeit(number=timeit_runs)/timeit_runs,
timeit_runs))
# 1 loops, best of 3: 7.04 s per loop
timer = timeit.Timer(lambda: parallel_read(files))
print('parallel_read: {0:.4f} seconds averaged over {1} runs'
.format(timer.timeit(number=timeit_runs)/timeit_runs,
timeit_runs))
# 1 loops, best of 3: 3.56 s per loop
# Examples of use:
x = serial_read(files)
print('len(x): {0}'.format(len(x))) # len(x): 100
print('len(x[0]): {0}'.format(len(x[0]))) # len(x[0]): 10000
print('len(x[0][0]): {0}'.format(len(x[0][0]))) # len(x[0]): 10000
print('x[0][0]: {0}'.format(x[0][0])) # len(x[0]): 10000
print('x[0].nbytes: {0} MB'.format(x[0].nbytes/1e6)) # 4.0 MB
Sure, но процессор будет по-прежнему является узким местом. Добавление большего количества поваров на кухню не ускоряет приготовление пищи. –
@BrentWashburne Почему бы не уменьшить время загрузки? –
Ваш код пытается обновить два массива из нескольких потоков. Как это произойдет? Каждый поток блокируется, а один поток обновляет массив, а затем следующий будет писать в массив и так далее. То есть, только один поток будет активен из-за общей блокировки. Позвольте мне задать тот же вопрос: почему вы думаете, что несколько потоков сократят время загрузки? –