我有一个与Apache Spark和PostgreSQL的JDBC连接,我想插入一些数据到我的数据库。 当我使用append模式时,我需要为每个DataFrame.Row指定id 。 有什么办法让Spark创build主键?
我是Spark的新手,我试图用Spark从文件中读取CSV数据。 这是我在做什么: sc.textFile('file.csv') .map(lambda line: (line.split(',')[0], line.split(',')[1])) .collect() 我希望这个调用给我一个我的文件的两个第一列的列表,但我得到这个错误: File "<ipython-input-60-73ea98550983>", line 1, in <lambda> IndexError: list index out of range 虽然我的CSV文件不止一列。
我们如何在数据框中连接2列? 有没有在火花SQL中的任何function,我们可以用来连接一个DF表中的2列。
我是新的apache的火花,显然我在我的MacBook中安装了自制软件的apache-spark: Last login: Fri Jan 8 12:52:04 on console user@MacBook-Pro-de-User-2:~$ pyspark Python 2.7.10 (default, Jul 13 2015, 12:05:58) [GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/01/08 14:46:44 INFO SparkContext: Running Spark version 1.5.1 16/01/08 14:46:46 WARN NativeCodeLoader: Unable […]
有两个独立的pyspark应用程序实例化一个HiveContext代替SQLContext让两个应用程序之一失败,错误: 例外:(“你必须使用Hive构buildSpark,导出'SPARK_HIVE = true'并运行build / sbt assembly”,Py4JJavaError(调用None.org.apache.spark.sql.hive.HiveContext时发生错误。\ n ',JavaObject id = o34039)) 另一个应用程序成功终止。 我使用Python API中的Spark 1.6,并希望使用一些Dataframe函数,这些函数仅支持HiveContext (例如collect_set )。 我在1.5.2及更早的版本中遇到了同样的问题。 这足以重现: import time from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext conf = SparkConf() sc = SparkContext(conf=conf) sq = HiveContext(sc) data_source = '/tmp/data.parquet' df = sq.read.parquet(data_source) time.sleep(60) sleep只是为了保持脚本运行,而我开始另一个进程。 如果我有两个运行此脚本的实例,则上述错误在读取parquet文件时显示。 当我用HiveContextreplaceHiveContext一切都很好。 有谁知道这是为什么?
从火花2.0.1开始我有一些问题。 我读了很多文档,但到目前为止找不到足够的答案: 有什么区别 df.select("foo") df.select($"foo") 我是否正确理解这一点 myDataSet.map(foo.someVal)是types安全的,不会转换为RDD但保持DataSet表示forms/无额外的开销(2.0.0性能) 所有其他命令,如select,..只是语法糖。 它们不是types安全的,可以使用地图代替。 我怎么能df.select("foo")types安全没有地图声明? 为什么我应该使用UDF / UADF而不是地图(假设地图停留在数据集表示中)?
我有一个Python类,用来加载和处理Spark中的一些数据。 在我需要做的各种事情中,我将生成一个由Spark数据框中各个列派生的虚拟variables列表。 我的问题是,我不知道如何正确定义一个用户定义函数来完成我所需要的。 我现在有一种方法,当映射到基础dataframeRDD时,解决了一半的问题(请记住,这是一个更大的data_processor类中的方法): def build_feature_arr(self,table): # this dict has keys for all the columns for which I need dummy coding categories = {'gender':['1','2'], ..} # there are actually two differnt dataframes that I need to do this for, this just specifies which I'm looking at, and grabs the relevant features from a config file […]
我想在spark中读取CSV并将其转换为DataFrame并使用df.registerTempTable("table_name")将其存储在HDFS中 我努力了: scala> val df = sqlContext.load("hdfs:///csv/file/dir/file.csv") 我得到的错误: java.lang.RuntimeException: hdfs:///csv/file/dir/file.csv is not a Parquet file. expected magic number at tail [80, 65, 82, 49] but found [49, 59, 54, 10] at parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:418) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:277) at org.apache.spark.sql.parquet.ParquetRelation2$MetadataCache$$anonfun$refresh$6.apply(newParquet.scala:276) at scala.collection.parallel.mutable.ParArray$Map.leaf(ParArray.scala:658) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:54) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:53) at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:56) at scala.collection.parallel.mutable.ParArray$Map.tryLeaf(ParArray.scala:650) at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:165) at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:514) at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160) at […]
在PySpark或者至less在Scala中是否有相当于Apache Spark中的Pandas Melt函数? 我在python中运行一个示例数据集,现在我想为整个数据集使用Spark。 提前致谢。
我有以下格式的数据(RDD或Spark DataFrame): from pyspark.sql import SQLContext sqlContext = SQLContext(sc) rdd = sc.parallelize([('X01',41,'US',3), ('X01',41,'UK',1), ('X01',41,'CA',2), ('X02',72,'US',4), ('X02',72,'UK',6), ('X02',72,'CA',7), ('X02',72,'XX',8)]) # convert to a Spark DataFrame schema = StructType([StructField('ID', StringType(), True), StructField('Age', IntegerType(), True), StructField('Country', StringType(), True), StructField('Score', IntegerType(), True)]) df = sqlContext.createDataFrame(rdd, schema) 我想做的是“重塑”数据,将国家(特别是美国,英国和加拿大)的某些行转换为列: ID Age US UK CA 'X01' 41 3 1 2 'X02' 72 […]