如何*实际*在TensorFlow中读取CSV数据?

我对TensorFlow的世界比较陌生,对于如何将CSV数据读入TensorFlow中可用的示例/标签张量中感到相当困惑。 阅读CSV数据的TensorFlow教程中的示例是相当分散的,只能让您在CSV数据上进行培训。

下面是我编写的基于CSV教程的代码:

from __future__ import print_function import tensorflow as tf def file_len(fname): with open(fname) as f: for i, l in enumerate(f): pass return i + 1 filename = "csv_test_data.csv" # setup text reader file_length = file_len(filename) filename_queue = tf.train.string_input_producer([filename]) reader = tf.TextLineReader(skip_header_lines=1) _, csv_row = reader.read(filename_queue) # setup CSV decoding record_defaults = [[0],[0],[0],[0],[0]] col1,col2,col3,col4,col5 = tf.decode_csv(csv_row, record_defaults=record_defaults) # turn features back into a tensor features = tf.pack([col1,col2,col3,col4]) print("loading, " + str(file_length) + " line(s)\n") with tf.Session() as sess: tf.initialize_all_variables().run() # start populating filename queue coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) for i in range(file_length): # retrieve a single instance example, label = sess.run([features, col5]) print(example, label) coord.request_stop() coord.join(threads) print("\ndone loading") 

以下是我正在加载的CSV文件的简要示例 – 相当基本的数据 – 4个特征列和1个标签列:

 0,0,0,0,0 0,15,0,0,0 0,30,0,0,0 0,45,0,0,0 

上面所有的代码都是从CSV文件中逐一打印每个例子 ,虽然很好,但对于培训来说却是无用的。

我在这里挣扎的是如何将这些逐个加载的单个示例转换为训练数据集。 例如, 这是我在Udacity深度学习课程中正在研究的一个笔记本 。 我基本上想要加载我正在加载的CSV数据,并将其放入类似train_datasettrain_labels

 def reformat(dataset, labels): dataset = dataset.reshape((-1, image_size * image_size)).astype(np.float32) # Map 2 to [0.0, 1.0, 0.0 ...], 3 to [0.0, 0.0, 1.0 ...] labels = (np.arange(num_labels) == labels[:,None]).astype(np.float32) return dataset, labels train_dataset, train_labels = reformat(train_dataset, train_labels) valid_dataset, valid_labels = reformat(valid_dataset, valid_labels) test_dataset, test_labels = reformat(test_dataset, test_labels) print('Training set', train_dataset.shape, train_labels.shape) print('Validation set', valid_dataset.shape, valid_labels.shape) print('Test set', test_dataset.shape, test_labels.shape) 

我试过使用tf.train.shuffle_batch ,就像这样,但它只是莫名其妙地挂起:

  for i in range(file_length): # retrieve a single instance example, label = sess.run([features, colRelevant]) example_batch, label_batch = tf.train.shuffle_batch([example, label], batch_size=file_length, capacity=file_length, min_after_dequeue=10000) print(example, label) 

所以总结一下,这里是我的问题:

  • 我错过了这个过程?
    • 感觉就像我错过了一些关于如何正确构buildinputpipe道的关键直觉。
  • 有没有办法避免必须知道CSV文件的长度?
    • 要知道你想要处理的行数( for i in range(file_length)上面的代码行),感觉很for i in range(file_length)

编辑:只要雅罗斯拉夫指出,我可能在这里混合命令和graphics构build部分,它开始变得更清晰。 我能够将以下代码放在一起,我认为这更接近于从CSV培训模型(不包括任何模型培训代码)时通常会做的事情:

 from __future__ import print_function import numpy as np import tensorflow as tf import math as math import argparse parser = argparse.ArgumentParser() parser.add_argument('dataset') args = parser.parse_args() def file_len(fname): with open(fname) as f: for i, l in enumerate(f): pass return i + 1 def read_from_csv(filename_queue): reader = tf.TextLineReader(skip_header_lines=1) _, csv_row = reader.read(filename_queue) record_defaults = [[0],[0],[0],[0],[0]] colHour,colQuarter,colAction,colUser,colLabel = tf.decode_csv(csv_row, record_defaults=record_defaults) features = tf.pack([colHour,colQuarter,colAction,colUser]) label = tf.pack([colLabel]) return features, label def input_pipeline(batch_size, num_epochs=None): filename_queue = tf.train.string_input_producer([args.dataset], num_epochs=num_epochs, shuffle=True) example, label = read_from_csv(filename_queue) min_after_dequeue = 10000 capacity = min_after_dequeue + 3 * batch_size example_batch, label_batch = tf.train.shuffle_batch( [example, label], batch_size=batch_size, capacity=capacity, min_after_dequeue=min_after_dequeue) return example_batch, label_batch file_length = file_len(args.dataset) - 1 examples, labels = input_pipeline(file_length, 1) with tf.Session() as sess: tf.initialize_all_variables().run() # start populating filename queue coord = tf.train.Coordinator() threads = tf.train.start_queue_runners(coord=coord) try: while not coord.should_stop(): example_batch, label_batch = sess.run([examples, labels]) print(example_batch) except tf.errors.OutOfRangeError: print('Done training, epoch reached') finally: coord.request_stop() coord.join(threads) 

我认为你在这里混合了命令和graphics构build部分。 操作tf.train.shuffle_batch创build一个新的队列节点,并且可以使用一个节点来处理整个数据集。 所以我认为你挂了,因为你在for循环中创build了一堆shuffle_batch队列,并没有启动它们的队列跑步者。

正常inputstream水线的使用如下所示:

  1. shuffle_batch这样的节点添加到inputpipe道
  2. (可选,防止无意图修改)最终化图

—图结构的结束,命令式编程的开始 –

  1. tf.start_queue_runners
  2. while(True): session.run()

为了更具可扩展性(避免使用Python GIL),您可以使用TensorFlowpipe道生成所有数据。 但是,如果性能不重要,则可以使用slice_input_producer.将numpy数组连接到inputpipe道slice_input_producer. 下面是一些Print节点的例子,看看发生了什么( Print消息在运行节点时转到stdout)

 tf.reset_default_graph() num_examples = 5 num_features = 2 data = np.reshape(np.arange(num_examples*num_features), (num_examples, num_features)) print data (data_node,) = tf.slice_input_producer([tf.constant(data)], num_epochs=1, shuffle=False) data_node_debug = tf.Print(data_node, [data_node], "Dequeueing from data_node ") data_batch = tf.batch([data_node_debug], batch_size=2) data_batch_debug = tf.Print(data_batch, [data_batch], "Dequeueing from data_batch ") sess = tf.InteractiveSession() sess.run(tf.initialize_all_variables()) tf.get_default_graph().finalize() tf.start_queue_runners() try: while True: print sess.run(data_batch_debug) except tf.errors.OutOfRangeError as e: print "No more inputs." 

你应该看到这样的东西

 [[0 1] [2 3] [4 5] [6 7] [8 9]] [[0 1] [2 3]] [[4 5] [6 7]] No more inputs. 

“8,9”号码没有填满整个批次,所以没有生产。 另外tf.Print打印到sys.stdout,所以它们分别出现在terminal里。

PS:最小连接batch到手动初始化队列在github问题2193

此外,为了进行debugging,您可能需要在会话上设置timeout ,以便您的IPython笔记本不挂在空的队列出队列上。 我为会话使用了这个辅助函数

 def create_session(): config = tf.ConfigProto(log_device_placement=True) config.gpu_options.per_process_gpu_memory_fraction=0.3 # don't hog all vRAM config.operation_timeout_in_ms=60000 # terminate on long hangs # create interactive session to register a default session sess = tf.InteractiveSession("", config=config) return sess 

可伸缩性注意事项:

  1. tf.constant将您的数据复制到Graph中。 graphics定义的大小有2GB的基本限制,所以这是数据大小的上限
  2. 你可以通过使用v=tf.Variable来绕过这个限制,并通过在右边的tf.placeholder运行v.assign_op并将numpy数组提供给占位符( feed_dict )来保存数据到那里。
  3. 这仍然会创build两个数据副本,所以为了节省内存,您可以创build自己的slice_input_producer版本,该版本在numpy数组上运行,并使用feed_dict一次一个上传行

或者你可以试试,代码使用pandas和numpy将Iris数据集加载到张量stream中,并在会话中打印一个简单的神经元输出。 希望它有助于一个基本的了解…. [我没有添加一个热解码标签的方式]。

 import tensorflow as tf import numpy import pandas as pd df=pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [0,1,2,3,4],skiprows = [0],header=None) d = df.values l = pd.read_csv('/home/nagarjun/Desktop/Iris.csv',usecols = [5] ,header=None) labels = l.values data = numpy.float32(d) labels = numpy.array(l,'str') #print data, labels #tensorflow x = tf.placeholder(tf.float32,shape=(150,5)) x = data w = tf.random_normal([100,150],mean=0.0, stddev=1.0, dtype=tf.float32) y = tf.nn.softmax(tf.matmul(w,x)) with tf.Session() as sess: print sess.run(y)