1

I am new to the world of PySpark, I am experiencing serious performance problems when writing data from a dataframe to a table in Bigquery. I have tried everything I have read, recommendations, using repartition cache checkpoints, etc. But nothing has worked so far.

In the example below, I show the writing of only 50,000 rows to a destination table, taking about 4 minutes to do so. In a normal context, this should be completed in a couple of seconds. This time is very bad if I consider that the database I have to load into the table has more than 17 million rows.

Most likely, since I am new to PySpark I am doing something wrong, or I am configuring the cluster and/or the spark session wrong. If you can help me solve this problem, I will be very grateful. Below I will place the code, the configuration, as well as the console output.

Thank you very much in advance, any help is valuable!

And the output from console:

:: loading settings :: url = jar:file:/usr/lib/spark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-avro_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-2d1f06bd-2136-4034-af8f-166c97149fde;1.0
    confs: [default]
    found org.apache.spark#spark-avro_2.12;3.1.2 in central
    found org.spark-project.spark#unused;1.0.0 in central
downloading https://repo1.maven.org/maven2/org/apache/spark/spark-avro_2.12/3.1.2/spark-avro_2.12-3.1.2.jar ...
    [SUCCESSFUL ] org.apache.spark#spark-avro_2.12;3.1.2!spark-avro_2.12.jar (26ms)
downloading https://repo1.maven.org/maven2/org/spark-project/spark/unused/1.0.0/unused-1.0.0.jar ...
    [SUCCESSFUL ] org.spark-project.spark#unused;1.0.0!unused.jar (13ms)
:: resolution report :: resolve 1369ms :: artifacts dl 44ms
    :: modules in use:
    org.apache.spark#spark-avro_2.12;3.1.2 from central in [default]
    org.spark-project.spark#unused;1.0.0 from central in [default]
    ---------------------------------------------------------------------
    |                  |            modules            ||   artifacts   |
    |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
    ---------------------------------------------------------------------
    |      default     |   2   |   2   |   2   |   0   ||   2   |   2   |
    ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-2d1f06bd-2136-4034-af8f-166c97149fde
    confs: [default]
    2 artifacts copied, 0 already retrieved (171kB/7ms)
24/08/22 23:31:59 INFO org.sparkproject.jetty.util.log: Logging initialized @7535ms to org.sparkproject.jetty.util.log.Slf4jLog
24/08/22 23:32:00 INFO org.sparkproject.jetty.server.Server: jetty-9.4.40.v20210413; built: 2021-04-13T20:42:42.668Z; git: b881a572662e1943a14ae12e7e1207989f218b74; jvm 1.8.0_312-b07
24/08/22 23:32:00 INFO org.sparkproject.jetty.server.Server: Started @7654ms
24/08/22 23:32:00 INFO org.sparkproject.jetty.server.AbstractConnector: Started ServerConnector@4f7fd6eb{HTTP/1.1, (http/1.1)}{0.0.0.0:35025}
24/08/22 23:32:00 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at pipelines-dl-prod-cluster-encryption-m/10.128.0.92:8032
24/08/22 23:32:01 INFO org.apache.hadoop.yarn.client.AHSProxy: Connecting to Application History server at pipelines-dl-prod-cluster-encryption-m/10.128.0.92:10200
24/08/22 23:32:01 INFO org.apache.hadoop.conf.Configuration: resource-types.xml not found
24/08/22 23:32:01 INFO org.apache.hadoop.yarn.util.resource.ResourceUtils: Unable to find 'resource-types.xml'.
24/08/22 23:32:03 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:///root/.ivy2/jars/org.apache.spark_spark-avro_2.12-3.1.2.jar added multiple times to distributed cache.
24/08/22 23:32:03 WARN org.apache.spark.deploy.yarn.Client: Same path resource file:///root/.ivy2/jars/org.spark-project.spark_unused-1.0.0.jar added multiple times to distributed cache.
24/08/22 23:32:03 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1724369481359_0001
24/08/22 23:32:04 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at pipelines-dl-prod-cluster-encryption-m/10.128.0.92:8030
24/08/22 23:32:06 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageImpl: Ignoring exception of type GoogleJsonResponseException; verified object already exists with desired state.

SQL Query: 

SELECT
    cust_key_id AS cust_key_id_corp
  , contact_point_txt AS contact_point_txt_corp
  , priority
  , sc_exec_dttm
  , DATE('2024-08-22')-1 AS PARTITIONDATE
FROM `mycompany-datalake-prod.svw_bi_corp_cl_cust_contac_prd_acc_fal_cl_contactability_prod.svw_vw_smartcontact_phone_rfm`
WHERE DATE(sc_exec_dttm) = DATE('2024-08-22')
LIMIT 10000

Columns from corp to retail: ['cust_key_id_corp', 'contact_point_txt_corp']
Aliases from corp to retail: ['cust_key_id_retail', 'contact_point_txt_retail']
Columns to clean first characters: {'cust_key_id_corp': '4'}
Output Table: mycompany-datalake-prod.cl_entities_preprod.btd_fal_cl_smartcontact_phone_rfm_retail
Output Table Columns Order: {'1': 'cust_key_id_corp', '2': 'contact_point_txt_corp', '3': 'cust_key_id_retail', '4': 'contact_point_txt_retail', '5': 'priority', '6': 'sc_exec_dttm', '7': 'PARTITIONDATE'}

----------------------------------------------------------------------------------------
Retail Key: Try to access the secret manager
Obtained default credentials for the project mycompany-datalake-prod

----------------------------------------------------------------------------------------
Retail Key: access OK

----------------------------------------------------------------------------------------
Retail Key: Trying to create the udf

----------------------------------------------------------------------------------------
Retail Key: udf OK

----------------------------------------------------------------------------------------
Corp Key: Try to access the secret manager
Obtained default credentials for the project mycompany-datalake-prod

----------------------------------------------------------------------------------------
Corp Key: access OK

----------------------------------------------------------------------------------------
Corp Key: Trying to create the udf

----------------------------------------------------------------------------------------
Corp Key: udf OK

----------------------------------------------------------------------------------------
Start reading the query

24/08/22 23:32:17 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: |Querying table mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771, parameters sent from Spark:|requiredColumns=[cust_key_id_corp,contact_point_txt_corp,priority,sc_exec_dttm,PARTITIONDATE],|filters=[]
24/08/22 23:32:19 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Read session:{"readSessionName":"projects/mycompany-datalake-prod/locations/us/sessions/CAISDDJmw2NBoCanIaAmpmndzRkNzZKU","readSessionCreationStartTime":"2024-08-22T23:32:17.204Z","readSessionCreationEndTime":"2024-08-22T23:32:19.604Z","readSessionPrepDuration":1218,"readSessionCreationDuration":1182,"readSessionDuration":2400}
24/08/22 23:32:19 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Requested 20000 max partitions, but only received 1 from the BigQuery Storage API for session projects/mycompany-datalake-prod/locations/us/sessions/CAISDDJmw2NBoCanIaAmpmndzRkNzZKU. Notice that the number of streams in actual may be lower than the requested number, depending on the amount parallelism that is reasonable for the table and the maximum amount of parallelism allowed by the system.
24/08/22 23:32:19 INFO com.google.cloud.spark.bigquery.direct.BigQueryRDDFactory: Created read session for table 'mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771': projects/mycompany-datalake-prod/locations/us/sessions/CAISDDJmw2NBoCanIaAmpmndzRkNzZKU

+------------------------+------------------------+--------+--------------------------+-------------+
|cust_key_id_corp        |contact_point_txt_corp  |priority|sc_exec_dttm              |PARTITIONDATE|
+------------------------+------------------------+--------+--------------------------+-------------+
|uRpSOh/XU5nNv8FTb8PyIQ==|4drYa5U6wKW/m1rJKkEVZA==|1       |2024-08-22T12:35:52.954362|2024-08-21   |
|tbk2mU5xJVhBtlwATHPd+g==|NAWiFY5u+pUxpegY6qQrZw==|1       |2024-08-22T12:35:52.954362|2024-08-21   |
|zINNfvoYV8/oDhgjbooGsA==|G16xHElQz/ATlNP5I3105g==|1       |2024-08-22T12:35:52.954362|2024-08-21   |
|s+l08Eji3Pe8m3mxKIQc6g==|sB/9MfFJTeSfH+ovhpGGJw==|1       |2024-08-22T12:35:52.954362|2024-08-21   |
|rXXoKSqnB1hPo3QWQ+ddzg==|rkoZS+GcXtiXlESGHLkEYw==|1       |2024-08-22T12:35:52.954362|2024-08-21   |
+------------------------+------------------------+--------+--------------------------+-------------+
only showing top 5 rows

24/08/22 23:32:25 INFO com.google.cloud.spark.bigquery.direct.DirectBigQueryRelation: |Querying table mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771, parameters sent from Spark:|requiredColumns=[cust_key_id_corp,contact_point_txt_corp,priority,sc_exec_dttm,PARTITIONDATE],|filters=[]
24/08/22 23:32:26 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Read session:{"readSessionName":"projects/mycompany-datalake-prod/locations/us/sessions/CAISDHcwMHlKUk5EaTR6ZxoCanIaAmpm","readSessionCreationStartTime":"2024-08-22T23:32:25.768Z","readSessionCreationEndTime":"2024-08-22T23:32:26.125Z","readSessionPrepDuration":89,"readSessionCreationDuration":268,"readSessionDuration":357}
24/08/22 23:32:26 INFO com.google.cloud.bigquery.connector.common.ReadSessionCreator: Requested 20000 max partitions, but only received 1 from the BigQuery Storage API for session projects/mycompany-datalake-prod/locations/us/sessions/CAISDHcwMHlKUk5EaTR6ZxoCanIaAmpm. Notice that the number of streams in actual may be lower than the requested number, depending on the amount parallelism that is reasonable for the table and the maximum amount of parallelism allowed by the system.
24/08/22 23:32:26 INFO com.google.cloud.spark.bigquery.direct.BigQueryRDDFactory: Created read session for table 'mycompany-datalake-prod.wrk_dataproc_processes._bqc_9180ed867c5a4cc0ab125aed6c815771': projects/mycompany-datalake-prod/locations/us/sessions/CAISDHcwMHlKUk5EaTR6ZxoCanIaAmpm

Time to read the query: 19.12794518470764

Start decryption columns corp to retail
Time for decryption columns corp to retail: 0.6136214733123779

Start decryption columns retail to corp
Time for decryption columns retail to corp: 0.22461867332458496

Start column sorting for output table
Time for column sorting for output table: 0.04193377494812012

Total row number: 10000

Start write to output table

24/08/22 23:36:07 INFO com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageFileSystem: Successfully repaired 'gs://dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86/.spark-bigquery-application_1748135243699_0001-fd3aa81e-8bb5--e04b-438c59d9a5c52599/' directory.
24/08/22 23:36:08 INFO com.google.cloud.bigquery.connector.common.BigQueryClient: Submitted job LoadJobConfiguration{type=LOAD, destinationTable=GenericData{classInfo=[datasetId, projectId, tableId], {datasetId=cl_entities_preprod, projectId=mycompany-datalake-prod, tableId=btd_fal_cl_smartcontact_phone_rfm_retail}}, decimalTargetTypes=null, destinationEncryptionConfiguration=null, createDisposition=CREATE_IF_NEEDED, writeDisposition=WRITE_TRUNCATE, formatOptions=FormatOptions{format=PARQUET}, nullMarker=null, maxBadRecords=null, schema=Schema{fields=[Field{name=cust_key_id_corp, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=contact_point_txt_corp, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=cust_key_id_retail, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=contact_point_txt_retail, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=priority, type=INTEGER, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=sc_exec_dttm, type=STRING, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}, Field{name=PARTITIONDATE, type=DATE, mode=null, description=null, policyTags=null, maxLength=null, scale=null, precision=null, defaultValueExpression=null, collation=null}]}, ignoreUnknownValue=null, sourceUris=[gs://dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86/.spark-bigquery-application_1748135243699_0001-fd3aa81e-8bb5--e04b-438c59d9a5c52599/part-0000*-21e9f1c5-8887-4b80-8c75-07e99361b532-c000.snappy.parquet], schemaUpdateOptions=null, autodetect=null, timePartitioning=null, clustering=null, useAvroLogicalTypes=null, labels=null, jobTimeoutMs=null, rangePartitioning=null, hivePartitioningOptions=null, referenceFileSchemaUri=null}. jobId: JobId{project=mycompany-datalake-prod, job=6123dc1e-44fa-bb87-8c77-32e2c099ff59, location=US}
24/08/22 23:36:11 INFO com.google.cloud.bigquery.connector.common.BigQueryClient: Done loading to mycompany-datalake-prod.cl_entities_preprod.btd_fal_cl_smartcontact_phone_rfm_retail. jobId: JobId{project=mycompany-datalake-prod, job=6123dc1e-44fa-bb87-8c77-32e2c099ff59, location=US}

Time to write to output table: 222.45286417007446

----------------------------------------------------------------------------------------

Total script execution time: 242.77862739562988
24/08/22 23:36:11 INFO org.sparkproject.jetty.server.AbstractConnector: Stopped Spark@4f7fd6eb{HTTP/1.1, (http/1.1)}{0.0.0.0:0}

1 Answer 1

0

First: I create a Dataproc cluster and send the work from a Dag in Airflow

CLUSTER_ENCRIPTION_NAME='pipelines-dl-prod-cluster-encryption'
CLUSTER_ENCRIPTION_SCOPES='cloud-platform' 
CLUSTER_ENCRIPTION_NUM_WORKERS=3
CLUSTER_ENCRIPTION_NUM_SECONDARY_WORKERS=1
CLUSTER_ENCRIPTION_REGION='us-central1'
CLUSTER_ENCRIPTION_ZONE='us-central1-a'
CLUSTER_ENCRIPTION_MASTER_MACHINE_TYPE='e2-standard-8'
CLUSTER_ENCRIPTION_WORKER_MACHINE_TYPE='e2-highcpu-16'
CLUSTER_ENCRIPTION_MASTER_DISK_TYPE='pd-ssd'
CLUSTER_ENCRIPTION_MASTER_DISK_SIZE='100'
CLUSTER_ENCRIPTION_WORKER_DISK_TYPE='pd-ssd'
CLUSTER_ENCRIPTION_SECONDARY_WORKER_DISK_TYPE='pd-ssd'
CLUSTER_ENCRIPTION_WORKER_DISK_SIZE='100'
CLUSTER_ENCRIPTION_SERVICE_ACCOUNT_COMPOSER ='theservice_account@mycompany-datalake-prod.iam.gserviceaccount.com'
CLUSTER_ENCRIPTION_SERVICE_ACCOUNT_PIPELINES='theservice_account@mycompany-datalake-prod.iam.gserviceaccount.com'
CLUSTER_ENCRIPTION_IMAGE_VERSION='2.0.29-debian10' 
CLUSTER_ENCRIPTION_MAX_IDLE='60m'
CLUSTER_ENCRIPTION_PROPERTIES="^#^yarn:yarn.log-aggregation-enable=true,#spark:spark.jars.packages=org.apache.spark:spark-avro_2.12:3.1.2,#dataproc:pip.packages=Unidecode==1.3.7,google-cloud-secret-manager==2.11.0,pbkdf2==1.3,pynacl==1.5.0,pyAesCrypt==6.1.1,pycryptodome==3.20.0,cryptography==41.0.5"
CLUSTER_ENCRIPTION_AUTOSCALING_POLICY='projects/mycompany-datalake-prod/regions/us-central1/autoscalingPolicies/scaling_policy_contact'

   create_dataproc_cluster_encryption = BashOperator(
    task_id=f'create_dataproc_cluster_encryption',
    bash_command="""
        gcloud config set project {project} 
        gcloud config set account {service_account_composer}
        cluster_status=$(gcloud dataproc clusters list \\
                            --project {project} \\
                            --region {region} \\
                            --filter='status.state=ACTIVE AND clusterName={cluster_name}')
        if [ -z "$cluster_status" ]
        then
            echo "Creating cluster {cluster_name} --project {project} --region {region}."
            gcloud beta dataproc clusters create {cluster_name} \\
                --enable-component-gateway \\
                --subnet default \\
                --region {region} \\
                --zone {zone} \\
                --master-machine-type {master_machine_type} \\
                --master-boot-disk-type {master_disk_type} \\
                --master-boot-disk-size {master_disk_size} \\
                --num-workers {num_workers} \\
                --worker-machine-type {worker_machine_type} \\
                --worker-boot-disk-type {worker_disk_type} \\
                --worker-boot-disk-size {worker_disk_size} \\
                --secondary-worker-type 'non-preemptible' \\
                --num-secondary-workers {num_secondary_workers} \\
                --secondary-worker-boot-disk-type {secondary_worker_disk_type} \\
                --secondary-worker-boot-disk-size {worker_disk_size} \\
                --image-version {image_version} \\
                --project {project} \\
                --service-account={service_account_pipelines} \\
                --scopes={scopes} \\
                --properties '{properties}' \\
                --max-idle {max_idle} \\
                --autoscaling-policy '{autoscaling_policy}'
        else
            echo "Cluster {cluster_name} already exists in --project {project} --region {region}."
        fi
        """.format(cluster_name=CLUSTER_ENCRIPTION_NAME, 
                   region=CLUSTER_ENCRIPTION_REGION,       
                   zone=CLUSTER_ENCRIPTION_ZONE,    
                   scopes=CLUSTER_ENCRIPTION_SCOPES,     
                   master_machine_type=CLUSTER_ENCRIPTION_MASTER_MACHINE_TYPE, 
                   master_disk_type=CLUSTER_ENCRIPTION_MASTER_DISK_TYPE, 
                   master_disk_size=CLUSTER_ENCRIPTION_MASTER_DISK_SIZE,
                   num_workers=CLUSTER_ENCRIPTION_NUM_WORKERS,
                   num_secondary_workers=CLUSTER_ENCRIPTION_NUM_SECONDARY_WORKERS,
                   worker_machine_type=CLUSTER_ENCRIPTION_WORKER_MACHINE_TYPE, 
                   worker_disk_type=CLUSTER_ENCRIPTION_WORKER_DISK_TYPE, 
                   worker_disk_size=CLUSTER_ENCRIPTION_WORKER_DISK_SIZE,
                   secondary_worker_disk_type= CLUSTER_ENCRIPTION_SECONDARY_WORKER_DISK_TYPE,
                   image_version=CLUSTER_ENCRIPTION_IMAGE_VERSION,
                   project=PROJECT_ID,
                   service_account_composer=CLUSTER_ENCRIPTION_SERVICE_ACCOUNT_COMPOSER,
                   service_account_pipelines=CLUSTER_ENCRIPTION_SERVICE_ACCOUNT_PIPELINES,
                   properties=CLUSTER_ENCRIPTION_PROPERTIES,
                   autoscaling_policy=CLUSTER_ENCRIPTION_AUTOSCALING_POLICY,
                   max_idle=CLUSTER_ENCRIPTION_MAX_IDLE))

params = {
    "sql_query": read_query(f"{ENTITY_PATH}/btd_fal_co_smartcontact_phone_rfm_retail.sql"),
    "columns_and_alias_from_corp_to_retail": {
        'cust_key_id_corp': 'cust_key_id_retail',
        'contact_point_txt_corp': 'contact_point_txt_retail'
    },
    "columns_and_alias_from_retail_to_corp": {},
    "columns_to_clean_first_characters" : {"cust_key_id_corp": "4"},
    "output_table": f'{DESTINATION_PROJECT_ID}.{DESTINATION_DATASET_ID}.{DESTINATION_TABLE_ID}',
    "output_table_columns_order": {
        1: 'cust_key_id_corp',
        2: 'contact_point_txt_corp',
        3: 'cust_key_id_retail',
        4: 'contact_point_txt_retail',
        5: 'priority',
        6: 'sc_exec_dttm',
        7: 'PARTITIONDATE'
    }
}

job_re_encript_corp_to_retail = DataprocSubmitJobOperator(
        task_id=f'job_re_encript_corp_to_retail',
        job={
            "reference": {"project_id": PROJECT_ID, 
                          "job_id": f"job_{COUNTRY}_{INGESTION_KEY}_{PROCESS_DATE}_{uuid.uuid4()}"},
            "placement": {"cluster_name": CLUSTER_ENCRIPTION_NAME},
            "pyspark_job": {"main_python_file_uri": "gs://dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86/dataproc-jobs/encrypt-decrypt/spark-scripts/spark_re_encrypt_corp_retail.py",
                            "args": [json.dumps(params)],
                            "jar_file_uris": ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"],
                            },
                            "labels": {"dag": DAG_NAME, "task": "job_re_encript_corp_to_retail",
                                       "bu_project": INGESTION_PROYECT}
        },
        region="us-central1",
        retries=5,
        project_id=PROJECT_ID
    ) 

Second part, the pyspark code:

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, udf, substring, expr, row_number
from google.cloud import secretmanager
import base64
import hashlib
from Crypto.Cipher import AES
from Crypto.Protocol.KDF import PBKDF2
import sys
from google.cloud import bigquery
import logging
import json
from google.auth import default
from google.auth.transport.requests import Request
import base64
import binascii
import json
import logging
import os
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad , unpad
from pbkdf2 import PBKDF2
from google.cloud import secretmanager_v1
import google.auth
import google.auth.impersonated_credentials
import traceback
from pyspark import SparkContext
import time
from time import localtime, strftime
import subprocess
from pyspark.sql.window import Window

PROJECT_ID = 'mycompany-datalake-prod'
GCS_WORKING_BUCKET = 'dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86'
GCS_TEMPORRARY_BUCKET = 'gs://dataproc-datalake-a01350f1-0bef-0bef-0bef-20bf078e6d86/dataproc-jobs/encrypt-decrypt/staging'
BQ_NATERIALIZATION_DATASET = 'wrk_dataproc_processes'
BQ_TEMPORARY_DATASET = 'wrk_dataproc_processes'

NUM_PARTITIONS = 10

GCS_BUCKET = "entity-datalake-f6639638-b893-44fa-ae66-b8f50729e10d"
GCS_AVRO_PATH = "co/co_entities/product/data/2024/08/04/*.avro"
API_ENDPOINT = "https://api.example.com/endpoint"
CORP_KEY =   "projects/special_corp_project/secrets/encrypt-corp/versions/latest"
RETAIL_KEY = "projects/mycompany-datalake-prd/secrets/encrypt-retail/versions/latest"
SA_FOR_RETAIL_KEY = "sa_for_retail_key@mycompany-datalake-prod.iam.gserviceaccount.com"
SA_FOR_CORP_KEY = "theservice_account@mycompany-datalake-prod.iam.gserviceaccount.com"

spark = SparkSession.builder \
    .appName("Email and Phone Validation") \
    .config('parentProject', PROJECT_ID) \
    .getOrCreate()


spark.conf.set("viewsEnabled","true")
spark.conf.set("materializationDataset",BQ_NATERIALIZATION_DATASET)
spark.conf.set("temporaryGcsBucket",GCS_TEMPORRARY_BUCKET)

sc = spark.sparkContext
sc = SparkContext.getOrCreate()
sc.setCheckpointDir(GCS_TEMPORRARY_BUCKET)

project_id = PROJECT_ID
retail_secret_id = 'encrypt-retail'
corp_secret_id = 'encrypt-corp'
project_retail_secret = 'mycompany-datalake-encrypt-prd'
project_corp_secret = 'corporate-secrets'

logger = logging.getLogger(__name__)

def delete_temporary_tables(dataset_id):
    client = bigquery.Client()

    # Listar todas las tablas en el dataset
    tables = client.list_tables(dataset_id)

    for table in tables:
        table_id = f"{dataset_id}.{table.table_id}"
        client.delete_table(table_id)
        print(f"Deleted table {table_id}")

def access_secret_version(secret_id):
    creds, pid = google.auth.default()
    print(f"Obtained default credentials for the project {pid}")
    if secret_id == RETAIL_KEY:
        SA_FOR_KEY = SA_FOR_RETAIL_KEY
    elif secret_id == CORP_KEY:
        SA_FOR_KEY = SA_FOR_CORP_KEY
    tcreds = google.auth.impersonated_credentials.Credentials(
        source_credentials=creds,
        target_principal=SA_FOR_KEY,
        target_scopes='https://www.googleapis.com/auth/cloud-platform'
    )
    
    client = secretmanager_v1.SecretManagerServiceClient(credentials=tcreds)

    # Access the secret version
    response = client.access_secret_version(name=secret_id)

    # Get the payload as a string
    secret_payload = response.payload.data.decode("UTF-8")

    return secret_payload

class AesCrypt256:
    # Based on https://gist.github.com/pfote/5099161
    BLOCK_SIZE = 16

    # To use the null/x00 byte array for the IV
    default_initialization_vector = False

    def __init__(self, default_initialization_vector=False):
        self.default_initialization_vector = default_initialization_vector

    def pkcs7_pad(self, s):
        s = s.encode("utf-8") if isinstance(s, str) else s
        pad_len = self.BLOCK_SIZE - (len(s) % self.BLOCK_SIZE)
        return s + bytes([pad_len]) * pad_len

    def pkcs7_unpad(self, s):
        return s[: -s[-1]]

    def _encrypt(self, key, value, iv):
        cipher = AES.new(key, AES.MODE_CBC, iv)
        value = value.encode("utf-8") if isinstance(value, str) else value
        crypted = cipher.encrypt(self.pkcs7_pad(value))

        # check if empty/null initialization vector, and do not prepend if null
        if all(v == 0 for v in iv):
            return crypted
        else:
            # prepend the initialization vector
            return iv + crypted

    def _decrypt(self, key, value, iv):
        cipher = AES.new(key, AES.MODE_CBC, iv)
        # unpad the bytes, throw away garbage at end
        return self.pkcs7_unpad(cipher.decrypt(value)).decode("utf-8")

    def encrypt(self, key, value):
        if self.default_initialization_vector:
            return self._encrypt(key, value, bytes(bytearray(16)))
        else:
            iv = Random.get_random_bytes(16)
            return self._encrypt(key, value, iv)

    def decrypt(self, key, value):
        if self.default_initialization_vector:
            # we do not have an IV present
            default_iv = bytes(bytearray(16))
            return self._decrypt(key, value, default_iv)
        else:
            iv = value[:16]
            crypted = value[16:]
            return self._decrypt(key, crypted, iv)

    def encryptHex(self, key, value):
        return (binascii.hexlify(self.encrypt(key, value))).decode("utf-8")

    def decryptHex(self, key, value):
        return self.decrypt(key, binascii.unhexlify(value))

class BasicEncript:
    def __init__(self, passp, salt):
        self.passp = passp
        self.salt = binascii.unhexlify(salt)
        iterations = 1024
        self.key = PBKDF2(
            passphrase=self.passp, salt=self.salt, iterations=iterations
        ).read(32)

    @property
    def aes(self):
        return AesCrypt256(default_initialization_vector=True)

    def encrypt(self, data):
        if data == None:
            return
        try:
            crypted = self.aes.encryptHex(self.key, data)
            return crypted
        except Exception as e:
            logging.error(f"error al encriptar valor: {data}  , error: {e}")

    def decrypt(self, data):
        if data == None:
            return
        try:
            crypted = self.aes.decryptHex(self.key, data)
            return crypted
        except Exception as e:
            logging.error(f"error al encriptar valor: {data}  , error: {e}")

class Encrypter:
    def __init__(self, key, iv):
        key_rtl = key
        iv_rtl = iv
        self.crypt_retail = BasicEncript(key_rtl, iv_rtl)

    def get_encrypter(self):
        return self.crypt_retail

    def encrypt(self, msj: str):
        return self.crypt_retail.encrypt(msj)

    def decrypt(self, msj: str):
        return self.crypt_retail.decrypt(msj)

class AwkEncryptation:
  def __init__(self,key,iv,block_size):
    self.key = key
    self.iv = iv
    self.block_size = block_size

  def decrypt(self,message):
    if message != None:
      try:
        enc = base64.b64decode(message)
        obj = AES.new(self.key, AES.MODE_CBC, self.iv)
        return unpad(obj.decrypt(enc), self.block_size).decode('utf-8')
      except Exception as e:
        return None
    return message

  def encrypt(self,message):
    if message != None:
      message_pad = pad(str(message).encode('utf-8'),self.block_size)
      obj = AES.new(self.key, AES.MODE_CBC, self.iv)
      msg_encrypt = obj.encrypt(message_pad)
      base = base64.b64encode(msg_encrypt)
      return base.decode('utf-8')
    return message

def call_mode(awk, mode, value):

    if mode == "encrypt": 
        return awk.encrypt(value)

    if mode == "decrypt":
        return awk.decrypt(value)

def process(key_type, key_values, call_mode_type, value):
    key_values = json.loads(key_values)
    key_values = {"key": key_values["passEncrypt"], "iv": key_values["salt"]}
    awk = None
    try:
        if key_type == "RETAIL_KEY":
            awk = Encrypter(key=key_values["key"], iv=key_values["iv"])

        elif key_type == "CORP_KEY":
            awk = AwkEncryptation(base64.b64decode(key_values['key']), base64.b64decode(key_values['iv']), 16)

        return_value = call_mode(awk, call_mode_type, value)

        return return_value
    except Exception as e:
        print(f"process NOK")
        print("------------------------------------------------------------------------------------------")
        print(traceback.format_exc())
        return logging.error(f"errorMessage: {str(e)}")

def decode_columns(df: DataFrame, columns_to_decode: list):
    for column in columns_to_decode:
        df = df.withColumn(column, col(column).cast("string"))  # Example transformation
    return df

def encode_columns(df: DataFrame, columns_to_decode: list):
    for column in columns_to_decode:
        df = df.withColumn(column, col(column).cast("string"))  # Example transformation
    return df

def load_data_from_bigquery_with_query(query: str):
    spark = SparkSession.builder \
        .appName("BQLoader") \
        .config('parentProject', 'mycompany-datalake-prod') \
        .config("spark.jars.packages", "com.google.cloud.spark:spark-bigquery-with-dependencies_2.12:0.31.1") \
        .getOrCreate()
    
    table = 'mycompany-datalake-prod.co_entities.contactability_base'
    columns_to_select = ["CUST_NUM_ID", "EMAIL_DEF", "TELEPHONE_DEF", "EMAIL_CORP_DEF", "TELEPHONE_CORP_DEF"]
    filter = "_partitiondate = CURRENT_DATE('America/Bogota') - 1 AND (EMAIL_DEF IS NOT NULL AND TELEPHONE_DEF IS NOT NULL)"
    df = spark.read.format('bigquery').option('table', table) \
        .option('filter',filter) \
        .load() \
        .select(*columns_to_select) 

    return df

txt = "\n----------------------------------------------------------------------------------------\n"

def get_udfs_for_encryption():
    try:
        try:
            print(f"{txt}Retail Key: Try to access the secret manager")
            latest_retail_key = access_secret_version(RETAIL_KEY)
            print(f"{txt}Retail Key: access OK")
            try:
                print(f"{txt}Retail Key: Trying to create the udf")
                udf_retail_decrypt = udf(lambda value: process("RETAIL_KEY", latest_retail_key, "decrypt", value))
                udf_retail_encrypt = udf(lambda value: process("RETAIL_KEY", latest_retail_key, "encrypt", value))
                print(f"{txt}Retail Key: udf OK")
            except Exception as e:
                print(f"{txt}Retail Key: udf NOK")
                print(traceback.format_exc())
        except Exception as e:
            print(f"{txt}Retail Key: access denied")
            print(traceback.format_exc())
        try:
            print(f"{txt}Corp Key: Try to access the secret manager")
            latest_corp_key = access_secret_version(CORP_KEY)
            print(f"{txt}Corp Key: access NOK")
            try:
                print(f"{txt}Corp Key: Trying to create the udf")
                udf_corp_decrypt = udf(lambda value: process("CORP_KEY", latest_corp_key, "decrypt", value))
                udf_corp_encrypt = udf(lambda value: process("CORP_KEY", latest_corp_key, "encrypt", value))
                print(f"{txt}LLave corp: udf OK")
            except Exception as e:
                print(f"{txt}Corp Key: udf NOK")
                print(traceback.format_exc())
        except Exception as e:
            print(f"{txt}Corp Key: access denied")
            print(traceback.format_exc())

        return udf_retail_decrypt, udf_retail_encrypt, udf_corp_decrypt, udf_corp_encrypt

    except Exception as e:
            print(traceback.format_exc())

def decrypt_encript_query(
        sql_query, output_table, columns_and_alias_from_corp_to_retail, columns_and_alias_from_retail_to_corp, output_table_columns_order,
        columns_to_clean_first_characters):
    
    udf_retail_decrypt, udf_retail_encrypt, udf_corp_decrypt, udf_corp_encrypt = get_udfs_for_encryption()   
    print(f"{txt}Start reading the query")
    t1 = time.time()
    # Leer los datos desde BigQuery
    df = spark.read.format('bigquery') \
            .option('query', sql_query) \
            .option('use_legacy_sql', 'false') \
            .load()
    
    df.show(5, truncate=False)

    df = df.repartition(NUM_PARTITIONS)
    df.cache()
    size_df = df.count() 
    t2 = time.time()
    print(f"Time to read the query: {t2-t1}")

    print(f"Start decryption columns corp to retail")
    # Desencriptar las columnas necesarias y re-encriptarlas con la nueva llave
    for column_corp, alias_retail in columns_and_alias_from_corp_to_retail.items():
        decrypted_col = udf_corp_decrypt(col(column_corp))  # Desencriptar la columna 
        
        # Verificar si la columna debe limpiarse
        if column_corp in columns_to_clean_first_characters:
            num_chars_to_clean = int(columns_to_clean_first_characters[column_corp])
            truncated_col = substring(decrypted_col, num_chars_to_clean + 1, 256)  # Eliminar los primeros caracteres
            encrypted_col = udf_retail_encrypt(truncated_col)  # Re-encriptar la columna después de truncarla
        else:
            encrypted_col = udf_retail_encrypt(decrypted_col)
        
        df = df.withColumn(alias_retail, encrypted_col)  # Añadir la columna re-encriptada al DataFrame
            
    size_df = df.count()
    t3 = time.time()
    print(f"Time for decryption columns corp to retail: {t3-t2}")

    print(f"Start decryption columns retail to corp")
    for column_retail, alias_corp in columns_and_alias_from_retail_to_corp.items():
        df = df.withColumn(alias_corp, udf_corp_encrypt(udf_retail_decrypt(col(column_retail))))
    
    size_df = df.count()
    t4 = time.time()
    print(f"Time for decryption columns retail to corp: {t4-t3}")

    print(f"Start column sorting for output table")
    # Ordenar las columnas según `output_table_columns_order`
    ordered_columns = [output_table_columns_order[i] for i in sorted(output_table_columns_order.keys())]
    df = df.select(*ordered_columns)
    
    t5 = time.time()
    print(f"Time for column sorting for output table: {t5-t4}")

    size_df = df.count()
    #df.show(5, truncate=False)
    print(f"Total row number: {size_df}")

    print(f"Start write to output table")
    # Guardar los resultados en la tabla de salida
    df.write.format('bigquery') \
        .option('table', output_table) \
        .option("temporaryGcsBucket", GCS_WORKING_BUCKET) \
        .option('writeDisposition', 'WRITE_TRUNCATE') \
        .mode('overwrite') \
        .save()
    
    t6 = time.time()
    print(f"Time to write to output table: {t6-t5}")
 
params_json = sys.argv[1]
params = json.loads(params_json)

sql_query = params.get('sql_query')
columns_and_alias_from_corp_to_retail = params.get('columns_and_alias_from_corp_to_retail', {})
columns_and_alias_from_retail_to_corp = params.get('columns_and_alias_from_retail_to_corp', {})
output_table = params.get('output_table')
output_table_columns_order = params.get('output_table_columns_order', {})
columns_to_clean_first_characters = params.get('columns_to_clean_first_characters', {})

if __name__ == "__main__":
    try:
        t1 = time.time()
        print(f"SQL Query: {sql_query}")
        print(f"Columns from corp to retail: {list(columns_and_alias_from_corp_to_retail.keys())}")
        print(f"Aliases from corp to retail: {list(columns_and_alias_from_corp_to_retail.values())}")
        print(f"Columns to clean first characters: {columns_to_clean_first_characters}")
        print(f"Output Table: {output_table}")
        print(f"Output Table Columns Order: {output_table_columns_order}")

        decrypt_encript_query(
            sql_query=sql_query,
            output_table=output_table,
            columns_and_alias_from_corp_to_retail=columns_and_alias_from_corp_to_retail,
            columns_and_alias_from_retail_to_corp=columns_and_alias_from_retail_to_corp,
            output_table_columns_order=output_table_columns_order,
            columns_to_clean_first_characters=columns_to_clean_first_characters
        )
        t2 = time.time()
        print(f"{txt}Total script execution time: {t2-t1}")
        spark.stop()
    except Exception as e:
        print("An error occurred:")
        print(traceback.format_exc())
        spark.stop()
        raise
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.