Skip to main content
We’ve updated our Terms of Service. A new AI Addendum clarifies how Stack Overflow utilizes AI interactions.
Filter by
Sorted by
Tagged with
0 votes
1 answer
137 views

How to set a new header in pyspark dataframe?

I'm loading a csv into a dataframe of type: Column A Column B Cell 1 Cell 2 Cell 3 Cell 4 The header row is "nonsense", the actual header is in row 1, but I want to change the header names ...
Mitch's user avatar
  • 596
0 votes
0 answers
42 views

Query Variant Data Type with Pyspark (Spark 4.0)

I use Spark 4.0 and have the following code from pyspark.sql import SparkSession from pyspark.sql import functions as F from pyspark.sql.types import StringType json_data = [ ('{"auswahl&...
user3579222's user avatar
  • 1,512
1 vote
1 answer
93 views

Changes to Delta Table Reflect in Existing Spark Dataframe

Platform: Azure Databricks Compute: Serverless I create a Spark data frame by selecting a subset of records from a delta table. I perform several transformations on the data set. Each transformation ...
Adam's user avatar
  • 4,236
-4 votes
1 answer
94 views

Exception in thread "main" java.io.FileNotFoundException but I have my file correctly in my S3 bucket

I'm trying to create a cluster on EMR on EC2 Clusters to process my jobs I have created a spark cluster with default VPC I have checked my IAM and I've allowed "s3:CreateBucket", "s3:...
myts999's user avatar
  • 61
0 votes
1 answer
110 views

Problem reading the _last_checkpoint file from the _delta_log directory of a delta lake table on s3

I am trying to read the _delta_log folder of a delta lake table via spark to export some custom metrics. I have configured how to get some metrics from history and description but I have problem ...
Melika Ghiasi's user avatar
1 vote
1 answer
290 views

PySpark ArrayType usage in transformWithStateInPandas state causes java.lang.IllegalArgumentException

I have the following python code that uses PySpark to mock a fraud detection system for credit cards: from pyspark.sql import SparkSession from pyspark.sql.functions import from_json, col, ...
Marco Filippozzi's user avatar
1 vote
2 answers
112 views

Create unique index for each group PySpark

I am working with a relatively large dataframe (close to 1 billion rows) in PySpark. This dataframe is in "long" format, and I would like to have a unique index for each group defined by a ...
CopyOfA's user avatar
  • 931
1 vote
0 answers
135 views

Conversion of a pyspark DataFrame with a Variant column to pandas fails with an error

When I try to convert a pyspark DataFrame with a VariantType column to a pandas DataFrame, the conversion fails with an error 'NoneType' object is not iterable. Am I doing it incorrectly? Sample code: ...
Ghislain Fourny's user avatar
3 votes
0 answers
72 views

Cannot extend Spark UnaryExpression in Java

I am trying to write a custom decoder function in Java targeting Spark 4.0: public class MyDataToCatalyst extends UnaryExpression implements NonSQLExpression, ExpectsInputTypes, Serializable { //.....
Carsten's user avatar
  • 1,288
0 votes
0 answers
71 views

How to package a PySpark + Delta Lake script into an EXE with PyInstaller

I’m trying to convert my PySpark script into an executable(.exe) file using PyInstaller. The script runs fine in Python, but after converting to an EXE and executing it, I get the following error: '...
userr's user avatar
  • 11
0 votes
0 answers
69 views

AWS Glue/Spark performance issue

I am new to AWS Glue and I am facing performance issues with the following code: spark.conf.set("spark.sql.mapKeyDedupPolicy", "LAST_WIN") # Define S3 path with wildcard to match ...
Alberto's user avatar
  • 15
0 votes
0 answers
133 views

spark 4.0 do not support hive 4.0.0

while I comiple spark 4.0 with hive 4.0.0, then I run into this issue. This looks like spark 4.0 is not compatable with hive 4.0.0. But the doc suggest 4.0.0 as one of the support. So I don't know ...
jcyan's user avatar
  • 71
2 votes
1 answer
95 views

PySpark program stuck at adding broadcast piece

I'm trying to write a PySpark program that filters for records in a very large dataframe (1-2B records) that matches some conditions on another smaller reference dataframe. This is done using a left ...
Rayne's user avatar
  • 15.2k
0 votes
1 answer
83 views

Unable to run trino for iceberg table: Invalid value 'hadoop' for type CatalogType

I have generated iceberg table with spark java program. Now I want to access it via trino. My docker compose is: version: '3.8' services: trino: image: trinodb/trino:latest container_name: ...
Mandroid's user avatar
  • 7,768
0 votes
0 answers
124 views

Which version of source delta table table currently being processed by spark structured streaming?

I want to know/monitor which version of the delta table is currently being processed, especially when the stream is started with a startingVersion. My understanding is when that option is chosen, the ...
Saugat Mukherjee's user avatar
3 votes
0 answers
149 views

INSERT into Iceberg table fails: Spark 3.4 on Kubernetes (Spark Operator) + Iceberg REST (Lakekeeper) + Minio

Hey I have a problem with inserting into an Iceberg table hosted on Minio via a Spark executor, here's more context: Platform: Spark 3.4.0 on Kubernetes via Spark Operator 1.1.28 Catalog: Lakekeeper (...
vojitas's user avatar
  • 31
0 votes
0 answers
87 views

Spark Scala: JSON Array Column Not Persisting to BigQuery Table

I'm hoping to get some help with an issue I'm having moving data from a JSON file to a BigQuery table using Spark. The problem, in a nutshell: Goal: Load a JSON file into a Spark DataFrame and persist ...
user3348838's user avatar
1 vote
0 answers
130 views

How to read data from MongoDB collection using SparkSession into a Spark DataFrameReader?

Spark reading data from MongoDB(ver 7.0) and DocumentDB(ver 4.0) and loading into the spark DataFrameReader is failing when DataFrameReader.isEmpty() method is called . SparkSession and ...
Sandeep Reddy CONT's user avatar
0 votes
1 answer
68 views

Apache Spark Error: Cannot cast STRING into a StructType(StructField(subFieldA,StringType,true)) (value: BsonString{value='{}'})

I’m reading documents from DocDB (MongoDB) into Spark using the mongo-spark-connector. One of the fields, fieldA, is a nested object. If fieldA is missing in a document, I replace it with an empty ...
fancybear's user avatar
  • 119
1 vote
0 answers
110 views

A User Defined Aggregated Function fails to shape a Map<K, C> field, where C is an object coming from many rows

Edit: Progressing into this problem analysis, the management of the dataset nested values with tools that Spark offers since 3.1.x seems required. → It's my manner of filling nested objects hold by ...
Marc Le Bihan's user avatar
0 votes
0 answers
169 views

How can I mark Databricks Job as Success Despite a Failed Autoloader Pipeline

Issue: I have a Databricks Workflow/job running a pytest test that is being marked as "Failed" because one of the Autoloader pipelines within it fails, despite the overall job succeeding and ...
r_g_s_'s user avatar
  • 306
1 vote
1 answer
57 views

Difference between spark.sql.files.maxPartitionBytes and spark.files.maxPartitionBytes

I see that Spark 2.0.0 introduced a property spark.sql.files.maxPartitionBytes and it's subsequent sub-release (2.1.0) introduced spark.files.maxPartitionBytes The Spark configuration link says in ...
mandar's user avatar
  • 125
2 votes
1 answer
76 views

enforcing nullability on json schema

I want to read in an ndjson file and apply a pre-defined schema to it (not allow spark to infer the schema). In general terms, this works fine. But we are unable to define certain elements as required....
Andrew's user avatar
  • 8,913
0 votes
2 answers
105 views

Scala Spark decompress lz4 csv

Is there any way to decompress csv lz4 files by Spark? I tried following approaches: First: sparkSession.read .option("delimiter", ",") .option("compression", "...
Jelly's user avatar
  • 1,434
0 votes
1 answer
102 views

Cannot expire snapshot with retain last properies

I have 67 snapshot in a single table but when i use CALL iceberg_catalog.system.expire_snapshots( table => 'iceberg_catalog.default.test_7', retain_last => 5 ); It doesn't delete any snapshot. ...
Sơn Bùi's user avatar
0 votes
0 answers
142 views

Athena is appending UTC to an iceberg timestamp results, why? how to fix it?

I am storing a simple datetime value (e.g 2025-01-24 13:58:14.000) from SQL to an iceberg table using glue catelog. I don't want anything with timezones. We only work in EST so all our datetimes don't ...
Chaitanya Kulkarni's user avatar
0 votes
3 answers
130 views

Apache Spark (3.5-4.0) Does df.count() load the data?

Over the past few months, I have been getting inconsistent benchmark results, and I want to confirm if my suspicion is correct. If I perform a series of transformations to create a Spark (3.5+/4) ...
Quiescent's user avatar
  • 1,196
0 votes
1 answer
44 views

Azure Synapse SQL Merge is not updating records, instead of that it inserts matching records using spark.sql

I have the below code where the Id is a 36 character GUID. The code gets executed but when a matching record is found , instead of updating it inserts the entire records again. What could be the root ...
Sandeep T's user avatar
  • 441
3 votes
1 answer
80 views

How to use emptyValue option in pyspark while reading a csv file?

According to docs of csv options: Property Name Default Meaning emptyValue (for reading), "" (for writing) Sets the string representation of an empty value. But it doesn't seem to work: with ...
Kashyap's user avatar
  • 17.9k
2 votes
1 answer
136 views

How to read empty string as well as NULL values from a csv file in pyspark? [duplicate]

Read spark csv with empty values without converting to null doesn't answer this one because: That one's scala and this is pyspark. Scala solution .option("nullValue", null) translates to ...
Kashyap's user avatar
  • 17.9k
0 votes
1 answer
49 views

How to summarize Spark executor computing time

I have a spark job running with 260K tasks. I can check individual task executor computing time from Spark UI. For the purpose of calculate resource usage of the whole job, how to summarize all ...
Brian Mo's user avatar
1 vote
2 answers
324 views

Multiple writestream to same delta table error

I am trying to run 2 spark structured streaming jobs in databricks. Each job is running on it's own job cluster and streaming data from different AWS Kinesis data stream but both the jobs are writing ...
DumbCoder's user avatar
  • 515
0 votes
2 answers
96 views

Read files from storage account in Spark - Without using keys - Azure

I am doing local development. Wish to run spark job locally on my desktop, and access files in storage account from my spark job. I don't have an option to use SAS tokens or access-keys for my storage ...
Jatin's user avatar
  • 31.8k
0 votes
1 answer
50 views

Reduce number of created snapshot with kafka streaming realtime

My system runs realtime data stream from kafka to hdfs integrated with iceberg using spark, how to reduce the number of snapshots created each time insert while still maintaining realtime capability (...
Sơn Bùi's user avatar
0 votes
1 answer
58 views

bigtable spark connector not reading cell timestamp with the data

I am using bigtable spark connector to read bigtable data in scala code. I want to read the cell timestamp with the data as well. But nowhere I can find how to do it. Can someone help me on this? I ...
anzuman farhana's user avatar
0 votes
1 answer
148 views

Unable to access ADLS Gen2 through Spark when manually running a notebook in Synapse Analytics Studio

I'm trying to read a csv file from my ADLS Gen2 container through Spark using a linked service but when I run my notebook (interactively) I'm getting java.nio.file.AccessDeniedException: Operation ...
Ashwin's user avatar
  • 1
0 votes
3 answers
92 views

PySpark groupBy().applyInPandas() fails with INVALID_PANDAS_UDF despite correct signature and schema for GROUPED_MAP

NOTE: This question has many related questions on StackOverFlow but I was unable to get my answer from any of them. I'm attempting to parallelize Prophet time series model training across multiple ...
Arnab Sinha's user avatar
0 votes
0 answers
52 views

Delta live tables are producing different results

I am trying to perform aggregation on top a table. I applied same aggregation in dlt pipeline and pyspark query. But results are different. My pyspark query looks like below: - agg_df = filter_df....
awesome_sangram's user avatar
0 votes
1 answer
216 views

Not able to run a spark code due to some issue on my local

I am facing the below error while running the given piece of spark code on my local Pycharm Community Edition and the spark session is not getting created. I have set up all my local environment ...
Node98's user avatar
  • 27
0 votes
0 answers
100 views

Spark SQL Fails to Create Table from Volume-Based Data Source in Databricks

In Databricks, I uploaded my source data under a volume folder (e.g., /Volumes/my_catalog/my_schema/landing_source/). Now, I want to create a DataFrame or table using this volume path as the source. 1....
Learn Hadoop's user avatar
  • 3,058
0 votes
2 answers
78 views

Why does this PySpark job fail when performing the final checkpoint?

This is tested on Spark 3.5.6: from pyspark.sql import SparkSession spark = ( SparkSession.builder .appName("reproduce_union_checkpoint_example") .config("spark.sql....
Alexey's user avatar
  • 1,472
0 votes
1 answer
150 views

Apache Spark Connect is falling back to classic local mode by default

I'm trying to use Apache Spark Connect from PySpark (version 4.0.0) with the following config: spark1 = SparkSession.builder \ .config("spark.api.mode", "connect") \ ....
awesome_sangram's user avatar
2 votes
1 answer
64 views

Spark hint changes query results

Should adding a hint to a spark query ever return different results? I'm able to reproduce a production issue with the code below. It's worth noting that the last statement is just a copy/pasted ...
Peter Connolly's user avatar
0 votes
1 answer
65 views

Apache Spark 2.4.3 Java Job Stuck in "Active" State in Cluster Mode (Java web app with spark java API) — No Errors in Logs

I'm running an Apache Spark application (v2.4.3) in standalone cluster mode, developed using the Java Spark API, and deployed from a web application. My Spark job reads and processes 50+ CSV files, ...
Hitesh Vaghela's user avatar
0 votes
1 answer
58 views

Why the Encoder of map(toGroupement(row), Encoders.bean(Groupement.class)) left some null members when toGroupement(...) has already created them?

With Spark 3.5.6, I have some complex Row to convert to Java objects. sirenGroupement nomGroupement natureJuridique sirenCommuneSiege codeCommuneSiege nomCommuneSiege codeRegionCommuneSiege ...
Marc Le Bihan's user avatar
0 votes
0 answers
72 views

SparkRuntimeException Error while displaying Spark DataFrame

When i display my spark df, i run into a SparkRuntimeException. I am unsure of what it means or what I need to fix. file_path = "/Volumes/filepath/file.xlsx" df = spark.read \ .format(&...
Evan's user avatar
  • 13
0 votes
1 answer
88 views

How to fail Spark on wrong schema?

The data: {"name": "name1", "id": 1} {"name": "name2", "id": "1"} The code: val schema = """ | name ...
Cherry's user avatar
  • 34k
4 votes
2 answers
77 views

Parquet schema for Map type changes after spark2 to 3 migration

We are migrating from spark 2 to 3 in our workflows and one issue we notice is that the parquet files are having schema mismatch for MapType. Here's a simple example to illustrate the case ...
Anshul Dubey's user avatar
1 vote
1 answer
48 views

Limited in defining Spark time Window

I have a code of pyspark streaming. Which is in following: parsed_df = df.selectExpr("CAST(value AS STRING) as message", "timestamp") \ .select( from_json(col("...
M_Gh's user avatar
  • 1,172
0 votes
1 answer
39 views

Can't use pyspark.sql.functions with DataprocSparkSession

I have code like this on a Google BigQuery notebook from google.cloud.dataproc_spark_connect import DataprocSparkSession from pyspark.sql.functions import col, unix_timestamp, expr, dayofweek, round ...
bmarkham's user avatar
  • 1,700