如何在数据集中存储自定义对象?

据介绍Spark数据集 :

当我们期待Spark 2.0时,我们计划对数据集进行一些令人振奋的改进,特别是:…自定义编码器 – 当我们目前自动生成各种类型的编码器时,我们希望打开自定义对象的API。

并尝试将自定义类型存储在Dataset集中导致出现以下错误:

无法找到存储在数据集中的类型的编码器。 通过导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类)。支持序列化其他类型将在未来版本中添加

要么:

Java.lang.UnsupportedOperationException:没有找到编码器….

是否有任何现有的解决方法?


请注意,此问题仅作为社区Wiki答案的入口点存在。 随时更新/改善问题和答案。

更新

这个答案仍然有效和信息丰富,尽管现在2.2 / 2.3版本已经更好了,它增加了对SetSeqMapDateTimestampBigDecimal内置编码器支持。 如果你坚持仅仅使用case类和通常的Scala类型进行类型化,那么你应该只用SQLImplicits的隐式SQLImplicits


不幸的是,几乎没有添加任何帮助。 在Encoders.scalaSQLImplicits.scala搜索@since 2.0.0主要是为了处理基本类型(以及一些case类的调整)。 所以,首先要说的是,目前对定制类编码器没有真正的好的支持 。 这样一来,接下来就是一些我们可以期待的工作,因为我们现在掌握了这些工作。 作为一个前面的免责声明:这不会完美的工作,我会尽我所能,使所有的限制清晰和前期。

究竟是什么问题

当你想创建一个数据集时,Spark需要一个编码器(将T类型的JVM对象转换为内部的Spark SQL表示形式),这通常是通过SparkSession implicits自动创建的,或者可以通过调用static Encoders方法“(取自createDataset上的文档 )。 编码器将采用Encoder[T]的形式,其中T是您正在编码的类型。 第一个建议是添加import spark.implicits._ (给你这些隐式编码器),第二个建议是使用这组编码器相关函数明确地传入隐式编码器。

没有编码器可用于常规课程,所以

 import spark.implicits._ class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) 

会给你下面隐含的相关编译时错误:

无法找到存储在数据集中的类型的编码器。 通过导入sqlContext.implicits._支持原始类型(Int,String等)和产品类型(case类)。支持序列化其他类型将在未来版本中添加

但是,如果您将任何仅用于在扩展Product类中获得上述错误的类型进行封装,则会将错误混淆地延迟到运行时,所以

 import spark.implicits._ case class Wrap[T](unwrap: T) class MyObj(val i: Int) // ... val d = spark.createDataset(Seq(Wrap(new MyObj(1)),Wrap(new MyObj(2)),Wrap(new MyObj(3)))) 

编译得很好,但在运行时失败

java.lang.UnsupportedOperationException:没有找到MyObj的编码器

其原因是Spark创建的编码器实际上只是在运行时(通过scala重新获得)创建的。 在这种情况下,所有的Spark检查在编译时是最外面的类扩展Product (所有的case类),并且只在运行时意识到它仍然不知道如何处理MyObj (如果我尝试了同样的问题创建一个Dataset[(Int,MyObj)] – Spark等待直到运行时MyObj上的MyObj )。 这些都是亟需解决的核心问题:

  • 一些扩展了Product编译的类,尽管在运行时总是崩溃
  • 没有办法传递嵌套类型的自定义编码器(我没有办法给Spark一个编码器只为MyObj这样它就知道如何编码Wrap[MyObj](Int,MyObj) )。

只要用kryo

大家建议的解决方案是使用kryo编码器。

 import spark.implicits._ class MyObj(val i: Int) implicit val myObjEncoder = org.apache.spark.sql.Encoders.kryo[MyObj] // ... val d = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) 

这虽然很快,但非常繁琐。 特别是如果你的代码正在操纵各种数据集,加入,分组等等,你最终会得到一堆额外的暗示。 那么,为什么不只是自动地做一个隐含的事呢?

 import scala.reflect.ClassTag implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct) 

现在,似乎我几乎可以做任何我想要的东西(下面的例子不会在spark.implicits._被自动导入的spark-shell中工作)

 class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).alias("d2") // mapping works fine and .. val d3 = d1.map(d => (di, d)).alias("d3") // .. deals with the new type val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") // Boom! 

或几乎。 问题是使用kryo导致Spark只是将数据集中的每一行都存储为一个扁平的二进制对象。 对于mapfilterforeach就足够了,但对于像join这样的操作,Spark确实需要将这些分隔成列。 检查d2d3的模式,你会发现只有一个二进制列:

 d2.printSchema // root // |-- value: binary (nullable = true) 

元组的部分解决方案

所以,利用Scala中蕴含的魔力(更多的是在6.26.3重载解析中 ),我可以给自己一系列的含义 ,尽可能地做好工作,至少对于元组来说,并且能够很好地处理现有的含义:

 import org.apache.spark.sql.{Encoder,Encoders} import scala.reflect.ClassTag import spark.implicits._ // we can still take advantage of all the old implicits implicit def single[A](implicit c: ClassTag[A]): Encoder[A] = Encoders.kryo[A](c) implicit def tuple2[A1, A2]( implicit e1: Encoder[A1], e2: Encoder[A2] ): Encoder[(A1,A2)] = Encoders.tuple[A1,A2](e1, e2) implicit def tuple3[A1, A2, A3]( implicit e1: Encoder[A1], e2: Encoder[A2], e3: Encoder[A3] ): Encoder[(A1,A2,A3)] = Encoders.tuple[A1,A2,A3](e1, e2, e3) // ... you can keep making these 

然后,带着这些暗示,我可以让我的例子工作,虽然有一些专栏更名

 class MyObj(val i: Int) val d1 = spark.createDataset(Seq(new MyObj(1),new MyObj(2),new MyObj(3))) val d2 = d1.map(d => (d.i+1,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d2") val d3 = d1.map(d => (di ,d)).toDF("_1","_2").as[(Int,MyObj)].alias("d3") val d4 = d2.joinWith(d3, $"d2._1" === $"d3._1") 

我还没有想出如何获得预期的元组名( _1_2 ,…)默认情况下不重命名 – 如果别人想要玩这个, 这是名称"value"得到引入和这是通常添加元组名称的地方。 但关键是我现在有一个很好的结构化模式:

 d4.printSchema // root // |-- _1: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) // |-- _2: struct (nullable = false) // | |-- _1: integer (nullable = true) // | |-- _2: binary (nullable = true) 

所以,总之,这个解决方法:

  • 允许我们为元组获得单独的列(所以我们可以再次加入元组,)
  • 我们可以再次依靠这些暗示(所以不需要kryo所有的地方)
  • 几乎完全向后兼容import spark.implicits._ (涉及一些重命名)
  • 不让我们加入kyro序列化的二进制列,更不用说那些可能有的字段
  • 有一些令人不愉快的副作用,将某些元组列重命名为“value”(如果有必要,可以通过转换.toDF ,指定新的列名并转换回数据集来取消这个操作),并且模式名似乎被保留通过连接,他们最需要的地方)。

一般的班级部分解决方案

这一个是不愉快的,没有好的解决方案。 然而,现在我们已经有了上面的元组解决方案,我从另一个答案中得到了一个隐式的转换解决方案,因为你可以把更复杂的类转换成元组,所以也不会那么痛苦。 然后,在创建数据集之后,您可能会使用数据框方法重命名这些列。 如果一切顺利的话,这是一个真正的改进,因为我现在可以在我的类的领域进行连接。 如果我刚刚使用了一个kryo平面二进制kryo序列化器。

下面是一个例子,它做了一些事情:我有一个MyObj类,它有Intjava.util.UUIDSet[String]类型的字段。 第一个照顾自己。 第二,虽然我可以使用kryo序列化将更有用,如果存储为一个String (因为UUID通常是我想要加入)。 第三个真的只属于一个二元列。

 class MyObj(val i: Int, val u: java.util.UUID, val s: Set[String]) // alias for the type to convert to and from type MyObjEncoded = (Int, String, Set[String]) // implicit conversions implicit def toEncoded(o: MyObj): MyObjEncoded = (oi, outoString, os) implicit def fromEncoded(e: MyObjEncoded): MyObj = new MyObj(e._1, java.util.UUID.fromString(e._2), e._3) 

现在,我可以使用这个机制创建一个具有良好模式的数据集:

 val d = spark.createDataset(Seq[MyObjEncoded]( new MyObj(1, java.util.UUID.randomUUID, Set("foo")), new MyObj(2, java.util.UUID.randomUUID, Set("bar")) )).toDF("i","u","s").as[MyObjEncoded] 

模式向我展示了我有正确名字的专栏和前两个我可以加入的东西。

 d.printSchema // root // |-- i: integer (nullable = false) // |-- u: string (nullable = true) // |-- s: binary (nullable = true) 
  1. 使用通用编码器。

    现在有两个通用的编码器可用于kryojavaSerialization ,其中后者被明确描述为:

    效率极低,只能作为最后的手段。

    假设下面的课

     class Bar(i: Int) { override def toString = s"bar $i" def bar = i } 

    您可以通过添加隐式编码器来使用这些编码器:

     object BarEncoders { implicit def barEncoder: org.apache.spark.sql.Encoder[Bar] = org.apache.spark.sql.Encoders.kryo[Bar] } 

    它们可以一起使用如下:

     object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarEncoders._ val ds = Seq(new Bar(1)).toDS ds.show sc.stop() } } 

    它将对象存储为binary列,因此在转换为DataFrame您将获得以下模式:

     root |-- value: binary (nullable = true) 

    也可以使用kryo编码器对特定字段进行元组编码:

     val longBarEncoder = Encoders.tuple(Encoders.scalaLong, Encoders.kryo[Bar]) spark.createDataset(Seq((1L, new Bar(1))))(longBarEncoder) // org.apache.spark.sql.Dataset[(Long, Bar)] = [_1: bigint, _2: binary] 

    请注意,我们在这里不依赖于隐式编码器,而是明确地传递编码器,所以这很可能不适用于toDS方法。

  2. 使用隐式转换:

    在可以被编码的表示和自定义类之间提供隐式转换,例如:

     object BarConversions { implicit def toInt(bar: Bar): Int = bar.bar implicit def toBar(i: Int): Bar = new Bar(i) } object Main { def main(args: Array[String]) { val sc = new SparkContext("local", "test", new SparkConf()) val sqlContext = new SQLContext(sc) import sqlContext.implicits._ import BarConversions._ type EncodedBar = Int val bars: RDD[EncodedBar] = sc.parallelize(Seq(new Bar(1))) val barsDS = bars.toDS barsDS.show barsDS.map(_.bar).show sc.stop() } } 

相关问题:

  • 如何为选项类型构造函数创建编码器,例如Option [Int]?

编码器在Spark2.0工作或多或少相同。 而Kryo仍然是推荐的serialization选择。

你可以看下面的例子与火花外壳

 scala> import spark.implicits._ import spark.implicits._ scala> import org.apache.spark.sql.Encoders import org.apache.spark.sql.Encoders scala> case class NormalPerson(name: String, age: Int) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class NormalPerson scala> case class ReversePerson(name: Int, age: String) { | def aboutMe = s"I am ${name}. I am ${age} years old." | } defined class ReversePerson scala> val normalPersons = Seq( | NormalPerson("Superman", 25), | NormalPerson("Spiderman", 17), | NormalPerson("Ironman", 29) | ) normalPersons: Seq[NormalPerson] = List(NormalPerson(Superman,25), NormalPerson(Spiderman,17), NormalPerson(Ironman,29)) scala> val ds1 = sc.parallelize(normalPersons).toDS ds1: org.apache.spark.sql.Dataset[NormalPerson] = [name: string, age: int] scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds1.show() +---------+---+ | name|age| +---------+---+ | Superman| 25| |Spiderman| 17| | Ironman| 29| +---------+---+ scala> ds2.show() +----+---------+ |name| age| +----+---------+ | 25| Superman| | 17|Spiderman| | 29| Ironman| +----+---------+ scala> ds1.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Superman. I am 25 years old. I am Spiderman. I am 17 years old. scala> val ds2 = ds1.map(np => ReversePerson(np.age, np.name)) ds2: org.apache.spark.sql.Dataset[ReversePerson] = [name: int, age: string] scala> ds2.foreach(p => println(p.aboutMe)) I am 17. I am Spiderman years old. I am 25. I am Superman years old. I am 29. I am Ironman years old. 

直到现在]在当前范围内没有appropriate encoders ,所以我们的人员不被编码为binary值。 但是一旦我们提供了一些使用Kryo序列化的implicit编码器,这将会改变。

 // Provide Encoders scala> implicit val normalPersonKryoEncoder = Encoders.kryo[NormalPerson] normalPersonKryoEncoder: org.apache.spark.sql.Encoder[NormalPerson] = class[value[0]: binary] scala> implicit val reversePersonKryoEncoder = Encoders.kryo[ReversePerson] reversePersonKryoEncoder: org.apache.spark.sql.Encoder[ReversePerson] = class[value[0]: binary] // Ecoders will be used since they are now present in Scope scala> val ds3 = sc.parallelize(normalPersons).toDS ds3: org.apache.spark.sql.Dataset[NormalPerson] = [value: binary] scala> val ds4 = ds3.map(np => ReversePerson(np.age, np.name)) ds4: org.apache.spark.sql.Dataset[ReversePerson] = [value: binary] // now all our persons show up as binary values scala> ds3.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ scala> ds4.show() +--------------------+ | value| +--------------------+ |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| |[01 00 24 6C 69 6...| +--------------------+ // Our instances still work as expected scala> ds3.foreach(p => println(p.aboutMe)) I am Ironman. I am 29 years old. I am Spiderman. I am 17 years old. I am Superman. I am 25 years old. scala> ds4.foreach(p => println(p.aboutMe)) I am 25. I am Superman years old. I am 29. I am Ironman years old. I am 17. I am Spiderman years old. 

在Java Bean类的情况下,这可能是有用的

 import spark.sqlContext.implicits._ import org.apache.spark.sql.Encoders implicit val encoder = Encoders.bean[MyClasss](classOf[MyClass]) 

现在,您可以简单地将dataFrame读取为自定义的DataFrame

 dataFrame.as[MyClass] 

这将创建一个自定义类编码器,而不是一个二进制。

我的例子将在Java中,但我不认为适应Scala很困难。

使用spark.createDataset和Encoders.bean ,我相当成功地将RDD<Fruit>转换为Dataset<Fruit> ,只要Fruit是一个简单的Java Bean 。

第1步:创建简单的Java Bean。

 public class Fruit implements Serializable { private String name = "default-fruit"; private String color = "default-color"; // AllArgsConstructor public Fruit(String name, String color) { this.name = name; this.color = color; } // NoArgsConstructor public Fruit() { this("default-fruit", "default-color"); } // ...create getters and setters for above fields // you figure it out } 

在DataBricks人们加强编码器之前,我会坚持使用原始类型和字符串作为字段。 如果您有一个带有嵌套对象的类,则可以创建另一个简单的Java Bean,并将其所有字段放平,以便可以使用RDD转换将复杂类型映射到更简单的类型。 当然这是一个额外的工作,但是我认为这对于使用平面模式的性能有很大的帮助。

第2步:从RDD获取数据集

 SparkSession spark = SparkSession.builder().getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(); List<Fruit> fruitList = ImmutableList.of( new Fruit("apple", "red"), new Fruit("orange", "orange"), new Fruit("grape", "purple")); JavaRDD<Fruit> fruitJavaRDD = jsc.parallelize(fruitList); RDD<Fruit> fruitRDD = fruitJavaRDD.rdd(); Encoder<Fruit> fruitBean = Encoders.bean(Fruit.class); Dataset<Fruit> fruitDataset = spark.createDataset(rdd, bean); 

瞧! 泡,冲洗,重复。