1

We have a requirement to add the sum and ratio as below.

Input File:Array(Array("a:25","a:30","b:30"),Array("a:25","a:30","b:30")) 

we need the output to be ratios:

step 1:
=======
a:25+30+25+30  ==> a:110
b:30 + 30      ==> b:60

step 2:
=======
a=a/a+b  ==>a:110/170
b=b/a+b ==>b:60/170

So far I tried This:

val a = Array(Array("a:25","a:30","b:30"),Array("a:25","a:30","b:30"))
val res=a.flatMap(a=>a.map(x=>x.split(":")))
val res1=res.map(y => (y(0).asInstanceOf[String],(y(1).toDouble.asInstanceOf[Double]))).groupBy(_._1).map(x=>(x._1, x._2.map(_._2).sum)).toArray

Input File or Dataframe:

 [54,WrappedArray(
    [WrappedArray(BCD001:10.0, BCD006:20.0),
    WrappedArray(BCD003:10.0, BCD006:30.0)],
    [WrappedArray(BCD005:50.0, BCD006:10.0),
    WrappedArray(BCD003:70.0, BCD006:0.0)])]

ouput file or dataframe: after 
adding all the BCD code values and ratios per bcd

eg. in record1 sum = 10+20+10+30+50+10+70+0= 210
ratio per BCD code = 10/210 = 0.50`

output file:

[54,WrappedArray([BCD001:0.5,
BCD006:0.1,BCD003:0.4,BCD005:0.25])]
3
  • what does this have to do with spark? Commented Apr 11, 2018 at 18:34
  • @RaphaelRoth Actually we have wrappedArray with similar data we need to write a UDF to achieve the above requirement. Commented Apr 11, 2018 at 18:50
  • could you please give an example how your rdd or dataframe looks like? Commented Apr 11, 2018 at 18:51

2 Answers 2

1

You can do the following

val a = Array(Array("a:25","a:30","b:30"),Array("a:25","a:30","b:30"))

val res = a.flatMap(a=>a.map(x=>{
  val splitted = x.split(":")
  (splitted(0).trim, splitted(1).trim.toInt)
}))
  .groupBy(_._1)
  .map(x => (x._1, x._2.map(_._2).sum))

val total = res.values.sum

res.map(x => (x._1, x._2+"/"+total))
  .foreach(println)

which should give you

(b,60/170)
(a,110/170)

If you don't want the string values you can do

res.map(x => (x._1, x._2.toDouble/total))
  .foreach(println)

which should give

(b,0.35294117647058826)
(a,0.6470588235294118)
Sign up to request clarification or add additional context in comments.

Comments

1

It seems like your data structure is not like sequence of sequences

val a = Array(Array("a:25","a:30","b:30"),Array("a:25","a:30","b:30"))

but more like a Sequence of tuple of sequence (you can verify with printSchema in spark)

val a = Seq((Seq("BCD001:10.0", "BCD006:20.0"),Seq("BCD003:10.0", "BCD006:30.0")),
  (Seq("BCD005:50.0", "BCD006:10.0"),Seq("BCD003:70.0", "BCD006:0.0")))

in which case you need something like:

def parse(sq:Seq[String])=sq.map(x=>{val y=x.split(":")
  (y.head,y.last.toDouble)})
val res=a.flatMap(a=>Seq(parse(a._1),parse(a._2))).flatten.groupBy{case (k,_)=>k}
  .map{case (k,vs)=>(k,vs.foldLeft(0.0){case (t,(_,v))=>t+v})}
val tot=res.values.sum
res.map{case (k,v)=> s"$k:${v/tot}"}.toArray

resulting in:

res0: Array[String] = Array(BCD006:0.3, BCD003:0.4, BCD005:0.25, BCD001:0.05)

3 Comments

Input File or Dataframe: --------------------- [54,WrappedArray( [WrappedArray(BCD001:10.0, BCD006:20.0), WrappedArray(BCD003:10.0, BCD006:30.0)], [WrappedArray(BCD005:50.0, BCD006:10.0), WrappedArray(BCD003:70.0, BCD006:0.0)])] ouput file or dataframe: after adding all the BCD code values and ratios per bcd eg. in record1 sum = 10+20+10+30+50+10+70+0= 210 ratio per BCD code = 10/210 = 0.50 ------------- output file: ------------- [54,WrappedArray([BCD001:0.5, BCD006:0.1,BCD003:0.4,BCD005:0.25])]`
It's hard to understand what you mean
updated the requirement with input and output dataframes

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.