I am trying to run the below code using dataflow, where I have 2-3 functions defined inside a class, out of which 2 functions are working but the send_email() is working nor throwing any errors. Please help in solving the issue
#dataflow pipeline code to run
import apache_beam as beam
import os
import argparse
import logging
import pandas as pd
import datetime
import pytz
from oauth2client.client import GoogleCredentials
from datetime import datetime,date,timedelta
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
import time
import json
import requests
from google.cloud import pubsub_v1
import google.cloud.dlp_v2
from json import loads
import smtplib
from email.utils import formataddr
#from google.cloud import dlp_v2
from google.protobuf import json_format
class readandwrite(beam.DoFn):
def send_email(self,content_json):
smtp_email = "[email protected]"
smtp_password = "password"
email_subject_bigquery = "PII Information Detected!"
email_body_bigquery = f'''
Dear BigQuery Admin,
We want to inform you that personally identifiable information (PII) has been detected in a transaction.
- Duration: {content_json["duration"]} seconds
- Text: "{content_json["text"]}
Appropriate actions has been taken by the DLP teammsecure the data.
Thank you,
DataLake DLP Admin Team,
sush Ltd.
'''
try:
with smtplib.SMTP("smtp.gmail.com", 587) as server:
server.starttls()
server.login(smtp_email, smtp_password)
sender_name = "DLP DataLake Admin Team"
sender_address = "[email protected]"
formatted_sender = formataddr((sender_name, sender_address))
email_message_bigquery = f"From: {formatted_sender}\nSubject: {email_subject_bigquery}\nTo: [email protected]\n\n{email_body_bigquery}"
server.sendmail(smtp_email, "[email protected]", email_message_bigquery)
print("Emails sent successfully.")
except Exception as e:
print("Error sending emails:", str(e))
def deidentify_content_with_dlp(self,content_json):
import google.cloud.dlp_v2
# Existing code for DLP de-identification
dlp_client = google.cloud.dlp_v2.DlpServiceClient()
item = {"value": json.dumps(content_json)}
credit_card_info_type = {"name": "CREDIT_CARD_NUMBER"}
phone_number_info_type = {"name": "PHONE_NUMBER"}
# The transformation configuration to de-identify the content.
deidentify_config = {
"info_type_transformations": {
"transformations": [
{
"info_types": [credit_card_info_type],
"primitive_transformation": {
"character_mask_config": {
"masking_character": "#",
"number_to_mask": 7,
"reverse_order": True,
}
},
},
{
"info_types": [phone_number_info_type],
"primitive_transformation": {
"character_mask_config": {
"masking_character": "#",
"number_to_mask": 7,
"reverse_order": True,
}
}
},
]
}
}
# Convert the project ID into a full resource ID.
project_id = "sandeepdev"
parent = f"projects/{project_id}"
# Call the API to inspect and de-identify the content.
try:
response = dlp_client.deidentify_content(
request={
"parent": parent,
"deidentify_config": deidentify_config,
"item": item,
}
)
# Check if PII information was found by DLP
logging.info("Applying DLP de-identification...")
if response.item.value and response.overview.transformation_summaries:
print("PII information found. Creating ServiceNow incident.")
self.send_email(content_json)
print("Email send to BigQuery Admin")
return json.loads(response.item.value) if response.item.value else content_json
except Exception as e:
print(f"Error: {e}")
print("Error during de-identification. Inserting original content to BigQuery.")
return content_json # In case of an error, insert the original content to BigQuery
logging.info("DLP de-identification completed...")
def process(self, conetxt):
import time
import json
import requests
from google.cloud import pubsub_v1
import google.cloud.dlp_v2
from json import loads
import smtplib
from email.utils import formataddr
from google.cloud import bigquery
project_id = "sandeepdev"
subscription_id = "audio_msg-sub"
client_bigquery = bigquery.Client()
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)
masked_table_id = "sandeepdev.call_interaction.cleansed_dlp_raw_table"
raw_table_id="sandeepdev.call_interaction.raw_audio_data"
dlp_count_table_id="sandeepdev.call_interaction.dlp_count"
max_messages = 1
count=0
rows_to_insert_raw = [] # Clear the list after inserting the rows
rows_to_insert_masked = []
rows_to_insert_dlp_count=[]
logging.info("Starting data processing workflow...")
while True:
response = subscriber.pull(request={"subscription": subscription_path, "max_messages": max_messages})
for received_message in response.received_messages:
message = received_message.message
content_json = json.loads(message.data.decode('utf-8'))
masked_data = self.deidentify_content_with_dlp(content_json)
logging.info(masked_data)
audio_file_name=masked_data['audio_file_name']
print("masked data is :" , masked_data)
dlp_list= masked_data['text'].split(" ")
for value in dlp_list:
if '#' in value:
count += 1
print("masked data count:", count)
insert_data = {
"audio_file_name": audio_file_name,
"PII_count": count
}
rows_to_insert_dlp_count.append(insert_data)
load_PII_count = client_bigquery.insert_rows_json(dlp_count_table_id, rows_to_insert_dlp_count)
count=0
##extract audio file name and the count value and insert to another bigquery table
subscriber.acknowledge(request={"subscription": subscription_path, "ack_ids": [received_message.ack_id]})
#inserts original data to a different bigquery table
rows_to_insert_raw.append(content_json)
load_raw = client_bigquery.insert_rows_json(raw_table_id, rows_to_insert_raw)
# Insert the masked data into dlp_masked_data table
rows_to_insert_masked.append(masked_data)
load_masked = client_bigquery.insert_rows_json(masked_table_id, rows_to_insert_masked)
rows_to_insert_raw = [] # Clear the list after inserting the rows
rows_to_insert_masked = [] # Clear the list after inserting the rows
rows_to_insert_dlp_count=[]
logging.info("Data processing workflow completed.")
time.sleep(2)
def run():
try:
parser = argparse.ArgumentParser()
parser.add_argument(
'--dfBucket',
required=True,
help= ('Bucket where JARS/JDK is present')
)
known_args, pipeline_args = parser.parse_known_args()
global df_Bucket
df_Bucket = known_args.dfBucket
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(StandardOptions).streaming = True
pcoll = beam.Pipeline(options=pipeline_options)
logging.info("Pipeline Starts")
dummy= pcoll | 'Initializing..' >> beam.Create(['1'])
dummy_env = dummy | 'Processing' >> beam.ParDo(readandwrite())
p=pcoll.run()
logging.info('Job Run Successfully!')
p.wait_until_finish()
except Exception as e:
logging.exception('Failed to launch datapipeline')
logging.exception('Failed to launch datapipeline: %s', str(e))
raise
if __name__ == '__main__':
run()
in the provided, data masking is occurring, data is loaded to bigQuery but the email function is not working. when tried running locally with Directrunner it is working as expected but not with dataflow runner.
run command :
python3 /home/sandeepdev751/dlp_dataflow.py --runner DataflowRunner --project sandeepdev --region us-central1 --job_name dlpcheckerv5 --temp_location gs://sandeepdev/temp --dfBucket "sandeepdev" --setup_file /home/sandeepdev751/setup.py
beam.io.ReadFromPubSub? For the email question, what kind of errors do you get? Can your Dataflow workers access that smtp server?