1

I have to develop a Spark script with python that checks some logs and verifies if a user has changed the country of his IP between two events. I have a csv file with IP ranges and associated countries saved on HDFS like this:

startIp, endIp, country
0.0.0.0, 10.0.0.0, Italy
10.0.0.1, 20.0.0.0, England
20.0.0.1, 30.0.0.0, Germany

And a log csv file:

userId, timestamp, ip, event
1, 02-01-17 20:45:18, 10.5.10.3, login
24, 02-01-17 20:46:34, 54.23.16.56, login

I load both files with a Spark Dataframe, and I've already modified the one that contains the logs with a lag function adding a column with the previousIp. The solution I thought is to substitute the ip and previousIp with the associated country in order to compare them and using a dataFrame.filter("previousIp" != "ip"). My question is, there is a way to do that in Spark? Something like:

dataFrame = dataFrame.select("userId", udfConvert("ip",countryDataFrame).alias("ip"), udfConvert("previousIp",countryDataFrame).alias("previousIp"),...)

In order to have a Dataframe like this:

userId, timestamp, ip, event, previousIp
1, 02-01-17 20:45:18, England, login, Italy

If not, how I can solve my problem? Thank you

0

1 Answer 1

1

It's actually quite easy if you convert IP address to number first. You can write your own UDF or use code from petrabarus and register function like this:

spark.sql("CREATE TEMPORARY FUNCTION iptolong as 'net.petrabarus.hiveudfs.IPToLong'")

Then map countries csv to dataframe with numbers:

>>> ipdb = spark.read.csv('ipdb.csv', header=True).select(
             expr('iptolong(startIp)').alias('ip_from'),
             expr('iptolong(endIp)').alias('ip_to'), 
             'country')
>>> ipdb.show()
+---------+---------+-------+
|  ip_from|    ip_to|country|
+---------+---------+-------+
|        0|167772160|  Italy|
|167772161|335544320|England|
|335544321|503316480|Germany|
+---------+---------+-------+

Also, map your log dataframe to numbers:

>>> log = spark.createDataFrame([('15.0.0.1',)], ['ip']) \
            .withColumn('ip', expr('iptolong(ip)'))
>>> log.show()
+---------+
|       ip|
+---------+
|251658241|
+---------+

Then you can join this dataframe using between condition:

>>> log.join(broadcast(ipdb), log.ip.between(ipdb.ip_from, ipdb.ip_to)).show()
+---------+---------+---------+-------+
|       ip|  ip_from|    ip_to|country|
+---------+---------+---------+-------+
|251658241|167772161|335544320|England|
+---------+---------+---------+-------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.