-1

I have a Dataset<Row> inputDS which has 4 columns namely Id, List<long> time, List<String> value, aggregateType I want to add one more column to the Dataset value_new using map function, that map function takes columns time , value and aggregateType passes that to a function getAggregate(String aggregateType, List<long> time, List<String> value) and return a double value on processing the parameters. The Double value returned by the method getAggregate will be the new column value i.e value of value_new

Dataset inputDS

 +------+---+-----------+---------------------------------------------+---------------+
 |    Id| value         |     time                                   |aggregateType  |
 +------+---------------+---------------------------------------------+---------------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |
 +------+---------------+---------------------------------------------+---------------+

Expected Dataset outputDS

 +------+---------------+---------------------------------------------+---------------+-----------+
 |    Id| value         |     time                                    |aggregateType  | value_new |
 +------+---------------+---------------------------------------------+---------------+-----------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |   9.4     |
 +------+---------------+---------------------------------------------+---------------+-----------+

Code I tried.

 inputDS.withColumn("value_new",functions.lit(inputDS.map(new MapFunction<Row,Double>(){

 public double call(Row row){
 String aggregateType = row.getAS("aggregateType");
 List<long> timeList = row.getList("time");
 List<long> valueList= row.getList("value");  

 return  getAggregate(aggregateType ,timeList,valueList);    

 }}),Encoders.DOUBLE())));

ERROR

 Unsupported literal type class org.apache.spark.sql.Dataset [value:double]

Note Sorry if I used map function wrongly and please suggest me if there is any workaround.

Thank you.!

1 Answer 1

0

You get the error because you are trying to create a function literal (lit()) using the result of Dataset.map(), which you can see in docs is a Dataset. You can see in the API for Dataset.withColumn() that you need a argument that is a column.

It seems like you need to create a user-defined function. Take a look at How do I call a UDF on a Spark DataFrame using JAVA?

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.