This is the first application or really any Scala I have every written. So far it functions as I would hope it would. I just found this community and would love some peer review of possible improvements or just tell me flat out what I'm doing wrong.
This code is supposed to read a table of tables then take all of the values from that table transpose the columns into rows and write them all to a master table with three additional columns. So far it does everything except the looping which I just haven't gotten to yet. This question is not about how to do the looping and really only hoping to critique what I have here so far. I did change some variable names and a few things I had to mask.
//how to call
//spark-submit --verbose --deploy-mode cluster --master yarn --class App scala-maven-0.1-SNAPSHOT.jar
import org.apache.spark.sql.{ SQLContext, SparkSession }
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import java.util.Calendar
//this will call createStuff function for each table_name value in table
//TODO: Add loop and call CreateStuff with values from list returned from stuff_to_run
object App {
def main(args: Array[String]) {
//do I have to create the SparkSession value twice or can I pass this into the function?
val spark = SparkSession.builder().appName("App").enableHiveSupport().getOrCreate()
val stuff_to_run = spark.sql("select table_name from schema.table").rdd.map(r => r(0)).collect.toList
println(stuff_to_run)
CreateStuff("stuff");
}
def CreateStuff(stuffName: String): Unit = {
var tablename = stuffName
var column_stack = ""
val spark = SparkSession.builder().appName("App").enableHiveSupport().getOrCreate()
//is there a better way to get year and month
val Year = Calendar.getInstance.get(Calendar.YEAR)
val Month = Calendar.getInstance.get(Calendar.MONTH) + 1
val twodigitmonth = "%02d".format(Month)
val yrmnth = Year.toString + twodigitmonth.toString
//get all of the data from the base table
val raw_table = spark.sql("select *, '" + tablename + "' as stuffname, '" + yrmnth + "' as yrmnth from schema." + tablename)
//transpose all columns into rows
val string_table = raw_table.select(raw_table.columns.map(c => col(c).cast(StringType)): _*)
//get column besides 3 defined these are the only ones which are not dynamic
val selectColumns2 = string_table.columns.toSeq.filter(x => (x != "val1" && x != "stuffname" && x != "yrmnth"))
val columnCount = selectColumns2.size
//comma and quote
selectColumns2.foreach { e =>
column_stack += "'" + e + "', " + e + ", "
}
val collist = column_stack.mkString("").dropRight(1)
import spark.sqlContext.implicits._
val unPivotDF = string_table.select($"val1", $"stuffname", $"yrmnth",
expr("stack(" + selectColumns2.size + ", " + collist.dropRight(1) + ") as (fieldname, fieldvalue)"))
//unPivotDF.show()
//insert records into combined table
unPivotDF.createOrReplaceTempView("stuff_temp")
spark.sql("insert into schema.combined_stuff select * from stuff_temp")
}
}