0

Considering a spark dataframe named employees such as this one :

+----------+-----+
|   name   | age |
+----------+-----+
|   John   | 32  |
| Elizabeth| 28  |
|   Eric   | 41  |
+----------+-----+

and an array of string state = ["LA", "AZ", "OH"], I want to append this array to df as a new column so the dataframe will then look like :

+----------+-----+-------+
|   name   | age | state |
+----------+-----+-------+
|   John   | 32  |   LA  |
| Elizabeth| 28  |   AZ  |
|   Eric   | 41  |   OH  |
+----------+-----+-------+

How can I achieve this in Scala (or Java, it's almost the same) ? I have only seen how to add same value to all rows on the net, and here I want different values for each one.

Thank you ! :)

3
  • 1
    How do you know that Jogn is LA, Elizabeth is AZ, and Eric is OH? The order? Commented Oct 20, 2021 at 14:06
  • @jgp I know/admit that the array values are in the same order as rows in the dataframe Commented Oct 20, 2021 at 14:12
  • 1
    Can you add the row number and join on them? Commented Oct 20, 2021 at 15:45

2 Answers 2

1

Since spark runs in distributed mode, you cannot add column based values on array with index. Suppose spark runs with two workers and John and Elizabeth deliver to worker A and Eric deliver to worker B. Indeed, they will split when save in dataframe. The workers don't know what is the index of John,Elizabeth or Eric. You can do what you want simply in a normal java single program.

In your example you need to convert your array to dataframe and use join to merge two dataframes based a column with the same value. However, you can use crossJoin to do a cartesian product on your tables.

Dataset<Row> ndf = df.crossJoin(df2);

If you need just add a column with a constant value or a value based another column on the same dataframe, use withColumn as below:

Dataset<Row> ndf = df.withColumn("city",functions.lit(1));
Dataset<Row> ndf = df.withColumn("city",functions.rand());
Dataset<Row> ndf = df.withColumn("city",functions.col("name"));

At last, you can use Atomic like this to get what you want. I test it in spark single mode.

    public static void main(String[] args) {
        System.setProperty("hadoop.home.dir", "H:\\work\\HadoopWinUtils\\");
        SparkSession spark = SparkSession
                .builder()
                .master("local[*]")
                .getOrCreate();

        Dataset<Row> df = spark.read().json("H:\\work\\HadoopWinUtils\\people.json");

        List<String> city_array = Arrays.asList("LA", "AZ", "OH");
        // Displays the content of the DataFrame to stdout
        df.show();
   
        df = df.withColumn("city",functions.col("name"));

        AtomicInteger i= new AtomicInteger();

        Dataset<Row> df3 = df.map((MapFunction<Row, Row>) value -> {
            return RowFactory.create(value.get(0),value.get(1),city_array.get(i.getAndIncrement()));
            //return city_array.get(i.getAndIncrement());
        }, RowEncoder.apply(df.schema()));

        df3.show();
    }

People is

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

and the result is

+----+-------+----+
| age|   name|city|
+----+-------+----+
|null|Michael|  LA|
|  30|   Andy|  AZ|
|  19| Justin|  OH|
+----+-------+----+
Sign up to request clarification or add additional context in comments.

1 Comment

I had a similar problem and your approach worked. Thank you! One issue, the order of the columns in the row, is there any way to take any Row with any number of columns and update the Column value there?
1

You can try something like this in pyspark.

>>> _TRANSFORMED_DF_SCHEMA = StructType([
...     StructField('name', StringType(), False),
...     StructField('age', IntegerType(), False),
...     StructField('id', IntegerType(), False),
...     StructField('state', StringType(), False),
... ])
>>> 
>>> state = ['LA', 'AZ', 'OH']
>>> data = (['John', 32], ['Eli', 28], ['Eric', 41])
>>> df = spark.createDataFrame(data, schema=['name', 'age'])
>>> rdd1 = df.rdd.zipWithIndex()
>>> df1 = rdd1.toDF()
>>> df1.show()
+----------+---+
|        _1| _2|
+----------+---+
|[John, 32]|  0|
| [Eli, 28]|  1|
|[Eric, 41]|  2|
+----------+---+

>>> df_final = df1.select(df1['_1']['name'].alias('name'), df1['_1']['age'].alias('age'), df1['_2'].alias('id'))
>>> df_final.show()
+----+---+---+
|name|age| id|
+----+---+---+
|John| 32|  0|
| Eli| 28|  1|
|Eric| 41|  2|
+----+---+---+

>>> def add_state(row_dict):
...     new_dict = dict()
...     new_dict['name'] = row_dict['name']
...     new_dict['age'] = row_dict['age']
...     new_dict['id'] = row_dict['id']
...     new_dict['state'] = state[row_dict['id']]
...     return new_dict
... 
>>> df_rdd = df_final.rdd.map(add_state)
>>> df_final = spark.createDataFrame(df_rdd, schema=_TRANSFORMED_DF_SCHEMA)
>>> df_final.show()
+----+---+---+-----+
|name|age| id|state|
+----+---+---+-----+
|John| 32|  0|   LA|
| Eli| 28|  1|   AZ|
|Eric| 41|  2|   OH|
+----+---+---+-----+

3 Comments

Thank you, I'll look at it, but does it work the same in Scala ? I can only use Scala for that project.
It will work in Scala but you will have to do some syntactical changes.
try something like this. val df3 = df_final.map(row=>{ val state_col = state[row.getString(2)] }) val df3Map = df3.toDF("state_col", "name", "age", "id")

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.