19

We are trying to filter rows that contain empty arrays in a field using PySpark. Here is the schema of the DF:

root
 |-- created_at: timestamp (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- text: string (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- favorite_count: long (nullable = true)
 |-- in_reply_to_status_id: long (nullable = true)
 |-- in_reply_to_user_id: long (nullable = true)
 |-- in_reply_to_screen_name: string (nullable = true)
 |-- user_mentions: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- id_str: string (nullable = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- screen_name: string (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)

We are trying two approaches.

First, defining UDF that can modify the rows like this

empty_array_to_null = udf(lambda arr: None if len(arr) == 0 else arr, ArrayType(StructType()))

and use it to exclude the rows in df.select(empty_array_to_null(df.user_mentions)).

The other approach is to have the following UDF:

is_empty = udf(lambda x: len(x) == 0, BooleanType())

and use it in df.filter(is_empty(df.user_mentions))

Both approaches throw errors. First approach yields the following:

An error occurred while calling o3061.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1603.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1603.0 (TID 41390, 10.0.0.11): java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 0 fields are required while 5 values are provided.
at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:136)
at org.apache.spark.sql.execution.python.EvaluatePython$$anonfun$fromJava$1.apply(EvaluatePython.scala:122)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)

Second approach throws the following:

Some of types cannot be determined by the first 100 rows, please try again with sampling
Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 57, in toDF
    return sparkSession.createDataFrame(self, schema, sampleRatio)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 522, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 360, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio)
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/session.py", line 347, in _inferSchema
    raise ValueError("Some of types cannot be determined by the "
ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling
...

Update: Added sample data...

+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
|          created_at|   screen_name|                text|retweet_count|favorite_count|in_reply_to_status_id|in_reply_to_user_id|in_reply_to_screen_name|user_mentions|            hashtags|
+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
|2017-03-13 23:00:...|  danielmellen|#DevOps understan...|            0|             0|                 null|               null|                   null|           []|            [devops]|
|2017-03-13 23:00:...|     RebacaInc|Automation of ent...|            0|             0|                 null|               null|                   null|           []|[googlecloud, orc...|
|2017-03-13 23:00:...| CMMIAppraiser|Get your Professi...|            0|             0|                 null|               null|                   null|           []|        [broadsword]|
|2017-03-13 23:00:...|       usxtron|and when the syst...|            0|             0|                 null|               null|                   null|           []|             [cloud]|
|2017-03-13 23:00:...|     SearchCRM|.#Automation and ...|            0|             0|                 null|               null|                   null|           []|[automation, chat...|
|2017-03-13 23:00:...|  careers_tech|SummitSync - Juni...|            0|             0|                 null|               null|                   null|           []|[junior, cloud, e...|
|2017-03-13 23:00:...|    roy_lauzon|Both the #DevOps ...|            0|             0|                 null|               null|                   null|           []|[devops, cybersec...|
|2017-03-13 23:00:...|      nosqlgal|Introducing #Couc...|            0|             0|                 null|               null|                   null|           []|  [couchbase, nosql]|
|2017-03-13 23:00:...|  jordanfarrer|Ran into a weird ...|            0|             0|                 null|               null|                   null|           []|            [docker]|
|2017-03-13 23:00:...|    BGrieveSTL|#purestorage + #a...|            0|             0|                 null|               null|                   null|           []|[purestorage, azure]|
|2017-03-13 23:00:...| Hotelbeds_API|"How to Quickly O...|            0|             0|                 null|               null|                   null|           []|       [api, feedly]|
|2017-03-13 23:00:...|  ScalaWilliam|Principles behind...|            0|             0|                 null|               null|                   null|           []|             [agile]|
|2017-03-13 23:00:...|   PRFT_Oracle|[On-Demand Webina...|            0|             0|                 null|               null|                   null|           []|             [cloud]|
|2017-03-13 23:00:...|    PDF_filler|Now you can #secu...|            0|             0|                 null|               null|                   null|           []|[secure, data, ap...|
|2017-03-13 23:00:...|lgoncalves1979|10 Mistakes We Ma...|            0|             0|                 null|               null|                   null|           []|[coaching, scrumm...|
|2017-03-13 23:00:...|       Jelecos|Vanguard CIO: Why...|            0|             0|                 null|               null|                   null|           []|[microservices, cio]|
|2017-03-13 23:00:...|   DJGaryBaldy|Why bother with W...|            0|             0|                 null|               null|                   null|           []|        [automation]|
|2017-03-13 23:00:...|     1codeblog|Apigee Edge Produ...|            0|             0|                 null|               null|                   null|           []|[cloud, next17, g...|
|2017-03-13 23:00:...|     CloudRank|Why and when shou...|            0|             0|                 null|               null|                   null|           []|[machinelearning,...|
|2017-03-13 23:00:...|  forgeaheadio|5 essentials for ...|            0|             0|                 null|               null|                   null|           []|[hybrid, cloud, h...|
+--------------------+--------------+--------------------+-------------+--------------+---------------------+-------------------+-----------------------+-------------+--------------------+
only showing top 20 rows
0

3 Answers 3

32

One of the way is to first get the size of your array, and then filter on the rows which array size is 0. I have found the solution here How to convert empty arrays to nulls?.

import pyspark.sql.functions as F
df = df.withColumn("size", F.size(F.col(user_mentions)))
df_filtered = df.filter(F.col("size") >= 1)
Sign up to request clarification or add additional context in comments.

1 Comment

Or, in one step: df_filtered = df.filter(F.size('user_mentions') > 0)
6

array() creates an empty array that can be compared against.

df = spark.createDataFrame([
  ["ABC", ["a", "b"]],
  ["DEF", []],
  ["GHI", ["c"]],
  ["JKL", []]
], ["name", "user_mentions"])

import pyspark.sql.functions as F

df_with = df.filter(F.col("user_mentions")!=F.array())
df_without = df.filter(F.col("user_mentions")==F.array())

Comments

0

df[ df.user_mentions != F.array() ]

To see why this works, note that df.user_mentions != F.array() is a column object with boolean entries, so passing it to df filters df on the rows that have nonempty array in the user_mentions column.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.