3

Currently, I am new to spark and I am using python to write code in spark.

I am able to read from a parquet file and store the data in dataframe and as the temp table.

But it is not printing the results of the query executed. Please help in debugging this.

Code:

import os
os.environ['SPARK_HOME']="/opt/apps/spark-2.0.1-bin-hadoop2.7/"
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sc = SparkContext(master='local')
sqlCtx = SQLContext(sc)
df_tract_alpha = sqlCtx.read.parquet("tract_alpha.parquet")
print (df_tract_alpha.columns)
sqlCtx.registerDataFrameAsTable(df_tract_alpha, "table1")
nt = sqlCtx.sql("SELECT COUNT(*) AS pageCount FROM table1 WHERE pp_count>=500").collect()
n1 = nt[0].pageCount
print n1

This is giving result:

 Column< pageCount['pageCount'] > instead of printing the value

Here is the stack trace

17/06/12 12:54:27 WARN BlockManager: Putting block broadcast_2 failed due to an exception
17/06/12 12:54:27 WARN BlockManager: Block broadcast_2 could not be removed as it was not found on disk or in memory
Traceback (most recent call last):
  File "/home/vn/scripts/g_s_pipe/test_code_here.py", line 66, in 
    nt = sqlContext.sql("SELECT count(*) as pageCount FROM table1 WHERE pp_count>=500").collect()
  File "/opt/apps/spark-2.0.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py", line 310, in collect
    port = self._jdf.collectToPython()
  File "/opt/apps/spark-2.0.1-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/java_gateway.py", line 1133, in __call__
  File "/opt/apps/spark-2.0.1-bin-hadoop2.7/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/opt/apps/spark-2.0.1-bin-hadoop2.7/python/lib/py4j-0.10.3-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o30.collectToPython.
: java.lang.reflect.InaccessibleObjectException: Unable to make field transient java.lang.Object[] java.util.ArrayList.elementData accessible: module java.base does not "opens java.util" to unnamed module @55deb90
    at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:335)
    at java.base/java.lang.reflect.AccessibleObject.checkCanSetAccessible(AccessibleObject.java:278)
    at java.base/java.lang.reflect.Field.checkCanSetAccessible(Field.java:175)
    at java.base/java.lang.reflect.Field.setAccessible(Field.java:169)
    at org.apache.spark.util.SizeEstimator$$anonfun$getClassInfo$3.apply(SizeEstimator.scala:336)
    at org.apache.spark.util.SizeEstimator$$anonfun$getClassInfo$3.apply(SizeEstimator.scala:330)
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
    at org.apache.spark.util.SizeEstimator$.getClassInfo(SizeEstimator.scala:330)
    at org.apache.spark.util.SizeEstimator$.visitSingleObject(SizeEstimator.scala:222)
    at org.apache.spark.util.SizeEstimator$.org$apache$spark$util$SizeEstimator$$estimate(SizeEstimator.scala:201)
    at org.apache.spark.util.SizeEstimator$.estimate(SizeEstimator.scala:69)
    at org.apache.spark.util.collection.SizeTracker$class.takeSample(SizeTracker.scala:78)
    at org.apache.spark.util.collection.SizeTracker$class.afterUpdate(SizeTracker.scala:70)
    at org.apache.spark.util.collection.SizeTrackingVector.$plus$eq(SizeTrackingVector.scala:31)
    at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:214)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:926)
    at org.apache.spark.storage.BlockManager.putIterator(BlockManager.scala:702)
    at org.apache.spark.storage.BlockManager.putSingle(BlockManager.scala:1234)
    at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:103)
    at org.apache.spark.broadcast.TorrentBroadcast.(TorrentBroadcast.scala:86)
    at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:34)
    at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:56)
    at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1387)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReader(ParquetFileFormat.scala:329)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat.buildReaderWithPartitionValues(ParquetFileFormat.scala:281)
    at org.apache.spark.sql.execution.datasources.FileSourceStrategy$.apply(FileSourceStrategy.scala:112)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:60)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:61)
    at org.apache.spark.sql.execution.SparkPlanner.plan(SparkPlanner.scala:47)
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:51)
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:48)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:298)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:321)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:179)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:319)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:298)
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:51)
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1$$anonfun$apply$1.applyOrElse(SparkPlanner.scala:48)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:300)
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
    at org.apache.spark.sql.execution.SparkPlanner$$anonfun$plan$1.apply(SparkPlanner.scala:48)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:55)
    at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
    at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2523)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:547)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:237)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.base/java.lang.Thread.run(Thread.java:844)
8
  • This error is trigger on n1 = ntrac.show which is not in the code you have shared... Commented Jun 12, 2017 at 7:13
  • Also show doesn't return a value so I'm not sure why you are storing it in one... Commented Jun 12, 2017 at 7:15
  • sorry, i pasted the wrong stack, posting the right one... I was trying some other options instead of what is pasted in the question Commented Jun 12, 2017 at 7:19
  • Please check the updated trace now Commented Jun 12, 2017 at 7:26
  • I have updated my answer and it can't get more clear than that. As you see I'm using your actual code and ofc I have corrected the error with the collect method Commented Jun 12, 2017 at 8:39

1 Answer 1

4

The collect function take parentheses ()

nt = sqlCtx.sql("SELECT COUNT(*) AS pageCount FROM table1 WHERE pp_count>=500") \
           .collect()

Example :

Let's check our parquet data first:

$> parquet-tools head data.parquet/
a = 1
pp_count = 500

a = 2
pp_count = 750

a = 3
pp_count = 400

a = 4
pp_count = 600

a = 5
pp_count = 700

We will be running the following code :

sc = SparkContext(master='local')
sqlContext = SQLContext(sc)

df = sqlContext.read.parquet("data.parquet")
print("data columns : {} ".format(df.columns))

sqlContext.registerDataFrameAsTable(df, "table1")
results = sqlContext.sql("SELECT COUNT(*) AS pageCount FROM table1 WHERE pp_count>=500").collect()
df.show()
print("initial data count : {}".format(df.count()))
page_count = results[0].pageCount
print("page count : {}".format(page_count))

after submitting the application, here is the output :

data columns : ['a', 'pp_count']
+---+--------+
|  a|pp_count|
+---+--------+
|  1|     500|
|  2|     750|
|  3|     400|
|  4|     600|
|  5|     700|
+---+--------+

initial data count : 5
page count : 4
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.