Tag: pyspark

如何更改pyspark中的数据框列名?

我来自pandas的背景,习惯于从CSV文件中读取数据到数据框中,然后使用简单的命令简单地将列名更改为有用的东西: df.columns = new_column_name_list 但是,在使用sqlContext创build的pyspark数据框中不起作用。 我可以想出的唯一解决scheme就是: df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', inferschema='true', delimiter='\t').load("data.txt") oldSchema = df.schema for i,k in enumerate(oldSchema.fields): k.name = new_column_name_list[i] df = sqlContext.read.format("com.databricks.spark.csv").options(header='false', delimiter='\t').load("data.txt", schema=oldSchema) 这基本上定义了两次variables,首先推断模式,然后重命名列名,然后再次使用更新的模式加载数据框。 有没有比pandas更好更高效的方法来做到这一点? 我的火花版本是1.5.0

如何将每个DStream保存/插入到永久表中

我一直在面对“Spark Streaming”关于将输出Dstream插入永久性 SQL表的问题。 我想插入每个输出DStream(来自单个批处理,火花进程)到一个独特的表。 我一直在使用Spark版本1.6.2的Python。 在我的代码的这一部分,我有一个Dstream的一个或多个RDD,我想永久插入/存储到SQL表中,而不会丢失每个处理的批处理结果。 rr = feature_and_label.join(result_zipped)\ .map(lambda x: (x[1][0][0], x[1][1]) ) 这里的每个Dstream都被表示为这个元组的forms:( 4.0,0 )。 我不能使用SparkSQL,因为Spark对待'table'的方式就是像临时表一样 ,因此在每个批处理中都会丢失结果。 这是输出的一个例子: 时间:2016-09-23 00:57:00 (0.0,2) 时间:2016-09-23 00:57:01 (4.0,0) 时间:2016-09-23 00:57:02 (4.0,0) … 如上所示,每个批次仅由一个Dstream生成 。 正如我之前所说,我想将这些结果永久存储在某个地方保存的表中,并可能在稍后查询。 所以我的问题是:有没有办法做到这一点? 我很感激有人能帮我一把,但是特别告诉我这是否可能。 谢谢。

Spark:如何将Python与Scala或Java用户定义函数进行映射?

比方说,我们的团队已经selectPython作为用Spark开发的参考语言。 但后来出于性能方面的原因,我们想开发特定的Scala或Java特定的库,以便将它们与我们的Python代码(类似于具有Scala或Java框架的Python存根)进行映射。 难道你不觉得是否有可能通过一些Scala或Java用户定义函数来接口新的自定义Python方法?

是否启动谓词下推使用JDBC?

据此 Catalyst使用逻辑优化(如谓词下推)。 优化器可以将筛选谓词向下推送到数据源中,使物理执行跳过不相关的数据。 Spark支持将谓词按下到数据源。 此function是否也可用于JDBC? (从检查数据库日志,我可以看到它不是现在的默认行为 – 完整的查询被传递给数据库,即使它后来被火花filter限制) 更多细节 使用PostgreSQL 9.4运行Spark 1.5 代码片段: from pyspark import SQLContext, SparkContext, Row, SparkConf from data_access.data_access_db import REMOTE_CONNECTION sc = SparkContext() sqlContext = SQLContext(sc) url = 'jdbc:postgresql://{host}/{database}?user={user}&password={password}'.format(**REMOTE_CONNECTION) sql = "dummy" df = sqlContext.read.jdbc(url=url, table=sql) df = df.limit(1) df.show() SQL跟踪: < 2015-09-15 07:11:37.718 EDT >LOG: execute <unnamed>: SET extra_float_digits = […]

如何使用JDBC源在(Py)Spark中写入和读取数据?

这个问题的目的是logging: 在PySpark中使用JDBC连接读取和写入数据所需的步骤 JDBC源和可能的解决scheme可能存在的问题 只需稍作更改,这些方法就可以与Scala和R等其他支持的语言一起工作。

用Spark加载CSV文件

我是Spark的新手,我试图用Spark从文件中读取CSV数据。 这是我在做什么: sc.textFile('file.csv') .map(lambda line: (line.split(',')[0], line.split(',')[1])) .collect() 我希望这个调用给我一个我的文件的两个第一列的列表,但我得到这个错误: File "<ipython-input-60-73ea98550983>", line 1, in <lambda> IndexError: list index out of range 虽然我的CSV文件不止一列。

如何连接PyCharm和PySpark?

我是新的apache的火花,显然我在我的MacBook中安装了自制软件的apache-spark: Last login: Fri Jan 8 12:52:04 on console user@MacBook-Pro-de-User-2:~$ pyspark Python 2.7.10 (default, Jul 13 2015, 12:05:58) [GCC 4.2.1 Compatible Apple LLVM 6.1.0 (clang-602.0.53)] on darwin Type "help", "copyright", "credits" or "license" for more information. Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/01/08 14:46:44 INFO SparkContext: Running Spark version 1.5.1 16/01/08 14:46:46 WARN NativeCodeLoader: Unable […]

带有HiveContext的多个Spark应用程序

有两个独立的pyspark应用程序实例化一个HiveContext代替SQLContext让两个应用程序之一失败,错误: 例外:(“你必须使用Hive构buildSpark,导出'SPARK_HIVE = true'并运行build / sbt assembly”,Py4JJavaError(调用None.org.apache.spark.sql.hive.HiveContext时发生错误。\ n ',JavaObject id = o34039)) 另一个应用程序成功终止。 我使用Python API中的Spark 1.6,并希望使用一些Dataframe函数,这些函数仅支持HiveContext (例如collect_set )。 我在1.5.2及更早的版本中遇到了同样的问题。 这足以重现: import time from pyspark import SparkContext, SparkConf from pyspark.sql import SQLContext conf = SparkConf() sc = SparkContext(conf=conf) sq = HiveContext(sc) data_source = '/tmp/data.parquet' df = sq.read.parquet(data_source) time.sleep(60) sleep只是为了保持脚本运行,而我开始另一个进程。 如果我有两个运行此脚本的实例,则上述错误在读取parquet文件时显示。 当我用HiveContextreplaceHiveContext一切都很好。 有谁知道这是为什么?

Apache Spark中的Pandas Melt函数

在PySpark或者至less在Scala中是否有相当于Apache Spark中的Pandas Melt函数? 我在python中运行一个示例数据集,现在我想为整个数据集使用Spark。 提前致谢。

在Spark RDD和/或Spark DataFrame中重塑/旋转数据

我有以下格式的数据(RDD或Spark DataFrame): from pyspark.sql import SQLContext sqlContext = SQLContext(sc) rdd = sc.parallelize([('X01',41,'US',3), ('X01',41,'UK',1), ('X01',41,'CA',2), ('X02',72,'US',4), ('X02',72,'UK',6), ('X02',72,'CA',7), ('X02',72,'XX',8)]) # convert to a Spark DataFrame schema = StructType([StructField('ID', StringType(), True), StructField('Age', IntegerType(), True), StructField('Country', StringType(), True), StructField('Score', IntegerType(), True)]) df = sqlContext.createDataFrame(rdd, schema) 我想做的是“重塑”数据,将国家(特别是美国,英国和加拿大)的某些行转换为列: ID Age US UK CA 'X01' 41 3 1 2 'X02' 72 […]