0

I have a client sending me Snappy compressed hadoop sequence files for analysis. What I ultimately want to do is to put this data into a pandas df. The format looks like the following

>>> body_read

b'SEQ\x06!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable\x01\x01)org.apache.hadoop.io.compress.SnappyCodec\x00\x00\x00\x00\x0b\xabZ\x92f\xceuAf\xa1\x9a\xf0-\x1d2D\xff\xff\xff\xff\x0b\xabZ\x92f\xceuAf\xa1\x9a\xf0-\x1d2D\x8e\x05^N\x00\x00\x05^\x00\x00\x00F\xde\n\x00\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00\xfe\x01\x00r\x01\x00\x04\x00\x00\x00\x00\x8e\x08\x92\x00\x00\x10\x1a\x00\x00\x08\x8a\x9a h\x8e\x02\xd6\x8e\x02\xc6\x8e\x02T\x8e\x02\xd4\x8e\x02\xdb\x8e\x02\xd8\x8e\x02\xdf\x8e\x02\xd9\x8e\x02\xd3\x05\x0c0\xd9\x8e\x02\xcc\x8e\x02\xfc\x8e\x02\xe8\x8e\x02\xd0\x05!\x00\xdb\x05\x06\x0c\xd1\x8e\x02\xd7\x05\'\x04\xde\x8e\x01\x03\x18\xce\x8e\x02\xe7\x8e\x02\xd2\x05<\x00\xd4\x05\x1b\x04\xdc\x8e

I think what I need to do is first decompress the file using python-snappy, and then read the sequence files. I'm not sure what the best method is for reading hadoop sequence files in python. I am also getting and error when trying to decompress this file

>>> body_decomp = snappy.uncompress(body_read)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/ec2-user/anaconda3/lib/python3.5/site-packages/snappy/snappy.py", line 91, in uncompress
    return _uncompress(data)
snappy.UncompressError: Error while decompressing: invalid input

What do I need to do in order to read these files?

4
  • Try Pyspark...? Commented Feb 6, 2018 at 1:50
  • Can I use Pyspark even if I don't use Apache Spark? I don't want to put these files into a Hadoop cluster, all I want to do is convert them to a pandas df and possibly load into MySQL Commented Feb 6, 2018 at 20:21
  • You don't need a Hadoop cluster. Spark has nothing to do with HDFS or YARN. It just includes libraries to read sequence files (and convert them to a Pandas dataframe, as well as insert into a database). PySpark requires the Apache Spark libraries, yes. See stackoverflow.com/a/29498104/2308683 Commented Feb 6, 2018 at 21:28
  • Thanks! I was able to follow that guidance and get pyspark working. One weird discrepancy that is still stumping me is that in my spark-shell, my file is automatically decompressed: Commented Feb 6, 2018 at 23:03

1 Answer 1

1

Thanks to @cricket_007's helpful comments and some more digging, I was able to solve this. PySpark will accomplish the tasks that I need, and can read Hadoop Sequence Files directly from S3 locations, which is great. The tricky part was setting up PySpark, and I found this guide really helpful once I had downloaded Apache Spark - https://markobigdata.com/2017/04/23/manipulating-files-from-s3-with-apache-spark/.

One weird discrepancy I have though is that my spark-shell automatically decompresses the file:

scala> val fRDD = sc.textFile("s3a://bucket/file_path")
fRDD: org.apache.spark.rdd.RDD[String] = s3a://bucket/file_path MapPartitionsRDD[5] at textFile at <console>:24

scala> fRDD.first()
res4: String = SEQ?!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable??)org.apache.hadoop.io.compress.SnappyCodec???? �Z�f�uAf���- 2D���� �Z�f�uAf���- 2D�?^N???^???F�

but PySpark does not:

>>> from pyspark import SparkContext, SparkConf
>>> sc = SparkContext()
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/02/06 23:00:55 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

>>> fRDD = sc.textFile("s3a://bucket/file_path")
>>> fRDD.first()
'SEQ\x06!org.apache.hadoop.io.NullWritable"org.apache.hadoop.io.BytesWritable\x01\x01)org.apache.hadoop.io.compress.SnappyCodec\x00\x00\x00\x00\x0b�Z�f�uAf���-\x1d2D����\x0b�Z�f�uAf���-\x1d2D�\x05^N\x00\x00\x05^\x00\x00\x00F�'

Any ideas how I get PySpark to do this?

EDIT: Thanks again to cricket_007, I started using .sequenceFile() instead. This was initially giving me the error

    >>> textFile = sc.sequenceFile("s3a://bucket/file_path")
18/02/07 18:13:12 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: native snappy library not available: this version of libhadoop was built without snappy support.

I was able to solve that issue by following this guide - https://community.hortonworks.com/questions/18903/this-version-of-libhadoop-was-built-without-snappy.html. I am now able to read the sequence file and decompile the protobuf message

>>> seqs = sc.sequenceFile("s3a://bucket/file_path").values()
>>> feed = protobuf_message_pb2.feed()
>>> row = bytes(seqs.first())
>>> feed.ParseFromString(row)
>>> feed.user_id_64
3909139888943208259

This is exactly what I needed. What I want to do now is find an efficient way to decompile the entire sequenceFile and turn it into a DataFrame, rather than doing it one record at a time as I have done above.

Sign up to request clarification or add additional context in comments.

3 Comments

I think that output is just the difference in the encoding of Scala vs Python REPLs. The string itself is not "decompressed"
Also, don't use textFile. There is a method specifically for SequenceFiles spark.apache.org/docs/latest/api/python/…
Thanks cricket_007, that was a good idea. I had to follow this solution to get snappy decompression working community.hortonworks.com/questions/18903/…, but I'm all good now. My last task is to find an efficient way to decompile the protobuf messages and turn the results into a dataframe

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.