Skip to main content
Stack Overflow for Teams is now Stack Internal: See how we’re powering the human intelligence layer of enterprise AI. Read more >
Filter by
Sorted by
Tagged with
Best practices
0 votes
3 replies
63 views

Pushing down filters in RDBMS with Java Spark

I have been working as a Data Engineer and got this issue. I came across a use case where I have a view(lets name it as inputView) which is created by reading data from some source. Now somewhere ...
Parth Sarthi Roy's user avatar
Advice
0 votes
6 replies
153 views

Pyspark SQL: How to do GROUP BY with specific WHERE condition

So I am doing some SQL aggregation transformations of a dataset and there is a certain condition that I would like to do, but not sure how. Here is a basic code block: le_test = spark.sql(""&...
BeaverFever's user avatar
0 votes
0 answers
83 views

How to Check if a Query Touches Data Files or just Uses Manifests and Metadata in Iceberg

I created a table as follows: CREATE TABLE IF NOT EXISTS raw_data.civ ( date timestamp, marketplace_id int, ... some more columns ) USING ICEBERG PARTITIONED BY ( marketplace_id, ...
shiva's user avatar
  • 2,781
3 votes
1 answer
103 views

How to collect multiple metrics with observe in PySpark without triggering multiple actions

I have a PySpark job that reads data from table a, performs some transformations and filters, and then writes the result to table b. Here’s a simplified version of the code: import pyspark.sql....
עומר אמזלג's user avatar
0 votes
0 answers
63 views

Unexpected Write Behavior when using MERGE INTO/INSERT INTO Iceberg Spark Queries

I am observing different write behaviors when executing queries on EMR Notebook (correct behavior) vs when using spark-submit to submit a spark application to EMR Cluster (incorrect behavior). When I ...
shiva's user avatar
  • 2,781
0 votes
0 answers
44 views

Spark: VSAM File read issue with special character

We have a scenario to read a VSAM file directly along with a copy book to understand the column lengths, we were using COBRIX library as part of spark read. However, we could the same is not properly ...
Rocky1989's user avatar
  • 409
0 votes
0 answers
35 views

How to link Spark event log stages to PySpark code or query?

I'm analyzing Spark event logs and have already retrieved the SparkListenerStageSubmitted and SparkListenerTaskEnd events to collect metrics such as spill, skew ratio, memory, and CPU usage. However, ...
Carol C's user avatar
0 votes
0 answers
59 views

Scala spark: Why does DataFrame.transform calling a transform hang?

I have a job on scala (v. 2.12.15) spark (v. 3.5.1) that works correctly and looks something like this: import org.apache.spark.sql.DataFrame ... val myDataFrame = myReadDataFunction(...) ....
jd_sa's user avatar
  • 1
0 votes
0 answers
53 views

Spatial join without Apache Sedona

currently I'm working in a specific version of Apache Spark (3.1.1) that cannot upgrade. Since that I can't use Apache Sedona and the version 1.3.1 is too slow. My problem is the following code that ...
matdlara's user avatar
  • 149
1 vote
3 answers
96 views

How to pass array of structure as parameter to udf in spark 4

does anybody know what am I doing wrong? Following is reduced code snippet working in spark-3.x but doesn't work in spark-4.x. In my usecase I need to pass complex data structure to udf (let's say ...
Jiri Humpolicek's user avatar
0 votes
1 answer
110 views

Problem reading the _last_checkpoint file from the _delta_log directory of a delta lake table on s3

I am trying to read the _delta_log folder of a delta lake table via spark to export some custom metrics. I have configured how to get some metrics from history and description but I have problem ...
Melika Ghiasi's user avatar
1 vote
0 answers
135 views

Conversion of a pyspark DataFrame with a Variant column to pandas fails with an error

When I try to convert a pyspark DataFrame with a VariantType column to a pandas DataFrame, the conversion fails with an error 'NoneType' object is not iterable. Am I doing it incorrectly? Sample code: ...
Ghislain Fourny's user avatar
3 votes
0 answers
72 views

Cannot extend Spark UnaryExpression in Java

I am trying to write a custom decoder function in Java targeting Spark 4.0: public class MyDataToCatalyst extends UnaryExpression implements NonSQLExpression, ExpectsInputTypes, Serializable { //.....
Carsten's user avatar
  • 1,288
0 votes
0 answers
69 views

AWS Glue/Spark performance issue

I am new to AWS Glue and I am facing performance issues with the following code: spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN") # Define S3 path with wildcard to match ...
Alberto's user avatar
  • 15
1 vote
0 answers
131 views

How to read data from MongoDB collection using SparkSession into a Spark DataFrameReader?

Spark reading data from MongoDB(ver 7.0) and DocumentDB(ver 4.0) and loading into the spark DataFrameReader is failing when DataFrameReader.isEmpty() method is called . SparkSession and ...
Sandeep Reddy CONT's user avatar
2 votes
1 answer
76 views

enforcing nullability on json schema

I want to read in an ndjson file and apply a pre-defined schema to it (not allow spark to infer the schema). In general terms, this works fine. But we are unable to define certain elements as required....
Andrew's user avatar
  • 8,913
0 votes
1 answer
44 views

Azure Synapse SQL Merge is not updating records, instead of that it inserts matching records using spark.sql

I have the below code where the Id is a 36 character GUID. The code gets executed but when a matching record is found , instead of updating it inserts the entire records again. What could be the root ...
Sandeep T's user avatar
  • 441
2 votes
2 answers
177 views

Regex Expression to avoid space and other character

I am working with a Transformation logic in Databricks. Basically there is field called rip_fw which has values like "LC.JO.P051S1-1250" , "LF.030707 23:54-496" like this , as per ...
sayan nandi's user avatar
0 votes
1 answer
104 views

How to dynamically generate SQL to Update/Insert a table in Azure Databricks Notebook

Its a sort of CDC ( Change Data Capture ) scenario in which I am trying to compare new data (in tblNewData) with old data (in tblOldData), and logging the changes into a log table (tblExpectedDataLog) ...
Aza's user avatar
  • 27
2 votes
1 answer
64 views

Spark hint changes query results

Should adding a hint to a spark query ever return different results? I'm able to reproduce a production issue with the code below. It's worth noting that the last statement is just a copy/pasted ...
Peter Connolly's user avatar
3 votes
1 answer
61 views

Why is this task taking so long even for a graph with merely 5-6 nodes and edges?

I am trying to implement the Parallelized BFS algorithm using Pyspark. I am following the material in CS246, What exactly in my implementation is making this thing take so long? Pardon me I am just a ...
Frenzy Ripper's user avatar
0 votes
0 answers
69 views

Parse error when using ALTER TABLE CREATE BRANCH to create iceberg branch in spark

I create a spark client-mode (Spark Connect) session with: def get_spark_session(master_url: str) -> SparkSession: return ( SparkSession.builder.remote(master_url) .config( ...
LuckyGambler's user avatar
1 vote
1 answer
85 views

Spark streaming failing intermittently with llegalStateException: Found no SST files

I'm encountering the following error while trying to upload a RocksDB checkpoint in Databricks: java.lang.IllegalStateException: Found no SST files during uploading RocksDB checkpoint version 498 with ...
Susmit Sarkar's user avatar
1 vote
0 answers
126 views

IllegalThreadStateException in Spark 4.0 when Adaptive Query Execution (AQE) is active and when running many queries in the same Spark instance

Upon upgrading to Spark 4, we get (deterministically) an IllegalThreadStateException in long series of queries including spark.ml or Delta Lake (e.g. in estimator.fit()) in the same long-running Spark ...
Ghislain Fourny's user avatar
0 votes
1 answer
60 views

How can I calculate the timestamp difference based on status using spark Dataframes?

I am trying to calculate the timestamp difference on cumulative rows based on ID and status columns Example dataframe: ID TIMESTAMP STATUS V1 2023-06-18 13:00:00 1 V1 2023-06-18 13:01:00 1 V1 2023-06-...
RMK's user avatar
  • 41
0 votes
3 answers
84 views

Can I Using .cache() in the Middle of a Spark SQL Chain?

The code example here: val results = readFromMyDB("url") .cache() .flatMap(flatMyResults) .repartition() results.write.parquet(outputS3Path) val total = ...
JoeYo's user avatar
  • 81
0 votes
0 answers
62 views

Get two different nodes to access and distribute the same SQL table in Apache spark?

I have the following code to test. I created a table on worker 1. Then I tried to read the table on worker 2 and it got TABLE_OR_VIEW_NOT_FOUND. Worker 2 is in the some computer as Master. I ran the ...
Rick C. Ferreira's user avatar
0 votes
0 answers
249 views

Is Apache Spark 3.5.5 compatible with Hive 4.0.1? If not, which Hive version is fully compatible?

I'm currently using Apache Spark 3.5.5 (Scala 2.12, built for Hadoop 3) and trying to integrate it with Apache Hive 4.0.1 on a YARN-based Hadoop cluster but got few issues on this. However, I'm facing ...
Sheikh Wasiu Al Hasib's user avatar
3 votes
0 answers
332 views

Overwrite is failing with "pyspark.errors.exceptions.captured.AnalysisException: Table does not support truncate in batch mode"

I upgraded PySpark from 3.5.5 to 3.5.6, and now all unit tests with an overwrite operation are failing with this error: pyspark.errors.exceptions.captured.AnalysisException: Table does not support ...
Nicholas Fiorentini's user avatar
1 vote
2 answers
42 views

How to merge two dataframes based on matching rows using spark scala

I have two dataframes like below, and I need to merge them based on matching rows. Dataframe 1 ID status V1 Low V2 Low V3 Low Dataframe 2 ID status V1 High V2 High V6 High Expected dataframe like ...
RMK's user avatar
  • 41
1 vote
2 answers
111 views

Consecutive Activity using Analytical Function

I have a table containing the fields: user_ip, datetime, year, month, day, hour, tag_id, country, device_type, brand. I need to check if given a IP was active for a continuous period of 4 or more ...
user16798185's user avatar
3 votes
2 answers
116 views

Consecutive User Details in Simple Approach

I have a table containing the fields: ip, datetime, year, month, day, country, region, seen_time. A single IP can have multiple records within the same hour. I need to identify IPs that were ...
user16798185's user avatar
0 votes
0 answers
101 views

Poor performance with PySpark write to parquet

I can make PySpark "work" no problem, but know very little and am very confused by documentation on performance. I have some source data partitioned by date, read it directory by directory (...
mateoc15's user avatar
  • 680
0 votes
1 answer
90 views

sizeInBytes in ShuffleQueryStage node in Spark Query plan

What is sizeInBytes in ShuffleQueryStage node in a Spark execution plan? I have a stage to read from ext. source with Shuffle Write = 1.8 TiB AQE ShuffleQueryStage sizeInBytes shows 7.4 TiB And the ...
a.k's user avatar
  • 1
1 vote
1 answer
103 views

How to properly recalculate Spark DataFrame statistics after checkpoint?

Here is minimal example using default data in DataBricks (Spark 3.4): import org.apache.spark.sql.functions.col import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.sql.types._ sc....
Igor Railean's user avatar
0 votes
0 answers
97 views

Unable to create Parquet converter for decimal type "decimal(38,18)" whose Parquet type is optional double cost

Unable to create Parquet converter for decimal type "decimal(38,18)" whose Parquet type is optional double cost. Parquet DECIMAL type can only be backed by INT32, INT64, ...
sopL's user avatar
  • 61
1 vote
2 answers
148 views

How can I do a join on each array element in a column and replace with something from the join table?

I have a table, base_df, with many columns, one of which is an array column: Id FruitNames Col1 Col2 Col3 ... Col99 1 ["apple", "banana", "orange"] ... ... ... ... ... 2 [...
wkeithvan's user avatar
  • 2,215
0 votes
0 answers
30 views

spark.sql insert overwrite on existing partition not updating hive metastore partition transient_lastddltime and column_stats

I have a partitioned hive external table as below scala> spark.sql("describe extended db1.table1").show(100,false) +----------------------------+-------------------------------------------...
Pradeep's user avatar
0 votes
0 answers
84 views

Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/serialization/ByteArraySerializer

I am trying to deploy a scala application which uses structures streaming on a standalone distributed Spark cluster using the spark-submit command and I get the following error: Exception in thread &...
Maria's user avatar
  • 1
0 votes
1 answer
193 views

How to Update Identity Column for a Databricks Table

I have a databricks table with the below DDL: CREATE TABLE default.Test ( ID BIGINT GENERATED ALWAYS AS IDENTITY (START WITH 1 INCREMENT BY 1), StopFromDateTime TIMESTAMP, StopToDateTime ...
John Bryan's user avatar
1 vote
0 answers
63 views

Why does a subquery without matching column names still work in Spark SQL? [duplicate]

I have the following two datasets in Spark SQL: person view: person = spark.createDataFrame([ (0, "Bill Chambers", 0, [100]), (1, "Matei Zaharia", 1, [500, 250, 100]), (2, "...
DumbCoder's user avatar
  • 515
0 votes
0 answers
49 views

Disable printing info when running spark-sql

I'm running sql commands with spark-sql. I have put rootLogger.level = off in log4j2.properties file, but I'm still getting some info messages: Spark Web UI available at http://computer:4040 Spark ...
IGRACH's user avatar
  • 3,726
1 vote
0 answers
131 views

Pyspark writing dataframe to oracle database table using JDBC

I am new to Pyspark and having few clarifications on writing dataframe to oracle database table using JDBC. As part of the requirement I need to read the data from Oracle table and perform ...
Siva's user avatar
  • 11
0 votes
1 answer
50 views

Getting NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT while using Hudi

Getting the error when I try to execute spark sql. Caused by: org.apache.spark.sql.AnalysisException: [NOT_SUPPORTED_COMMAND_WITHOUT_HIVE_SUPPORT] CREATE Hive TABLE (AS SELECT) is not supported, if ...
Albert T. Wong's user avatar
0 votes
0 answers
56 views

Spark with availableNow trigger doesn't archive sources

I use Spark to read JSON files that appear in a folder everyday with path pattern Yyyy/mm/dd to convert them into Iceberg format. Both folders JSON and Iceberg are in a s3 bucket on different paths. ...
Alex's user avatar
  • 1,019
1 vote
0 answers
126 views

Doesn't Delta Table data skipping leverage parquet file metadata?

I noticed that querying for the maximum value in a string timestamp column takes 30s with 30+GB of data scanned while querying an actual timestamp column takes 1s with 310MB scanned. Maybe these ...
taksqth's user avatar
  • 73
0 votes
0 answers
62 views

Spark AQE and Skew Join configurations not being applied

I am experiencing data skew issues in spark, specifically during joins and window functions. I have tried many of the spark performance tuning configurations recommended but none appear to be working. ...
ifightfortheuserz's user avatar
0 votes
0 answers
148 views

Why does ydata-profiling not detect missing values in PySpark DataFrame when using None?

I'm using ydata-profiling to generate profiling reports from a large PySpark DataFrame without converting it to Pandas (to avoid memory issues on large datasets). Some columns contain the string "...
hexxetexxeh's user avatar
1 vote
0 answers
131 views

Creating an Iceberg table with a geometry column with Sedona

I'm trying to create an Iceberg table with a geometry column in this example: import org.apache.sedona.sql.utils.SedonaSQLRegistrator SedonaSQLRegistrator.registerAll(spark) val stmt = ""&...
Stefan Ziegler's user avatar
0 votes
0 answers
22 views

How to sort time parser error when using EMR and pyspark script used as step

I am having this error when running a EMR with a notebook passing some dates: An error occurred: An error occurred while calling o236.showString. : org.apache.spark.SparkException: Job aborted due ...
gcj's user avatar
  • 298

1
2 3 4 5
539