1

Let's say I have a spark dataframe that includes the categorical columns (School, Type, Group)

------------------------------------------------------------
StudentID  |  School |   Type        |  Group               
------------------------------------------------------------
1          |  ABC    |   Elementary  |  Music-Arts          
2          |  ABC    |   Elementary  |  Football            
3          |  DEF    |   Secondary   |  Basketball-Cricket  
4          |  DEF    |   Secondary   |  Cricket             
------------------------------------------------------------

I need to add one more column to the dataframe as below:

--------------------------------------------------------------------------------------
StudentID  |  School |   Type        |  Group               |  Combined Array
---------------------------------------------------------------------------------------
1          |  ABC    |   Elementary  |  Music-Arts          | ["School: ABC", "Type: Elementary", "Group: Music", "Group: Arts"]
2          |  ABC    |   Elementary  |  Football            | ["School: ABC", "Type: Elementary", "Group: Football"]
3          |  DEF    |   Secondary   |  Basketball-Cricket  | ["School: DEF", "Type: Secondary", "Group: Basketball", "Group: Cricket"]
4          |  DEF    |   Secondary   |  Cricket             | ["School: DEF", "Type: Secondary", "Group: Cricket"]
----------------------------------------------------------------------------------------

The extra column is combination of all categorical columns but includes a different processing on 'Group' column. The values of 'Group' column need to be split on '-'.

All the categorical columns including 'Group' are contained in a list. The 'Group' column is also input as a String as the column to be split on. The data-frame has other columns which are not used.

I am looking for the best performance solution.

If it's a simple array, it can be done with a single 'withColumn' transformation.

val columns = List("School", "Type", "Group")
var df2 = df1.withColumn("CombinedArray", array(columns.map(df1(_)):_*))

However, here because of the additional processing in 'Group' column, the solution doesn't seem straightforward.

4
  • Just to be sure: why do you want redundant information in the combined column? I get why you want an array containing the "-"-split of the group, but i am less sure about the other values. I suggest df.withColumn("combined", split($"Group", "-")) Commented Nov 13, 2018 at 16:33
  • The column will be fed to countVectorizer, so each entry of the array (category: value) will be identified differently. For instance the same value may be present across different categories. Commented Nov 13, 2018 at 16:39
  • Ah I see, well stack0114106 got the correct answer if you add the splitting of the group to it. ;) Commented Nov 13, 2018 at 16:49
  • In case you do not wanna wrangle so much with String-concats in order to put identifying prefixes to the different type of informations (which might be a little annoying for the Group category), you could also just do: df.withColumn("combined", split($"Group", "-")).withColumn("SchoolArray", array($"School")).withColumn("TypeArray", array($"Type")) and just apply 3 CountVectorizers for each of the "XYZArrays" and a final VectorAssembler to put all together. This version has the benefit, that you can define different minimum frequencies for each of the CountVectorizers. Commented Nov 13, 2018 at 17:04

3 Answers 3

1

Using the spark.sql(), Check this out:

Seq(("ABC","Elementary","Music-Arts"),("ABC","Elementary","Football"),("DEF","Secondary","Basketball-Cricket"),("DEF","Secondary","Cricket"))
  .toDF("School","Type","Group").createOrReplaceTempView("taba")
spark.sql( """ select school, type, group, array(concat('School:',school),concat('type:',type),concat('group:',group)) as combined_array from taba """).show(false)

Output:

+------+----------+------------------+------------------------------------------------------+
|school|type      |group             |combined_array                                        |
+------+----------+------------------+------------------------------------------------------+
|ABC   |Elementary|Music-Arts        |[School:ABC, type:Elementary, group:Music-Arts]       |
|ABC   |Elementary|Football          |[School:ABC, type:Elementary, group:Football]         |
|DEF   |Secondary |Basketball-Cricket|[School:DEF, type:Secondary, group:Basketball-Cricket]|
|DEF   |Secondary |Cricket           |[School:DEF, type:Secondary, group:Cricket]           |
+------+----------+------------------+------------------------------------------------------+

If you need it as a dataframe, then

val df = spark.sql( """ select school, type, group, array(concat('School:',school),concat('type:',type),concat('group:',group)) as combined_array from taba """)
df.printSchema()

root
 |-- school: string (nullable = true)
 |-- type: string (nullable = true)
 |-- group: string (nullable = true)
 |-- combined_array: array (nullable = false)
 |    |-- element: string (containsNull = true)

Update:

Dynamically constructing the sql columns.

scala> val df = Seq(("ABC","Elementary","Music-Arts"),("ABC","Elementary","Football"),("DEF","Secondary","Basketball-Cricket"),("DEF","Secondary","Cricket")).toDF("School","Type","Group")
df: org.apache.spark.sql.DataFrame = [School: string, Type: string ... 1 more field]

scala> val columns = df.columns.mkString("select ", ",", "")
columns: String = select School,Type,Group

scala> val arr = df.columns.map( x=> s"concat('"+x+"',"+x+")" ).mkString("array(",",",") as combined_array ")
arr: String = "array(concat('School',School),concat('Type',Type),concat('Group',Group)) as combined_array "

scala> val sql_string = columns + " , " + arr + " from taba "
sql_string: String = "select School,Type,Group , array(concat('School',School),concat('Type',Type),concat('Group',Group)) as combined_array  from taba "

scala> df.createOrReplaceTempView("taba")

scala> spark.sql(sql_string).show(false)
+------+----------+------------------+---------------------------------------------------+
|School|Type      |Group             |combined_array                                     |
+------+----------+------------------+---------------------------------------------------+
|ABC   |Elementary|Music-Arts        |[SchoolABC, TypeElementary, GroupMusic-Arts]       |
|ABC   |Elementary|Football          |[SchoolABC, TypeElementary, GroupFootball]         |
|DEF   |Secondary |Basketball-Cricket|[SchoolDEF, TypeSecondary, GroupBasketball-Cricket]|
|DEF   |Secondary |Cricket           |[SchoolDEF, TypeSecondary, GroupCricket]           |
+------+----------+------------------+---------------------------------------------------+


scala>

Update2:

scala>  val df = Seq((1,"ABC","Elementary","Music-Arts"),(2,"ABC","Elementary","Football"),(3,"DEF","Secondary","Basketball-Cricket"),(4,"DEF","Secondary","Cricket")).toDF("StudentID","School","Type","Group")
df: org.apache.spark.sql.DataFrame = [StudentID: int, School: string ... 2 more fields]

scala> df.createOrReplaceTempView("student")

scala>  val df2 = spark.sql(""" select studentid, collect_list(concat('Group:', t.sp1)) as sp2 from (select StudentID,School,Type,explode((split(group,'-'))) as sp1 from student where size(split(group,'-')) > 1 ) t group by studentid """)
df2: org.apache.spark.sql.DataFrame = [studentid: int, sp2: array<string>]

scala> val df3 = df.alias("t1").join(df2.alias("t2"),Seq("studentid"),"LeftOuter")
df3: org.apache.spark.sql.DataFrame = [StudentID: int, School: string ... 3 more fields]

scala> df3.createOrReplaceTempView("student2")

scala> spark.sql(""" select studentid, school,group, type, array(concat('School:',school),concat('type:',type),concat_ws(',',temp_arr)) from (select studentid,school,group,type, case when sp2 is null then array(concat("Group:",group)) else sp2 end as temp_arr from student2) t """).show(false)
+---------+------+------------------+----------+---------------------------------------------------------------------------+
|studentid|school|group             |type      |array(concat(School:, school), concat(type:, type), concat_ws(,, temp_arr))|
+---------+------+------------------+----------+---------------------------------------------------------------------------+
|1        |ABC   |Music-Arts        |Elementary|[School:ABC, type:Elementary, Group:Music,Group:Arts]                      |
|2        |ABC   |Football          |Elementary|[School:ABC, type:Elementary, Group:Football]                              |
|3        |DEF   |Basketball-Cricket|Secondary |[School:DEF, type:Secondary, Group:Basketball,Group:Cricket]               |
|4        |DEF   |Cricket           |Secondary |[School:DEF, type:Secondary, Group:Cricket]                                |
+---------+------+------------------+----------+---------------------------------------------------------------------------+


scala>
Sign up to request clarification or add additional context in comments.

3 Comments

This solution doesn't address the core issue where the 'Group' values need to be split dynamically
Thanks, but the update still doesn't address the core issue here: If you look at my output, the first and 3rd row has an array size of 4. We need to split the 'Group' column based on '-' and add multiple elements to array, one for each split.
@John Subas.. could you pls check Update2
0

Using regex replacement to start of each field and to "-" in between:

val df1 = spark.read.option("header","true").csv(filePath)
val columns = List("School", "Type", "Group")
var df2 = df1.withColumn("CombinedArray", array(columns.map{
   colName => regexp_replace(regexp_replace(df1(colName),"(^)",s"$colName: "),"(-)",s", $colName: ")
}:_*))

5 Comments

This would work probably. I will need to modify a little to include splits only for selected columns among the category columns. Will try to work it out and post the answer here.
Reason for unaccepting answer? Works as expected output you mentioned.
I will accept it once I am able to work on your code to get the exact solution. The split need to be done for one column only: 'Group', not for all the columns
The below code would be the accurate answer: var df2 = df.withColumn("CombinedArray", array(columns.map( colName => { colName match { case "Group" => regexp_replace(regexp_replace(df(colName),"(^)",s"$colName: "),"(-)",s", $colName: ") case _ => regexp_replace(df(colName),"(^)",s"$colName: ") } }):_*))
what would be the conversion to pyspark?
0

You need to first add an empty column then map it like so (in Java):

StructType newSchema = df1.schema().add("Combined Array", DataTypes.StringType);

df1 = df1.withColumn("Combined Array", lit(null))
        .map((MapFunction<Row, Row>) row ->
            RowFactory.create(...values...) // add existing values and new value here
        , newSchema);

It should be fairly similar in Scala.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.