The trick is to transform your data from dumb string columns into a more useable data structure. Once col1 and col2 are rebuilt as arrays (or as a map, as your desired output suggests they should be), you can use Spark's built-in functions rather than a messy UDF as suggested by @baitmbarek.
To start, use trim and split to convert col1 and col2 to arrays:
scala> val df = Seq(
| ("1", """["x","y","z"]""","""[123,"null","null"]"""),
| ("2", """["a","y","z"]""","""[123,"null","null"]""")
| ).toDF("id","col1","col2")
df: org.apache.spark.sql.DataFrame = [id: string, col1: string ... 1 more field]
scala> val df_array = df.withColumn("col1", split(trim($"col1", "[\"]"), "\"?,\"?"))
.withColumn("col2", split(trim($"col2", "[\"]"), "\"?,\"?"))
df_array: org.apache.spark.sql.DataFrame = [id: string, col1: array<string> ... 1 more field]
scala> df_array.show(false)
+---+---------+-----------------+
|id |col1 |col2 |
+---+---------+-----------------+
|1 |[x, y, z]|[123, null, null]|
|2 |[a, y, z]|[123, null, null]|
+---+---------+-----------------+
scala> df_array.printSchema
root
|-- id: string (nullable = true)
|-- col1: array (nullable = true)
| |-- element: string (containsNull = true)
|-- col2: array (nullable = true)
| |-- element: string (containsNull = true)
From here, you should be able to achieve what you want using array_position to find the index of 'x' (if any) in col1 and retrieve the matching data from col2. However, converting the two arrays into a map first should make it clearer to understand what your code is doing:
scala> val df_map = df_array.select(
$"id",
map_from_entries(arrays_zip($"col1", $"col2")).as("col_map")
)
df_map: org.apache.spark.sql.DataFrame = [id: string, col_map: map<string,string>]
scala> df_map.show(false)
+---+--------------------------------+
|id |col_map |
+---+--------------------------------+
|1 |[x -> 123, y -> null, z -> null]|
|2 |[a -> 123, y -> null, z -> null]|
+---+--------------------------------+
scala> val df_final = df_map.select(
$"id",
when(isnull(element_at($"col_map", "x")),
array(lit("null")))
.otherwise(
array(lit("x")))
.as("col1"),
when(isnull(element_at($"col_map", "x")),
array(lit("null")))
.otherwise(
array(element_at($"col_map", "x")))
.as("col2")
)
df_final: org.apache.spark.sql.DataFrame = [id: string, col1: array<string> ... 1 more field]
scala> df_final.show
+---+------+------+
| id| col1| col2|
+---+------+------+
| 1| [x]| [123]|
| 2|[null]|[null]|
+---+------+------+
scala> df_final.printSchema
root
|-- id: string (nullable = true)
|-- col1: array (nullable = false)
| |-- element: string (containsNull = false)
|-- col2: array (nullable = false)
| |-- element: string (containsNull = true)