Пожалуйста, смотрите пример ниже:
>>> from pyspark.sql.functions import col
>>> df = (sc.textFile('data.txt')
.map(lambda line: line.split(","))
.toDF(['name','age','height'])
.select(col('name'), col('age').cast('int'), col('height').cast('int')))
+-----+---+------+
| name|age|height|
+-----+---+------+
|Alice| 5| 80|
| Bob| 5| 80|
|Alice| 10| 80|
+-----+---+------+
>>> list_persons = map(lambda row: row.asDict(), df.collect())
>>> list_persons
[
{'age': 5, 'name': u'Alice', 'height': 80},
{'age': 5, 'name': u'Bob', 'height': 80},
{'age': 10, 'name': u'Alice', 'height': 80}
]
>>> dict_persons = {person['name']: person for person in list_persons}
>>> dict_persons
{u'Bob': {'age': 5, 'name': u'Bob', 'height': 80}, u'Alice': {'age': 10, 'name': u'Alice', 'height': 80}}
вход, который я использую для тестирования data.txt
:
Alice,5,80
Bob,5,80
Alice,10,80
Сначала мы выполняем загрузку с помощью pyspark, читая строки. Затем мы преобразуем строки в столбцы путем разбиения на запятую. Затем мы преобразуем собственный RDD в DF и добавим имена в colume. Наконец, мы конвертируем в столбцы соответствующий формат.
Затем мы собираем все, чтобы драйвер, и используя некоторое понимание списка python, мы преобразуем данные в форму как предпочтительные. Мы преобразуем объект в словарь, используя метод asDict()
. На выходе мы можем заметить, что Алиса появляется только один раз, но это, конечно, потому, что ключ от Алисы перезаписывается.
Пожалуйста, имейте в виду, что вы хотите выполнить всю обработку и фильтрацию внутри pypspark, прежде чем возвращать результат драйверу.
Надеюсь, это поможет, ура.
Но ваш результат неправильный? Я хочу, чтобы этот вывод выглядел так: {name: [age, height]} ' –
, поэтому выход должен быть {Алиса: [5,80]} без 'u' –
Я бы отговорил от использования Panda здесь. Panda является большой зависимостью и не требуется для такой простой операции. –