0

Hellow everyone!

I have two DataFrames in apache spark (2.3) and I want to join them properly. I will explain below what I mean with 'properly'. First of all the two dataframes holds the following information:

nodeDf: ( id, year, title, authors, journal, abstract )
edgeDf: ( srcId, dstId, label )

The label could be 0 or 1 in case node1 is connected with node2 or not.

I want to combine this two dataframes to get one dataframe withe the following information:

JoinedDF: ( id_from, year_from, title_from, journal_from, abstract_from, id_to, year_to, title_to, journal_to, abstract_to, time_dist )

time_dist = abs(year_from - year_to)

When I said 'properly' I meant that the query must be as fast as it could be and I don't want to contain null rows or cels ( value on a row ).

I have tried the following but I took me 500 -540 sec to execute the query and the final dataframe contains null values. I don't even know if the dataframes ware joined correctly.

I want to mention that the node file from which I create the nodeDF has 27770 rows and the edge file (edgeDf) has 615512 rows.

Code:

val spark = SparkSession.builder().master("local[*]").appName("Logistic Regression").getOrCreate()
val sc = spark.sparkContext

val data = sc.textFile("resources/data/training_set.txt").map(line =>{
  val fields = line.split(" ")
  (fields(0),fields(1), fields(2).toInt)
})

val data2 = sc.textFile("resources/data/test_set.txt").map(line =>{
  val fields = line.split(" ")
  (fields(0),fields(1))
})

import spark.implicits._
val trainingDF = data.toDF("srcId","dstId", "label")
val testDF = data2.toDF("srcId","dstId")

val infoRDD = spark.read.option("header","false").option("inferSchema","true").format("csv").load("resources/data/node_information.csv")

val infoDF = infoRDD.toDF("srcId","year","title","authors","jurnal","abstract")

println("Showing linksDF sample...")
trainingDF.show(5)
println("Rows of linksDF: ",trainingDF.count())

println("Showing infoDF sample...")
infoDF.show(2)
println("Rows of infoDF: ",infoDF.count())

println("Joining linksDF and infoDF...")
var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" === $"b.srcId")

println(joinedDF.count())

joinedDF = joinedDF.select($"a.srcId",$"a.dstId",$"a.label",$"b.year",$"b.title",$"b.authors",$"b.jurnal",$"b.abstract")

joinedDF.show(5)


val graphX = new GraphX()
val pageRankDf =graphX.computePageRank(spark,"resources/data/training_set.txt",0.0001)

println("Joining joinedDF and pageRankDf...")
joinedDF = joinedDF.as("a").join(pageRankDf.as("b"),$"a.srcId" === $"b.nodeId")

var dfWithRanks = joinedDF.select("srcId","dstId","label","year","title","authors","jurnal","abstract","rank").withColumnRenamed("rank","pgRank")
dfWithRanks.show(5)

println("Renameming joinedDF...")
dfWithRanks = dfWithRanks
  .withColumnRenamed("srcId","id_from")
  .withColumnRenamed("dstId","id_to")
  .withColumnRenamed("year","year_from")
  .withColumnRenamed("title","title_from")
  .withColumnRenamed("authors","authors_from")
  .withColumnRenamed("jurnal","jurnal_from")
  .withColumnRenamed("abstract","abstract_from")

var infoDfRenamed = dfWithRanks
  .withColumnRenamed("id_from","id_from")
  .withColumnRenamed("id_to","id_to")
  .withColumnRenamed("year_from","year_to")
  .withColumnRenamed("title_from","title_to")
  .withColumnRenamed("authors_from","authors_to")
  .withColumnRenamed("jurnal_from","jurnal_to")
  .withColumnRenamed("abstract_from","abstract_to").select("id_to","year_to","title_to","authors_to","jurnal_to","jurnal_to")

var finalDF = dfWithRanks.as("a").join(infoDF.as("b"),$"a.id_to" === $"b.srcId")

finalDF = finalDF
  .withColumnRenamed("year","year_to")
  .withColumnRenamed("title","title_to")
  .withColumnRenamed("authors","authors_to")
  .withColumnRenamed("jurnal","jurnal_to")
  .withColumnRenamed("abstract","abstract_to")

println("Dropping unused columns from joinedDF...")
finalDF = finalDF.drop("srcId")

finalDF.show(5)  

Here are my results!

enter image description here

Avoid all calculations and code related to pgRank! Is there any proper way to do this join works?

2 Answers 2

2

You can filter your data first and then join, in that case you will avoid nulls

df.filter($"ColumnName".isNotNull)

Sign up to request clarification or add additional context in comments.

Comments

1

use <=> operator in your joining column condition

var joinedDF = trainingDF.as("a").join(infoDF.as("b"),$"a.srcId" <=> $"b.srcId") 

There is a function in spark 2.1 or greater is eqNullSafe

var joinedDF = trainingDF.join(infoDF,trainingDF("srcId").eqNullSafe(infoDF("srcId")))

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.