I have to develop a Spark script with python that checks some logs and verifies if a user has changed the country of his IP between two events. I have a csv file with IP ranges and associated countries saved on HDFS like this:
startIp, endIp, country
0.0.0.0, 10.0.0.0, Italy
10.0.0.1, 20.0.0.0, England
20.0.0.1, 30.0.0.0, Germany
And a log csv file:
userId, timestamp, ip, event
1, 02-01-17 20:45:18, 10.5.10.3, login
24, 02-01-17 20:46:34, 54.23.16.56, login
I load both files with a Spark Dataframe, and I've already modified the one that contains the logs with a lag function adding a column with the previousIp. The solution I thought is to substitute the ip and previousIp with the associated country in order to compare them and using a dataFrame.filter("previousIp" != "ip"). My question is, there is a way to do that in Spark? Something like:
dataFrame = dataFrame.select("userId", udfConvert("ip",countryDataFrame).alias("ip"), udfConvert("previousIp",countryDataFrame).alias("previousIp"),...)
In order to have a Dataframe like this:
userId, timestamp, ip, event, previousIp
1, 02-01-17 20:45:18, England, login, Italy
If not, how I can solve my problem? Thank you