2

I have dataframe of dns(string) and ip-address (string). I would like to use a UDF to apply python function I created that searches for distinct/unique dns and correlates that to the number of ips it matches on. Finally it will output that information out into a list. The end results is the UDF takes a dataframe and returns a list.

#creating sample data
from pyspark.sql import Row
l = [('pipe.skype.com','172.25.132.26'),('management.azure.com','172.25.24.57'),('pipe.skype.com','172.11.128.10'),('management.azure.com','172.16.12.22'),('www.google.com','172.26.51.144'),('collector.exceptionless.io','172.22.2.21')]
rdd = sc.parallelize(l)
data = rdd.map(lambda x: Row(dns_host=x[0], src_ipv4=x[1]))
data_df = sqlContext.createDataFrame(data)

def beaconing_aggreagte(df):
  """Loops through unique hostnames and correlates them to unique src ip. If an individual hostname has less than 5 unique source ip connection, moves to the next step"""
  dns_host = df.select("dns_host").distinct().rdd.flatMap(lambda x: x).collect()
  HIT_THRESHOLD = 5
  data = []
  for dns in dns_host:
    dns_data =[]
    testing = df.where((f.col("dns_host") == dns)).select("src_ipv4").distinct().rdd.flatMap(lambda x: x).collect()
    if 0 < len(testing) <= 5: #must have less than 5 unique src ip for significance 
      dns_data.append(dns)
      data.append([testing,dns_data])
      print([testing,dns_data])
  return data

I think it is possible that my schema is incorrect

#Expected return from function: [[['172.25.24.57','172.16.12.22'],[management.azure.com]],..]
array_schema = StructType([
    StructField('ip', ArrayType(StringType()), nullable=False),
    StructField('hostname', ArrayType(StringType()), nullable=False)
]) 

testing_udf_beaconing_aggreagte = udf(lambda z: beaconing_aggreagte(z), array_schema)
df_testing = testing_df.select('*',testing_udf_beaconing_aggreagte(array('dns_host','src_ipv4')))
df_testing.show()

This errors out to:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1248.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1248.0 (TID 3846823, 10.139.64.23, executor 13): org.apache.spark.api.python.PythonException: Traceback (most recent call last):

My end goal is to take a df and return a list in the format [[[list of ips], [dns_host]],...]. I am attempting to use UDF to help parallelize the operations over the cluster instead of using one executor.

1 Answer 1

1

A group by should be able to achieve that. Use the aggregate to collect all IPs and then count the size of the list. You can then filter out rows which has size > 5

from pyspark.sql.functions import *
from pyspark.sql import Row
l = [('pipe.skype.com','172.25.132.26'),('management.azure.com','172.25.24.57'),('pipe.skype.com','172.11.128.10'),('management.azure.com','172.16.12.22'),('www.google.com','172.26.51.144'),('collector.exceptionless.io','172.22.2.21')]
rdd = sc.parallelize(l)
data = rdd.map(lambda x: Row(dns_host=x[0], src_ipv4=x[1]))
data_df = sqlContext.createDataFrame(data)

data_df2 = data_df.groupby("dns_host").agg(F.collect_list("src_ipv4").alias("src_ipv4_list"))\
                  .withColumn("ip_count",F.size("src_ipv4_list"))\
                  .filter(F.col("ip_count") <= 5)\
                  .drop("ip_count")
data_df2.show(20,False)

Output:

+--------------------------+------------------------------+
|dns_host                  |src_ipv4_list                 |
+--------------------------+------------------------------+
|pipe.skype.com            |[172.25.132.26, 172.11.128.10]|
|collector.exceptionless.io|[172.22.2.21]                 |
|www.google.com            |[172.26.51.144]               |
|management.azure.com      |[172.25.24.57, 172.16.12.22]  |
+--------------------------+------------------------------+
Sign up to request clarification or add additional context in comments.

1 Comment

Thank you so much. This way is much much easier than what I was trying to go for. And I love that it will parallelize naturally

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.