0

I am trying to run the below code using dataflow, where I have 2-3 functions defined inside a class, out of which 2 functions are working but the send_email() is working nor throwing any errors. Please help in solving the issue

#dataflow pipeline code to run 
import apache_beam as beam
import os
import argparse
import logging
import pandas as  pd
import datetime
import pytz
from oauth2client.client import GoogleCredentials
from datetime import datetime,date,timedelta
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import time
import json
import requests
from google.cloud import pubsub_v1
import google.cloud.dlp_v2
from json import loads
import smtplib
from email.utils import formataddr
#from google.cloud import dlp_v2
from google.protobuf import json_format

class readandwrite(beam.DoFn):

    def send_email(self,content_json):
        smtp_email = "[email protected]"
        smtp_password = "password"
        email_subject_bigquery = "PII Information Detected!"
        email_body_bigquery = f'''
        Dear BigQuery Admin,

        We want to inform you that personally identifiable information (PII) has been detected in a transaction.

        - Duration: {content_json["duration"]} seconds

        - Text: "{content_json["text"]}

        Appropriate actions has been taken by the DLP teammsecure the data.

        Thank you,
        DataLake DLP Admin Team,
        sush Ltd.
        '''
        try:
            with smtplib.SMTP("smtp.gmail.com", 587) as server:
                server.starttls()
                server.login(smtp_email, smtp_password)
                sender_name = "DLP DataLake Admin Team"
                sender_address = "[email protected]"
                formatted_sender = formataddr((sender_name, sender_address))
                email_message_bigquery = f"From: {formatted_sender}\nSubject: {email_subject_bigquery}\nTo: [email protected]\n\n{email_body_bigquery}"
                server.sendmail(smtp_email, "[email protected]", email_message_bigquery)
                
            print("Emails sent successfully.")

        except Exception as e:

            print("Error sending emails:", str(e))


    def deidentify_content_with_dlp(self,content_json):
        import google.cloud.dlp_v2
        # Existing code for DLP de-identification
        dlp_client = google.cloud.dlp_v2.DlpServiceClient()
        item = {"value": json.dumps(content_json)}
        credit_card_info_type = {"name": "CREDIT_CARD_NUMBER"}
        phone_number_info_type = {"name": "PHONE_NUMBER"}

        # The transformation configuration to de-identify the content.
        deidentify_config = {
            "info_type_transformations": {
                "transformations": [
                    {
                        "info_types": [credit_card_info_type],
                        "primitive_transformation": {
                            "character_mask_config": {
                                "masking_character": "#",
                                "number_to_mask": 7,
                                "reverse_order": True,
                            }
                        },
                    },
                    {
                        "info_types": [phone_number_info_type],
                        "primitive_transformation": {
                            "character_mask_config": {
                                "masking_character": "#",
                                "number_to_mask": 7,
                                "reverse_order": True,
                            }
                        }
                    },
                ]
            }
        }

        # Convert the project ID into a full resource ID.
        project_id = "sandeepdev"
        parent = f"projects/{project_id}"

        # Call the API to inspect and de-identify the content.
        try:
            response = dlp_client.deidentify_content(
                request={
                    "parent": parent,
                    "deidentify_config": deidentify_config,
                    "item": item,
                }
            )

            # Check if PII information was found by DLP
            logging.info("Applying DLP de-identification...")
            if response.item.value and response.overview.transformation_summaries:
                print("PII information found. Creating ServiceNow incident.")
                self.send_email(content_json)
                print("Email send to BigQuery Admin")

            return json.loads(response.item.value) if response.item.value else content_json
        except Exception as e:
            print(f"Error: {e}")
            print("Error during de-identification. Inserting original content to BigQuery.")
            return content_json  # In case of an error, insert the original content to BigQuery
            logging.info("DLP de-identification completed...")

    def process(self, conetxt):
        import time
        import json
        import requests
        from google.cloud import pubsub_v1
        import google.cloud.dlp_v2
        from json import loads
        import smtplib
        from email.utils import formataddr
        from google.cloud import bigquery
        project_id = "sandeepdev"
        subscription_id = "audio_msg-sub"
        client_bigquery = bigquery.Client()
        subscriber = pubsub_v1.SubscriberClient()
        subscription_path = subscriber.subscription_path(project_id, subscription_id)
        masked_table_id = "sandeepdev.call_interaction.cleansed_dlp_raw_table"
        raw_table_id="sandeepdev.call_interaction.raw_audio_data"
        dlp_count_table_id="sandeepdev.call_interaction.dlp_count"
        max_messages = 1
        count=0
        rows_to_insert_raw = []  # Clear the list after inserting the rows
        rows_to_insert_masked = []
        rows_to_insert_dlp_count=[]
        logging.info("Starting data processing workflow...")
        while True:
            response = subscriber.pull(request={"subscription": subscription_path, "max_messages": max_messages})

            for received_message in response.received_messages:
                message = received_message.message
                content_json = json.loads(message.data.decode('utf-8'))
                masked_data = self.deidentify_content_with_dlp(content_json)
                logging.info(masked_data)
                audio_file_name=masked_data['audio_file_name']
                print("masked data is :" , masked_data)
                dlp_list= masked_data['text'].split(" ")
                for value in dlp_list:
                    if '#' in value:
                        count += 1
                print("masked data count:", count)
                insert_data = {
                    "audio_file_name": audio_file_name,
                    "PII_count": count
                }
                rows_to_insert_dlp_count.append(insert_data)
                load_PII_count = client_bigquery.insert_rows_json(dlp_count_table_id, rows_to_insert_dlp_count)
                count=0
                ##extract audio file name and the count value and insert to another bigquery table
                subscriber.acknowledge(request={"subscription": subscription_path, "ack_ids": [received_message.ack_id]})
                #inserts original data to a different bigquery table
                rows_to_insert_raw.append(content_json)
                load_raw = client_bigquery.insert_rows_json(raw_table_id, rows_to_insert_raw)

                # Insert the masked data into dlp_masked_data table
                rows_to_insert_masked.append(masked_data)
                load_masked = client_bigquery.insert_rows_json(masked_table_id, rows_to_insert_masked)

                rows_to_insert_raw = []  # Clear the list after inserting the rows
                rows_to_insert_masked = []  # Clear the list after inserting the rows
                rows_to_insert_dlp_count=[]
                logging.info("Data processing workflow completed.")


            time.sleep(2)
    

def run():    
    try: 
        parser = argparse.ArgumentParser()
        parser.add_argument(
            '--dfBucket',
            required=True,
            help= ('Bucket where JARS/JDK is present')
            )
        known_args, pipeline_args = parser.parse_known_args()
        global df_Bucket 
        df_Bucket = known_args.dfBucket
        logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
        pipeline_options = PipelineOptions(pipeline_args)
        pipeline_options.view_as(StandardOptions).streaming = True
        pcoll = beam.Pipeline(options=pipeline_options)
        logging.info("Pipeline Starts")
        dummy= pcoll | 'Initializing..' >> beam.Create(['1'])
        dummy_env = dummy | 'Processing' >>  beam.ParDo(readandwrite())
        p=pcoll.run()
        logging.info('Job Run Successfully!')
        p.wait_until_finish()
    except Exception as e:
        logging.exception('Failed to launch datapipeline')
        logging.exception('Failed to launch datapipeline: %s', str(e))
        raise    
if __name__ == '__main__':
    run()

in the provided, data masking is occurring, data is loaded to bigQuery but the email function is not working. when tried running locally with Directrunner it is working as expected but not with dataflow runner.

run command :

python3 /home/sandeepdev751/dlp_dataflow.py --runner DataflowRunner --project sandeepdev  --region us-central1 --job_name dlpcheckerv5 --temp_location  gs://sandeepdev/temp  --dfBucket "sandeepdev" --setup_file /home/sandeepdev751/setup.py
1
  • Is there any reason not using beam.io.ReadFromPubSub? For the email question, what kind of errors do you get? Can your Dataflow workers access that smtp server? Commented Aug 8, 2023 at 14:50

1 Answer 1

1

It is hard to debug without any logs, but my guess is: Your dataflow service account does not have access to the smtp server.

In your current setup, you will never see any errors, since Dataflow does not log any print statements. You see them when using the DirectRunner, but not when using Dataflow. Just switch to logging and you should start seeing some issues.

Besides that, I strongly recommend that you refactor your code. There are several issues which go against the Beam framework.

  1. As @XQHu has mentioned, use the Beam native ReadFromPubSub source (see here). While while works, you will get a ton of issues once you want to drain your pipeline. Furthermore, the native PubSub input is optimized by Google if you are using Dataflow.
  2. Once you use ReadFromPubSub you do not longer need to beam.Create as dummy input. Furthermore, you can skip the streaming=True option, since the pipeline will be natively streaming with PubSub as input. On the other hand, your p.wait_until_finish() will not that much sense anymore. Just search for PubSub ingestion pipelines, there are a lot of examples out there, and it is not that hard.
  3. Split this massive DoFn into several smaller ones. Makes debugging later on much easier (not a must).
  4. Lastly, for security reasons I recommend using the GCP native secret manager for storing your password, instead of writing it plain into the code. Keep in mind that you might need to give the Dataflow service account the role roles/secretmanager.secretAccessor (see here)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.