0

I am new to spark.

Here is my code:

val Data = sc.parallelize(List(
      ("I", "India"), 
      ("U", "USA"), 
      ("W", "West"))) 

val DataArray = sc.broadcast(Data.collect)

val FinalData = DataArray.value

Here FinalData is of Array[(String, String)] type. But I want data to be in the form of RDD[(String, String)] type.

Can I convert FinalData to RDD[(String, String)] type.

More Detail:

I want to join Two RDD So to optimize join condition(For performance point of view) I am broadcasting small RDD to all cluster so that data shuffling will be less.(Indirectly performance will get improved) So for all this I am writting something like this:

//Big Data
val FirstRDD = sc.parallelize(List(****Data of first table****))

//Small Data
val SecondRDD = sc.parallelize(List(****Data of Second table****)) 

So defintely I will broadcast Small Data set(means SecondRDD)

val DataArray = sc.broadcast(SecondRDD.collect)

val FinalData = DataArray.value

//Here it will give error that

val Join = FirstRDD.leftOuterJoin(FinalData)

Found Array required RDD

That's why I am looking for Array to RDD conversion.

2 Answers 2

3

The real is problem is that you are creating a Broadcast variable, by collecting the RDD (notice that this action converts the RDD into an Array). So, what I'm saying is that you already have an RDD, which is Data, and this variable has exactly the same values as FinalData, but in the form you want RDD[(String, String)].

You can check this in the following output.

Data: org.apache.spark.rdd.RDD[(String, String)] = ParallelCollectionRDD[2] at parallelize at <console>:32
DataArray: org.apache.spark.broadcast.Broadcast[Array[(String, String)]] = Broadcast(1)
FinalData: Array[(String, String)] = Array((I,India), (U,USA), (W,West))

Although, I don't understand your approach You just need to parallelize the Broadcast's value.

// You already have this data stored in `Data`, so it's useless repeat this process.
val DataCopy = sc.parallelize(DataArray.value)

EDIT

After reading your question again, I believe the problem is almost the same. You are trying to join an RDD with a Broadcast and that's not allowed. However, if you read the documentation you may notice that it's possible to join both RDDs (see code below).

val joinRDD = FirstRDD.keyBy(_._1).join(SecondRDD.keyBy(_._1))
Sign up to request clarification or add additional context in comments.

8 Comments

But After broadcast I have to again convert FinalData to RDD. As FinalData is of Array type.
@Darshan perhaps you're not noticing the fact that Data has type RDD[(String, String)] - exactly what you seem to be looking for...
There is one function that i want to use. But that function is in RDD not in Array.That's why
@Tzach Zohar Yes Data has type RDD[(String, String)]
So you intentionally want to shift the data back and forth 4 times between driver application and cluster nodes, just to end up with an identical value?
|
2

Broadcasts are indeed useful to improve performance of a JOIN between a large RDD and a smaller one. When you do that, broadcast (along with map or mapPartitions) replaces the join, it's not used in a join, and therefore in no way you'll need to "transform a broadcast into an RDD".

Here's how it would look:

val largeRDD = sc.parallelize(List(
  ("I", "India"),
  ("U", "USA"),
  ("W", "West")))

val smallRDD = sc.parallelize(List(
  ("I", 34),
  ("U", 45)))

val smaller = sc.broadcast(smallRDD.collectAsMap())

// using "smaller.value" inside the function passed to RDD.map ->
// on executor side. Broadcast made sure it's copied to each executor (once!)
val joinResult = largeRDD.map { case (k, v) => (k, v, smaller.value.get(k)) }

joinResult.foreach(println)
// prints:
// (I,India,Some(34))
// (W,West,None)
// (U,USA,Some(45))

See a similar solution (using mapPartitions) which might be more efficient here.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.