1

I am working on building a Flink application which reads data from Kafka topics, apply some transformations and writes to the Iceberg table.

I read the data from Kafka topic (which is in JSON) and use circe to decode that to Scala case class with Scala Option values in it. All the transformations on the datastream works fine.

Case Class Looks like below

Event(app_name: Option[String], service_name: Option[String], ......)

But when I try to convert the stream to a table to write to iceberg table due to the case classes the columns are converted to Raw type as shown below.

table.printSchema()

service_name RAW('scala.Option', '...'),
conversion_id RAW('scala.Option', '...'),
......

And the table write fails as below.

Query schema: [app_name: RAW('scala.Option', '...'), .........
Sink schema: [app_name: STRING, .......

Does the Flink table API support Scala case classes with option values? https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/serialization/types_serialization/#special-types

I found out that it is supported in datastream at this documentation.

Is there a way to do this in Table API?

1

2 Answers 2

1

I had the exact same issue of Option being parsed as RAW and found yet another workaround that might interest you:

TL;DR:
instead of using .get which returns an Option and diligently declaring the return type to be Types.OPTION(Types.INSTANT) which doesn't work, I instead use .getOrElse("key", null) and declare the type as conventional. Table API then recognizes the column type, creates a nullable column and interprets the null correctly. I can then filter those rows with IS NOT NULL.

Detailed example:

For me it starts with a custom map function in which I unpack data where certain fields could be missing:

class CustomExtractor extends RichMapFunction[MyInitialRecordClass, Row] {
  def map(in: MyInitialRecordClass): Row = {
    Row.ofKind(
      RowKind.INSERT,
      in._id
      in.name
      in.times.getOrElse("time", null) // This here did the trick instead of using .get and returning an option
    )
  }
}

And then I use it like this explicitly declaring a return type.

val stream: DataStream[Row] = data
  .map(new CustomExtractor())
  .returns(
    Types.ROW_NAMED(
      Array("id", "name", "time"),
      Types.STRING,
      Types.STRING,
      Types.INSTANT
    )

val table = tableEnv.fromDataStream(stream)
table.printSchema()
// (
//   `id` STRING,
//   `name` STRING,
//   `time` TIMESTAMP_LTZ(9),
// )

tableEnv.createTemporaryView("MyTable", table)

tableEnv
  .executeSql("""
      |SELECT
      | id, name, time
      |FROM MyTable""".stripMargin)
  .print()
// +----+------------------+-------------------+---------------------+
// | op |               id |              name |                time |
// +----+------------------+-------------------+---------------------+
// | +I |              foo |               bar |              <Null> |
// | +I |             spam |               ham | 2022-10-12 11:32:06 |

This was at least for me exactly what I wanted. I'm very much new to Flink and would be curious if the Pros here think this is workable or horribly hacky instead.

Using scala 2.12.15 and flink 1.15.2

Sign up to request clarification or add additional context in comments.

Comments

0

The type system of the Table API is more restrictive than the one of the DataStream API. Unsupported classes are immediately treated as black-boxed type RAW. This allows objects to still pass the API but it might not be supported by every connector.

From the exception, it looks like you declared the sink table with app_name: STRING, so I guess you are fine with a string representation. If this is the case, I would recommend to implement a user-defined function that performs the conversion to string.

1 Comment

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.