0

I've been implementing a model with spark via a python class. I had some headaches calling class methods on a RDD defined in the class (see this question for details), but finally have made some progress. Here is an example of a class method I'm working with:

@staticmethod
def alpha_sampler(model):

    # all the variables in this block are numpy arrays or floats
    var_alpha = model.params.var_alpha
    var_rating = model.params.var_rating
    b = model.params.b
    beta = model.params.beta
    S = model.params.S
    Z = model.params.Z
    x_user_g0_inner_over_var = model.x_user_g0_inner_over_var

    def _alpha_sampler(row):
        feature_arr = row[2]
        var_alpha_given_rest = 1/((1/var_alpha) + feature_arr.shape[0]*(1/var_rating))
        i = row[0]
        items = row[1]
        O = row[3] - np.inner(feature_arr,b) - beta[items] - np.inner(S[i],Z[items])
        E_alpha_given_rest = var_alpha_given_rest * (x_user_g0_inner_over_var[i] + O.sum()/var_rating)
        return np.random.normal(E_alpha_given_rest,np.sqrt(var_alpha_given_rest))
    return _alpha_sampler

As you can see, to avoid serialization errors, I define a static method that returns a function that is in turn applied to each row of an RDD (model is the parent class here, and this is called from within another method of model):

# self.grp_user is the RDD
self.params.alpha = np.array(self.grp_user.map(model.alpha_sampler(self)).collect())

Now, this all works fine, but is not leveraging Spark's broadcast variables at all. Ideally, all the variables I'm passing in this function (var_alpha, beta, S, etc.) could first be broadcast to the workers, so that I wasn't redundantly passing them as part of the map. But I'm not sure how to do this.

My question, then, is the following: How/where should I make these into broadcast variables such that they are available to the alpha_sampler function that I map to grp_user? One thing I believe will work would be to make them globals, e.g.

global var_alpha
var_alpha = sc.broadcast(model.params.var_alpha)
# and similarly for the other variables...

Then the alpha_sampler could be much simplified:

@staticmethod
def _alpha_sampler(row):
    feature_arr = row[2]
    var_alpha_given_rest = 1/((1/var_alpha.value) + feature_arr.shape[0]*(1/var_rating.value))
    i = row[0]
    items = row[1]
    O = row[3] - np.inner(feature_arr,b.value) - beta.value[items] - np.inner(S.value[i],Z.value[items])
    E_alpha_given_rest = var_alpha_given_rest * (x_user_g0_inner_over_var.value[i] + O.sum()/var_rating.value)
    return np.random.normal(E_alpha_given_rest,np.sqrt(var_alpha_given_rest))

But of course this is really dangerous use of globals that I would like to avoid. Is there a better way that lets me leverage broadcast variables?

1 Answer 1

1

Assuming that variables you use here are simply scalars there is probably nothing to gain here from a performance perspective and using broadcast variables will make you code less readable but you can either pass a broadcast variable as an argument to the static method:

class model(object):
    @staticmethod
    def foobar(a_model, mu):
        y = a_model.y
        def _foobar(x):
            return x - mu.value + y 
        return _foobar

    def __init__(self, sc):
        self.sc = sc
        self.y = -1
        self.rdd = self.sc.parallelize([1, 2, 3])

    def get_mean(self):
        return self.rdd.mean()

    def run_foobar(self):
        mu = self.sc.broadcast(self.get_mean())
        self.data = self.rdd.map(model.foobar(self, mu))

or initialize it there:

class model(object):
    @staticmethod
    def foobar(a_model):
        mu = a_model.sc.broadcast(a_model.get_mean())
        y = a_model.y
        def _foobar(x):
            return x - mu.value + y 
        return _foobar

    def __init__(self, sc):
        self.sc = sc
        self.y = -1
        self.rdd = self.sc.parallelize([1, 2, 3])

    def get_mean(self):
        return self.rdd.mean()

    def run_foobar(self):
        self.data = self.rdd.map(model.foobar(self))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.