0

My problem is the following: I am parsing users interactions, each time an interaction is detected I emit ((user1,user2),((date1,0),(0,1))). The zero's are here for the direction of the interaction.

I cannot figure out why I cannot reduce this output with the following reduce function:

def myFunc2(x1,x2):
    return (min(x1[0][0],x2[0][0]),max(x1[0][0],x2[0][0]),min(x1[0][1],x2[0][1]),max(x1[0][1],x2[0][1]),x1[1][0]+x2[1][0],x1[1][1]+x2[1][1])

The output of my mapper (flatmap(myFunc)) is correct:

((7401899, 5678002), ((1403185440.0, 0), (1, 0)))
((82628194, 22251869), ((0, 1403185452.0), (0, 1)))
((2162276, 98056200), ((1403185451.0, 0), (1, 0)))
((0509420, 4827510), ((1403185449.0, 0), (1, 0)))
((7974923, 9235930), ((1403185450.0, 0), (1, 0)))
((250259, 6876774), ((0, 1403185450.0), (0, 1)))
((642369, 6876774), ((0, 1403185450.0), (0, 1)))
((82628194, 22251869), ((0, 1403185452.0), (0, 1)))
((2162276, 98056200), ((1403185451.0, 0), (1, 0)))

But running

lines.flatMap(myFunc) \
              .map(lambda x: (x[0], x[1])) \
              .reduceByKey(myFunc2)

Gives me the error

return (min(x1[0][0],x2[0][0]),max(x1[0][0],x2[0][0]),min(x1[0][1],x2[0][1]),max(x1[0][1],x2[0][1]),x1[1][0]+x2[1][0],x1[1][1]+x2[1][1])

TypeError: 'int' object has no attribute 'getitem'

I guess I am messing something up in my keys but I don't know why (I tried to recast the key to tuple as said here but same error)

Some idea ? Thanks a lot

0

1 Answer 1

1

Okay, I think the problem here is that you are indexing too deep in items that don't go as deep as you think.

Let's examine myFunc2

def myFunc2(x1,x2):
    return (min(x1[0][0],x2[0][0]),max(x1[0][0],x2[0][0]),min(x1[0][1],x2[0][1]),max(x1[0][1],x2[0][1]),x1[1][0]+x2[1][0],x1[1][1]+x2[1][1])

Given your question above, the input data will look like this:

((467401899, 485678002), ((1403185440.0, 0), (1, 0)))

Let's go ahead and assign that data row equal to a variable.

x = ((467401899, 485678002), ((1403185440.0, 0), (1, 0)))

What happens when we run x[0]? We get (467401899, 485678002). When we run x[1]? We get ((1403185440.0, 0), (1, 0)). That's what your map statement is doing, I believe.

Okay. That's clear.

In your function myFunc2, you have two parameters, x1 and x2. Those correspond to the variables above: x1 = x[0] = (467401899, 485678002) and x2 = x[1] = ((1403185440.0, 0), (1, 0))

Now let's examine just the first part of your return statement in your function.

min(x1[0][0], x2[0][0])

So, x1 = (467401899, 485678002). Cool. Now, what's x1[0]? Well, that's 467401899. Obviously. But wait! What's x1[0][0]? You're tryinig to get the zeroth index of the item at x1[0], but the item at x1[0] isn't a list or a tuple, it's just an int. And objects of <type 'int'> don't have a method called getitem.

To summarize: you're digging too deep into objects that are not nested that deeply. Think carefully about what you are passing into myFunc2, and how deep your objects are.

I think the first part of the return statement for myFunc2 should look like:

return min(x1[0], x2[0][0]). You can index deeper on x2 because x2 has more deeply nested tuples!


When I run the following, it works just fine:

a = sc.parallelize([((7401899, 5678002), ((1403185440.0, 0), (1, 0))),
((82628194, 22251869), ((0, 1403185452.0), (0, 1))),
((2162276, 98056200), ((1403185451.0, 0), (1, 0))),
((1509420, 4827510), ((1403185449.0, 0), (1, 0))),
((7974923, 9235930), ((1403185450.0, 0), (1, 0))),
((250259, 6876774), ((0, 1403185450.0), (0, 1))),
((642369, 6876774), ((0, 1403185450.0), (0, 1))),
((82628194, 22251869), ((0, 1403185452.0), (0, 1))),
((2162276, 98056200), ((1403185451.0, 0), (1, 0)))])

b = a.map(lambda x: (x[0], x[1])).reduceByKey(myFunc2)

b.collect()

[((1509420, 4827510), ((1403185449.0, 0), (1, 0))),
 ((2162276, 98056200), (1403185451.0, 1403185451.0, 0, 0, 2, 0)),
 ((7974923, 9235930), ((1403185450.0, 0), (1, 0))), 
 ((7401899, 5678002), ((1403185440.0, 0), (1, 0))), 
 ((642369, 6876774), ((0, 1403185450.0), (0, 1))), 
 ((82628194, 22251869), (0, 0, 1403185452.0, 1403185452.0, 0, 2)),
 ((250259, 6876774), ((0, 1403185450.0), (0, 1)))]
Sign up to request clarification or add additional context in comments.

4 Comments

I am not sure to follow you. myfunc2 is the function passed to the reducer ? x1 = x[0] being my key, being (467401899, 485678002). So, I am probably missing something here but, myfun2 takes two inputs, being two values with same key. So x1 and x2 should have the format of ((1403185440.0, 0), (1, 0)). I guess it's me missing a point here but I do not see why the key should be passed as input to the reducer (badly worded).
I think I may also be struggling to understand your example. Could you add more data like ((467401899, 485678002), ((1403185440.0, 0), (1, 0))) in your question? That way I could actually run the code and try to replicate your error.
Thanks again for your answer. I thought that the issue might be from the (unnecessary) use of the map after the flatmap but I get the same output before and after the map and the same error. I think that there is an error in my code for grouping records with the key (user_id_1,user_id_2) but I cannot figure out what am I doing wrong
@HorusH I've added new code above given your sample data.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.