如何将csv文件转换为rdd

我是新来的火花。 我想对CSVlogging中的特定数据执行一些操作。

我正在尝试读取CSV文件并将其转换为RDD。 我的进一步操作基于CSV文件中提供的标题。

(从评论)这是我的代码到目前为止:

final JavaRDD<String> File = sc.textFile(Filename).cache(); final JavaRDD<String> lines = File.flatMap(new FlatMapFunction<String, String>() { @Override public Iterable<String> call(String s) { return Arrays.asList(EOL.split(s)); } }); final String heading=lines.first().toString(); 

我可以得到像这样的标题值。 我想把这个映射到CSV文件中的每个logging。

 final String[] header=heading.split(" "); 

我可以得到像这样的标题值。 我想把这个映射到CSV文件中的每个logging。

在Java中我使用CSVReader record.getColumnValue(Column header)来获取特定的值。 我需要在这里做类似的事情。

一个简单的方法是有一个方法来保存标题。

比方说,你有一个file.csv像:

 user, topic, hits om, scala, 120 daniel, spark, 80 3754978, spark, 1 

我们可以定义一个使用第一行parsing版本的头文件:

 class SimpleCSVHeader(header:Array[String]) extends Serializable { val index = header.zipWithIndex.toMap def apply(array:Array[String], key:String):String = array(index(key)) } 

我们可以使用这个标题来进一步解决数据的问题:

 val csv = sc.textFile("file.csv") // original file val data = csv.map(line => line.split(",").map(elem => elem.trim)) //lines in rows val header = new SimpleCSVHeader(data.take(1)(0)) // we build our header with the first line val rows = data.filter(line => header(line,"user") != "user") // filter the header out val users = rows.map(row => header(row,"user") val usersByHits = rows.map(row => header(row,"user") -> header(row,"hits").toInt) ... 

请注意, header并不仅仅是一个数组索引助记符的简单映射。 几乎所有这些都可以在数组元素的序号上完成,比如user = row(0)

PS:欢迎来到Scala 🙂

你可以使用spark-csv库: https : //github.com/databricks/spark-csv

这是直接从文档:

 import org.apache.spark.sql.SQLContext SQLContext sqlContext = new SQLContext(sc); HashMap<String, String> options = new HashMap<String, String>(); options.put("header", "true"); options.put("path", "cars.csv"); DataFrame df = sqlContext.load("com.databricks.spark.csv", options); 

首先,我必须说,如果你把你的头文件放在不同的文件中,那就更简单了 – 这是大数据中的惯例。

无论如何,丹尼尔的回答是相当不错的,但是效率低下,错误率高,所以我会发布自己的。 效率低下,你不需要检查每个logging,看看它是否是头,你只需要检查每个分区的第一个logging。 这个错误是通过使用.split(",")你可能会得到一个exception抛出或获取错误的列当条目是空string,并发生在logging的开始或结束 – 纠正你需要使用.split(",", -1) 。 所以这是完整的代码:

 val header = scala.io.Source.fromInputStream( hadoop.fs.FileSystem.get(new java.net.URI(filename), sc.hadoopConfiguration) .open(new hadoop.fs.Path(path))) .getLines.head val columnIndex = header.split(",").indexOf(columnName) sc.textFile(path).mapPartitions(iterator => { val head = iterator.next() if (head == header) iterator else Iterator(head) ++ iterator }) .map(_.split(",", -1)(columnIndex)) 

最后一点,考虑Parquet如果你只想钓出某些列。 或者至less考虑实施一个懒惰评估拆分function,如果你有宽行。

我们可以使用新的DataFrameRDD读取和写入CSV数据。 DataFrameRDD比NormalRDD有一些优点:

  1. DataFrameRDD比NormalRDD要快一些,因为我们确定了架构,并且有助于在运行时优化很多,并为我们提供了显着的性能提升。
  2. 即使列移位为CSV,它也会自动采用正确的列,因为我们不是将读取数据时出现的列编号硬编码为textFile,然后将其拆分,然后使用列数来获取数据。
  3. 在几行代码中,您可以直接读取CSV文件。

你将被要求有这个库:添加它在build.sbt

 libraryDependencies += "com.databricks" % "spark-csv_2.10" % "1.2.0" 

Spark Scala代码:

 val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val csvInPath = "/path/to/csv/abc.csv" val df = sqlContext.read.format("com.databricks.spark.csv").option("header","true").load(csvInPath) //format is for specifying the type of file you are reading //header = true indicates that the first line is header in it 

通过从中获取一些列来转换为正常的RDD

 val rddData = df.map(x=>Row(x.getAs("colA"))) //Do other RDD operation on it 

将RDD保存为CSV格式:

 val aDf = sqlContext.createDataFrame(rddData,StructType(Array(StructField("colANew",StringType,true)))) aDF.write.format("com.databricks.spark.csv").option("header","true").save("/csvOutPath/aCSVOp") 

由于标题设置为true,我们将在所有输出文件中获取标题名称。

这里是另一个使用Spark / Scala 将CSV转换为RDD的例子。 有关更详细的描述,请参阅这篇文章 。

 def main(args: Array[String]): Unit = { val csv = sc.textFile("/path/to/your/file.csv") // split / clean data val headerAndRows = csv.map(line => line.split(",").map(_.trim)) // get header val header = headerAndRows.first // filter out header (eh. just check if the first val matches the first header name) val data = headerAndRows.filter(_(0) != header(0)) // splits to map (header/value pairs) val maps = data.map(splits => header.zip(splits).toMap) // filter out the user "me" val result = maps.filter(map => map("user") != "me") // print result result.foreach(println) } 

我build议直接从驱动程序读取头,而不是通过Spark。 两个原因:1)这是一个单一的行。 分布式方法没有优势。 2)我们在驱动程序中需要这一行,而不是工作节点。

它是这样的:

 // Ridiculous amount of code to read one line. val uri = new java.net.URI(filename) val conf = sc.hadoopConfiguration val fs = hadoop.fs.FileSystem.get(uri, conf) val path = new hadoop.fs.Path(filename) val stream = fs.open(path) val source = scala.io.Source.fromInputStream(stream) val header = source.getLines.head 

现在,当您制作RDD时,您可以放弃标题。

 val csvRDD = sc.textFile(filename).filter(_ != header) 

然后,我们可以从一列中创build一个RDD,例如:

 val idx = header.split(",").indexOf(columnName) val columnRDD = csvRDD.map(_.split(",")(idx)) 

另一种方法是使用mapPartitionsWithIndex方法,因为您将获得分区索引号和该分区中所有行的列表。 分区0和行0将是标题

 val rows = sc.textFile(path) .mapPartitionsWithIndex({ (index: Int, rows: Iterator[String]) => val results = new ArrayBuffer[(String, Int)] var first = true while (rows.hasNext) { // check for first line if (index == 0 && first) { first = false rows.next // skip the first row } else { results += rows.next } } results.toIterator }, true) rows.flatMap { row => row.split(",") } 

这个怎么样?

 val Delimeter = "," val textFile = sc.textFile("data.csv").map(line => line.split(Delimeter)) 

我build议你试试

https://spark.apache.org/docs/latest/sql-programming-guide.html#rdds

 JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map( new Function<String, Person>() { public Person call(String line) throws Exception { String[] parts = line.split(","); Person person = new Person(); person.setName(parts[0]); person.setAge(Integer.parseInt(parts[1].trim())); return person; } }); 

你必须在这个例子中有一个类的人与您的文件标题的规范,并将您的数据关联到模式,并像在mysql中一样应用标准来获得所需的结果

我想你可以尝试加载这个csv到一个RDD,然后从这个RDD创build一个数据框,这里是从rdd创build数据框的文档: http ://spark.apache.org/docs/latest/sql-programming-guide 的.html#互操作与- RDDS

对于火花Scala,我通常使用,当我不能使用spark csv包…

 val sqlContext = new org.apache.spark.sql.SQLContext(sc) val rawdata = sc.textFile("hdfs://example.host:8020/user/example/example.csv") val header = rawdata.first() val tbldata = rawdata.filter(_(0) != header(0)) 

从Spark 2.0开始,可以直接将CSV读取到DataFrame

如果数据文件没有标题行,那么它将是:

 val df = spark.read.csv("file://path/to/data.csv") 

这将加载数据,但给每列像_c0_c1等通用名称

如果有标题,则添加.option("header", "true")将使用第一行来定义DataFrame的列:

 val df = spark.read .option("header", "true") .csv("file://path/to/data.csv") 

举一个具体的例子,假设你有一个文件的内容:

 user,topic,hits om,scala,120 daniel,spark,80 3754978,spark,1 

然后下面将得到按主题分组的总点击数:

 import org.apache.spark.sql.functions._ import spark.implicits._ val rawData = spark.read .option("header", "true") .csv("file://path/to/data.csv") // specifies the query, but does not execute it val grouped = rawData.groupBy($"topic").agg(sum($"hits)) // runs the query, pulling the data to the master node // can fail if the amount of data is too much to fit // into the master node's memory! val collected = grouped.collect // runs the query, writing the result back out // in this case, changing format to Parquet since that can // be nicer to work with in Spark grouped.write.parquet("hdfs://some/output/directory/") // runs the query, writing the result back out // in this case, in CSV format with a header and // coalesced to a single file. This is easier for human // consumption but usually much slower. grouped.coalesce(1) .write .option("header", "true") .csv("hdfs://some/output/directory/")