1

I have a PySpark DataFrame with two sets of latitude, longitude coordinates. I am trying to calculate the Haversine distance between each set of coordinates for a given row. I am using the following haversine() that I found online. The problem is that it cannot be applied to columns, or at least I do not know the syntax to do so. Can someone share the syntax or point out a better solution?

from math import radians, cos, sin, asin, sqrt

def haversine(lat1, lon1, lat2, lon2):
    """
    Calculate the great circle distance between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])
    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    # Radius of earth in miles is 3,963; 5280 ft in 1 mile
    ft = 3963 * 5280 * c
    return ft

I know the haversine() function above works because I tested it using some lat/lon coordinates from my dataframe and got sensible results:

haversine(-85.8059, 38.250134, 
          -85.805122, 38.250098)
284.1302325439314

When I replace sample coordinates with column names corresponding to lat/lons in my PySpark dataframe, I get an error. I have tried the following code in an attempt to create a new column containing the calculated Haversine distance as measured in feet:

df.select('id', 'p1_longitude', 'p1_latitude', 'p2_lon', 'p2_lat').withColumn('haversine_dist', 
                           haversine(df['p1_latitude'],
                                    df['p1_longitude'],
                                    df['p2_lat'],
                                    df['p2_lon']))
.show()

but I get the error:

must be real number, not Column Traceback (most recent call last):
File "", line 8, in haversine TypeError: must be real number, not Column

This indicates to me that I must somehow iteratively apply my haversine function to each row of my PySpark DataFrame, but I'm not sure if that guess is correct and even if so, I don't know how to do it. As an aside, my lat/lons are float types.

1
  • 1
    Avoid using udf-s as much as possible - spark cannot optimize them and their performance is times worse than using builtin functions or SQL. All the trigonometric functions are available in SQL (I don't know about radians)... So if your performance tanks try rewriting this with SQL expressions instead and see what happens Commented Feb 6, 2020 at 6:57

2 Answers 2

6

Don't use UDF when you can use Spark built-in functions as they are generally less performant.

Here is a solution using only Spark SQL functions that do the same as your function :

from pyspark.sql.functions import col, radians, asin, sin, sqrt, cos

df.withColumn("dlon", radians(col("p2_lon")) - radians(col("p1_longitude"))) \
  .withColumn("dlat", radians(col("p2_lat")) - radians(col("p1_latitude"))) \
  .withColumn("haversine_dist", asin(sqrt(
                                         sin(col("dlat") / 2) ** 2 + cos(radians(col("p1_latitude")))
                                         * cos(radians(col("p2_lat"))) * sin(col("dlon") / 2) ** 2
                                         )
                                    ) * 2 * 3963 * 5280) \
  .drop("dlon", "dlat")\
  .show(truncate=False)

Gives:

+-----------+------------+----------+---------+------------------+
|p1_latitude|p1_longitude|p2_lat    |p2_lon   |haversine_dist    |
+-----------+------------+----------+---------+------------------+
|-85.8059   |38.250134   |-85.805122|38.250098|284.13023254857814|
+-----------+------------+----------+---------+------------------+

You can find available Spark builtin functions here.

Sign up to request clarification or add additional context in comments.

2 Comments

this looks promising. however, I am getting the error: <console>:33: error: value ** is not a member of org.apache.spark.sql.Column
Hemm this is weird! The above code runs correctly with Spark 2.4... Not sure why you get that error but alternatively, you can use Spark SQL function power instead of Python operator ** (a ** b is equivalent to power(a, b) )...
0

In the case of udf, radians(and other maths functions) should be imported from pyspark.sql.functions and not math. math expects the real number so it is giving a type error.

This should resolve the error:

from pyspark.sql.functions import radians, cos, sin, asin, sqrt

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.