I am new to the world of PySpark, I am experiencing serious performance problems when writing data from a dataframe to a table in Bigquery. I have tried everything I have read, recommendations, using repartition cache checkpoints, etc. But nothing has worked so far.
In the example below, I show the writing of only 50,000 rows to a destination table, taking about 4 minutes to do so. In a normal context, this should be completed in a couple of seconds. This time is very bad if I consider that the database I have to load into the table has more than 17 million rows.
Most likely, since I am new to PySpark I am doing something wrong, or I am configuring the cluster and/or the spark session wrong. If you can help me solve this problem, I will be very grateful. Below I will place the code, the configuration, as well as the console output.
Thank you very much in advance, any help is valuable!
And the output from console:
:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2d1f06bd-2136-4034-af8f-166c97149fde;1.0
confs: [default]
found org.apache.spark#spark-avro_2.12;3.1.2 in central
found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.jar ...
[SUCCESSFUL ] org.apache.spark#spark-avro_2.12;3.1.2!spark-avro_2.12.jar (26ms)
downloading https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar ...
[SUCCESSFUL ] org.spark-project.spark#unused;1.0.0!unused.jar (13ms)
:: resolution report :: resolve 1369ms :: artifacts dl 44ms
:: modules in use:
org.apache.spark#spark-avro_2.12;3.1.2 from central in [default]
org.spark-project.spark#unused;1.0.0 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 2 | 2 | 2 | 0 || 2 | 2 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-2d1f06bd-2136-4034-af8f-166c97149fde
confs: [default]
2 artifacts copied, 0 already retrieved (171kB/7ms)
24/08/22 23:31:59 INFO org.sparkproject.jetty.util.log: Logging initialized @7535ms to org.sparkproject.jetty.util.log.Slf4jLog
24/08/22 23:32:00 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_312-b07
24/08/22 23:32:00 INFO org.sparkproject.jetty.server.Server: Started @7654ms
24/08/22 23:32:00 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@4f7fd6eb{HTTP/1.1, (http/1.1)}{0.0.0.0:35025}
24/08/22 23:32:00 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at pipelines-dl-prod-cluster-encryption-m/10.128.0.92:8032
24/08/22 23:32:01 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at pipelines-dl-prod-cluster-encryption-m/10.128.0.92:10200
24/08/22 23:32:01 INFO org.apache.hadoop.conf.Configuration: resource-types.xml not found
24/08/22 23:32:01 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils: Unable to find 'resource-types.xml'.
24/08/22 23:32:03 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:///root/.ivy2/jars/org.apache.spark_spark-avro_2.12-3.1.2.jar added multiple times to distributed cache.
24/08/22 23:32:03 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache.
24/08/22 23:32:03 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1724369481359_0001
24/08/22 23:32:04 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at pipelines-dl-prod-cluster-encryption-m/10.128.0.92:8030
24/08/22 23:32:06 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Ignoring exception of type GoogleJsonResponseException; verified object already exists with desired state.
SQL Query:
SELECT
cust_key_id AS cust_key_id_corp
, contact_point_txt AS contact_point_txt_corp
, priority
, sc_exec_dttm
, DATE('2024-08-22')-1 AS PARTITIONDATE
FROM `mycompany-datalake-prod.svw_bi_corp_cl_cust_contac_prd_acc_fal_cl_contactability_prod.svw_vw_smartcontact_phone_rfm`
WHERE DATE(sc_exec_dttm) = DATE('2024-08-22')
LIMIT 10000
Columns from corp to retail: ['cust_key_id_corp', 'contact_point_txt_corp']
Aliases from corp to retail: ['cust_key_id_retail', 'contact_point_txt_retail']
Columns to clean first characters: {'cust_key_id_corp': '4'}
Output Table: mycompany-datalake-prod.cl_entities_preprod.btd_fal_cl_smartcontact_phone_rfm_retail
Output Table Columns Order: {'1': 'cust_key_id_corp', '2': 'contact_point_txt_corp', '3': 'cust_key_id_retail', '4': 'contact_point_txt_retail', '5': 'priority', '6': 'sc_exec_dttm', '7': 'PARTITIONDATE'}
----------------------------------------------------------------------------------------
Retail Key: Try to access the secret manager
Obtained default credentials for the project mycompany-datalake-prod
----------------------------------------------------------------------------------------
Retail Key: access OK
----------------------------------------------------------------------------------------
Retail Key: Trying to create the udf
----------------------------------------------------------------------------------------
Retail Key: udf OK
----------------------------------------------------------------------------------------
Corp Key: Try to access the secret manager
Obtained default credentials for the project mycompany-datalake-prod
----------------------------------------------------------------------------------------
Corp Key: access OK
----------------------------------------------------------------------------------------
Corp Key: Trying to create the udf
----------------------------------------------------------------------------------------
Corp Key: udf OK
----------------------------------------------------------------------------------------
Start reading the query
24/08/22 23:32:17 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: |Querying table mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771, parameters sent from Spark:|requiredColumns=[cust_key_id_corp,contact_point_txt_corp,priority,sc_exec_dttm,PARTITIONDATE],|filters=[]
24/08/22 23:32:19 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Read session:{"readSessionName":"projects/mycompany-datalake-prod/locations/us/sessions/CAISDDJmw2NBoCanIaAmpmndzRkNzZKU","readSessionCreationStartTime":"2024-08-22T23:32:17.204Z","readSessionCreationEndTime":"2024-08-22T23:32:19.604Z","readSessionPrepDuration":1218,"readSessionCreationDuration":1182,"readSessionDuration":2400}
24/08/22 23:32:19 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Requested 20000 max partitions, but only received 1 from the BigQuery Storage API for session projects/mycompany-datalake-prod/locations/us/sessions/CAISDDJmw2NBoCanIaAmpmndzRkNzZKU. Notice that the number of streams in actual may be lower than the requested number, depending on the amount parallelism that is reasonable for the table and the maximum amount of parallelism allowed by the system.
24/08/22 23:32:19 INFO com.google.cloud.spark.bigquery.direct.BigQueryRDDFactory: Created read session for table 'mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771': projects/mycompany-datalake-prod/locations/us/sessions/CAISDDJmw2NBoCanIaAmpmndzRkNzZKU
+------------------------+------------------------+--------+--------------------------+-------------+
|cust_key_id_corp |contact_point_txt_corp |priority|sc_exec_dttm |PARTITIONDATE|
+------------------------+------------------------+--------+--------------------------+-------------+
|uRpSOh/XU5nNv8FTb8PyIQ==|4drYa5U6wKW/m1rJKkEVZA==|1 |2024-08-22T12:35:52.954362|2024-08-21 |
|tbk2mU5xJVhBtlwATHPd+g==|NAWiFY5u+pUxpegY6qQrZw==|1 |2024-08-22T12:35:52.954362|2024-08-21 |
|zINNfvoYV8/oDhgjbooGsA==|G16xHElQz/ATlNP5I3105g==|1 |2024-08-22T12:35:52.954362|2024-08-21 |
|s+l08Eji3Pe8m3mxKIQc6g==|sB/9MfFJTeSfH+ovhpGGJw==|1 |2024-08-22T12:35:52.954362|2024-08-21 |
|rXXoKSqnB1hPo3QWQ+ddzg==|rkoZS+GcXtiXlESGHLkEYw==|1 |2024-08-22T12:35:52.954362|2024-08-21 |
+------------------------+------------------------+--------+--------------------------+-------------+
only showing top 5 rows
24/08/22 23:32:25 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: |Querying table mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771, parameters sent from Spark:|requiredColumns=[cust_key_id_corp,contact_point_txt_corp,priority,sc_exec_dttm,PARTITIONDATE],|filters=[]
24/08/22 23:32:26 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Read session:{"readSessionName":"projects/mycompany-datalake-prod/locations/us/sessions/CAISDHcwMHlKUk5EaTR6ZxoCanIaAmpm","readSessionCreationStartTime":"2024-08-22T23:32:25.768Z","readSessionCreationEndTime":"2024-08-22T23:32:26.125Z","readSessionPrepDuration":89,"readSessionCreationDuration":268,"readSessionDuration":357}
24/08/22 23:32:26 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Requested 20000 max partitions, but only received 1 from the BigQuery Storage API for session projects/mycompany-datalake-prod/locations/us/sessions/CAISDHcwMHlKUk5EaTR6ZxoCanIaAmpm. Notice that the number of streams in actual may be lower than the requested number, depending on the amount parallelism that is reasonable for the table and the maximum amount of parallelism allowed by the system.
24/08/22 23:32:26 INFO com.google.cloud.spark.bigquery.direct.BigQueryRDDFactory: Created read session for table 'mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771': projects/mycompany-datalake-prod/locations/us/sessions/CAISDHcwMHlKUk5EaTR6ZxoCanIaAmpm
Time to read the query: 19.12794518470764
Start decryption columns corp to retail
Time for decryption columns corp to retail: 0.6136214733123779
Start decryption columns retail to corp
Time for decryption columns retail to corp: 0.22461867332458496
Start column sorting for output table
Time for column sorting for output table: 0.04193377494812012
Total row number: 10000
Start write to output table
24/08/22 23:36:07 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem: Successfully repaired 'gs://dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86/.spark-bigquery-application_1748135243699_0001-fd3aa81e-8bb5--e04b-438c59d9a5c52599/' directory.
24/08/22 23:36:08 INFO com.google.cloud.bigquery.connector.common.BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=cl_entities_preprod, projectId=mycompany-datalake-prod, tableId=btd_fal_cl_smartcontact_phone_rfm_retail}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createDisposition=CREATE_IF_NEEDED, writeDisposition=WRITE_TRUNCATE, formatOptions=FormatOptions{format=PARQUET}, nullMarker=null, maxBadRecords=null, schema=Schema{fields=[Field{name=cust_key_id_corp, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=contact_point_txt_corp, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=cust_key_id_retail, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=contact_point_txt_retail, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=priority, type=INTEGER, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=sc_exec_dttm, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=PARTITIONDATE, type=DATE, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}]}, ignoreUnknownValue=null, sourceUris=[gs://dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86/.spark-bigquery-application_1748135243699_0001-fd3aa81e-8bb5--e04b-438c59d9a5c52599/part-0000*-21e9f1c5-8887-4b80-8c75-07e99361b532-c000.snappy.parquet], schemaUpdateOptions=null, autodetect=null, timePartitioning=null, clustering=null, useAvroLogicalTypes=null, labels=null, jobTimeoutMs=null, rangePartitioning=null, hivePartitioningOptions=null, referenceFileSchemaUri=null}. jobId: JobId{project=mycompany-datalake-prod, job=6123dc1e-44fa-bb87-8c77-32e2c099ff59, location=US}
24/08/22 23:36:11 INFO com.google.cloud.bigquery.connector.common.BigQueryClient: Done loading to mycompany-datalake-prod.cl_entities_preprod.btd_fal_cl_smartcontact_phone_rfm_retail. jobId: JobId{project=mycompany-datalake-prod, job=6123dc1e-44fa-bb87-8c77-32e2c099ff59, location=US}
Time to write to output table: 222.45286417007446
----------------------------------------------------------------------------------------
Total script execution time: 242.77862739562988
24/08/22 23:36:11 INFO org.sparkproject.jetty.server.AbstractConnector: Stopped Spark@4f7fd6eb{HTTP/1.1, (http/1.1)}{0.0.0.0:0}