2017-01-30 2 views
0

Предположим, что мы имеем медленную функцию для получения данных, а другой медленную функцию для обработки данных следующим образом:данные джулия-Ланг Cache в параллельном потоке с использованием @async

# some slow function 
function prime(i) 
    sleep(2) 
    println("processed $i") 
    i 
end 

function slow_process(x) 
    sleep(2) 
    println("slow processed $x") 
end 

function each(rng) 
    function _iter() 
    for i ∈ rng 
     @time d = prime(i) 
     produce(d) 
    end 
    end 
    return Task(_iter) 
end 

@time for x ∈ each(1000:1002) 
    slow_process(x) 
end 

Выход:

% julia test-task.jl 
processed 1000 
    2.063938 seconds (37.84 k allocations: 1.605 MB) 
slow processed 1000 
processed 1001 
    2.003115 seconds (17 allocations: 800 bytes) 
slow processed 1001 
processed 1002 
    2.001798 seconds (17 allocations: 800 bytes) 
slow processed 1002 
12.166475 seconds (88.08 k allocations: 3.640 MB) 

Есть ли способ получить и кэшировать данные в параллельном потоке с помощью @async и передать функцию slow_process?

Редактировать: Я обновил пример, чтобы прояснить проблему. В идеале, пример должен занимать 2 + 6 секунд вместо 12 секунд.

Edit 2: Это моя попытка использования @sync и @async, но я получил ошибку ERROR (unhandled task failure): no process with id 2 exists

macro swap(x,y) 
    quote 
    local tmp = $(esc(x)) 
    $(esc(x)) = $(esc(y)) 
    $(esc(y)) = tmp 
    end 
end 

# some slow function 
function prime(i) 
    sleep(2) 
    println("processed $i") 
    i 
end 

function slow_process(x) 
    sleep(2) 
    println("slow processed $x") 
end 

function each(rng) 
    @assert length(rng) > 1 
    rng = collect(rng) 
    a = b = nothing 
    function _iter() 
    for i ∈ 1:length(rng) 
     if a == nothing 
     a = @async remotecall_fetch(prime, 2, rng[i]) 
     b = @async remotecall_fetch(prime, 2, rng[i+1]) 
     else 
     if i < length(rng) 
      a = @async remotecall_fetch(prime, 2, rng[i+1]) 
     end 
     @swap(a,b) 
     end 
     @sync d = a 
     produce(d) 
    end 
    end 
    return Task(_iter) 
end 

@time for x ∈ each(1000:1002) 
    slow_process(x) 
end 

ответ

0

ОК, у меня есть рабочий раствор ниже:

macro swap(x,y) 
    quote 
    local tmp = $(esc(x)) 
    $(esc(x)) = $(esc(y)) 
    $(esc(y)) = tmp 
    end 
end 

# some slow function 
@everywhere function prime(i) 
    sleep(2) 
    println("prime $i") 
    i 
end 

function slow_process(x) 
    sleep(2) 
    println("slow_process $x") 
end 

function each(rng) 
    @assert length(rng) > 1 
    rng = collect(rng) 
    a = b = nothing 
    function _iter() 
    for i ∈ 1:length(rng) 
     if a == nothing 
     a = remotecall(prime, 2, rng[i]) 
     b = remotecall(prime, 2, rng[i+1]) 
     else 
     if i < length(rng) 
      a = remotecall(prime, 2, rng[i+1]) 
     end 
     @swap(a,b) 
     end 
     d = fetch(a) 
     produce(d) 
    end 
    end 
    return Task(_iter) 
end 
@time for x ∈ each(1000:1002) 
    slow_process(x) 
end 

И

% julia -p 2 test-task.jl 
8.354102 seconds (148.00 k allocations: 6.204 MB) 
Смежные вопросы