0

This is my function apply rule, the col mdp_codcat,mdp_idregl, usedRef changechanges according to the data in array bRef.

    def withMdpCodcat(bRef: Broadcast[Array[RefRglSDC]])(dataFrame: DataFrame):DataFrame ={var matchRule = false
    var i = 0
    while (i < bRef.value.size && !matchRule) {
      if ((bRef.value(i).sensop.isEmpty || bRef.value(i).sensop.equals(col("signe")))
        && (bRef.value(i).cdopcz.isEmpty || Lib.matchCdopcz(strTail(col("cdopcz")).toString(), bRef.value(i).cdopcz))
        && (bRef.value(i).libope.isEmpty || Lib.matchRule(col("lib_ope").toString(), bRef.value(i).libope))
        && (bRef.value(i).qualib.isEmpty || Lib.matchRule(col("qualif_lib_ope").toString(), bRef.value(i).qualib))) {
        matchRule = true
        dataFrame.withColumn("mdp_codcat", lit(bRef.value(i).codcat))
        dataFrame.withColumn("mdp_idregl", lit(bRef.value(i).idregl))
        dataFrame.withColumn("usedRef", lit("SDC"))
      }else{
        dataFrame.withColumn("mdp_codcat", lit("NOT_CATEGORIZED"))
        dataFrame.withColumn("mdp_idregl", lit("-1"))
        dataFrame.withColumn("usedRef", lit(""))
      }
      i += 1
    }

    dataFrame
  }


dataFrame : "cdenjp", "cdguic", "numcpt", "mdp_codcat", "mdp_idregl" , mdp_codcat","mdp_idregl","usedRef"  if match add mdp_idregl, mdp_idregl,mdp_idregl with value bRef

Example - my dataframe :

val DF = Seq(("tt", "aa","bb"),("tt1", "aa1","bb2"),("tt1", "aa1","bb2")).toDF("t","a","b)
+---+---+---+---+
|  t|  a|  b|  c|
+---+---+---+---+
| tt| aa| bb| cc|
|tt1|aa1|bb2|cc3|
+---+---+---+---+

file.text content :

 ,aa,bb,cc
 ,aa1,bb2,cc3
tt4,aa4,bb4,cc4
tt1,aa1,,cc6


case class TOTO(a: String, b:String, c: String, d:String)


 val text = sc.textFile("file:///home/X176616/file")
 val bRef= textFromCsv.map(row => row.split(",", -1))
      .map(c => TOTO(c(0), c(1), c(2), c(3))).collect().sortBy(_.a)



def withMdpCodcat(bRef: Broadcast[Array[RefRglSDC]])(dataFrame: DataFrame):DataFrame
 dataframe.withColumn("mdp_codcat_new", "NOT_FOUND")  //first init not found, change if while if match 

    var matchRule = false
    var i = 0

    while (i < bRef.value.size && !matchRule) {
      if ((bRef.value(i).a.isEmpty || bRef.value(i).a.equals(signe))
        && (bRef.value(i).b.isEmpty || Lib.matchCdopcz(col(b), bRef.value(i).b))
        && (bRef.value(i).c.isEmpty || Lib.matchRule(col(c), bRef.value(i).c))
        )) {
        matchRule = true
        dataframe.withColumn("mdp_codcat_new", bRef.value(i).d)
        dataframe.withColumn("mdp_mdp_idregl_new" = bRef.value(i).e
       
      }
      i += 1
    }

Finally df if condition true

bRef.value(i).a.isEmpty || bRef.value(i).a.equals(signe))
            && (bRef.value(i).b.isEmpty || Lib.matchCdopcz(b.substring(1).toInt.toString, bRef.value(i).b))
            && (bRef.value(i).c.isEmpty || Lib.matchRule(c, bRef.value(i).c)

+---+---+---+---+-----------+----------+
|  t|  a|  b|  c|mdp_codcat |mdp_idregl|
+---+---+---+---+-----------|----------+
| tt| aa| bb| cc|cc         | other    |
| ab|aa1|bb2|cc3|cc4        | toto     | from bRef if true in while
| cd|aa1|bb2|cc3|cc4        | titi     |
|  b|a1 |b2 |c3 |NO_FOUND   |NO_FOUND  | (not_found if conditional false)
+---+---+---+---+----------------------+
+---+---+---+---+----------------------+
5
  • I need help please, thank you in advance Commented Oct 16, 2018 at 8:48
  • do your column names always be the same? these three names?, use an udf to update the fields value Commented Oct 16, 2018 at 9:26
  • You should write what you want to achieve and provide a simple example of input and output. There is not a reason to use while to loop in to a dataframe in an environment like spark Commented Oct 16, 2018 at 9:29
  • Tks the columns name is different ex colomns in dataFrame : "cdenjp", "cdguic", "numcpt", "mdp_codcat", "mdp_idregl" , mdp_codcat","mdp_idregl","usedRef" Commented Oct 16, 2018 at 9:34
  • Tks, the columns name is different ex colomns in dataFrame : "cdenjp", "cdguic", "numcpt", "mdp_codcat", "mdp_idregl" , if match just 3 columns add with new value mdp_codcat","mdp_idregl","usedRef" else add 3 columns with other value Commented Oct 16, 2018 at 9:43

1 Answer 1

2

You can not create a dataframe schema depending on a runtime value. I would try to do it simpler. First I´d create the three columns with a default value:

dataFrame.withColumn("mdp_codcat", lit(""))
dataFrame.withColumn("mdp_idregl", lit(""))
dataFrame.withColumn("usedRef", lit(""))

Then you can use a udf with your broadcasted value:

def mdp_codcat(bRef: Broadcast[Array[RefRglSDC]]) = udf { (field: String) =>
{
      // Your while and if stuff
      // return your update data
}}

And apply each udf to each field:

dataframe.withColumn("mdp_codcat_new", mdp_codcat(bRef)("mdp_codcat"))

Maybe it can help

Sign up to request clarification or add additional context in comments.

1 Comment

tks for answer @EmiCareOfCell44, i add new details with exemple

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.