2016-02-18 3 views
2

Я пытаюсь загрузить строки из файлов csv в базу данных Elasticsearch в f # с помощью FileHelpers для чтения csv. Все работает для небольших тестовых файлов с фрагмента кода ниже чтение всех записей сразуИспользование FileHelperAsyncEngine в F #

let readRows<'T>(filePath:string) = 
    let engine = FileHelperEngine(typeof<'T>) 

    engine.ReadFile(filePath) 
    |> Array.map (fun row -> row :?> 'T) 

К сожалению, он должен быть в состоянии прочитать файлы большего размера, из которых многие столбцы выброшенные позже, строка за строкой. Функция FileHelperAsyncEngine.BeginReadFile возвращает IDisposable.

let readRowsAsync<'T>(filePath:string) = 
    let engine = new FileHelperAsyncEngine(typeof<'T>) 

    engine.BeginReadFile(filePath:string) 
    |> ... 

Как можно дополнительно обработать этот объект в массив < «T> s?

ответ

5

Согласно the documentation, после того, как вы называете BeginReadFile, сама engine становится перечислимой последовательностью, по которой вы можете итерацию (что очень странное решение дизайна). Таким образом, вы можете просто создать свою собственную последовательность на нем:

let readRowsAsync<'T>(filePath:string) = 
    seq { 
    let engine = new FileHelperAsyncEngine(typeof<'T>) 
    use disposable = engine.BeginReadFile(filePath) 

    for r in engine do 
     if not (shouldDiscard r) then yield (map r) 
    } 

Обратите внимание, что я использую use связывания, а не let. Это гарантирует, что одноразовое устройство будет расположено после окончания последовательности или потребитель прекратит итерацию над ним.

Обратите внимание, что следующий будет не работы, даже если он будет компилировать:

let readRowsAsync<'T>(filePath:string) = 
    let engine = new FileHelperAsyncEngine(typeof<'T>) 
    use disposable = engine.BeginReadFile(filePath) 

    engine |> Seq.filter (not << shouldDiscard) |> Seq.map map 

Если вы делаете это таким образом, одноразовое будет захоронено после возврата из функции, но до того, как в результате перечисление итерация завершена, тем самым закрывая файл до его времени. Чтобы обеспечить правильное размещение одноразового использования, вы должны приложить все это к выражению seq.

Если вы действительно хотите использовать Seq.filter/Seq.map вместо for/yield, вы все еще можете сделать это, но внутри выражения seq, как это:

let readRowsAsync<'T>(filePath:string) = 
    seq { 
    let engine = new FileHelperAsyncEngine(typeof<'T>) 
    use disposable = engine.BeginReadFile(filePath) 

    yield! engine |> Seq.filter (not << shouldDiscard) |> Seq.map map 
    } 

Вы также можете принести фильтрацию и картографировании из seq выражения (что сделает вашу функцию более многоразовым), но seq выражения самого должно оставаться на месте, так как он контролирует УТИЛИЗАЦИЮ часть:

let readRowsAsync<'T>(filePath:string) = 
    seq { 
    let engine = new FileHelperAsyncEngine(typeof<'T>) 
    use disposable = engine.BeginReadFile(filePath) 

    yield! engine 
    } 

let results = 
    readRowsAsync<SomeType>("someFile.txt") 
    |> Seq.filter (not << shouldDiscard) 
    |> Seq.map map 

Наконец, следует отметить, что вы должны быть осторожны с этой последовательностью, потому что она держится за неуправляемый ресурс (т. открытый файл): не держите его открытым в течение длительного времени, не используйте блокирующие операции при его обработке и т. д.

+1

Хороший ответ. В качестве альтернативы, поскольку 'engine' является' IEnumerable <'T> ', вы можете использовать' Seq.filter' для выполнения теста discard. Это позволяет заменить цикл for на 'yield! Seq.filter shouldKeep engine'. Если вам также нужна часть отображения, тогда это будет 'yield! двигатель |> Seq.filter shouldKeep |> Seq.map mappingFn'. =) – Roujo

+0

Спасибо, Федор и Ружо! Помогите мне очень :) – praiserCS

+1

Если вы используете 'Seq.filter', у вас нет контроля над тем, когда нужно распоряжаться одноразовым. Так что не делай этого! –

Смежные вопросы