I created a BucketedRandomProjectionLSHModel in order to find out the approximate nearest neighbours for every row in my dataset. The signature for the approximate nearest function is
def approxNearestNeighbors(
dataset: Dataset[_],
key: Vector,
numNearestNeighbors: Int): Dataset[_]
To run it on every row of the dataframe, my idea is to create some udf which calls this function and convert the resulting Dataset into a column of ArrayType[StructType].
Suppose my initial schema is
root
|-- genderIndex: double (nullable = false)
|-- genderIndexVec: vector (nullable = true)
|-- categoryIndex: double (nullable = false)
|-- categoryIndexVec: vector (nullable = true)
|-- features: vector (nullable = true)
|-- featureStdDev: vector (nullable = true)
My target schema (after calling .withColumn($"featureStdDev", udf...)) is
root
|-- genderIndex: double (nullable = false)
|-- genderIndexVec: vector (nullable = true)
|-- categoryIndex: double (nullable = false)
|-- categoryIndexVec: vector (nullable = true)
|-- features: vector (nullable = true)
|-- featureStdDev: vector (nullable = true)
|-- neighbours: array(nullable = true)
|-- elem: struct
|-- genderIndex: double (nullable = false)
|-- genderIndexVec: vector (nullable = true)
|-- categoryIndex: double (nullable = false)
|-- categoryIndexVec: vector (nullable = true)
|-- features: vector (nullable = true)
|-- featureStdDev: vector (nullable = true)
Please help with my UDF as I am not sure how to make it work.
val model = // BucketedRandomProjectionLSHModel definition
val inputDF = // Input definition
val nn = udf{ (featureVector: SparseVector, k: Int) =>
model.approxNearestNeighbors(inputDF, featureVector, k)
// What now...
}
approxNearestNeighborscurrently only support single vector inputs, which is why you want to use an UDF. The problem with the UDF appraoch is that you can't refer to a dataframe inside an UDF, see e.g. stackoverflow.com/questions/47509249/…