2012-03-19 2 views
0

В соответствии с this question Я пытаюсь обойти здание списка, проиллюстрированное на примере range(int(1e8)) с использованием генератора xrange(int(1e8)). Где xrange является просто примером для процесса, который производит длинную последовательность значений. (Пожалуйста, предположите, что это невозможно легко воспроизвести.) Еще один пример: у меня есть длинный список пар timestamp/value, которые я хочу обработать (сортировка временных рядов). Я стараюсь не вынимать их в память в целом, потому что это много данных.Несколько клиентов для генератора Python?

Я думал, что было бы круто, если бы я мог применить несколько процессоров одновременно к этому потоку данных, созданных моим генератором. Первая идея состояла в том, чтобы использовать itertools.tee(), например .:

from itertools import tee 
g1,g2 = tee(xrange(int(1e8)),2) 
sum(g1), sum(g2) 

Но потом я обнаружил, что только первый sum() будет использовать генератор, в то время как tee() внутренне строит list снова (который я хотел бы избежать.).

Итак, я думал, что мне нужно асинхронное решение, то есть такое, которое позволит каждому sum() обновить каждый шаг генератора. Вещи, которые пришли в виду, где

Но мне не будучи ни на самом деле использовали раньше, и отчасти я даже не могу сказать, может ли подходы работать, или быть эффективными/эффективными/эффективными.

С этого момента я с радостью по достоинству оценю любые предложения от аудитории!


Update

Я хотел избежать callback based solution, как это Apparantly снижает производительность значительно (Это, как это в настоящее время реализуется.). Я добавил некоторые профилирование ниже (пожалуйста, добавьте комментарии, если тест не является объективным):

class SinkA: 
    def __init__(self, src): 
    for i in src: pass 

class SinkB: 
    def f(self,i): 
    pass 

class Source: 
    def __iter__(self): 
    for i in xrange(int(1e4)): 
     yield i 

def t1(): 
    src = Source() 
    snk = SinkA(src) 

def t2(): 
    src = Source() 
    snk = SinkB() 
    for i in src: snk.f(i) 

if __name__ == "__main__": 
    from timeit import Timer 
    n = 1000 
    t = Timer("t1()", "from __main__ import t1, t2, SinkA, SinkB, Source") 
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 612.11 usec/pass 
    t = Timer("t2()", "from __main__ import t1, t2, SinkA, SinkB, Source") 
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 1933.39 usec/pass 

Update 2

Что я могу еще сказать? У меня есть это решение на основе обратного вызова, которое кажется неэффективным. Подход, основанный на генераторе, кажется многообещающим, но у меня слишком мало опыта программирования такого типа, особенно когда речь идет о более сложных вещах в виде сопрограмм или скрученной библиотеки. Подводя итог, у меня есть несколько потребителей для процесса, который генерирует много данных, и я заметил некоторые потенциальные подходы. Теперь я ищу квалифицированные заявления опытных пользователей, которые раньше, возможно, выполняли подобные задачи. Заявления, которые касаются того, какой подход может быть уместным, как подходы относятся друг к другу. Или какие другие подходы я, возможно, пропустил в конце концов.

+1

Вы действительно не решить эту проблему: вы хотите, чтобы каждый потребитель, чтобы увидеть те же данные или нет? – Marcin

+0

Я предполагаю, что поведение, которое вы видите с помощью 'tee', связано с тем, что вы не запускаете свои две задачи параллельно. Сначала Python выполняет 'sum (g1)', затем 'sum (g2)'. Попробуйте сделать свою сумму вручную, используя цикл, и посмотрите, потребляет ли она память в виде мэша. –

+0

@CharlesBrunet, это правда. Я пытаюсь каким-то образом абстрагироваться от этого ручного цикла. Чтобы получить более удобный код. – moooeeeep

ответ

5

Как общий подход, я заменил бы тянуть модель генератора с обратными вызовами, и, вероятно, оберните генератор, как это:

def walk(gen, callbacks): 
    for item in gen: 
     for f in callbacks: 
      f(item) 

Если процессоры в отдельных потоках, и вы хотите их блокировать при ожидании, вы можете зарегистрировать Queue.put (или что-то подобное) в качестве обратного вызова для каждого процессора и опросить эти очереди независимо. Это позволит вам использовать как радиовещание, так и модели рабочего пула, если вам это нужно.

Редактировать

Другим решением было бы использовать coroutines:

def source(self, *dests): 
    for i in xrange(int(1e4)): 
     for dest in dests: 
      dest.send(i) 

def sink(): 
    while True: 
     i = yield 

def t3(): 
    snk = sink() 
    snk.next() # activate the coroutine 
    source(snk) 

if __name__ == '__main__': 

    from timeit import Timer 
    n = 1000 
    t = Timer("t3()", "from __main__ import source, sink, t3") 
    print "%.2f usec/pass" % (1000000 * t.timeit(number=n)/n) # 872.99 usec/pass 

Выглядит достаточно быстро. В основном, сопрограммы - это инвертированные генераторы, вы тянете от генератора, нажимаете на сопрограмму.

+0

Что это за архитектура? Учитывая, что модель pull будет блокировать потоки, пока уровень ввода-вывода будет действовать соответствующим образом, это будет простейшая модель для программирования. – Marcin

+0

Если человеку нужно блокировать, ну, обратные вызовы могут быть фактически 'Queue.put' (обновлен ответ). – bereal

+0

см. Мою статью – moooeeeep

1

Вы действительно не обращаетесь к этому, но хотите ли вы, чтобы каждый потребитель видел точные данные (в этом случае tee, вероятно, является лучшим решением), или нет?

Если нет, вы можете просто попросить каждого потребителя прочитать один объект-генератор.

Если вы хотите, чтобы они получили точные данные, попробуйте tee (использует больше памяти) вместо двух генераторов (более IO) и посмотрите, что быстрее.

Что касается ваших таймингов, то, что ваши данные показывают просто, что накладные расходы на несколько вызовов функций и что один из ваших методов позволяет избежать промежуточных вызовов функций.

Если вы хотите повысить производительность, попробуйте запустить это на PyPy, у которого есть оптимизационная точка доступа JIT.

+0

К сожалению, PyPy не поддерживает мои другие зависимости. (Действительно, в этом вопросе не было никакой заметки.) – moooeeeep

+0

@moooeeeep У вас, похоже, много ограничений, которые вы просто не заявляете. Все эти ответы отвечают на ваш вопрос по заданию, но затем вы продолжаете жаловаться, что они не отвечают на ваш вопрос. – Marcin

1

Поскольку генераторы дешевы в памяти, почему бы вам просто не использовать два независимых генератора?

g1 = xrange(int(1e8)) 
g2 = xrange(int(1e8)) 
sum(g1), sum(g2) 
+0

Дважды IO. – Marcin

+0

«xrange» - это всего лишь пример процесса, который создает длинный список значений. Пожалуйста, предположите, что это невозможно легко воспроизвести. – moooeeeep

0

Я предлагаю вам посмотреть, как это реализовать с coroutines, более конкретно этот broadcast example

+0

Возможно, объясните немного больше. – Marcin

+0

Спасибо за ссылку! – moooeeeep

Смежные вопросы