Предположим, что мы имеем медленную функцию для получения данных, а другой медленную функцию для обработки данных следующим образом:данные джулия-Ланг Cache в параллельном потоке с использованием @async
# some slow function
function prime(i)
sleep(2)
println("processed $i")
i
end
function slow_process(x)
sleep(2)
println("slow processed $x")
end
function each(rng)
function _iter()
for i ∈ rng
@time d = prime(i)
produce(d)
end
end
return Task(_iter)
end
@time for x ∈ each(1000:1002)
slow_process(x)
end
Выход:
% julia test-task.jl
processed 1000
2.063938 seconds (37.84 k allocations: 1.605 MB)
slow processed 1000
processed 1001
2.003115 seconds (17 allocations: 800 bytes)
slow processed 1001
processed 1002
2.001798 seconds (17 allocations: 800 bytes)
slow processed 1002
12.166475 seconds (88.08 k allocations: 3.640 MB)
Есть ли способ получить и кэшировать данные в параллельном потоке с помощью @async и передать функцию slow_process
?
Редактировать: Я обновил пример, чтобы прояснить проблему. В идеале, пример должен занимать 2 + 6 секунд вместо 12 секунд.
Edit 2: Это моя попытка использования @sync и @async, но я получил ошибку ERROR (unhandled task failure): no process with id 2 exists
macro swap(x,y)
quote
local tmp = $(esc(x))
$(esc(x)) = $(esc(y))
$(esc(y)) = tmp
end
end
# some slow function
function prime(i)
sleep(2)
println("processed $i")
i
end
function slow_process(x)
sleep(2)
println("slow processed $x")
end
function each(rng)
@assert length(rng) > 1
rng = collect(rng)
a = b = nothing
function _iter()
for i ∈ 1:length(rng)
if a == nothing
a = @async remotecall_fetch(prime, 2, rng[i])
b = @async remotecall_fetch(prime, 2, rng[i+1])
else
if i < length(rng)
a = @async remotecall_fetch(prime, 2, rng[i+1])
end
@swap(a,b)
end
@sync d = a
produce(d)
end
end
return Task(_iter)
end
@time for x ∈ each(1000:1002)
slow_process(x)
end