I am working on building a Flink application which reads data from Kafka topics, apply some transformations and writes to the Iceberg table.
I read the data from Kafka topic (which is in JSON) and use circe to decode that to Scala case class with Scala Option values in it. All the transformations on the datastream works fine.
Case Class Looks like below
Event(app_name: Option[String], service_name: Option[String], ......)
But when I try to convert the stream to a table to write to iceberg table due to the case classes the columns are converted to Raw type as shown below.
table.printSchema()
service_name RAW('scala.Option', '...'),
conversion_id RAW('scala.Option', '...'),
......
And the table write fails as below.
Query schema: [app_name: RAW('scala.Option', '...'), .........
Sink schema: [app_name: STRING, .......
Does the Flink table API support Scala case classes with option values? https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#special-types
I found out that it is supported in datastream at this documentation.
Is there a way to do this in Table API?