1

I'm converting Scala code to Python. Scala code is using UDF.

def getVectors(searchTermsToProcessWithTokens: Dataset[Person]): Dataset[Person] = {

    import searchTermsToProcessWithTokens.sparkSession.implicits._

    def addVectors(
      tokensToSearchFor: String,
      tokensToSearchIn: String
    ): Seq[Int] = {
      tokensToSearchFor.map(token => if (tokensToSearchIn.contains(token)) 1 else 0)
    }

    val addVectorsUdf: UserDefinedFunction = udf(addVectors _)

    searchTermsToProcessWithTokens
      .withColumn("search_term_vector", addVectorsUdf($"name", $"age"))
      .withColumn("keyword_text_vector", addVectorsUdf($"name", $"age"))
      .as[Person]
     }

My Python conversion:

def getVectors(searchTermsToProcessWithTokens): 

    def addVectors(tokensToSearchFor: str, tokensToSearchIn: str): 
      
      tokensToSearchFor = [1 if (token in tokensToSearchIn) else 0 for token in tokensToSearchIn]
      
      return tokensToSearchFor 
      
    addVectorsUdf= udf(addVectors, ArrayType(StringType()))

    TokenizedSearchTerm = searchTermsToProcessWithTokens \
      .withColumn("search_term_vector", addVectorsUdf(col("name"), col("age"))) \
      .withColumn("keyword_text_vector", addVectorsUdf(col("name"), col("age")))
      
      
    return TokenizedSearchTerm

Defining simple Dataset in Scala like

case class Person(name: String, age: Int)

val personDS = Seq(Person("Max", 33), Person("Adam", 32), Person("Muller", 62)).toDS()
personDS.show()
// +------+---+
// |  name|age|
// +------+---+
// |   Max| 33|
// |  Adam| 32|
// |Muller| 62|
// +------+---+

I'm getting output from Scala function

val x= getVectors(personDS)

x.show()
// +------+---+------------------+-------------------+
// |  name|age|search_term_vector|keyword_text_vector|
// +------+---+------------------+-------------------+
// |   Max| 33|         [0, 0, 0]|          [0, 0, 0]|
// |  Adam| 32|      [0, 0, 0, 0]|       [0, 0, 0, 0]|
// |Muller| 62|[0, 0, 0, 0, 0, 0]| [0, 0, 0, 0, 0, 0]|
// +------+---+------------------+-------------------+

But for the same defined PySpark DataFrame

%python
personDF = spark.createDataFrame([["Max", 32], ["Adam", 33], ["Muller", 62]], ['name', 'age'])

+------+---+
|  name|age|
+------+---+
|   Max| 32|
|  Adam| 33|
|Muller| 62|
+------+---+

I'm getting from Python version

An exception was thrown from a UDF: 'TypeError: 'int' object is not iterable'

What it is wrong with this conversion?

2
  • The only loop you seem to be doing is the for loop in the definition of tokensToSearchFor. tokensToSearchIn is a string as you hint in the function definition so this error is hard to understand from my point of view. Does the error precise the line at which it happens ? What happens if you do a print(type(tokensToSearchIn)) before that line? Commented Jun 9, 2022 at 10:01
  • print didn't show anything, line number points to tokensToSearchFor = [1 if (token in tokensToSearchIn) else 0 for token in tokensToSearchIn] Commented Jun 9, 2022 at 10:42

1 Answer 1

2

It's because your tokensToSearchIn is integer, while it should be string. The following works:

def getVectors(searchTermsToProcessWithTokens): 
    def addVectors(tokensToSearchFor: str, tokensToSearchIn: str): 
      tokensToSearchFor = [1 if token in str(tokensToSearchIn) else 0 for token in tokensToSearchFor]
      return tokensToSearchFor
    addVectorsUdf = udf(addVectors, ArrayType(StringType()))

    TokenizedSearchTerm = searchTermsToProcessWithTokens \
      .withColumn("search_term_vector", addVectorsUdf(col("name"), col("age"))) \
      .withColumn("keyword_text_vector", addVectorsUdf(col("name"), col("age")))

    return TokenizedSearchTerm

For the sake of curiosity, you don't need a UDF. But it doesn't look much simpler...

def getVectors(searchTermsToProcessWithTokens): 
    def addVectors(tokensToSearchFor: str, tokensToSearchIn: str): 
      def arr(s: str):
        return F.split(F.col(s), '(?!$)')
      return transform(arr(tokensToSearchFor), lambda token: when(array_contains(arr(tokensToSearchIn), token), 1).otherwise(0))

    TokenizedSearchTerm = searchTermsToProcessWithTokens \
      .withColumn("search_term_vector", addVectors("name", "age")) \
      .withColumn("keyword_text_vector", addVectors("name", "age"))

    return TokenizedSearchTerm
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.