2017-02-21 6 views
1

Мой DASK dataframe около 120 мм строк и 4 столбца:Ошибка при экспорте DASK dataframe в CSV

df_final.dtypes 

cust_id  int64 
score   float64 
total_qty  float64 
update_score float64 
dtype: object 

и я делаю эту операцию на jupyter ноутбуков, подключенных к Linux машине:

%time df_final.to_csv('/path/claritin-files-*.csv') 

и он бросает эту ошибку:

--------------------------------------------------------------------------- 
ValueError        Traceback (most recent call last) 
<ipython-input-24-46468ae45023> in <module>() 
----> 1 get_ipython().magic(u"time df_final.to_csv('path/claritin-files-*.csv')") 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s) 
    2334   magic_name, _, magic_arg_s = arg_s.partition(' ') 
    2335   magic_name = magic_name.lstrip(prefilter.ESC_MAGIC) 
-> 2336   return self.run_line_magic(magic_name, magic_arg_s) 
    2337 
    2338  #------------------------------------------------------------------------- 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line) 
    2255     kwargs['local_ns'] = sys._getframe(stack_depth).f_locals 
    2256    with self.builtin_trap: 
-> 2257     result = fn(*args,**kwargs) 
    2258    return result 
    2259 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns) 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k) 
    191  **# but it's overkill for just that one bit of state.** 
    192  def magic_deco(arg): 
--> 193   call = lambda f, *a, **k: f(*a, **k) 
    194 
    195   if callable(arg): 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns) 
    1161   if mode=='eval': 
    1162    st = clock2() 
-> 1163    out = eval(code, glob, local_ns) 
    1164    end = clock2() 
    1165   else: 

<timed eval> in <module>() 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/core.pyc in to_csv(self, filename, **kwargs) 
    936   """ See dd.to_csv docstring for more information """ 
    937   from .io import to_csv 
--> 938   return to_csv(self, filename, **kwargs) 
    939 
    940  def to_delayed(self): 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.pyc in to_csv(df, filename, name_function, compression, compute, get, **kwargs) 
    411  if compute: 
    412   from dask import compute 
--> 413   compute(*values, get=get) 
    414  else: 
    415   return values 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs) 
    177   dsk = merge(var.dask for var in variables) 
    178  keys = [var._keys() for var in variables] 
--> 179  results = get(dsk, keys, **kwargs) 
    180 
    181  results_iter = iter(results) 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, **kwargs) 
    74  results = get_async(pool.apply_async, len(pool._pool), dsk, result, 
    75       cache=cache, get_id=_thread_get_id, 
---> 76       **kwargs) 
    77 
    78  # Cleanup pools associated to dead threads 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs) 
    491      _execute_task(task, data) # Re-execute locally 
    492     else: 
--> 493      raise(remote_exception(res, tb)) 
    494    state['cache'][key] = res 
    495    finish_task(dsk, key, state, results, keyorder.get) 

**ValueError: invalid literal for long() with base 10: 'total_qty'** 

Traceback 
--------- 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 268, in execute_task 
    result = _execute_task(task, data) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task 
    return func(*args2) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 55, in pandas_read_text 
    coerce_dtypes(df, dtypes) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 83, in coerce_dtypes 
    df[c] = df[c].astype(dtypes[c]) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3054, in astype 
    raise_on_error=raise_on_error, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3189, in astype 
    return self.apply('astype', dtype=dtype, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3056, in apply 
    applied = getattr(b, f)(**kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 461, in astype 
    values=values, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 504, in _astype 
    values = _astype_nansafe(values.ravel(), dtype, copy=True) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/types/cast.py", line 534, in _astype_nansafe 
    return lib.astype_intsafe(arr.ravel(), dtype).reshape(arr.shape) 
    File "pandas/lib.pyx", line 980, in pandas.lib.astype_intsafe (pandas/lib.c:17409) 
    File "pandas/src/util.pxd", line 93, in util.set_value_at_unsafe (pandas/lib.c:72777) 

У меня есть несколько вопросов:

1) В первую очередь этот экспорт работал нормально в пятницу, он выплескивает 100 CSV-файлов (так как он имеет 100 разделов), которые я позже агрегировал. Итак, что сегодня не так - что-нибудь из журнала ошибок?

2) Может быть, этот вопрос предназначен для создателей этого пакета, что является наиболее эффективным с точки зрения времени способом получения csv-экстракта из dask dataframe такого размера, поскольку он занимал от 1,5 до 2 часов, в последний раз он работал.

Я не использую распределенный пакет, и это находится на одном ядре кластера Linux.

ответ

1

Эта ошибка, вероятно, имеет мало общего с to_csv и больше связана с чем-то еще в ваших вычислениях. Вызов df.to_csv был только в первый раз, когда вы заставили вычислить прокрутку всех данных.

Учитывая ошибку, я действительно подозреваю, что это не работает в read_csv. Dask.dataframe прочитал первые несколько сотен килобайт вашего первого файла, чтобы догадаться о типах данных, но он, кажется, неправильно догадался. Возможно, вы захотите попробовать явно указать типы dtypes в вызове read_csv.

Что касается второго вопроса о написании в CSV быстро, моим первым ответом будет «использовать паркет или HDF5 вместо». Они намного быстрее и точнее почти во всех отношениях.

+0

Спасибо !!, да, предположил, что раньше, так как я читаю dataframe из формата csv. Не уверен, почему он не читает его правильно. относительно вашего предложения по второму вопросу, это для чтения и письма в паркетном формате (я знаком с паркетным домом). –

+0

Общей причиной является то, что целочисленный столбец имеет некоторые отсутствующие значения, поэтому панда решает, что ему нужно использовать float partway through. Я не понимаю вашего комментария о паркете. – MRocklin

+0

Я имел в виду, когда вы сказали использовать паркет или HDF5, вы имели в виду чтение паркетных файлов для преобразования в dask dataframes, а затем запись в формат паркета вместо формата csv? могут ли файлы csv экспортироваться быстрее (мой фреймворк составляет 130 мм x 4 столбца), если я использую dask, распределенный по кластеру машин? –

Смежные вопросы