В соответствии с this question Я пытаюсь обойти здание списка, проиллюстрированное на примере range(int(1e8))
с использованием генератора xrange(int(1e8))
. Где xrange
является просто примером для процесса, который производит длинную последовательность значений. (Пожалуйста, предположите, что это невозможно легко воспроизвести.) Еще один пример: у меня есть длинный список пар timestamp/value, которые я хочу обработать (сортировка временных рядов). Я стараюсь не вынимать их в память в целом, потому что это много данных.Несколько клиентов для генератора Python?
Я думал, что было бы круто, если бы я мог применить несколько процессоров одновременно к этому потоку данных, созданных моим генератором. Первая идея состояла в том, чтобы использовать itertools.tee()
, например .:
from itertools import tee
g1,g2 = tee(xrange(int(1e8)),2)
sum(g1), sum(g2)
Но потом я обнаружил, что только первый sum()
будет использовать генератор, в то время как tee()
внутренне строит list
снова (который я хотел бы избежать.).
Итак, я думал, что мне нужно асинхронное решение, то есть такое, которое позволит каждому sum()
обновить каждый шаг генератора. Вещи, которые пришли в виду, где
Но мне не будучи ни на самом деле использовали раньше, и отчасти я даже не могу сказать, может ли подходы работать, или быть эффективными/эффективными/эффективными.
С этого момента я с радостью по достоинству оценю любые предложения от аудитории!
Update
Я хотел избежать callback based solution, как это Apparantly снижает производительность значительно (Это, как это в настоящее время реализуется.). Я добавил некоторые профилирование ниже (пожалуйста, добавьте комментарии, если тест не является объективным):
class SinkA:
def __init__(self, src):
for i in src: pass
class SinkB:
def f(self,i):
pass
class Source:
def __iter__(self):
for i in xrange(int(1e4)):
yield i
def t1():
src = Source()
snk = SinkA(src)
def t2():
src = Source()
snk = SinkB()
for i in src: snk.f(i)
if __name__ == "__main__":
from timeit import Timer
n = 1000
t = Timer("t1()", "from __main__ import t1, t2, SinkA, SinkB, Source")
print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 612.11 usec/pass
t = Timer("t2()", "from __main__ import t1, t2, SinkA, SinkB, Source")
print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 1933.39 usec/pass
Update 2
Что я могу еще сказать? У меня есть это решение на основе обратного вызова, которое кажется неэффективным. Подход, основанный на генераторе, кажется многообещающим, но у меня слишком мало опыта программирования такого типа, особенно когда речь идет о более сложных вещах в виде сопрограмм или скрученной библиотеки. Подводя итог, у меня есть несколько потребителей для процесса, который генерирует много данных, и я заметил некоторые потенциальные подходы. Теперь я ищу квалифицированные заявления опытных пользователей, которые раньше, возможно, выполняли подобные задачи. Заявления, которые касаются того, какой подход может быть уместным, как подходы относятся друг к другу. Или какие другие подходы я, возможно, пропустил в конце концов.
Вы действительно не решить эту проблему: вы хотите, чтобы каждый потребитель, чтобы увидеть те же данные или нет? – Marcin
Я предполагаю, что поведение, которое вы видите с помощью 'tee', связано с тем, что вы не запускаете свои две задачи параллельно. Сначала Python выполняет 'sum (g1)', затем 'sum (g2)'. Попробуйте сделать свою сумму вручную, используя цикл, и посмотрите, потребляет ли она память в виде мэша. –
@CharlesBrunet, это правда. Я пытаюсь каким-то образом абстрагироваться от этого ручного цикла. Чтобы получить более удобный код. – moooeeeep