Spark – 将CSV文件加载为DataFrame?

我想在spark中读取CSV并将其转换为DataFrame并使用df.registerTempTable("table_name")将其存储在HDFS中

我努力了:

 scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv") 

我得到的错误:

 java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10] at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

在Apache Spark中将CSV文件加载为DataFrame的正确命令是什么?

Spark SQL仅为3种types的数据源提供内置支持:

  1. 镶木地板(这是默认)
  2. JSON
  3. JDBC

对于CSV,有一个单独的库: spark-csv

它的CsvContext类提供了可用于加载csv的csvFile方法。

 val cars = sqlContext.csvFile("cars.csv") // uses implicit class CsvContext 

CSVparsing为DataFrame / DataSet与Spark 2.x

首先初始化SparkSession对象,默认情况下它将在shell中以spark

 val spark = org.apache.spark.sql.SparkSession.builder .master("local") .appName("Spark CSV Reader") .getOrCreate; 

使用以下任何一种方式将CSV加载为DataFrame/DataSet

1.以程序化的方式进行

  val df = spark.read .format("csv") .option("header", "true") //reading the headers .option("mode", "DROPMALFORMED") .load("hdfs:///csv/file/dir/file.csv") 

你也可以做这个SQL的方式

  val df = spark.sql("SELECT * FROM csv.`csv/file/path/in/hdfs`") 

依赖关系

  "org.apache.spark" % "spark-core_2.11" % 2.0.0, "org.apache.spark" % "spark-sql_2.11" % 2.0.0, 


Spark版本<2.0

 val df = sqlContext.read .format("com.databricks.spark.csv") .option("header", "true") .option("mode", "DROPMALFORMED") .load("csv/file/path"); 

依赖关系:

 "org.apache.spark" % "spark-sql_2.10" % 1.6.0, "com.databricks" % "spark-csv_2.10" % 1.6.0, "com.univocity" % "univocity-parsers" % LATEST, 

它的Hadoop是2.6,Spark是1.6,没有“databricks”包。

 import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType}; import org.apache.spark.sql.Row; val csv = sc.textFile("/path/to/file.csv") val rows = csv.map(line => line.split(",").map(_.trim)) val header = rows.first val data = rows.filter(_(0) != header(0)) val rdd = data.map(row => Row(row(0),row(1).toInt)) val schema = new StructType() .add(StructField("id", StringType, true)) .add(StructField("val", IntegerType, true)) val df = sqlContext.createDataFrame(rdd, schema) 

使用Spark 2.0,以下是您可以如何阅读CSV

 val conf = new SparkConf().setMaster("local[2]").setAppName("my app") val sc = new SparkContext(conf) val sparkSession = SparkSession.builder .config(conf = conf) .appName("spark session example") .getOrCreate() val path = "/Users/xxx/Downloads/usermsg.csv" val base_df = sparkSession.read.option("header","true"). csv(path) 

在Java 1.8中这个代码片段完全可以读取CSV文件

的pom.xml

 <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql_2.10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>2.0.0</version> </dependency> <!-- https://mvnrepository.com/artifact/org.scala-lang/scala-library --> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>2.11.8</version> </dependency> <dependency> <groupId>com.databricks</groupId> <artifactId>spark-csv_2.10</artifactId> <version>1.4.0</version> </dependency> 

Java的

 SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local"); // create Spark Context SparkContext context = new SparkContext(conf); // create spark Session SparkSession sparkSession = new SparkSession(context); Dataset<Row> df = sparkSession.read().format("com.databricks.spark.csv").option("header", true).option("inferSchema", true).load("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); //("hdfs://localhost:9000/usr/local/hadoop_data/loan_100.csv"); System.out.println("========== Print Schema ============"); df.printSchema(); System.out.println("========== Print Data =============="); df.show(); System.out.println("========== Print title =============="); df.select("title").show(); 

Penny's Spark 2的例子是在spark2中做的。 还有一个技巧:通过对数据进行初始扫描,为您生成该标题,方法是将选项inferSchema设置为true

在这里,假设spark是你设置的火花会话,是在S3上的所有Amazon地图图像的CSV索引文件中加载的操作。

  /* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ val csvdata = spark.read.options(Map( "header" -> "true", "ignoreLeadingWhiteSpace" -> "true", "ignoreTrailingWhiteSpace" -> "true", "timestampFormat" -> "yyyy-MM-dd HH:mm:ss.SSSZZZ", "inferSchema" -> "true", "mode" -> "FAILFAST")) .csv("s3a://landsat-pds/scene_list.gz") 

坏消息是:这会触发对文件的扫描; 对于像20 + MB压缩的CSV文件这样的大型文件,可能需要长达30秒的长途连接。 记住这一点:一旦你得到它,你最好手动编码架构。

(代码片段Apache软件许可证2.0许可,以避免所有歧义;我已经做了S3集成的演示/集成testing)

我可以像下面这样做:

 val conf = new SparkConf().setAppName("Test Spark").setMaster("local[1]") val sc = new SparkContext(conf) val sqlContext = new SQLContext(sc) val txtDf = sqlContext.read.format("com.databricks.spark.csv") .option("header", "true").load("D:\\spark-training\\employee.txt") txtDf.registerTempTable("employee") val employees = sqlContext.sql("select * from employee") employees.printSchema() employees.show() 

注意:对于1.7之前的Spark版本

  val dataframe = sqlContext.read.format("com.databricks.spark.csv"). option("delimiter", "\t"). option("header", "true"). option("inferSchema", "true"). load("file_name") 

为一个csv使用分隔符为','和更改必要的选项,如标题和inferSchema为python只需取出它,它的工作原理但你需要通过这个包到你的火花shell或火花提交像

 spark-shell --packages com.databricks:spark-csv_2.10:1.4.0 or spark-submit --packages com.databricks:spark-csv_2.10:1.4.0