2016-10-05 2 views
1

Когда я собираю кадр данных из одной строки, мой метод успешно возвращает ожидаемый фрейм данных.Apache Spark 2.0.0 PySpark ручной сборщик данных для создания диаграммы памяти

def build_job_finish_data_frame(sql_context, job_load_id, is_success): 
    job_complete_record_schema = StructType(
     [ 
      StructField("job_load_id", IntegerType(), False), 
      StructField("terminate_datetime", TimestampType(), False), 
      StructField("was_success", BooleanType(), False) 
     ] 
    ) 
    data = [ 
     Row(
      job_load_id=job_load_id, 
      terminate_datetime=datetime.now(), 
      was_success=is_success 
     ) 
    ] 

    return sql_context.createDataFrame(data, job_complete_record_schema) 

Если изменить «terminate_datetime» к «end_datetime» или «finish_datetime», как показано ниже, он выдает ошибку.

def build_job_finish_data_frame(sql_context, job_load_id, is_success): 
    job_complete_record_schema = StructType(
     [ 
      StructField("job_load_id", IntegerType(), False), 
      StructField("end_datetime", TimestampType(), False), 
      StructField("was_success", BooleanType(), False) 
     ] 
    ) 
    data = [ 
     Row(
      job_load_id=job_load_id, 
      end_datetime=datetime.now(), 
      was_success=is_success 
     ) 
    ] 

    return sql_context.createDataFrame(data, job_complete_record_schema) 

Ошибка я получаю это

TypeError: IntegerType can not accept object datetime.datetime(2016, 10, 5, 11, 19, 31, 915745) in type <class 'datetime.datetime'> 

я могу изменить "terminate_datetime" на "start_datetime" и экспериментировали с другими словами.

Я не вижу причин для изменения имени поля, нарушающего этот код, поскольку он делает не что иное, как создание фрейма данных вручную.

Это беспокоит, поскольку я использую фреймы данных для загрузки хранилища данных, где я не могу управлять именами полей.

Я бегу PySpark на Python 3.3.2 на Fedora 20.

ответ

1

Почему название изменяет вещи? Проблема в том, что является tupleотсортировано по номеру по __fields__. Таким образом, первый случай не создает

from pyspark.sql import Row 
from datetime import datetime 

x = Row(job_load_id=1, terminate_datetime=datetime.now(), was_success=True) 
x.__fields__ 
## ['job_load_id', 'terminate_datetime', 'was_success'] 

, а второй один создает:

y = Row(job_load_id=1, end_datetime=datetime.now(), was_success=True) 
y.__fields__ 
## ['end_datetime', 'job_load_id', 'was_success'] 

Это больше не соответствует схеме, которую Вы определили, который ожидает (IntegerType, TimestampType, Boolean).

Поскольку полезна в основном для вывода схемы и вы предоставите схемы непосредственно вы можете обратиться к этому с помощью стандартных tuple:

def build_job_finish_data_frame(sql_context, job_load_id, is_success): 
    job_complete_record_schema = StructType(
     [ 
      StructField("job_load_id", IntegerType(), False), 
      StructField("end_datetime", TimestampType(), False), 
      StructField("was_success", BooleanType(), False) 
     ] 
    ) 
    data = [tuple(job_load_id, datetime.now(), is_success)] 

    return sql_context.createDataFrame(data, job_complete_record_schema) 

хотя создание единого элемента DataFrame выглядит странно, если не бессмысленно.

Смежные вопросы