0

I have a Dataset called outLinks which has two columns, a string column and an array column. It looks like below:

+-------+---------------------+
|    url|collect_set(outlinks)|
+-------+---------------------+
| link91|   [link620, link761]|
|link297| [link999, link942...|
|link246| [link623, link605...|
...

I am trying to append several more rows to this table, where each new row contains a string and an empty list. diff is a Dataset with one string column.

outLinks = outLinks.union(
        diff.map(r ->
                 new Tuple2<>(r.getString(0), DataTypes.createArrayType(DataTypes.StringType)),
                 Encoders.tuple(Encoders.STRING(), Encoders.bean(ArrayType.class))).toDF()); 

I've tried to define an empty array/list in every way I can imagine. When I do it like above (where I use the ArrayType class), I get the following exception:

Exception in thread "main" java.util.NoSuchElementException: head of empty list
    at scala.collection.immutable.Nil$.head(List.scala:420)
    at scala.collection.immutable.Nil$.head(List.scala:417)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$5.apply(ExpressionEncoder.scala:121)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$$anonfun$5.apply(ExpressionEncoder.scala:120)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.immutable.List.map(List.scala:285)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.tuple(ExpressionEncoder.scala:120)
    at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.tuple(ExpressionEncoder.scala:186)
    at org.apache.spark.sql.Encoders$.tuple(Encoders.scala:228)
    at org.apache.spark.sql.Encoders.tuple(Encoders.scala)
    at edu.upenn.cis455.pagerank.PageRankTask.run(PageRankTask.java:96)
    at edu.upenn.cis455.pagerank.PageRankTask.main(PageRankTask.java:30)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

And when I use regular Java collections:

outLinks = outLinks.union(
        diff.map(r ->
                 new Tuple2<>(r.getString(0), Collections.emptyList),
                 Encoders.tuple(Encoders.STRING(), Encoders.javaSerialization(List.class))).toDF());

I get the following exception:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. binary <> array<string> at the second column of the second table;;
'Union
:- Aggregate [url#18], [url#18, collect_set(outlinks#1, 0, 0) AS collect_set(outlinks)#99]
:  +- Deduplicate [url#18, outlinks#1], false
:     +- TypedFilter edu.upenn.cis455.pagerank.PageRankTask$$Lambda$15/603456365@713e49c3, interface org.apache.spark.sql.Row, [StructField(url,StringType,true), StructField(outlinks,StringType,true)], createexternalrow(url#18.toString, outlinks#1.toString, StructField(url,StringType,true), StructField(outlinks,StringType,true))
:        +- Project [url#18, outlinks#1]
:           +- Join Inner, (id#15 = storagepage_id#0)
:              :- Relation[id#15,body#16,lastaccessed#17L,url#18] JDBCRelation(pages) [numPartitions=1]
:              +- Relation[storagepage_id#0,outlinks#1] JDBCRelation(storagepage_outlinks) [numPartitions=1]
+- SerializeFromObject [staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, scala.Tuple2, true]._1, true) AS value#1771, encodeusingserializer(input[0, scala.Tuple2, true]._2, false) AS _2#1772]
   +- MapElements edu.upenn.cis455.pagerank.PageRankTask$$Lambda$17/2054286321@4bf89d3d, interface org.apache.spark.sql.Row, [StructField(url,StringType,true)], obj#1770: scala.Tuple2
      +- DeserializeToObject createexternalrow(url#18.toString, StructField(url,StringType,true)), obj#1769: org.apache.spark.sql.Row
         +- Except
            :- Project [url#18]
            :  +- Relation[id#15,body#16,lastaccessed#17L,url#18] JDBCRelation(pages) [numPartitions=1]
            +- Project [url#152]
               +- Aggregate [url#152], [url#152, collect_set(outlinks#1, 0, 0) AS collect_set(outlinks)#99]
                  +- Deduplicate [url#152, outlinks#1], false
                     +- TypedFilter edu.upenn.cis455.pagerank.PageRankTask$$Lambda$15/603456365@713e49c3, interface org.apache.spark.sql.Row, [StructField(url,StringType,true), StructField(outlinks,StringType,true)], createexternalrow(url#152.toString, outlinks#1.toString, StructField(url,StringType,true), StructField(outlinks,StringType,true))
                        +- Project [url#152, outlinks#1]
                           +- Join Inner, (id#149 = storagepage_id#0)
                              :- Relation[id#149,body#150,lastaccessed#151L,url#152] JDBCRelation(pages) [numPartitions=1]
                              +- Relation[storagepage_id#0,outlinks#1] JDBCRelation(storagepage_outlinks) [numPartitions=1]

    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:39)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$13$$anonfun$apply$14.apply(CheckAnalysis.scala:329)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$13$$anonfun$apply$14.apply(CheckAnalysis.scala:326)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$13.apply(CheckAnalysis.scala:326)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$13.apply(CheckAnalysis.scala:315)
    at scala.collection.immutable.List.foreach(List.scala:381)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:315)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:78)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:78)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:91)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:52)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:66)
    at org.apache.spark.sql.Dataset.withSetOperator(Dataset.scala:2884)
    at org.apache.spark.sql.Dataset.union(Dataset.scala:1656)
    at edu.upenn.cis455.pagerank.PageRankTask.run(PageRankTask.java:95)
    at edu.upenn.cis455.pagerank.PageRankTask.main(PageRankTask.java:29)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

1 Answer 1

2

You can use spark.implicits().newStringArrayEncoder() for String array. Here is the sample.

public class SparkSample {
    public static void main(String[] args) {
        SparkSession spark = SparkSession
                .builder()
                .appName("SparkSample")
                .master("local[*]")
                .getOrCreate();

        List<Tuple2<String,String[]>> inputList = new ArrayList<Tuple2<String,String[]>>();
        inputList.add(new Tuple2<String,String[]>("link91",new String[]{"link620","link761"}));
        inputList.add(new Tuple2<String,String[]>("link297",new String[]{"link999","link942"}));
        Dataset<Row> dataset = spark.createDataset(inputList, Encoders.tuple(Encoders.STRING(), spark.implicits().newStringArrayEncoder())).toDF();
        dataset.show(false);    
    }
}
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.