1

I'm trying to access a key from a map using Flink's SQL API. It fails with the error Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY Please advise how i can fix it. Here is my event class

     public class EventHolder {

        private Map<String,String> event;

        public Map<String, String> getEvent() {
            return event;
        }

        public void setEvent(Map<String, String> event) {
            this.event = event;
        }
    }

Here is the main class which submits the flink job

public class MapTableSource {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<EventHolder> mapEventStream = env.fromCollection(getMaps());

        // register a table and use SQL
        StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
        tableEnv.registerDataStream("mapEvent", mapEventStream); 
        //tableEnv.registerFunction("orderSizeType", new OrderSizeType());

        Table alerts = tableEnv.sql(
                "select event['key'] from mapEvent ");

        DataStream<String> alertStream = tableEnv.toAppendStream(alerts, String.class);

        alertStream.filter(new FilterFunction<String>() {
            private static final long serialVersionUID = -2438621539037257735L;

            @Override
            public boolean filter(String value) throws Exception {
                System.out.println("Key value is:"+value);
                return value!=null;
            }
        });

        env.execute("map-tablsource-job");
    }

    private static List<EventHolder> getMaps(){
        List<EventHolder> list = new ArrayList<>();
        for(int i=0;i<5;i++){
            EventHolder holder = new EventHolder();
            Map<String,String> map = new HashMap<>();
            map.put("key", "value");
            holder.setEvent(map);
            list.add(holder);
        }
        return list;
    }
}

When I run it I'm getting the exception

Exception in thread "main" org.apache.flink.table.api.TableException: Type is not supported: ANY
at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
at org.apache.flink.table.calcite.FlinkTypeFactory$.toTypeInfo(FlinkTypeFactory.scala:341)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:530)
at org.apache.flink.table.plan.logical.LogicalRelNode$$anonfun$12.apply(operators.scala:529)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:245)
at scala.collection.Iterator$class.foreach(Iterator.scala:742)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:245)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.plan.logical.LogicalRelNode.<init>(operators.scala:529)
at org.apache.flink.table.api.TableEnvironment.sql(TableEnvironment.scala:503)
at com.c.p.flink.MapTableSource.main(MapTableSource.java:25)

I'm using flink 1.3.1

2 Answers 2

1

I think the problem lies in fromCollection. Flink is not able to extract the needed type information because of Java limitations (i.e. type erasure). Therefore you map is treated as black box with SQL ANY type. You can verify the types of your table by using tableEnv.scan("mapEvent").printSchema(). You can specify the type information in fromCollection with Types.MAP(Types.STRING, Types.STRING).

Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for the update. In the real program, I'm not using the fromCollection. This is just in the example program. In reality I'm using the FlinkKinesisConsumer to get the events. In the deserializer, I'm doing this Map<String, String> map = gson.fromJson(new String(message), new TypeToken<Map<String, String>>(){}.getType()); EventHolder eventHolder = new EventHolder(); eventHolder.setEvent(map); return eventHolder;
I'm registering the eventstream like this. DataStream<Row> orderEventStream = env.addSource(new FlinkKinesisConsumer<>( params.get("events.stream"), new EventDeserializationSchema(), consumerConfig)); Even with this, I'm getting the same error. So I think the issue is not just with the fromCollection
If you print new TypeHint<Map<String, String>>(){}.getType() you will see that even this is GenericTypeInfo. I agree that this is confusing, but maps where not supported initially. I don't know if we can change this behavior because of backwards compatibility.
I opened an issue for this problem: issues.apache.org/jira/browse/FLINK-7425
0

I solved a similar issue with the following:

//Should probably make MapVal more generic, but works for this example
public class MapVal extends ScalarFunction {
    public String eval(Map<String, String> obj, String key) {
        return obj.get(key);
    }
}

public class Car {
    private String make;
    private String model;
    private int year;
    private Map<String, String> attributes;
    //getters/setters...
}

//After registering Stream and TableEnv etc

tableEnv.registerFunction("mapval", new MapVal());

Table cars = tableEnv
                .scan("Cars")
                .select("make, model, year, attributes.mapval('name')");

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.