34

I have timestamp dataset which is in format of

And I have written a udf in pyspark to process this dataset and return as Map of key values. But am getting below error message.

Dataset:df_ts_list

+--------------------+
|             ts_list|
+--------------------+
|[1477411200, 1477...|
|[1477238400, 1477...|
|[1477022400, 1477...|
|[1477224000, 1477...|
|[1477256400, 1477...|
|[1477346400, 1476...|
|[1476986400, 1477...|
|[1477321200, 1477...|
|[1477306800, 1477...|
|[1477062000, 1477...|
|[1477249200, 1477...|
|[1477040400, 1477...|
|[1477090800, 1477...|
+--------------------+

Pyspark UDF:

>>> def on_time(ts_list):
...     import sys
...     import os
...     sys.path.append('/usr/lib/python2.7/dist-packages')
...     os.system("sudo apt-get install python-numpy -y")
...     import numpy as np
...     import datetime
...     import time
...     from datetime import timedelta
...     ts = np.array(ts_list)
...     if ts.size == 0:
...             count = 0
...             duration = 0
...             st = time.mktime(datetime.now())
...             ymd = str(datetime.fromtimestamp(st).date())
...     else:
...             ts.sort()
...             one_tag = []
...             start = float(ts[0])
...             for i in range(len(ts)):
...                     if i == (len(ts)) - 1:
...                             end = float(ts[i])
...                             a_round = [start, end]
...                             one_tag.append(a_round)
...                     else:
...                             diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i])))
...                             if abs(diff.total_seconds()) > 3600:
...                                     end = float(ts[i])
...                                     a_round = [start, end]
...                                     one_tag.append(a_round)
...                                     start = float(ts[i+1])
...             one_tag = [u for u in one_tag if u[1] - u[0] > 300]
...             count = int(len(one_tag))
...             duration = int(np.diff(one_tag).sum())
...             ymd = str(datetime.datetime.fromtimestamp(time.time()).date())
...     return {'count':count,'duration':duration, 'ymd':ymd}

Pyspark code:

>>> on_time=udf(on_time, MapType(StringType(),StringType()))
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show()

Error:

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main
    process()
  File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda>
    func = lambda _, it: map(mapper, it)
  File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda>
    mapper = lambda a: udf(*a)
  File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda>
    return lambda *a: f(*a)
  File "<stdin>", line 27, in on_time
  File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col)
AttributeError: 'NoneType' object has no attribute '_jvm'

Any help would be appreciated!

8 Answers 8

73

Mariusz answer didn't really help me. So if you like me found this because it's the only result on google and you're new to pyspark (and spark in general), here's what worked for me.

In my case I was getting that error because I was trying to execute pyspark code before the pyspark environment had been set up.

Making sure that pyspark was available and set up before doing calls dependent on pyspark.sql.functions fixed the issue for me.

Sign up to request clarification or add additional context in comments.

7 Comments

as an additional for others... i hit this error when my spark session had not been set up and I had defined a pyspark UDF using a decorator to add the schema. I normally set up spark session in my main, but in this case, when passing a complex schema needed to set it up at the top of script. thanks for the quick hint!
To add on to this, I got this error when using a spark function in a default value for a function, since those are evaluated at import time, not call-time. E.g. def func(is_test = lit(False))
Or, for others as stupid as me, you can encounter this error if you write pyspark code inside a pandas_udf (which is supposed to receive pandas code...)
@Mari all I can advise is that you cannot use pyspark functions before the spark context is initialized. In my case I was using them as a default arg value, but those are evaluated at import time, not runtime, so the spark context is not initialized. So I just changed it to None and checked inside the function.
@Mari I ran into this recently. If you want to take this construction, instead of assigning it as a variable, return it via a function. E.g. def is_not_empty(): return (col('var') != lit('')). Then use it as a function instead of a variable. This allows it to be instantiated when called (after spark context is initialized) rather than when the module is loaded.
|
25

The error message says that in 27th line of udf you are calling some pyspark sql functions. It is line with abs() so I suppose that somewhere above you call from pyspark.sql.functions import * and it overrides python's abs() function.

7 Comments

Is there a way to use the original abs() function without deleting the line from pyspark.sql.functions import * ?
@mufmuf sure, you can use __builtin__.abs as a pointer to python's function
or you can import pyspark.sql.functions as F and use F.function_name to call pyspark functions
Thank you so much, instead of abs, I had round.
I used math.fabs() instead
|
12

Just to be clear the problem a lot of guys are having is stemming from a single bad programming style. That is from blah import *

When you guys do

from pyspark.sql.functions import *

you overwrite a lot of python builtins functions. I strongly recommending importing functions like

import pyspark.sql.functions as f
# or 
import pyspark.sql.functions as pyf

1 Comment

This advice helped me correct my bad habit of using '*' when importing. Hope others would correct this too
4

Make sure that you are initializing the Spark context. For example:

spark = SparkSession \
    .builder \
    .appName("myApp") \
    .config("...") \
    .getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("com.mongodb.spark.sql").load()

Or as in

spark = SparkSession.builder.appName('company').getOrCreate()
sqlContext = SQLContext(spark)
productData = sqlContext.read.format("csv").option("delimiter", ",") \
    .option("quote", "\"").option("escape", "\"") \
    .option("header", "true").option("inferSchema", "true") \
    .load("/path/thecsv.csv")

1 Comment

You can use the SparkSession to get a Dataframe reader. Don't need the sql context
4

This exception also arises when the udf can not handle None values. For example the following code results in the same exception:

get_datetime = udf(lambda ts: to_timestamp(ts), DateType())
df = df.withColumn("datetime", get_datetime("ts"))

However this one does not:

get_datetime = udf(lambda ts: to_timestamp(ts) if ts is not None else None, DateType())
df = df.withColumn("datetime", get_datetime("ts"))

Comments

0

I faced the same issue, when I had python's round() function in my code and like @Mariusz said python's round() function got overridden.

The workaround for this was to use __builtin__.round() instead of round() like @Mariusz mentions in the comments in his answer.

1 Comment

Or you rename whatever other round function you've defined/imported
0

I found this error in my jupyter notebook. I added the below commands

import findspark
findspark.init()
sc = pyspark.SparkContext(appName="<add-your-name-here>")

and it worked.

its the same problem of spark context not ready or Stopped

1 Comment

You should be using a SparkSession, though. You can get the context from that, if needed
0

In all probability, this error occurs due to absence of spark session creation. So, spark session should be created.

spark = SparkSession.builder
       .master('yarn')
       .appName('a').getOrCreate()

This should resolve.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.