Hellow everyone!
I have two DataFrames in apache spark (2.3) and I want to join them properly. I will explain below what I mean with 'properly'. First of all the two dataframes holds the following information:
nodeDf: ( id, year, title, authors, journal, abstract )
edgeDf: ( srcId, dstId, label )
The label could be 0 or 1 in case node1 is connected with node2 or not.
I want to combine this two dataframes to get one dataframe withe the following information:
JoinedDF: ( id_from, year_from, title_from, journal_from, abstract_from, id_to, year_to, title_to, journal_to, abstract_to, time_dist )
time_dist = abs(year_from - year_to)
When I said 'properly' I meant that the query must be as fast as it could be and I don't want to contain null rows or cels ( value on a row ).
I have tried the following but I took me 500 -540 sec to execute the query and the final dataframe contains null values. I don't even know if the dataframes ware joined correctly.
I want to mention that the node file from which I create the nodeDF has 27770 rows and the edge file (edgeDf) has 615512 rows.
Code:
val spark = SparkSession.builder().master("local[*]").appName("Logistic Regression").getOrCreate()
val sc = spark.sparkContext
val data = sc.textFile("resources/data/training_set.txt").map(line =>{
val fields = line.split(" ")
(fields(0),fields(1), fields(2).toInt)
})
val data2 = sc.textFile("resources/data/test_set.txt").map(line =>{
val fields = line.split(" ")
(fields(0),fields(1))
})
import spark.implicits._
val trainingDF = data.toDF("srcId","dstId", "label")
val testDF = data2.toDF("srcId","dstId")
val infoRDD = spark.read.option("header","false").option("inferSchema","true").format("csv").load("resources/data/node_information.csv")
val infoDF = infoRDD.toDF("srcId","year","title","authors","jurnal","abstract")
println("Showing linksDF sample...")
trainingDF.show(5)
println("Rows of linksDF: ",trainingDF.count())
println("Showing infoDF sample...")
infoDF.show(2)
println("Rows of infoDF: ",infoDF.count())
println("Joining linksDF and infoDF...")
var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" === $"b.srcId")
println(joinedDF.count())
joinedDF = joinedDF.select($"a.srcId",$"a.dstId",$"a.label",$"b.year",$"b.title",$"b.authors",$"b.jurnal",$"b.abstract")
joinedDF.show(5)
val graphX = new GraphX()
val pageRankDf =graphX.computePageRank(spark,"resources/data/training_set.txt",0.0001)
println("Joining joinedDF and pageRankDf...")
joinedDF = joinedDF.as("a").join(pageRankDf.as("b"),$"a.srcId" === $"b.nodeId")
var dfWithRanks = joinedDF.select("srcId","dstId","label","year","title","authors","jurnal","abstract","rank").withColumnRenamed("rank","pgRank")
dfWithRanks.show(5)
println("Renameming joinedDF...")
dfWithRanks = dfWithRanks
.withColumnRenamed("srcId","id_from")
.withColumnRenamed("dstId","id_to")
.withColumnRenamed("year","year_from")
.withColumnRenamed("title","title_from")
.withColumnRenamed("authors","authors_from")
.withColumnRenamed("jurnal","jurnal_from")
.withColumnRenamed("abstract","abstract_from")
var infoDfRenamed = dfWithRanks
.withColumnRenamed("id_from","id_from")
.withColumnRenamed("id_to","id_to")
.withColumnRenamed("year_from","year_to")
.withColumnRenamed("title_from","title_to")
.withColumnRenamed("authors_from","authors_to")
.withColumnRenamed("jurnal_from","jurnal_to")
.withColumnRenamed("abstract_from","abstract_to").select("id_to","year_to","title_to","authors_to","jurnal_to","jurnal_to")
var finalDF = dfWithRanks.as("a").join(infoDF.as("b"),$"a.id_to" === $"b.srcId")
finalDF = finalDF
.withColumnRenamed("year","year_to")
.withColumnRenamed("title","title_to")
.withColumnRenamed("authors","authors_to")
.withColumnRenamed("jurnal","jurnal_to")
.withColumnRenamed("abstract","abstract_to")
println("Dropping unused columns from joinedDF...")
finalDF = finalDF.drop("srcId")
finalDF.show(5)
Here are my results!
Avoid all calculations and code related to pgRank! Is there any proper way to do this join works?
