26,927 questions
Best practices
0
votes
3
replies
63
views
Pushing down filters in RDBMS with Java Spark
I have been working as a Data Engineer and got this issue.
I came across a use case where I have a view(lets name it as inputView) which is created by reading data from some source.
Now somewhere ...
Advice
0
votes
6
replies
153
views
Pyspark SQL: How to do GROUP BY with specific WHERE condition
So I am doing some SQL aggregation transformations of a dataset and there is a certain condition that I would like to do, but not sure how.
Here is a basic code block:
le_test = spark.sql(""&...
0
votes
0
answers
83
views
How to Check if a Query Touches Data Files or just Uses Manifests and Metadata in Iceberg
I created a table as follows:
CREATE TABLE IF NOT EXISTS raw_data.civ (
date timestamp,
marketplace_id int,
... some more columns
)
USING ICEBERG
PARTITIONED BY (
marketplace_id,
...
3
votes
1
answer
103
views
How to collect multiple metrics with observe in PySpark without triggering multiple actions
I have a PySpark job that reads data from table a, performs some transformations and filters, and then writes the result to table b.
Here’s a simplified version of the code:
import pyspark.sql....
0
votes
0
answers
63
views
Unexpected Write Behavior when using MERGE INTO/INSERT INTO Iceberg Spark Queries
I am observing different write behaviors when executing queries on EMR Notebook (correct behavior) vs when using spark-submit to submit a spark application to EMR Cluster (incorrect behavior).
When I ...
0
votes
0
answers
44
views
Spark: VSAM File read issue with special character
We have a scenario to read a VSAM file directly along with a copy book to understand the column lengths, we were using COBRIX library as part of spark read.
However, we could the same is not properly ...
0
votes
0
answers
35
views
How to link Spark event log stages to PySpark code or query?
I'm analyzing Spark event logs and have already retrieved the SparkListenerStageSubmitted and SparkListenerTaskEnd events to collect metrics such as spill, skew ratio, memory, and CPU usage.
However, ...
0
votes
0
answers
59
views
Scala spark: Why does DataFrame.transform calling a transform hang?
I have a job on scala (v. 2.12.15) spark (v. 3.5.1) that works correctly and looks something like this:
import org.apache.spark.sql.DataFrame
...
val myDataFrame = myReadDataFunction(...)
....
0
votes
0
answers
53
views
Spatial join without Apache Sedona
currently I'm working in a specific version of Apache Spark (3.1.1) that cannot upgrade. Since that I can't use Apache Sedona and the version 1.3.1 is too slow. My problem is the following code that ...
1
vote
3
answers
96
views
How to pass array of structure as parameter to udf in spark 4
does anybody know what am I doing wrong? Following is reduced code snippet working in spark-3.x but doesn't work in spark-4.x.
In my usecase I need to pass complex data structure to udf (let's say ...
0
votes
1
answer
110
views
Problem reading the _last_checkpoint file from the _delta_log directory of a delta lake table on s3
I am trying to read the _delta_log folder of a delta lake table via spark to export some custom metrics. I have configured how to get some metrics from history and description but I have problem ...
1
vote
0
answers
135
views
Conversion of a pyspark DataFrame with a Variant column to pandas fails with an error
When I try to convert a pyspark DataFrame with a VariantType column to a pandas DataFrame, the conversion fails with an error 'NoneType' object is not iterable. Am I doing it incorrectly?
Sample code:
...
3
votes
0
answers
72
views
Cannot extend Spark UnaryExpression in Java
I am trying to write a custom decoder function in Java targeting Spark 4.0:
public class MyDataToCatalyst extends UnaryExpression implements NonSQLExpression, ExpectsInputTypes, Serializable {
//.....
0
votes
0
answers
69
views
AWS Glue/Spark performance issue
I am new to AWS Glue and I am facing performance issues with the following code:
spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")
# Define S3 path with wildcard to match ...
1
vote
0
answers
131
views
How to read data from MongoDB collection using SparkSession into a Spark DataFrameReader?
Spark reading data from MongoDB(ver 7.0) and DocumentDB(ver 4.0) and loading into the spark DataFrameReader is failing when DataFrameReader.isEmpty() method is called . SparkSession and ...
2
votes
1
answer
76
views
enforcing nullability on json schema
I want to read in an ndjson file and apply a pre-defined schema to it (not allow spark to infer the schema). In general terms, this works fine. But we are unable to define certain elements as required....
0
votes
1
answer
44
views
Azure Synapse SQL Merge is not updating records, instead of that it inserts matching records using spark.sql
I have the below code where the Id is a 36 character GUID. The code gets executed but when a matching record is found , instead of updating it inserts the entire records again. What could be the root ...
2
votes
2
answers
177
views
Regex Expression to avoid space and other character
I am working with a Transformation logic in Databricks. Basically there is field called rip_fw which has values like "LC.JO.P051S1-1250" , "LF.030707 23:54-496" like this , as per ...
0
votes
1
answer
104
views
How to dynamically generate SQL to Update/Insert a table in Azure Databricks Notebook
Its a sort of CDC ( Change Data Capture ) scenario in which I am trying to compare new data (in tblNewData) with old data (in tblOldData), and logging the changes into a log table (tblExpectedDataLog) ...
2
votes
1
answer
64
views
Spark hint changes query results
Should adding a hint to a spark query ever return different results? I'm able to reproduce a production issue with the code below. It's worth noting that the last statement is just a copy/pasted ...
3
votes
1
answer
61
views
Why is this task taking so long even for a graph with merely 5-6 nodes and edges?
I am trying to implement the Parallelized BFS algorithm using Pyspark.
I am following the material in CS246,
What exactly in my implementation is making this thing take so long?
Pardon me I am just a ...
0
votes
0
answers
69
views
Parse error when using ALTER TABLE CREATE BRANCH to create iceberg branch in spark
I create a spark client-mode (Spark Connect) session with:
def get_spark_session(master_url: str) -> SparkSession:
return (
SparkSession.builder.remote(master_url)
.config(
...
1
vote
1
answer
85
views
Spark streaming failing intermittently with llegalStateException: Found no SST files
I'm encountering the following error while trying to upload a RocksDB checkpoint in Databricks:
java.lang.IllegalStateException: Found no SST files during uploading RocksDB checkpoint version 498 with ...
1
vote
0
answers
126
views
IllegalThreadStateException in Spark 4.0 when Adaptive Query Execution (AQE) is active and when running many queries in the same Spark instance
Upon upgrading to Spark 4, we get (deterministically) an IllegalThreadStateException in long series of queries including spark.ml or Delta Lake (e.g. in estimator.fit()) in the same long-running Spark ...
0
votes
1
answer
60
views
How can I calculate the timestamp difference based on status using spark Dataframes?
I am trying to calculate the timestamp difference on cumulative rows based on ID and status columns
Example dataframe:
ID
TIMESTAMP
STATUS
V1
2023-06-18 13:00:00
1
V1
2023-06-18 13:01:00
1
V1
2023-06-...
0
votes
3
answers
84
views
Can I Using .cache() in the Middle of a Spark SQL Chain?
The code example here:
val results = readFromMyDB("url")
.cache()
.flatMap(flatMyResults)
.repartition()
results.write.parquet(outputS3Path)
val total = ...
0
votes
0
answers
62
views
Get two different nodes to access and distribute the same SQL table in Apache spark?
I have the following code to test. I created a table on worker 1. Then I tried to read the table on worker 2 and it got TABLE_OR_VIEW_NOT_FOUND. Worker 2 is in the some computer as Master.
I ran the ...
0
votes
0
answers
249
views
Is Apache Spark 3.5.5 compatible with Hive 4.0.1? If not, which Hive version is fully compatible?
I'm currently using Apache Spark 3.5.5 (Scala 2.12, built for Hadoop 3) and trying to integrate it with Apache Hive 4.0.1 on a YARN-based Hadoop cluster but got few issues on this.
However, I'm facing ...
3
votes
0
answers
332
views
Overwrite is failing with "pyspark.errors.exceptions.captured.AnalysisException: Table does not support truncate in batch mode"
I upgraded PySpark from 3.5.5 to 3.5.6, and now all unit tests with an overwrite operation are failing with this error:
pyspark.errors.exceptions.captured.AnalysisException: Table does not support ...
1
vote
2
answers
42
views
How to merge two dataframes based on matching rows using spark scala
I have two dataframes like below, and I need to merge them based on matching rows.
Dataframe 1
ID
status
V1
Low
V2
Low
V3
Low
Dataframe 2
ID
status
V1
High
V2
High
V6
High
Expected dataframe like ...
1
vote
2
answers
111
views
Consecutive Activity using Analytical Function
I have a table containing the fields: user_ip, datetime, year, month, day, hour, tag_id, country, device_type, brand. I need to check if given a IP was active for a continuous period of 4 or more ...
3
votes
2
answers
116
views
Consecutive User Details in Simple Approach
I have a table containing the fields: ip, datetime, year, month, day, country, region, seen_time. A single IP can have multiple records within the same hour. I need to identify IPs that were ...
0
votes
0
answers
101
views
Poor performance with PySpark write to parquet
I can make PySpark "work" no problem, but know very little and am very confused by documentation on performance. I have some source data partitioned by date, read it directory by directory (...
0
votes
1
answer
90
views
sizeInBytes in ShuffleQueryStage node in Spark Query plan
What is sizeInBytes in ShuffleQueryStage node in a Spark execution plan?
I have a stage to read from ext. source with Shuffle Write = 1.8 TiB
AQE ShuffleQueryStage sizeInBytes shows 7.4 TiB
And the ...
1
vote
1
answer
103
views
How to properly recalculate Spark DataFrame statistics after checkpoint?
Here is minimal example using default data in DataBricks (Spark 3.4):
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.types._
sc....
0
votes
0
answers
97
views
Unable to create Parquet converter for decimal type "decimal(38,18)" whose Parquet type is optional double cost
Unable to create Parquet converter for decimal type "decimal(38,18)" whose Parquet type is optional double cost. Parquet DECIMAL type can only be backed by INT32, INT64, ...
1
vote
2
answers
148
views
How can I do a join on each array element in a column and replace with something from the join table?
I have a table, base_df, with many columns, one of which is an array column:
Id
FruitNames
Col1
Col2
Col3
...
Col99
1
["apple", "banana", "orange"]
...
...
...
...
...
2
[...
0
votes
0
answers
30
views
spark.sql insert overwrite on existing partition not updating hive metastore partition transient_lastddltime and column_stats
I have a partitioned hive external table as below
scala> spark.sql("describe extended db1.table1").show(100,false)
+----------------------------+-------------------------------------------...
0
votes
0
answers
84
views
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer
I am trying to deploy a scala application which uses structures streaming on a standalone distributed Spark cluster using the spark-submit command and I get the following error:
Exception in thread &...
0
votes
1
answer
193
views
How to Update Identity Column for a Databricks Table
I have a databricks table with the below DDL:
CREATE TABLE default.Test (
ID BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1),
StopFromDateTime TIMESTAMP,
StopToDateTime ...
1
vote
0
answers
63
views
Why does a subquery without matching column names still work in Spark SQL? [duplicate]
I have the following two datasets in Spark SQL:
person view:
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "...
0
votes
0
answers
49
views
Disable printing info when running spark-sql
I'm running sql commands with spark-sql. I have put rootLogger.level = off in log4j2.properties file, but I'm still getting some info messages:
Spark Web UI available at http://computer:4040
Spark ...
1
vote
0
answers
131
views
Pyspark writing dataframe to oracle database table using JDBC
I am new to Pyspark and having few clarifications on writing dataframe to oracle database table using JDBC.
As part of the requirement I need to read the data from Oracle table and perform ...
0
votes
1
answer
50
views
Getting NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT while using Hudi
Getting the error when I try to execute spark sql.
Caused by: org.apache.spark.sql.AnalysisException: [NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT] CREATE Hive TABLE (AS SELECT)
is not supported, if ...
0
votes
0
answers
56
views
Spark with availableNow trigger doesn't archive sources
I use Spark to read JSON files that appear in a folder everyday with path pattern Yyyy/mm/dd to convert them into Iceberg format. Both folders JSON and Iceberg are in a s3 bucket on different paths.
...
1
vote
0
answers
126
views
Doesn't Delta Table data skipping leverage parquet file metadata?
I noticed that querying for the maximum value in a string timestamp column takes 30s with 30+GB of data scanned while querying an actual timestamp column takes 1s with 310MB scanned. Maybe these ...
0
votes
0
answers
62
views
Spark AQE and Skew Join configurations not being applied
I am experiencing data skew issues in spark, specifically during joins and window functions. I have tried many of the spark performance tuning configurations recommended but none appear to be working. ...
0
votes
0
answers
148
views
Why does ydata-profiling not detect missing values in PySpark DataFrame when using None?
I'm using ydata-profiling to generate profiling reports from a large PySpark DataFrame without converting it to Pandas (to avoid memory issues on large datasets).
Some columns contain the string "...
1
vote
0
answers
131
views
Creating an Iceberg table with a geometry column with Sedona
I'm trying to create an Iceberg table with a geometry column in this example:
import org.apache.sedona.sql.utils.SedonaSQLRegistrator
SedonaSQLRegistrator.registerAll(spark)
val stmt = ""&...
0
votes
0
answers
22
views
How to sort time parser error when using EMR and pyspark script used as step
I am having this error when running a EMR with a notebook passing some dates:
An error occurred: An error occurred while calling o236.showString.
: org.apache.spark.SparkException: Job aborted due ...