2

I'm using sparkSql 1.6.2 (Java API) and I have to process the following DataFrame that has a list of value in 2 columns:

ID  AttributeName AttributeValue
 0  [an1,an2,an3] [av1,av2,av3]
 1  [bn1,bn2]     [bv1,bv2]

The desired table is:

ID  AttributeName AttributeValue
 0  an1           av1
 0  an2           av2
 0  an3           av3
 1  bn1           bv1
 1  bn2           bv2

I think I have to use a combination of the explode function and a custom UDF function.

I found the following resources:

and I can successfully run an example that read the two columns and return the concatenation of the first two strings in a column

 UDF2 combineUDF = new UDF2<Seq<String>, Seq<String>, String>() {
        public String call(final Seq<String> col1, final Seq<String> col2) throws Exception {
            return col1.apply(0) + col2.apply(0);
        }
    };

 context.udf().register("combineUDF", combineUDF, DataTypes.StringType);

the problem is to write the signature of a UDF returning two columns (in Java). As far as I understand I must define a new StructType as the one shown below and set that as return type, but so far I didn't manage to have the final code working

StructType retSchema = new StructType(new StructField[]{
            new StructField("@AttName", DataTypes.StringType, true, Metadata.empty()),
            new StructField("@AttValue", DataTypes.StringType, true, Metadata.empty()),
        }
    );

context.udf().register("combineUDF", combineUDF, retSchema);

Any help will be really appreciated.

UPDATE: I'm trying to implement first the zip(AttributeName,AttributeValue) so then I will need just to apply the standard explode function in sparkSql:

ID  AttName_AttValue
 0  [[an1,av1],[an1,av2],[an3,av3]]
 1  [[bn1,bv1],[bn2,bv2]]

I built the following UDF:

UDF2 combineColumns = new UDF2<Seq<String>, Seq<String>, List<List<String>>>() {
        public List<List<String>> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
            List<List<String>> zipped = new LinkedList<>();

            for (int i = 0, listSize = col1.size(); i < listSize; i++) {
                List<String> subRow = Arrays.asList(col1.apply(i), col2.apply(i));
                zipped.add(subRow);
            }

            return zipped;
        }

    };

But when I run the code

myDF.select(callUDF("combineColumns", col("AttributeName"), col("AttributeValue"))).show(10);

I got the following error message:

scala.MatchError: [[an1,av1],[an1,av2],[an3,av3]] (of class java.util.LinkedList)

and it looks like the combining has been performed correctly but then the return type is not the expected one in Scala.

Any Help?

1 Answer 1

0

Finally I managed to get the result I was looking for but probably not in the most efficient way.

Basically the are 2 step:

  • Zip of the two list
  • Explode of the list in rows

For the first step I defined the following UDF Function

UDF2 concatItems = new UDF2<Seq<String>, Seq<String>, Seq<String>>() {
    public Seq<String> call(final Seq<String> col1, final Seq<String> col2) throws Exception {
        ArrayList zipped = new ArrayList();

        for (int i = 0, listSize = col1.size(); i < listSize; i++) {
            String subRow = col1.apply(i) + ";" + col2.apply(i);
            zipped.add(subRow);
        }

        return scala.collection.JavaConversions.asScalaBuffer(zipped);
    }

};

Missing the function registration to SparkSession:

sparkSession.udf().register("concatItems",concatItems,DataTypes.StringType);

and then I called it with the following code:

DataFrame df2 = df.select(col("ID"), callUDF("concatItems", col("AttributeName"), col("AttributeValue")).alias("AttName_AttValue"));

At this stage the df2 looks like that:

ID  AttName_AttValue
 0  [[an1,av1],[an1,av2],[an3,av3]]
 1  [[bn1,bv1],[bn2,bv2]]

Then I called the following lambda function for exploding the list into rows:

 DataFrame df3 = df2.select(col("ID"),explode(col("AttName_AttValue")).alias("AttName_AttValue_row"));

At this stage the df3 looks like that:

ID  AttName_AttValue
 0  [an1,av1]
 0  [an1,av2]
 0  [an3,av3]
 1  [bn1,bv1]
 1  [bn2,bv2]

Finally to split the attribute name and value into two different columns, I converted the DataFrame into a JavaRDD in order to use the map function:

JavaRDD df3RDD = df3.toJavaRDD().map(
            (Function<Row, Row>) myRow -> {
                String[] info = String.valueOf(myRow.get(1)).split(",");
                return RowFactory.create(myRow.get(0), info[0], info[1]);
        }).cache();

If anybody has a better solution feel free to comment. I hope it helps.

Sign up to request clarification or add additional context in comments.

1 Comment

I have tried and your example and got: Undefined function: 'myNewFunc'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.