0

In PySpark, how do I convert a Dataframe to normal String?

Background:

I'm using PySpark with Kafka and instead of hard coding broker name, I have parameterized Kafka broker name in PySpark.

Json file is holding the Broker details and Spark read this Json input and assign values to variable. These variables are of Dataframe type with String.

I'm facing issue when I pass dataframe to Pyspark-Kakfa connection details to substitute the values.

Error :

Can only concatenate String (Not a Dataframe) to String.

Json parameter file :

{
"broker": "https://at.com:8082",
"topicname": "dev_hello"
}

PySpark Code :

parameter = spark.read.option("multiline", "true").json("/at/dev_parameter.json")

kserver = parameter.select("broker")

ktopic = parameter.select("topicname")


df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")
   
.write

   .format("kafka")

   .outputMode("append")

   .option("kafka.bootstrap.servers", "f"+ **kserver**)

   .option("topic", "josn_data_topic",**ktopic** )

   .save()

Please advise on it.

my second query is how do I pass these Python based variables to another Scala based Spark notebook.

1
  • You can't pass Python variables to a Scala notebook. But you can write records to Kafka then consume them from Scala Commented Mar 6, 2021 at 15:54

1 Answer 1

1

Use json.load instead of Spark json reader:

import json

with open("/at/dev_parameter.json") as f:
    parameter = json.load(f)

kserver = parameter["broker"]
ktopic = parameter["topicname"]

df.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
  .write \
  .format("kafka") \
  .outputMode("append") \
  .option("kafka.bootstrap.servers", kserver) \
  .option("topic", ktopic) \
  .save()

If you prefer using Spark json reader, you can do:

parameter = spark.read.option("multiline", "true").json("/at/dev_parameter.json")
kserver = parameter.select("broker").head()[0]
ktopic = parameter.select("topicname").head()[0]
Sign up to request clarification or add additional context in comments.

4 Comments

Thanks for your comment. Reading Json through Spark is easy as there are some nested element too which I’m fetching and it’s easy with spark DF. There must be a way to Pass DF to String.
@Abhi I have added a way to do that. See if that helps?
Thanks for your time mck. I tried the option you proposed however datatype for Kserver is a dataframe and String does not allow to concatenate with Dataframe. if you try below code in Pyspark. it will fail. parameter = spark.read.option("multiline", "true").json("/at/dev_parameter.json") kserver = parameter.select("broker").head()[0] ktopic = parameter.select("topicname").head()[0] server = "dev" final_broker= server+ kserver
kserver should be a string, because there is .head()[0] called. Could you do kserver = parameter.select("broker").head()[0] and print(type(kserver))?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.