13

I'm attempting to create broadcast variables from within Python methods (trying to abstract some utility methods I'm creating that rely on distributed operations). However, I can't seem to access the broadcast variables from within the Spark workers.

Let's say I have this setup:

def main():
    sc = SparkContext()
    SomeMethod(sc)

def SomeMethod(sc):
    someValue = rand()
    V = sc.broadcast(someValue)
    A = sc.parallelize().map(worker)

def worker(element):
    element *= V.value  ### NameError: global name 'V' is not defined ###

However, if I instead eliminate the SomeMethod() middleman, it works fine.

def main():
    sc = SparkContext()
    someValue = rand()
    V = sc.broadcast(someValue)
    A = sc.parallelize().map(worker)

def worker(element):
    element *= V.value   # works just fine

I'd rather not have to put all my Spark logic in the main method, if I can. Is there any way to broadcast variables from within local functions and have them be globally visible to the Spark workers?

Alternatively, what would be a good design pattern for this kind of situation--e.g., I want to write a method specifically for Spark which is self-contained and performs a specific function I'd like to re-use?

1 Answer 1

20

I am not sure I completely understood the question but, if you need the V object inside the worker function you then you definitely should pass it as a parameter, otherwise the method is not really self-contained:

def worker(V, element):
    element *= V.value

Now in order to use it in map functions you need to use a partial, so that map only sees a 1 parameter function:

from functools import partial

def SomeMethod(sc):
    someValue = rand()
    V = sc.broadcast(someValue)
    A = sc.parallelize().map(partial(worker, V=V))
Sign up to request clarification or add additional context in comments.

3 Comments

Are there any performance implications in passing broadcast variables around like this? Say for instance I was relying on a broadcast variable in a map() function over tens of thousands (or more) of rows. something like def transform(row): return broadcast_variable.value[row[0]] which is then used in a map() function like rdd.map(transform)
Thanks this solution helped me avoid the usage of global for the broadcast variable. Please note that you should replace the order of the worker method parameters so that the 'element' parameter (which being populated by Spark framework) will be first. Otherwise it will not work.
@elyase thank you so much. You really saved my day. I am having a different issue with spark but your suggestion in my scenario worked like a charm. Big THANKS !!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.