82,655 questions
0
votes
1
answer
137
views
How to set a new header in pyspark dataframe?
I'm loading a csv into a dataframe of type:
Column A
Column B
Cell 1
Cell 2
Cell 3
Cell 4
The header row is "nonsense", the actual header is in row 1, but I want to change the header names ...
0
votes
0
answers
42
views
Query Variant Data Type with Pyspark (Spark 4.0)
I use Spark 4.0 and have the following code
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import StringType
json_data = [
('{"auswahl&...
1
vote
1
answer
93
views
Changes to Delta Table Reflect in Existing Spark Dataframe
Platform: Azure Databricks
Compute: Serverless
I create a Spark data frame by selecting a subset of records from a delta table. I perform several transformations on the data set. Each transformation ...
-4
votes
1
answer
94
views
Exception in thread "main" java.io.FileNotFoundException but I have my file correctly in my S3 bucket
I'm trying to create a cluster on EMR on EC2 Clusters to process my jobs I have created a spark cluster with default VPC
I have checked my IAM and I've allowed "s3:CreateBucket", "s3:...
0
votes
1
answer
110
views
Problem reading the _last_checkpoint file from the _delta_log directory of a delta lake table on s3
I am trying to read the _delta_log folder of a delta lake table via spark to export some custom metrics. I have configured how to get some metrics from history and description but I have problem ...
1
vote
1
answer
290
views
PySpark ArrayType usage in transformWithStateInPandas state causes java.lang.IllegalArgumentException
I have the following python code that uses PySpark to mock a fraud detection system for credit cards:
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col, ...
1
vote
2
answers
112
views
Create unique index for each group PySpark
I am working with a relatively large dataframe (close to 1 billion rows) in PySpark. This dataframe is in "long" format, and I would like to have a unique index for each group defined by a ...
1
vote
0
answers
135
views
Conversion of a pyspark DataFrame with a Variant column to pandas fails with an error
When I try to convert a pyspark DataFrame with a VariantType column to a pandas DataFrame, the conversion fails with an error 'NoneType' object is not iterable. Am I doing it incorrectly?
Sample code:
...
3
votes
0
answers
72
views
Cannot extend Spark UnaryExpression in Java
I am trying to write a custom decoder function in Java targeting Spark 4.0:
public class MyDataToCatalyst extends UnaryExpression implements NonSQLExpression, ExpectsInputTypes, Serializable {
//.....
0
votes
0
answers
71
views
How to package a PySpark + Delta Lake script into an EXE with PyInstaller
I’m trying to convert my PySpark script into an executable(.exe) file using PyInstaller.
The script runs fine in Python, but after converting to an EXE and executing it, I get the following error:
'...
0
votes
0
answers
69
views
AWS Glue/Spark performance issue
I am new to AWS Glue and I am facing performance issues with the following code:
spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN")
# Define S3 path with wildcard to match ...
0
votes
0
answers
133
views
spark 4.0 do not support hive 4.0.0
while I comiple spark 4.0 with hive 4.0.0, then I run into this issue. This looks like spark 4.0 is not compatable with hive 4.0.0. But the doc suggest 4.0.0 as one of the support. So I don't know ...
2
votes
1
answer
95
views
PySpark program stuck at adding broadcast piece
I'm trying to write a PySpark program that filters for records in a very large dataframe (1-2B records) that matches some conditions on another smaller reference dataframe. This is done using a left ...
0
votes
1
answer
83
views
Unable to run trino for iceberg table: Invalid value 'hadoop' for type CatalogType
I have generated iceberg table with spark java program. Now I want to access it via trino.
My docker compose is:
version: '3.8'
services:
trino:
image: trinodb/trino:latest
container_name: ...
0
votes
0
answers
124
views
Which version of source delta table table currently being processed by spark structured streaming?
I want to know/monitor which version of the delta table is currently being processed, especially when the stream is started with a startingVersion.
My understanding is when that option is chosen, the ...
3
votes
0
answers
149
views
INSERT into Iceberg table fails: Spark 3.4 on Kubernetes (Spark Operator) + Iceberg REST (Lakekeeper) + Minio
Hey I have a problem with inserting into an Iceberg table hosted on Minio via a Spark executor, here's more context:
Platform: Spark 3.4.0 on Kubernetes via Spark Operator 1.1.28
Catalog: Lakekeeper (...
0
votes
0
answers
87
views
Spark Scala: JSON Array Column Not Persisting to BigQuery Table
I'm hoping to get some help with an issue I'm having moving data from a JSON file to a BigQuery table using Spark.
The problem, in a nutshell:
Goal: Load a JSON file into a Spark DataFrame and persist ...
1
vote
0
answers
130
views
How to read data from MongoDB collection using SparkSession into a Spark DataFrameReader?
Spark reading data from MongoDB(ver 7.0) and DocumentDB(ver 4.0) and loading into the spark DataFrameReader is failing when DataFrameReader.isEmpty() method is called . SparkSession and ...
0
votes
1
answer
68
views
Apache Spark Error: Cannot cast STRING into a StructType(StructField(subFieldA,StringType,true)) (value: BsonString{value='{}'})
I’m reading documents from DocDB (MongoDB) into Spark using the mongo-spark-connector.
One of the fields, fieldA, is a nested object. If fieldA is missing in a document, I replace it with an empty ...
1
vote
0
answers
110
views
A User Defined Aggregated Function fails to shape a Map<K, C> field, where C is an object coming from many rows
Edit: Progressing into this problem analysis, the management of the dataset nested values with tools that Spark offers since 3.1.x seems required.
→ It's my manner of filling nested objects hold by ...
0
votes
0
answers
169
views
How can I mark Databricks Job as Success Despite a Failed Autoloader Pipeline
Issue:
I have a Databricks Workflow/job running a pytest test that is being marked as "Failed" because one of the Autoloader pipelines within it fails, despite the overall job succeeding and ...
1
vote
1
answer
57
views
Difference between spark.sql.files.maxPartitionBytes and spark.files.maxPartitionBytes
I see that Spark 2.0.0 introduced a property spark.sql.files.maxPartitionBytes and it's subsequent sub-release (2.1.0) introduced spark.files.maxPartitionBytes
The Spark configuration link says in ...
2
votes
1
answer
76
views
enforcing nullability on json schema
I want to read in an ndjson file and apply a pre-defined schema to it (not allow spark to infer the schema). In general terms, this works fine. But we are unable to define certain elements as required....
0
votes
2
answers
105
views
Scala Spark decompress lz4 csv
Is there any way to decompress csv lz4 files by Spark?
I tried following approaches:
First:
sparkSession.read
.option("delimiter", ",")
.option("compression", "...
0
votes
1
answer
102
views
Cannot expire snapshot with retain last properies
I have 67 snapshot in a single table but when i use CALL
iceberg_catalog.system.expire_snapshots(
table => 'iceberg_catalog.default.test_7',
retain_last => 5
);
It doesn't delete any snapshot. ...
0
votes
0
answers
142
views
Athena is appending UTC to an iceberg timestamp results, why? how to fix it?
I am storing a simple datetime value (e.g 2025-01-24 13:58:14.000) from SQL to an iceberg table using glue catelog. I don't want anything with timezones. We only work in EST so all our datetimes don't ...
0
votes
3
answers
130
views
Apache Spark (3.5-4.0) Does df.count() load the data?
Over the past few months, I have been getting inconsistent benchmark results, and I want to confirm if my suspicion is correct. If I perform a series of transformations to create a Spark (3.5+/4) ...
0
votes
1
answer
44
views
Azure Synapse SQL Merge is not updating records, instead of that it inserts matching records using spark.sql
I have the below code where the Id is a 36 character GUID. The code gets executed but when a matching record is found , instead of updating it inserts the entire records again. What could be the root ...
3
votes
1
answer
80
views
How to use emptyValue option in pyspark while reading a csv file?
According to docs of csv options:
Property Name
Default
Meaning
emptyValue
(for reading), "" (for writing)
Sets the string representation of an empty value.
But it doesn't seem to work:
with ...
2
votes
1
answer
136
views
How to read empty string as well as NULL values from a csv file in pyspark? [duplicate]
Read spark csv with empty values without converting to null doesn't answer this one because:
That one's scala and this is pyspark.
Scala solution .option("nullValue", null) translates to ...
0
votes
1
answer
49
views
How to summarize Spark executor computing time
I have a spark job running with 260K tasks. I can check individual task executor computing time from Spark UI.
For the purpose of calculate resource usage of the whole job, how to summarize all ...
1
vote
2
answers
324
views
Multiple writestream to same delta table error
I am trying to run 2 spark structured streaming jobs in databricks. Each job is running on it's own job cluster and streaming data from different AWS Kinesis data stream but both the jobs are writing ...
0
votes
2
answers
96
views
Read files from storage account in Spark - Without using keys - Azure
I am doing local development. Wish to run spark job locally on my desktop, and access files in storage account from my spark job.
I don't have an option to use SAS tokens or access-keys for my storage ...
0
votes
1
answer
50
views
Reduce number of created snapshot with kafka streaming realtime
My system runs realtime data stream from kafka to hdfs integrated with iceberg using spark, how to reduce the number of snapshots created each time insert while still maintaining realtime capability (...
0
votes
1
answer
58
views
bigtable spark connector not reading cell timestamp with the data
I am using bigtable spark connector to read bigtable data in scala code. I want to read the cell timestamp with the data as well. But nowhere I can find how to do it. Can someone help me on this?
I ...
0
votes
1
answer
148
views
Unable to access ADLS Gen2 through Spark when manually running a notebook in Synapse Analytics Studio
I'm trying to read a csv file from my ADLS Gen2 container through Spark using a linked service but when I run my notebook (interactively) I'm getting java.nio.file.AccessDeniedException: Operation ...
0
votes
3
answers
92
views
PySpark groupBy().applyInPandas() fails with INVALID_PANDAS_UDF despite correct signature and schema for GROUPED_MAP
NOTE: This question has many related questions on StackOverFlow but I was unable to get my answer from any of them.
I'm attempting to parallelize Prophet time series model training across multiple ...
0
votes
0
answers
52
views
Delta live tables are producing different results
I am trying to perform aggregation on top a table. I applied same aggregation in dlt pipeline and pyspark query. But results are different.
My pyspark query looks like below: -
agg_df = filter_df....
0
votes
1
answer
216
views
Not able to run a spark code due to some issue on my local
I am facing the below error while running the given piece of spark code on my local Pycharm Community Edition and the spark session is not getting created.
I have set up all my local environment ...
0
votes
0
answers
100
views
Spark SQL Fails to Create Table from Volume-Based Data Source in Databricks
In Databricks, I uploaded my source data under a volume folder (e.g., /Volumes/my_catalog/my_schema/landing_source/). Now, I want to create a DataFrame or table using this volume path as the source.
1....
0
votes
2
answers
78
views
Why does this PySpark job fail when performing the final checkpoint?
This is tested on Spark 3.5.6:
from pyspark.sql import SparkSession
spark = (
SparkSession.builder
.appName("reproduce_union_checkpoint_example")
.config("spark.sql....
0
votes
1
answer
150
views
Apache Spark Connect is falling back to classic local mode by default
I'm trying to use Apache Spark Connect from PySpark (version 4.0.0) with the following config:
spark1 = SparkSession.builder \
.config("spark.api.mode", "connect") \
....
2
votes
1
answer
64
views
Spark hint changes query results
Should adding a hint to a spark query ever return different results? I'm able to reproduce a production issue with the code below. It's worth noting that the last statement is just a copy/pasted ...
0
votes
1
answer
65
views
Apache Spark 2.4.3 Java Job Stuck in "Active" State in Cluster Mode (Java web app with spark java API) — No Errors in Logs
I'm running an Apache Spark application (v2.4.3) in standalone cluster mode, developed using the Java Spark API, and deployed from a web application. My Spark job reads and processes 50+ CSV files, ...
0
votes
1
answer
58
views
Why the Encoder of map(toGroupement(row), Encoders.bean(Groupement.class)) left some null members when toGroupement(...) has already created them?
With Spark 3.5.6, I have some complex Row to convert to Java objects.
sirenGroupement
nomGroupement
natureJuridique
sirenCommuneSiege
codeCommuneSiege
nomCommuneSiege
codeRegionCommuneSiege
...
0
votes
0
answers
72
views
SparkRuntimeException Error while displaying Spark DataFrame
When i display my spark df, i run into a SparkRuntimeException. I am unsure of what it means or what I need to fix.
file_path = "/Volumes/filepath/file.xlsx"
df = spark.read \
.format(&...
0
votes
1
answer
88
views
How to fail Spark on wrong schema?
The data:
{"name": "name1", "id": 1}
{"name": "name2", "id": "1"}
The code:
val schema =
"""
| name ...
4
votes
2
answers
77
views
Parquet schema for Map type changes after spark2 to 3 migration
We are migrating from spark 2 to 3 in our workflows and one issue we notice is that the parquet files are having schema mismatch for MapType. Here's a simple example to illustrate the case
...
1
vote
1
answer
48
views
Limited in defining Spark time Window
I have a code of pyspark streaming. Which is in following:
parsed_df = df.selectExpr("CAST(value AS STRING) as message", "timestamp") \
.select(
from_json(col("...
0
votes
1
answer
39
views
Can't use pyspark.sql.functions with DataprocSparkSession
I have code like this on a Google BigQuery notebook
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from pyspark.sql.functions import col, unix_timestamp, expr, dayofweek, round
...