2

While reading inconsistent schema written group of parquet files, we have issue on schema merging. On switching to manually specifying schema i get following error. Any pointer will be helpful.

java.lang.UnsupportedOperationException: Unimplemented type: StringType at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readDoubleBatch(VectorizedColumnReader.java:389) at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:195) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:230) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:137)

source_location = "{}/{}/{}/dt={}/{}/*_{}_{}.parquet".format(source_initial,
                                                                       bucket,
                                                                       source_prefix,
                                                                       date,
                                                                       source_file_pattern,
                                                                       date,
                                                                       source_file_pattern)
schema = StructType([
        StructField("Unnamed", StringType(), True),StructField("nanos", LongType(), True),StructField("book", LongType(), True),
        StructField("X_o", LongType(), True),StructField("Y_o", LongType(), True),StructField("Z_o", LongType(), True),
        StructField("Total", DoubleType(), True),StructField("P_v", DoubleType(), True),StructField("R_v", DoubleType(), True),
        StructField("S_v", DoubleType(), True),StructField("message_type", StringType(), True),StructField("symbol", StringType(), True),
        StructField("date", StringType(), True),StructField("__index_level_0__", StringType(), True)])

print("Querying data from source location {}".format(source_location))
df_raw = spark.read.format('parquet').load(source_location, schema = schema, inferSchema = False,mergeSchema="true")
df_raw = df_raw.filter(df_raw.nanos.between(open_nano,close_nano))
df_raw = df_raw.withColumn("timeInWindow_nano",(fun.ceil(df_raw.nanos/(window_nano))).cast("int"))
df_core = df_raw.groupBy("date","symbol","timeInWindow_nano").agg(fun.sum("Total").alias("Total"),
                                                     fun.sum("P_v").alias("P_v"),
                                                     fun.sum("R_v").alias("R_v"),
                                                     fun.sum("S_v").alias("S_v"))

df_core = df_core.withColumn("P_v",fun.when(df_core.Total < 0,0).otherwise(df_core.P_v))
df_core = df_core.withColumn("R_v",fun.when(df_core.Total < 0,0).otherwise(df_core.R_v))
df_core = df_core.withColumn("S_v",fun.when(df_core.Total < 0,0).otherwise(df_core.S_v))
df_core = df_core.withColumn("P_pct",df_core.P_v*df_core.Total)
df_core = df_core.withColumn("R_pct",df_core.R_v*df_core.Total)
df_core = df_core.withColumn("S_pct",df_core.S_v*df_core.Total)
2
  • What happens, if you do not specify schema manually (but leave mergeSchema option as it is)? Commented Sep 15, 2017 at 18:02
  • @Mariusz :As there are some file with out any content as case of no events, so auto inferring results in org.apache.spark.SparkException: Failed merging schema of file. Commented Sep 16, 2017 at 7:08

1 Answer 1

6

You cannot read parquet files in one load if schemas are not compatible. My advice would be to separate this to two loads and then union dataframes when you have them compatible. See example code:

schema1_df = spark.read.parquet('path/to/files/with/string/field.parquet')
schema2_df = spark.read.parquet('path/to/files/with/double/field.parquet')
df = schema2_df.unionAll(schema1.df.withColumn('invalid_col', schema2_df.invalid_col.cast('double')))
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.