2016-02-15 3 views
2

Я пытаюсь использовать несколько очередей для чтения и пакетной обработки, но это вызывает временную блокировку TF. Вот некоторые примеры кода:Несколько очередей, вызывающих блокировку TF

import tensorflow as tf 

coordinator = tf.train.Coordinator() 

file_queue = tf.train.string_input_producer(tf.train.match_filenames_once(...)) 
reader = tf.TextLineReader() 
key, serialized_example = reader.read(file_queue) 
keys, serialized_examples = tf.train.batch([key, serialized_example], 10) 

# repeat the code snippet below multiple times, in my case 4 
file_queue_i = tf.train.string_input_producer(tf.train.match_filenames_once(...)) 
reader_i = tf.TextLineReader() 
key_i, serialized_example_i = reader.read(file_queue_i) 

initializer = tf.initialize_all_variables() 

session = tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1, intra_op_parallelism_threads=1)) 
session.run(initializer) 

threads = tf.train.start_queue_runners(sess=session, coord=coordinator) 

session.run(keys) 

TensorFlow иногда зависает на последней строке, когда я на самом деле пытаются запустить что-то. Однако это поведение довольно сложно воспроизвести, используя приведенный выше код. В 1000+ работает, я мог только заставить его повесить один раз. В моем реальном коде реальный читатель более сложный, и он использует TFRecords, но в остальном все одно и то же. Там он вешает 2/3 времени с 3 очередями в общей сложности. С 5 очередями он, похоже, никогда не запускается, и с 1 очередью он, кажется, никогда не висит. Это на Mac с 0,6. У меня другая система Ubuntu, также с 0,6, и я получаю ту же проблему (хотя частота блокировки намного выше в системе Ubuntu).

ОБНОВЛЕНИЕ. Более точную оценку того, как часто код блокируется, составляет 1 из 5000 испытаний.

+0

Нити будет висеть, если он читает из пустой очереди, или нажав на полные очереди. Я не понимаю ваш фрагмент, хотя вы читаете только 2 очереди, а reader_i нигде не используется –

+0

Правильно, проблема возникает, когда я создаю другие очереди, даже если я их активно не использую. Я использую один, но остальные только что созданы и не оцениваются (хотя, естественно, они будут заполнены бегуном очереди). Разумеется, дело не в коде, а в том, что нет необходимости оценивать других читателей, чтобы они действительно наблюдали эту проблему. Что касается подвешивания при чтении из пустой очереди - не нужно ли это на мгновение висеть, пока очередь не заполнится? Видения, которые я вижу, постоянны. Я должен закончить. –

+0

oh, interesting .... число нитей имеет значение на частоте блокировки? –

ответ

2

Возможно, это связано с недостаточным количеством рабочих потоков. Если у вас есть очередь очереди 1 в зависимости от работы очереди 2, и вы запускаете их асинхронно, вам понадобятся по крайней мере два потока op, заданных через inter_op_parallelism_threads, чтобы гарантировать, что прогресс будет выполнен.

В вашем случае у вас есть очередь бегуна, заполняющая batch нить в зависимости от string_input_producer очередь не пустая. Если очередь, связанная с очередью string_input_producer, запускается первой, тогда все в порядке. Но если batch очередь будет назначена первой, она застрянет в string_input_producer.dequeue op, ожидая очереди string_input_producer, чтобы получить некоторые имена файлов. Поскольку есть только одна нить в TensorFlow оп пул потоки, то enqueue оп string_input_producer никогда не будет выделена нитью для завершения (то есть, чтобы выполнить свой метод Compute)

Простейшее решения иметь по крайней мере столько же операцию тему, как вам имеют одновременные вызовы run (т. е. количество очередей + 1). Если вы действительно хотите ограничить себя одним потоком, вы можете предварительно загружать имена файлов файлов имен файлов синхронно с помощью основного потока.

coordinator = tf.train.Coordinator() 

    import glob 
    files = glob.glob('/temp/pipeline/*') 
    if FLAGS.preload_filenames: 
    file_queue = tf.FIFOQueue(capacity=len(files), dtypes=tf.string) 
    enqueue_val = tf.placeholder(dtype=tf.string) 
    enqueue_op = file_queue.enqueue(enqueue_val) 
    else: 
    file_queue = tf.train.string_input_producer(files) 

    reader = tf.TextLineReader() 
    key, serialized_example = reader.read(file_queue) 
    keys, serialized_examples = tf.train.batch([key, serialized_example], 5, 
              capacity=10) 

    initializer = tf.initialize_all_variables() 

    session = tf.Session(config=tf.ConfigProto(inter_op_parallelism_threads=1, 
              intra_op_parallelism_threads=1)) 
    print 'running initializer' 
    session.run(initializer) 

    if FLAGS.preload_filenames: 
    print 'preloading filenames' 
    for fn in files: 
     session.run([enqueue_op], feed_dict={enqueue_val: fn}) 
     print 'size - ', session.run([file_queue.size()]) 
    session.run([file_queue.close()]) 

    print 'starting queue runners' 
    threads = tf.train.start_queue_runners(sess=session, coord=coordinator) 
    print 'about to run session' 
    print session.run(keys) 

Приведенный выше код требует инкапсуляции, если у вас более одной очереди имен файлов. В качестве альтернативы вот Hacky обходного, который должен работать, если есть точно prebuffer_amount имена файлов для всех input_producer очередей

queue_runners=tf.get_collection(tf.GraphKeys.QUEUE_RUNNERS) 
filename_queue_runners=[qr for qr in queue_runners if 'input_producer' in qr.name] 
for qr in filename_queue_runners: 
    for k in prebuffer_amount: 
    sess.run(qr._enqueue_ops[0]) 
+0

Спасибо, я тестирую, если увеличение количества потоков устраняет проблему. Согласно вышеприведенной логике, не должно быть даже одной пары цепочек 'string_input_producer' и' batch' очереди блокировки? Если это случайная, какая из очередей начинается сначала, тогда она должна блокироваться примерно в 50% случаев, нет? Это, кажется, никогда не случается. Я пробовал до ~ 10 000 попыток и не видел ни одного экземпляра. Но как только у меня есть две пары очередей (2 'string_input_producer' и 2' batch' очереди), он блокируется в ~ 100 испытаниях. –

+0

Приведенный выше пример блокирует 50% времени для меня. Таким образом, в планировщике потоков с открытым исходным кодом может быть что-то, что заставляет его блокировать реже, чем ожидалось. –

+0

Хорошо, это будет иметь больший смысл. Есть ли способ включить несколько потоков только для операций очереди? У меня есть код, который будет работать в общей среде, и, хотя я могу выполнить 11 потоков в очереди, я не могу сделать то же самое для процессора с интенсивным вычислением. –

Смежные вопросы