1

process array column using udf and return another array

Below is my input:

docID Shingles D1 [23, 25, 39,59] D2 [34, 45, 65]

I want to generate a new column called hashes by processing shingles array column: For example, I want to extract min and max (this is just example toshow that I want a fixed length array column, I don’t actually want to find min or max)

docID Shingles Hashes D1 [23, 25, 39,59] [23,59] D2 [34, 45, 65] [34,65]

I created a udf as below:

def generate_minhash_signatures(shingles, coeffA, coeffB):
    signature = []
    minHashCode = nextPrime + 1
    maxHashCode = 0
    for shingleID in shingles:
        if shingleID < minHashCode:
            minHashCode = shingleID
        if shingleID > maxHashCode:
            maxHashCode = shingleID
    return [minHashCode, maxHashCode]

minhash_udf = udf(generate_minhash_signatures, ArrayType(IntegerType()))
df_with_minhash = df.withColumn('min_max_hash', minhash_udf("shingles", coeffA, coeffB))
df_with_minhash.show()

But it gives following error:

TypeError: Invalid argument, not a string or column: [2856022824, 2966132496, 947839218, 1658426276, 1862779421, 3729685802, 1710806966, 2696513050, 3630333076, 2555745391] of type <class 'list'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.

Actual udf:

def generate_minhash_signatures(shingles, coeffA, coeffB, numHashes):
    signature = []
    for i in range(0, numHashes):
        minHashCode = nextPrime + 1
        for shingleID in shingles:
            hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime

            if hashCode < minHashCode:
                minHashCode = hashCode

        signature.append(minHashCode)
    return signature
4

2 Answers 2

4

Your udf expects all three parameters to be columns. It's likely coeffA and coeffB are not just numeric values which you need to convert to column objects using lit:

import pyspark.sql.functions as f
df.withColumn('min_max_hash', minhash_udf(f.col("shingles"), f.lit(coeffA), f.lit(coeffB)))

If coeffA and coeffB are lists, use f.array to create the literals as follows:

df.withColumn('min_max_hash', 
  minhash_udf(f.col("shingles"), 
  f.array(*map(f.lit, coeffA)),
  f.array(*map(f.lit, coeffB))
)

Or separate column arguments and non column arguments as follows:

def generate_minhash_signatures(coeffA, coeffB, numHashes)
    def generate_minhash_signatures_inner(shingles):
        signature = []
        for i in range(0, numHashes):
            minHashCode = nextPrime + 1
            for shingleID in shingles:
                hashCode = (coeffA[i] * shingleID + coeffB[i]) % nextPrime

                if hashCode < minHashCode:
                    minHashCode = hashCode

            signature.append(minHashCode)
        return signature
    return f.udf(generate_minhash_signatures_inner, ArrayType(IntegerType()))

And then you can call the function as:

df.withColumn('min_max_hash', generate_minhash_signatures(coeffA, coeffB, numHashes)("shingles"))
Sign up to request clarification or add additional context in comments.

1 Comment

coeffA and coeffB are lists of fixed length. I updated the question to include actual udf.
0

My problem is not exactly the same. but a similar one - I had to sent three array type columns as input and get an array type (of string types ) as output

I was returning a list and tried many other approaches but it did not succeed.

def func_req(oldlist , newlist , pve):
    deleted_stores = list(set(oldlist) - set(newlist))
    new_stores = list(set(newlist) - set(oldlist))
    old_map = dict(zip(list(oldlist), list(pvector)))
    for key in deleted_stores:
        old_map.pop(key)
    for key in newlist:
        if key not in old_map.keys():
            old_map[key] = 'PTest'
    pvec=list(old_map.values())
    return pvec

I called it as in this statement:

df_diff = df3.withColumn(
    'updatedp',
    func_req(f.col('oldlist'), f.col('presentlist'), f.col('pvec'))
)

It gave me an error:

AssertionError: col should be Column

Solution

Then, I came across this post and introduced a wrapper function -

func_req_wrapper = f.udf(func_req, ArrayType(StringType()))

and called it in:

df_diff = df3.withColumn(
    'updatedp', 
    func_req_wrapper('oldlist',  'presentlist', 'pvec')
)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.