0

The following function results in outputting JSON results.

def myfunc():
  with ServiceBusClient.from_connection_string(CONNECTION_STR) as client:
      # max_wait_time specifies how long the receiver should wait with no incoming messages before stopping receipt.
      # Default is None; to receive forever.

        with client.get_queue_receiver(QUEUE_NAME, session_id=session_id, max_wait_time=5) as receiver:
          for msg in receiver:
              # print("Received: " + str(msg))
              themsg = json.loads(str(msg))
              # complete the message so that the message is removed from the queue
              receiver.complete_message(msg)
              return themsg

result = myfunc()

The following is snippet of the JSON output.

Out[65]: {'name': 'dmMapping_DIM_WORK_ORDER',
 'description': 'DIM_WORK_ORDER Azure Foundation to Azure Data Mart Mapping',
 'version': '2.4',
 'updateDttm': '01/02/2022 14:46PM',
 'SCDType': 4,
 'mappings': [{'ELLIPSE': {'method': 'ellipseItem',
    'tables': [{'database': 'foundation',
      'schema': 'AZ_FH_ELLIPSE',
      'table': 'ADS_FND_MSF620',
      'primaryKey': [{'column': 'WORK_ORDER'}]}],
    'columns': [{'column': 'D_WORK_ORDER_KEY',
      'type': 'int',
      'allowNulls': 'No',
      'mapType': 'autoGenerate'},
     {'column': 'SYSTEM_OF_RECORD',
      'type': 'varchar',
      'length': 24,
      'allowNulls': 'No',
      'mapType': 'staticValue',
      'value': 'ELLIPSE'},
     {'column': 'ACTUAL_FINISH_DATE',

When I attempt to save the output with following

result.write.save().json('/mnt/lake/RAW/FormulaClassification/F1Area/')

I get the error:

AttributeError: 'dict' object has no attribute 'write'

Can someone let me know how overcome this error?

1 Answer 1

1

The simplest way is just write data as JSON, without using Spark:

with open("/dbfs/mnt/lake/RAW/FormulaClassification/F1Area/<file-name>", "w") as file:
  file.write(json.dumps(result))

You can still use Spark API, but for one message it would be the overkill:

rdd = sc.parallelize([json.dumps(result)])
spark.read.json(rdd) \
  .write.mode("append").json('/mnt/lake/RAW/FormulaClassification/F1Area/')
Sign up to request clarification or add additional context in comments.

3 Comments

Perfect. Thanks again Alex
Quick question, what is sc in sc.parallelize?
Default name in notebook/shell for instance of SparkContext

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.