2016-12-06 2 views
0

Я нахожусь в самом начале изучения TensorFlow. Я хочу прочитать матрицу 3x3 из файла csv в hdfs и умножить ее на себя.Прочитайте одну (упакованную) матрицу из файла в TensorFlow

файл выглядит следующим образом:

1,2,3 
4,5,6 
7,8,9 

До сих пор я мог придумал следующий код с помощью TensorFlow tutorial:

def read_and_decode(filename_queue): 
    reader = tf.TextLineReader() 
    key, value = reader.read(filename_queue) 

    # Type information and column names based on the decoded CSV. 
    record_defaults = [[0.0], [0.0], [0.0]] 
    f1,f2,f3 = tf.decode_csv(value, record_defaults=record_defaults) 

    # Turn the features back into a tensor. 
    features = tf.pack([ 
    f1, 
    f2, 
    f3]) 

    return features 

def input_pipeline(filename_queue, batch_size, num_threads): 
    example = read_and_decode(filename_queue) 
    min_after_dequeue = 10000 
    capacity = min_after_dequeue + 3 * batch_size 

    example_batch = tf.train.batch(
     [example], batch_size=batch_size, capacity=capacity, 
     num_threads=num_threads, allow_smaller_final_batch=True) 
    return example_batch 


def get_all_records(FILE): 
with tf.Session() as sess: 
    filename_queue = tf.train.string_input_producer([FILE], num_epochs=1, shuffle=False) 
    batch_size = 1 
    num_threads = 4 
    #batch = input_pipeline(filename_queue, batch_size, num_threads) 
    batch = read_and_decode(filename_queue) 
    init_op = tf.local_variables_initializer() 
    sess.run(init_op) 
    coord = tf.train.Coordinator() 
    threads = tf.train.start_queue_runners(coord=coord) 
    try: 
    while True: 
     example = sess.run([batch]) 
     print(example) 
    except tf.errors.OutOfRangeError, e: 
    coord.request_stop(e) 
    finally: 
    coord.request_stop() 

    coord.join(threads) 

get_all_records('hdfs://default/test.csv') 

Это будет печатать каждую строку матрицы в правильный порядок. Однако, когда я использую пакетную обработку, применяя input_pipeline(), результат не будет в правильном порядке.

Мы также можем прочитать файл в Matrix Market format. Это устранит ограничение на порядок.

Итак, мой вопрос заключается в том, как я могу получить результирующие строки (или партии) в матрицу (или пакетную матрицу) масштабируемым образом (т.е. матрица действительно большая), так что я могу применить умножение матрицы как:

result = tf.matmul(Matrix,Matrix) 
result = tf.batch_matmul(batched_Matrix,batched_Matrix) 

И как продолжение вопроса: какое из них является самым быстрым решением, особенно когда речь идет о распределенном исполнении?

Спасибо за вашу помощь, Felix

ответ

0

После некоторых исследований я смог наконец осуществить рабочий прототип:

def read_and_decode(filename_queue): 
    reader = tf.TextLineReader() 
    key, value = reader.read(filename_queue) 

    # Type information and column names based on the decoded CSV. 
    record_defaults = [[0.0], [0.0], [0.0]] 
    f1,f2,f3 = tf.decode_csv(value, record_defaults=record_defaults) 

    return [f1,f2,f3] 

def cond(sequence_len, step): 
    return tf.less(step,sequence_len) 

def body(sequence_len, step, filename_queue): 
    begin = tf.get_variable("begin",tensor_shape.TensorShape([3, 3]),dtype=tf.float32,initializer=tf.constant_initializer(0)) 
    begin = tf.scatter_update(begin, step, read_and_decode(filename_queue), use_locking=None) 
    tf.get_variable_scope().reuse_variables() 

    with tf.control_dependencies([begin]): 
     return (sequence_len, step+1) 

def get_all_records(FILE): 
with tf.Session() as sess: 

    filename_queue = tf.train.string_input_producer([FILE], num_epochs=1, shuffle=False) 

    b = lambda sl, st: body(sl,st,filename_queue) 

    step = tf.constant(0) 
    sequence_len = tf.constant(3) 
    _,step, = tf.while_loop(cond, 
        b, 
        [sequence_len, step], 
        parallel_iterations=10, 
        back_prop=True, 
        swap_memory=False, 
        name=None) 

    begin = tf.get_variable("begin",tensor_shape.TensorShape([3, 3]),dtype=tf.float32) 

    with tf.control_dependencies([step]): 
     product = tf.matmul(begin, begin) 

    init0 = tf.local_variables_initializer() 
    sess.run(init0) 
    init1 = tf.global_variables_initializer() 
    sess.run(init1) 

    coord = tf.train.Coordinator() 
    threads = tf.train.start_queue_runners(coord=coord) 
    try: 
     print(sess.run([product])) 
    except tf.errors.OutOfRangeError, e: 
     coord.request_stop(e) 
    finally: 
     coord.request_stop() 

    coord.join(threads) 

get_all_records('hdfs://default/data.csv') 

Идея пришла от: How does the tf.scatter_update() work inside the while_loop() Я предполагаю, что я могу реализовать пакетное версии в аналогичным образом. Тем не менее, я рад за любой совет, чтобы сделать его более совершенным.