4

Given this dummy code:

 1 case class MyObject(values:mutable.LinkedHashMap[String, String])

...

 2    implicit val typeInfoString:TypeInformation[String] = TypeInformation.of(classOf[String])
 3    implicit val typeInfoMyObject:TypeInformation[MyObject] = TypeInformation.of(classOf[MyObject])
 4
 5    val env = StreamExecutionEnvironment.getExecutionEnvironment
 6
 7    env
 8      .fromElements("one")
 9      .map(str =>
10      {
11        val obj = MyObject(mutable.LinkedHashMap("key" -> str))
12        val filteredMap1:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
13
14        obj
15      })
16      .map(obj =>
17      {
18        val filteredMap2:mutable.LinkedHashMap[String, String] = obj.values.filter(!_._2.contains("bla"))
19
20        obj
21      })

The application will crashin line 18 with the exception:

Caused by: java.lang.ClassCastException: scala.collection.mutable.HashMap cannot be cast to scala.collection.mutable.LinkedHashMap

The issues seems to be that through serialization/deserialization the values member changes its object type, or in other words, LinkedHashMap turns into HashMap.

Note that the same code as in line 18 works perfectly in line 12.

When setting a breakpoint to line 12, obj.values will be shown as LinkedHashMap by the debugger/IntelliJ, however a breakpoint in line 18 will show obj.values as HashMap in the debugger.

What is going on here? How can I fix this? After all, LinkedHashMap implements Serializable?!

1 Answer 1

3

The default Kryo Chill serializer for LinkedHashMap does not preserve the map type and instead deserializes the data into a HashMap. In order to avoid this, one needs to create a serializer for the LinkedHashMap type:

class LinkedHashMapSerializer[K, V] extends Serializer[mutable.LinkedHashMap[K, V]] with Serializable {
  override def write(kryo: Kryo, output: Output, `object`: mutable.LinkedHashMap[K, V]): Unit = {
    kryo.writeObject(output, `object`.size)

    for (elem <- `object`.iterator) {
      kryo.writeClassAndObject(output, elem._1)
      kryo.writeClassAndObject(output, elem._2)
    }
  }

  override def read(kryo: Kryo, input: Input, `type`: Class[mutable.LinkedHashMap[K, V]]): mutable.LinkedHashMap[K, V] = {
    val result = new mutable.LinkedHashMap[K, V]()
    val size = kryo.readObject(input, classOf[Int])
    for (_ <- 1 to size) {
      val key = kryo.readClassAndObject(input).asInstanceOf[K]
      val value = kryo.readClassAndObject(input).asInstanceOf[V]
      result.put(key, value)
    }

    result
  }
}

And then register it as a Kryo Serializer:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.registerTypeWithKryoSerializer(classOf[mutable.LinkedHashMap[String, String]], new LinkedHashMapSerializer())
Sign up to request clarification or add additional context in comments.

5 Comments

Thanks for your reply. With the custom serializer it seems to work. However I am not sure what val serializer = new TraversableSerializer ... is good for? It does not seem to be used. Also, when I import org.apache.flink.api.scala.typeutils.TraversableSerializer, it complains about the false parameter (its expecting a parameter of type TypeSerializer).
Sorry that was not meant to be there. I've corrected the second snippet.
Okay, in fact it works for my minimal example, however it does not work with my real code. In the actual implementation LinkedHashMap is buried within a more complex case class structure, and running with your changes now throws a ExceptionInChainedOperatorException caused by a KryoException - it complains about "Unable to find class" (my involved case classes). Does this mean I will have to supply custom serializers for all my involved case classes and intermediate classes now? I mean it worked before the changes.
Made some more simpler tests, and it seems like the custom serializer cannot handle nested types within the LinkedHashMap. It throws e.g. com.esotericsoftware.kryo.KryoException: Unable to find class: Xscala.None$, and seemlingy for just any non-POJO type involved.
I've updated the Serializer implementation once more to use kryo.writeClassAndObject instead of kryo.writeObject. I hope this works now.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.