1

У меня есть 100 файлов npz, содержащих numpy массивы в хранилище google. У меня есть установка dataproc с jupyter, и я пытаюсь прочитать все массивы numpy в искровом RDD. Каков наилучший способ загрузки массивов numpy из хранилища google в pyspark? Есть ли простой способ, например, np.load("gs://path/to/array.npz") загрузить массив numpy, а затем сделать sc.parallelize на нем?чтение массива numy из GCS в искру

ответ

2

Если вы планируете масштабировать в конце концов, вы захотите использовать распределенные методы ввода в SparkContext, а не выполнять локальную загрузку файла из программы драйвера, используя sc.parallelize. Похоже, вы должны прочитать каждый из файлов нетронутых, хотя, так что в вашем случае, если вы хотите:

npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/") 

Или вы можете также указать отдельные файлы, если вы хотите, но тогда вы просто РД с одним элементом :

npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/arr1.npz") 

Затем каждая запись представляет собой пару <filename>,<str of bytes>. В Dataproc sc.binaryFiles будет автоматически работать напрямую с путями GCS, в отличие от np.load, для которых требуются пути к локальной файловой системе.

Затем в коде рабочего, вам просто нужно использовать StringIO использовать эти строки байтов в качестве файлового объекта вы положили в np.load:

from StringIO import StringIO 
# For example, to create an RDD of the 'arr_0' element of each of the picked objects: 
npz_rdd.map(lambda l: numpy.load(StringIO(l[1]))['arr_0']) 

Во время разработки, если вы действительно хотите, чтобы просто читать файлы в основной драйвер, вы всегда можете свернуть свой RDD с помощью collect(), чтобы получить его локально:

npz_rdd = sc.binaryFiles("gs://path/to/directory/containing/files/arr1.npz") 
local_bytes = npz_rdd.collect()[0][1] 
local_np_obj = np.load(StringIO(local_bytes)) 
Смежные вопросы