2016-03-31 3 views
4

У меня есть модель, которая работает над набором изображений и вычисляет некоторую статистику по ним, - для простоты говорят, что она выводит только среднее изображение из набора (оно делает больше, чем это на практике). У меня есть несколько каталогов, содержащих изображения, и я хочу получить вывод из каждого каталога. Каждый каталог имеет переменное количество изображений в нем.Tensorflow: пакетные очереди ввода, а затем изменение источника очереди

Я построил график, функцию вывода и функцию потерь один раз для моего скрипта. Входы загружаются с использованием слегка адаптированного code from here. Я адаптировал его, чтобы взять массив путей, которые я использую, используя заполнитель переменного размера. Я получил вдохновение для этого from here.

Тогда я цикл по каталогам и выполните следующие действия:

  1. Инициализация переменных (это сбрасывает предыдущую выходную переменную из результатов, рассчитанных на предыдущий каталог)
  2. Набор пути изображения переменных в Текущие файлы массив из нового каталога: sess.run(image_paths.initializer, feed_dict={image_paths_initializer: image_paths})
  3. Начало очереди запуска: queue_threads = tf.train.start_queue_runners(sess=sess, coord=coord)
  4. Run для ряда эпох, чтобы получить результаты
  5. Закрыть темы coord.request_stop(); coord.join(queue_threads); coord.clear_stop()
  6. возврата результатов, сохранить результаты, перейти на следующий каталог ...

Проблемы в том, что, когда речь идет о втором каталоге в очереди бегун нить отказывается начать (я могу видеть это путем отладки переменной queue_threads). Это дает ошибки, как:

Compute status: Aborted: FIFOQueue '_1_input_producer' is closed. 
Compute status: Aborted: RandomShuffleQueue '_0_shuffle_batch/random_shuffle_queue' is closed. 

Если я не закрываю нити (и не начинать их во второй раз), то они не производят файлы из нового каталога - они игнорируют оп присваивание переменной в (2). Невозможно перезапустить очереди таким образом?

Я попытался настроить очереди в отдельном сеансе и вытащить партии из них, но это дает мне различные ошибки CUDA/памяти. Если я это сделаю и добавлю остановки отладки, я могу заставить его работать довольно далеко до того, как он ударит это, но я не знаю, можно ли добавить зависимости управления между несвязанными сеансами/графиками?

Можно было бы начать с нуля для каждого нового каталога, но это добавляет много накладных расходов к процессу, который я пытаюсь избежать. Я сделал с ним подобные вещи (т. Е. Сброс переменных и повторное использование с разными входами) без очередей, и это экономит много времени, поэтому я знаю, что бит работает.

Может ли кто-нибудь из вас замечательный SO folk думать о выходе из этого?

ответ

6

string_input_producer является FIFOQueue + QueueRunner. Вы получаете больше контроля, если используете FIFOQueue и вручную регистрируете вещи. Что-то вроде этого

filename_queue = tf.FIFOQueue(100, tf.string) 
enqueue_placeholder = tf.placeholder(dtype=tf.string) 
enqueue_op = filename_queue.enqueue(enqueue_placeholder) 

config = tf.ConfigProto() 
config.operation_timeout_in_ms=2000 # for debugging queue hangs 
sess = tf.InteractiveSession(config=config) 
coord = tf.train.Coordinator() 
threads = tf.train.start_queue_runners(coord=coord) 

sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir1/0"}) 
sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir1/1"}) 

# do stats for /temp/dir1 

sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir2/0"}) 
sess.run([enqueue_op], feed_dict={enqueue_placeholder:"/temp/dir2/1"}) 

# do stats for /temp/dir2 

coord.request_stop() 
coord.join(threads) 
+0

Спасибо за предложение! Я пытался заставить его работать, как вы говорите, но он держится, когда я пытаюсь получить партию. Похоже, это вызвано очередью подачи в другую очередь (поскольку 'filename_queue' входит в очередь' shuffle_batch')? – lopsided

+0

Он будет зависать, когда вы пытаетесь установить очередь на полную очередь или удалить из пустой очереди. Попробуйте «sess.run ([filename_queue.size()]), чтобы узнать, сколько элементов уже в очереди –

+0

Также вам нужно запустить бегунов очереди, так как tf.batch также является очередью –

4

Большой благодаря @ ЯРОСЛАВ-Булатов указал мне в правильном направлении по этому вопросу.

Кажется, что моя самая большая проблема была связана с бегунами очереди.Когда я заменил очередь имен файлов FIFOQueue и вручную указал имена файлов, которые работали нормально, но поскольку я также использовал очередь shuffle_batch, это было расстроено, когда я попытался очистить очередь имен файлов для следующего каталога. Я не мог опорожнить эту очередь, поскольку она вызывает блокировки или ломает очередь, поэтому лучше всего я мог бы позволить ей заполнить новые изображения, сохранив остатки из предыдущего каталога - очевидно, ничего хорошего! В конце я заменил его на RandomShuffleQueue и снова вручную размещал элементы вручную так же, как и с именами файлов. Я думаю, что это дает достаточно хорошее смешение изображений и не является излишним для проблемы. Нет нитей, но как только я покончил с этим, все стало намного проще.

Я включил свое окончательное решение, как показано ниже. Любые предложения приветствуются!

import os 
import tensorflow as tf 
import numpy as np 
from itertools import cycle 

output_dir = '/my/output/dir' 

my_dirs = [ 
    [ 
     '/path/to/datasets/blacksquares/black_square_100x100.png', 
     '/path/to/datasets/blacksquares/black_square_200x200.png', 
     '/path/to/datasets/blacksquares/black_square_300x300.png' 
    ], 
    [ 
     '/path/to/datasets/whitesquares/white_square_100x100.png', 
     '/path/to/datasets/whitesquares/white_square_200x200.png', 
     '/path/to/datasets/whitesquares/white_square_300x300.png', 
     '/path/to/datasets/whitesquares/white_square_400x400.png' 
    ], 
    [ 
     '/path/to/datasets/mixedsquares/black_square_200x200.png', 
     '/path/to/datasets/mixedsquares/white_square_200x200.png' 
    ] 
] 

# set vars 
patch_size = (100, 100, 1) 
batch_size = 20 
queue_capacity = 1000 

# setup filename queue 
filename_queue = tf.FIFOQueue(
    capacity=queue_capacity, 
    dtypes=tf.string, 
    shapes=[[]] 
) 
filenames_placeholder = tf.placeholder(dtype='string', shape=(None)) 
filenames_enqueue_op = filename_queue.enqueue_many(filenames_placeholder) 

# read file and preprocess 
image_reader = tf.WholeFileReader() 
key, file = image_reader.read(filename_queue) 
uint8image = tf.image.decode_png(file) 
cropped_image = tf.random_crop(uint8image, patch_size) # take a random 100x100 crop 
float_image = tf.div(tf.cast(cropped_image, tf.float32), 255) # put pixels in the [0,1] range 

# setup shuffle batch queue for training images 
images_queue = tf.RandomShuffleQueue(
    capacity=queue_capacity, 
    min_after_dequeue=0, # allow queue to become completely empty (as we need to empty it) 
    dtypes=tf.float32, 
    shapes=patch_size 
) 
images_enqueue_op = images_queue.enqueue(float_image) 

# setup simple computation - calculate an average image patch 
input = tf.placeholder(shape=(None,) + patch_size, dtype=tf.float32) 
avg_image = tf.Variable(np.random.normal(loc=0.5, scale=0.5, size=patch_size).astype(np.float32)) 
loss = tf.nn.l2_loss(tf.sub(avg_image, input)) 
train_op = tf.train.AdamOptimizer(2.).minimize(loss) 

# start session and initialize variables 
sess = tf.InteractiveSession() 
sess.run(tf.initialize_all_variables()) 

# note - no need to start any queue runners as I've done away with them 

for dir_index, image_paths in enumerate(my_dirs): 
    image_paths_cycle = cycle(image_paths) 

    # reset the optimisation and training vars 
    sess.run(tf.initialize_all_variables()) 

    num_epochs = 1000 
    for i in range(num_epochs): 
     # keep the filename queue at capacity 
     size = sess.run(filename_queue.size()) 
     image_paths = [] 
     while size < queue_capacity: 
      image_paths.append(next(image_paths_cycle)) 
      size += 1 
     sess.run(filenames_enqueue_op, feed_dict={filenames_placeholder: image_paths}) 

     # keep the shuffle batch queue at capacity 
     size = sess.run(images_queue.size()) 
     while size < queue_capacity: 
      sess.run([images_enqueue_op]) 
      size += 1 

     # get next (random) batch of training images 
     batch = images_queue.dequeue_many(batch_size).eval() 

     # run train op 
     _, result, loss_i = sess.run([train_op, avg_image, loss], feed_dict={input: batch}) 
     print('Iteration {:d}. Loss: {:.2f}'.format(i, loss_i)) 

     # early stopping :) 
     if loss_i < 0.05: 
      break 

    # empty filename queue and verify empty 
    size = sess.run(filename_queue.size()) 
    sess.run(filename_queue.dequeue_many(size)) 
    size = sess.run(filename_queue.size()) 
    assert size == 0 

    # empty batch queue and verify empty 
    size = sess.run(images_queue.size()) 
    sess.run(images_queue.dequeue_many(size)) 
    size = sess.run(filename_queue.size()) 
    assert size == 0 

    # save the average image output 
    result_image = np.clip(result * 255, 0, 255).astype(np.uint8) 
    with open(os.path.join(output_dir, 'result_' + str(dir_index)), 'wb') as result_file: 
     result_file.write(tf.image.encode_png(result_image).eval()) 

print('Happy days!') 
exit(0) 

Это result_0 Выходы - черный квадрат, result_1 - белый квадрат и result_2 - (в основном) серый квадрат.

0

Глядя на свой собственный ответ, я бы предложил вам взглянуть на tf.train.match_filenames_once. Он позволяет указать шаблон для ваших файлов и передать его string_input_producer.

q = tf.train.string_input_producer(
    tf.train.match_filenames_once('/path/to/datasets/*.png') 
) 

Подробнее comprehensive example. Делая это, вы не только исправите свою проблему, но и не будете перебирать все ваши каталоги.

Смежные вопросы