82,655 questions
Best practices
0
votes
3
replies
62
views
Pushing down filters in RDBMS with Java Spark
I have been working as a Data Engineer and got this issue.
I came across a use case where I have a view(lets name it as inputView) which is created by reading data from some source.
Now somewhere ...
1
vote
0
answers
52
views
Databricks always loads built-in BigQuery connector (0.22.2), can’t override with 0.43.x
I am using Databricks Runtime 15.4 (Spark 3.5 / Scala 2.12) on AWS.
My goal is to use the latest Google BigQuery connector because I need the direct write method (BigQuery Storage Write API):
option(&...
0
votes
0
answers
31
views
PySpark 3.5.5 CharType in read.csv schema definition
I'm using a PySpark notebook inside of Azure Synapse.
This is my schema definition
qcew_schema = StructType([
StructField( 'area_fips', dataType = CharType(5), ...
1
vote
0
answers
48
views
PySpark/MongoDB Connector DataException: dataType 'struct' is invalid for 'BsonArray' during ETL
I am running a data ingestion ETL pipeline orchestrated by Airflow using PySpark to read data from MongoDB (using the MongoDB Spark Connector) and load it into a Delta Lake table. The pipeline is ...
0
votes
0
answers
56
views
trying to read bigquery array colum and passing it as columns to fetch from spark dataframe
I have a bigquery table with array column named as "column_list "
ALTER TABLE `test-project.TEST_DATASET.TEST_TABLE`
ADD COLUMN column_list ARRAY<STRING>;
update `test-project....
0
votes
1
answer
74
views
col function error type mismatch: found string required Int
I am attempting to programmatically remove specific columns/fields from a dataframe (anything that starts with _), whether the field is in the root or in a struct, using the dropFields method.
For ...
-5
votes
0
answers
30
views
How to connect Apache Spark (1.6.0 on Cloudera) with Apache Hive? [closed]
I’m using Cloudera Quickstart VM which includes Apache Spark 1.6.0 and Apache Hive.
I want to connect Spark with Hive so that I can run SQL queries on my Hive tables directly from PySpark.
0
votes
0
answers
62
views
Pyspark- Multithreading in Python
I have a user case like this. I have a list of many queries. I am running multi-threading with pyspark with each thread submitting some sql.
There are some queries that report success but the final ...
0
votes
1
answer
44
views
Spark 4.0.1 MongoDB Connector 10.5.0: Missing configuration for: collection and NoClassDefFoundError: com.mongodb.client.MongoClient
I am trying to read data from MongoDB in Spark 4.0.1 using the MongoDB Spark Connector (version 10.5.0). My PySpark code looks like this:
from pyspark.sql import SparkSession
spark = SparkSession....
0
votes
1
answer
56
views
How to do bucket logic in partition for Iceberg Table using AWS Glue?
# =====================================================
# 🧊 Step 4. Write Data to Iceberg Table (Glue Catalog)
# =====================================================
table_name = "glue_catalog....
0
votes
1
answer
100
views
Does BucketBy clause while writing Pyspark dataframe shuflle the data
I am writing a pyspark dataframe as table in my Spark warehouse. I am bucketing it on specific column using bucketBy clause and assigning 10 buckets. I have read on several blog posts that bucketBy ...
0
votes
1
answer
62
views
How to reference a CSV column with parentheses and a decimal point in Spark SQL or COALESCE expression?
I’m working on a data ingestion pipeline using Apache Spark (triggered via a Cloud Function on Dataproc).
The input CSV contains column names that include special characters such as parentheses and a ...
0
votes
0
answers
76
views
Java heap space - even only 1.5Gb of 5.8 used
Why am i getting java.lang.OutOfMemoryError: Java heap space even when i have a plenty of memory.
So my simple code that create dataframe from input data, so no really data processing in spark - only ...
0
votes
0
answers
44
views
fairscheduler.xml on the classpath
Spark documentation says that for scheduler allocation file we can: put a file named fairscheduler.xml on the classpath
I have included fairscheduler.xml in the root of the jar file that I use with ...
2
votes
0
answers
96
views
Loading a large multiline CSV file using pyspark is extremely slow
I've got a multiline CSV file which is about 150GB and I've been trying to load it using the usual code e.g.
df = spark.read.format('csv').option('header', True).option('multiLine', True).load('path/...
0
votes
0
answers
83
views
How to Check if a Query Touches Data Files or just Uses Manifests and Metadata in Iceberg
I created a table as follows:
CREATE TABLE IF NOT EXISTS raw_data.civ (
date timestamp,
marketplace_id int,
... some more columns
)
USING ICEBERG
PARTITIONED BY (
marketplace_id,
...
0
votes
0
answers
82
views
Spark OutOfMemoryError when reading large JSON file (3.5GB) as wholeText due to colon in path
I’m trying to load JSON data into an Iceberg table. The source files are named with timestamps that include colons (:), so I need to read them as plain text first. Additionally, each file is in a ...
3
votes
1
answer
103
views
How to collect multiple metrics with observe in PySpark without triggering multiple actions
I have a PySpark job that reads data from table a, performs some transformations and filters, and then writes the result to table b.
Here’s a simplified version of the code:
import pyspark.sql....
0
votes
0
answers
63
views
Unexpected Write Behavior when using MERGE INTO/INSERT INTO Iceberg Spark Queries
I am observing different write behaviors when executing queries on EMR Notebook (correct behavior) vs when using spark-submit to submit a spark application to EMR Cluster (incorrect behavior).
When I ...
0
votes
1
answer
79
views
dataproc serverless slow consume kafka topic
I use dataproc serverless with java API to read kafka topic. The topic has only 2 partitions.
The topic receives 200 msg/sec.
After reading messages I repartition to 100, transform the data then write ...
1
vote
1
answer
79
views
Apache Spark TransformWithState operator not working as expected
Hi I'm trying to implement a stateprocessor for my custom logic., ideally we are streaming and I want the custom logic of calculating packet loss from a previous row.
i implemented the stateprocessor ...
0
votes
1
answer
104
views
Databricks Community Edition: spark.conf.get('spark.sql.adaptiveExecution.enabled') not available on serverless compute
I’m using Databricks Community Edition (Free tier) with Spark 4.0.0. I noticed that the UI no longer allows creating a standard cluster — only the serverless compute option is available.
I tried the ...
0
votes
2
answers
77
views
SQLSTATE: 42K0I error when connecting Databricks to ADLS
could someone help me with connecting Databricks with ADLS? I have tried connecting with (as suppose) all the ways I could - SAS Token, Service Principle or even one more way I don't really remember ...
-1
votes
0
answers
65
views
Getting error mapPartitions and broadcast error on mapPartition() line
I am trying to run objects_small_list parallel using mapPartitions and comparing in with the obj_id with process_object_id which is in the objects_small_list and pulling matching folder rows in ...
0
votes
0
answers
74
views
Constantly getting a connection reset error when connecting to ClickHouse from Spark
Following the Spark Native Connector in ClickHouse, I downloaded clickhouse-spark-runtime-3.5.2_0.8.0 and clickhouse-jdbc-all-0.9.2, and set the catalog configured to:
The correct username and ...
0
votes
0
answers
45
views
PySpark Create Dataframe with List Columns [duplicate]
Why does the following code produce the desired dataframe without issue
data = [
("James,,Smith",["Java","Scala","C++"],["Spark","Java"],&...
0
votes
1
answer
42
views
Spark 3.3.1: decimal(38,10) shows as 0E-10 in DataFrame and inserts as NULL into Hive (Parquet / insertInto)
Environment:
Apache Spark: 3.3.1
Hive: 3.1.3 (metastore + Hive server)
Table storage format: PARQUET
Insertion method: dataframe.format("hive").insertInto("db.table")
Problem ...
0
votes
0
answers
59
views
Scala spark: Why does DataFrame.transform calling a transform hang?
I have a job on scala (v. 2.12.15) spark (v. 3.5.1) that works correctly and looks something like this:
import org.apache.spark.sql.DataFrame
...
val myDataFrame = myReadDataFunction(...)
....
0
votes
0
answers
92
views
Spark3 writes are slower than spark2
Spark3 vs spark2:
I have three spark dataframes util, dept, cntry. All three are created by reading three different hive tables with some where clause. All three have customer_id as common column.
I ...
0
votes
0
answers
53
views
Spatial join without Apache Sedona
currently I'm working in a specific version of Apache Spark (3.1.1) that cannot upgrade. Since that I can't use Apache Sedona and the version 1.3.1 is too slow. My problem is the following code that ...
0
votes
0
answers
73
views
Error when reading .csv.gz files in databricks
I have a small files with.csv.gz compressed format in gcs bucket and have mounted it and created external volumes on top of it in databricks(unity catalog enabled). So when I try to read a file with ...
1
vote
1
answer
35
views
DataprocSparkSession package in python error - "RuntimeError: Error while creating Dataproc Session"
I am using below code to create Dataproc Spark Session to run a job
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session
session = Session(...
0
votes
0
answers
59
views
EMR Spark Job Fails to Connect to MSK with IAM Auth - Timeout Waiting for Node Assignment Error
I am running an Apache Spark job on Amazon EMR that needs to connect to an Amazon MSK cluster configured with IAM authentication. The EMR cluster has an IAM role with full MSK permissions, and I can ...
0
votes
1
answer
78
views
How can i change my Spark session in Databricks Community Edition?
I want to change my spark session from 'pyspark.sql.connect.dataframe.DataFrame' to 'pyspark.sql.dataframe.DataFrame' so that I can run StringIndexer and VectorAssembler.
If I run it in pyspark.sql....
0
votes
0
answers
99
views
Pyspark error py4j.protocol.Py4JJavaError
I keep running into this issue when running PySpark.
I was able to connect to my database and retrieve data, but whenever I try do operations like .show() or .count(), or when I try to save a Spark ...
0
votes
1
answer
137
views
Python worker exited unexpectedly due to connection reset in pyspark
I'm trying to run PySpark code, but I'm having problems with it.
versions:
OS: windows 11
apache spark: 4.0.1
java: 17.0.12 2024-07-16 LTS
python version: 3.11.9
hadoop: 3.3.6
Code:
from pyspark.sql ...
0
votes
1
answer
54
views
Pushdown on spark datasource v2 implementation
I have a datasource v2 implementation where we have implemented interfaces for supporting filter pushdown, limit pushdown, top n pushdown, aggregates pushdown , filter pushdown and column pruning..
...
0
votes
2
answers
78
views
Running a notebook through Spark Connect fails to operate with Kafka
I have created this Docker Compose file:
# Command: docker stack deploy streaming-stack --compose-file docker/spark-kstreams-stack.yml
# Gary A. Stafford (2022-09-14)
# Updated: 2022-12-28
version: &...
0
votes
1
answer
101
views
pyspark on Windows - unexpected termination during collect()
I am new to python and pyspark.
I'm trying to run it on Windows Server 2022. I have environment variables
HADOOP_HOME=C:\spark\hadoop
JAVA_HOME=C:\Program Files\Microsoft\jdk-17.0.16.8-hotspot
...
0
votes
0
answers
50
views
What connector can be used for Google Cloud pubsub along with Cloud dataproc ( Spark 3.5.x )
I am using Spark 3.5.x and would like to use readStream() API to read structured streaming using Java .
I don't see any pubsub connector available. Couldn't try pubsub lite because it is deprecated ...
1
vote
3
answers
96
views
How to pass array of structure as parameter to udf in spark 4
does anybody know what am I doing wrong? Following is reduced code snippet working in spark-3.x but doesn't work in spark-4.x.
In my usecase I need to pass complex data structure to udf (let's say ...
-1
votes
2
answers
127
views
How to connect to S3 without the large AWS SDK v2 bundle?
I'm trying to read some file from S3 with PySpark 4.0.1 and the S3AFileSystem.
The standard configuration using hadoop-aws 3.4.1 works, but it requires the AWS SDK Bundle. This single dependency is ...
0
votes
0
answers
34
views
EventHub based structured streaming spark pipeline
I have little experience with event hubs. I am planning to build end to end data pipeline setup using event hub and apache structured streaming spark. I want to make my spark code fail proof. Eventhub ...
0
votes
1
answer
112
views
Is there a way to eagerly access the datatype of a Column object in (Py)Spark?
Given an arbitrary pyspark.sql.column.Column object (or, similarly, a pyspark.sql.connect.column.Column object), is there a way to get a datatype back -- either as a DDL string or pyspark.sql.types....
0
votes
0
answers
41
views
Is there a known issue with Spark version 3 behaving differently and inconsistent to Spark version 2 when rounding decimal points?
My project is having a problem with decimal rounding errors in Spark v3 compared to Spark v2, where rounding to second decimal place works fine in Spark v2 but Spark v3 rounds the values up or down (...
0
votes
0
answers
77
views
How to merge small parquet files in Hudi into larger files
I use Spark+ Hudi to write data into S3. I was writing data in bulk_insert mode, which cause there be many small paruqet files in Hudi table.
Then I try to schedule clustering on the Hudi table:
...
0
votes
0
answers
79
views
Kubernetes: driver-py: not found and Missing application resource error
I'm trying to run my pyspark file in a k8s environment using a custom image.
It used to work fine with our company provided image but it has spark v2.4.4 so I wanted an upgrade.
First I got this error:...
1
vote
2
answers
142
views
DATAPROC - com.google.cloud.spark.bigquery.BigQueryRelationProvider not a subtype
I'm new to Dataproc and am having trouble running a job that accesses a PostgreSQL database (Compute Engine VM) to write data to BigQuery.
I created a cluster with the following configuration:
gcloud ...
0
votes
0
answers
50
views
Job overrun / OOM after changing the partition
I have a PySpark job that ingests data into a Delta table originally partitioned by year, month, day and hour. The job takes 2hr to complete. The job runs daily ingesting previous days full data. ...
0
votes
1
answer
59
views
manage Z-Order with Predictive Optimization in databricks
I want to understand how to manage Z-Order in Databricks when using Predictive Optimization (PO). According to the documentation:
"OPTIMIZE does not run ZORDER when executed with predictive ...