1

The question is part of complex problem that I am working on.I am stuck at a particular point. To minimize the problem statement let's say I have a dataframe created from json. Let's say to minimize the structure

The raw data is let's say somewhat like

{"person":[{"name":"david", "email": "[email protected]"}, {"name":"steve", "email":"[email protected]"}]}

You can save this as person.json and create dataset as

Dataset<Row> df =  spark.read().json("person.json")

The schema / printSchema() has output-

root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- email: string (nullable = true)
 |    |


df.show(false);

+------------------------------------------------------------+
|       person                                               |
+------------------------------------------------------------+
|[[david, [email protected]],[steve, [email protected]]]         |
+------------------------------------------------------------+

Now the problem. As the part of code I have to do

df.select(array(struct(person.name, reverse(person.email)))

It's giving output like

+------------------------------------------------------------+
|       array(named_struct(person.name as `name`, person.e...|
+------------------------------------------------------------+
|[[[david, steve],[[email protected], [email protected]]]]       |
+------------------------------------------------------------+

The schema get's updated to -

root
 |-- array(named_struct(name, person.name as `name`, email, person.email as `email`)): array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |--  name: array(nullable=true)
 |    |    |-- element: string (containsNull = true)
 |    |--  email: array(nullable=true)
 |    |    |-- element: string (containsNull = true)

I do not want the schema and data to be changed. What should I change in above df.select

I am using Spark 2.3.0_2.11

On suggestion of user Someshwar Tried using transform on it but its not available in lower version

df = df.withColumn("person_processed", expr("transform(person, x -> named_struct( 'email', reverse(x.email), 'name', x.name))"));

Below is stack trace for same -

Exception in thread "main" org.apache.spark.sql.catalyst.parser.ParseException: 
extraneous input '>' expecting {'(', 'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'FIRST', 'AFTER', 'LAST', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'DIRECTORY', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'COST', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'MINUS', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IGNORE', 'BOTH', 'LEADING', 'TRAILING', 'IF', 'POSITION', '+', '-', '*', 'DIV', '~', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', 'GLOBAL', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', STRING, BIGINT_LITERAL, SMALLINT_LITERAL, TINYINT_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, DOUBLE_LITERAL, BIGDECIMAL_LITERAL, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 21)

== SQL ==
transform(person, x -> named_struct( 'email', reverse(x.email), 'name', x.name))
---------------------^^^

    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:239)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:115)
    at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:48)
    at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseExpression(ParseDriver.scala:44)
    at org.apache.spark.sql.functions$.expr(functions.scala:1308)
    at org.apache.spark.sql.functions.expr(functions.scala)
    at com.mywork.jspark.JSparkMain1.main(JSparkMain1.java:43)
6
  • What are you want to get? Commented May 22, 2020 at 19:08
  • It's part of encoding logic. Where some fields of nested dataframe needs to be updated. I have udf created for updating value. Now to change them I am using df.select as shown above but it is changing schema tooo. I want to preserve schema of original dataframe. Commented May 23, 2020 at 8:07
  • Please specify what fields you want to update or put your expected output.. your question is not clear. Commented May 23, 2020 at 8:12
  • Tried to solve. please upvote + accept if it solve your problem Commented May 23, 2020 at 9:03
  • Forgot to mention but I am using spark 2.3 Commented May 26, 2020 at 12:04

2 Answers 2

2

I tried to approach this problem as following-

  1. Load the data
  val spark = sqlContext.sparkSession
    val implicits = spark.implicits
    import implicits._
    val data =
      """
        |{"person":[{"name":"david", "email": "[email protected]"}, {"name":"steve", "email": "[email protected]"}]}
      """.stripMargin
    val df = spark.read
      .json(data.split(System.lineSeparator()).toSeq.toDS())
    df.show(false)
    df.printSchema()

Result-

+----------------------------------------------------+
|person                                              |
+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|
+----------------------------------------------------+

root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)
  1. Process the array<struct>

    This is tested for spark-2.4

 val answer1 = df.withColumn("person_processed",
      expr("transform(person, x -> named_struct( 'email', reverse(x.email), 'name', x.name))"))
    answer1.show(false)
    answer1.printSchema()

Result-

+----------------------------------------------------+----------------------------------------------------+
|person                                              |person_processed                                    |
+----------------------------------------------------+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|[[moc.liamg@divad, david], [moc.liamg@evets, steve]]|
+----------------------------------------------------+----------------------------------------------------+

root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- person_processed: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)

Please observe both input "person" and "person_processed" column are of same type

Edit-1 (As per comments, with case class)

User is on spark 2.3 where all higher order functions for map and array are no available, Below solution is for spark 2.3

 // spark < 2.3
    case class InfoData(name: String, email: String)
    val infoDataSchema =
    ArrayType(StructType(Array(StructField("name", StringType), StructField("email", StringType))))

    val reverseEmailUDF = udf((arr1: mutable.WrappedArray[String], arr2: mutable.WrappedArray[String]) => {
      if (arr1.length != arr2.length) null
      else arr1.zipWithIndex.map(t => InfoData(t._1, arr2(t._2).reverse))
    }, infoDataSchema)

    val spark2_3Processed = df
      .withColumn("person_processed",
          reverseEmailUDF(
            col("person.name").cast("array<string>"),
            col("person.email").cast("array<string>")
          )
      )

    spark2_3Processed.show(false)
    spark2_3Processed.printSchema()

Output-

+----------------------------------------------------+----------------------------------------------------+
|person                                              |person_processed                                    |
+----------------------------------------------------+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|[[david, moc.liamg@divad], [steve, moc.liamg@evets]]|
+----------------------------------------------------+----------------------------------------------------+

root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- person_processed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- email: string (nullable = true)

Edit-2 (As per comments, without case class)

User is on spark 2.3 where all higher order functions for map and array are no available and case class creation is difficult. Below solution is for spark 2.3

   val subSchema = df.schema("person").dataType

    val reverseEmailUDF_withoutCaseClass = //udf((nameArrayRow: Row, emailArrayRow: Row) => {
      udf((nameArray: mutable.WrappedArray[String], emailArray: mutable.WrappedArray[String]) => {
      if (nameArray.length != emailArray.length) null
      else nameArray.zipWithIndex.map(t => (t._1, emailArray(t._2).reverse))
    }, subSchema)

    val withoutCaseClasDF = df
      .withColumn("person_processed",
          reverseEmailUDF_withoutCaseClass(
            col("person.name").cast("array<string>"),
            col("person.email").cast("array<string>")
          )
      )

    withoutCaseClasDF.show(false)
    withoutCaseClasDF.printSchema()
    withoutCaseClasDF.select("person_processed.email").show(false)

Output-

+----------------------------------------------------+----------------------------------------------------+
|person                                              |person_processed                                    |
+----------------------------------------------------+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|[[david, moc.liamg@divad], [steve, moc.liamg@evets]]|
+----------------------------------------------------+----------------------------------------------------+

root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)
 |-- person_processed: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)

+--------------+
|email         |
+--------------+
|[david, steve]|
+--------------+


Sign up to request clarification or add additional context in comments.

11 Comments

Thanks Somesh. I tried above solution. It is giving sql.catatlyst.parse.ParserException at Line 1 Column 33. That is on where lambda starts.. I am using Java 1.8 and Spark 2.3
Some mistake in expression. Can you put the query you tried an Full stacktrace
I think that error because named_struct function is not available in spark 2.3 version & it was introduced from spark 2.4+ .
Can you put your code in description as edit -1 with stacktrace. It is very difficult for me to trace the reason?
|
0

Try below code.

scala> df.show(false)
+----------------------------------------------------+
|person                                              |
+----------------------------------------------------+
|[[[email protected], david], [[email protected], steve]]|
+----------------------------------------------------+

scala> df.printSchema
root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)


scala> val finalDF = df
.select(explode($"person").as("person"))
.groupBy(lit(1).as("id"))
.agg(
    collect_list(
        struct(
            reverse($"person.email").as("email"),
            $"person.name").as("person")
        ).as("person")
    )
.drop("id")

finalDF: org.apache.spark.sql.DataFrame = [person: array<struct<email:string,name:string>>]

scala> finalDF.show(false)
+----------------------------------------------------+
|person                                              |
+----------------------------------------------------+
|[[moc.liamg@divad, david], [moc.liamg@evets, steve]]|
+----------------------------------------------------+

scala> finalDF.printSchema
root
 |-- person: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- email: string (nullable = true)
 |    |    |-- name: string (nullable = true)

scala>

5 Comments

This is the problem only. In schema of your answer name and email have become array while they were originally string
since you have not mentioned how you want expected output, I have just update answer, Please check now.
No srinivas, there seems to be confusion. The schema after read should be exact similar to Schema after transform that is reverse of email in this case. The 2 schema in answer have difference. I will say that you have done flattening
+----------------------------------------------------+ |person | +----------------------------------------------------+ |[[moc.liamg@divad, david], [moc.liamg@evets, steve]]| +----------------------------------------------------+ root |-- person: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- email: string (nullable = true) | | |-- name: string (nullable = true)
Thanks for your help Srinivas! The solution works for exact 1 field in dataset. As the code is part of a bigger nested dataset which contains other 20-30 fields some of which are string and others are of type struct, so basically its not suitable for my problem.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.