2

I have the following code to do the implementation of having multiple condition columns in a single dataframe.

small_list = ["INFY","TCS", "SBIN", "ICICIBANK"]
frame = spark_frame.where(col("symbol") == small_list[0]).select('close')
## spark frame is a pyspark.sql.dataframe.DataFrame

for single_stock in small_list[1:]:
    print(single_stock)
    current_stock = spark_frame.where(col("symbol") == single_stock).select(['close'])
    current_stock.collect()
    frame.collect()
    frame = frame.withColumn(single_stock, current_stock.close)

But when I do frame.collect, I get:

[Row(close=736.85, TCS=736.85, SBIN=736.85, ICICIBANK=736.85),
 Row(close=734.7, TCS=734.7, SBIN=734.7, ICICIBANK=734.7),
 Row(close=746.0, TCS=746.0, SBIN=746.0, ICICIBANK=746.0),
 Row(close=738.85, TCS=738.85, SBIN=738.85, ICICIBANK=738.85)]

Which is wrong since all the values belong to the first reference. What am I doing wrong, and what is the best way to solve this one?

Edit: The spark_frame looks like this

[Row(SYMBOL='LINC', SERIES='  EQ', TIMESTAMP=datetime.datetime(2021, 12, 20, 0, 0), PREVCLOSE=235.6, OPEN=233.95, HIGH=234.0, LOW=222.15, LAST=222.15, CLOSE=224.2, AVG_PRICE=226.63, TOTTRDQTY=6447, TOTTRDVAL=14.61, TOTALTRADES=206, DELIVQTY=5507, DELIVPER=85.42),
 Row(SYMBOL='LINC', SERIES='  EQ', TIMESTAMP=datetime.datetime(2021, 12, 21, 0, 0), PREVCLOSE=224.2, OPEN=243.85, HIGH=243.85, LOW=222.85, LAST=226.0, CLOSE=225.6, AVG_PRICE=227.0, TOTTRDQTY=8447, TOTTRDVAL=19.17, TOTALTRADES=266, DELIVQTY=3401, DELIVPER=40.26),
 Row(SYMBOL='SCHAEFFLER', SERIES='  EQ', TIMESTAMP=datetime.datetime(2020, 8, 6, 0, 0), PREVCLOSE=3593.9, OPEN=3611.85, HIGH=3618.35, LOW=3542.5, LAST=3594.95, CLOSE=3573.1, AVG_PRICE=3580.73, TOTTRDQTY=12851, TOTTRDVAL=460.16, TOTALTRADES=1886, DELIVQTY=9649, DELIVPER=75.08),
 Row(SYMBOL='SCHAEFFLER', SERIES='  EQ', TIMESTAMP=datetime.datetime(2020, 8, 7, 0, 0), PREVCLOSE=3573.1, OPEN=3591.0, HIGH=3591.0, LOW=3520.0, LAST=3548.95, CLOSE=3543.85, AVG_PRICE=3554.6, TOTTRDQTY=2406, TOTTRDVAL=85.52, TOTALTRADES=688, DELIVQTY=1452, DELIVPER=60.35)]

Expected results should look like this:

[Row(LINC=224.2, SCHAEFFLER=3573.1,
 Row(LINC=225.6, SCHAEFFLER=3543.85)]
4
  • Can you give some extra details? How does the data in spark_frame look? Commented Dec 27, 2022 at 15:24
  • @gamezone25 Thank you for your respinse I added a sample for spark_frame Commented Dec 27, 2022 at 15:34
  • 1
    could you add your expected result as well? also when you want to peak data, you can use spark_frame.show() which will try to show the subset of data, while collect() will collect and show all data which could result in OOM with larger data. Commented Dec 27, 2022 at 15:49
  • @Emma Thank you for your comment, added the expected result. Thanks Commented Dec 27, 2022 at 16:32

2 Answers 2

2

I have completely edited the answer based on new understandings.

To do what you described in the comments, you want to pivot the table based on the closing price and stock symbols. The way to do it:

Input data (slightly modified for testing purposes):

+------+------+-------------------+---------+------+------+------+------+-----+---------+---------+---------+-----------+--------+--------+
|SYMBOL|SERIES|          TIMESTAMP|PREVCLOSE|  OPEN|  HIGH|   LOW|  LAST|CLOSE|AVG_PRICE|TOTTRDQTY|TOTTRDVAL|TOTALTRADES|DELIVQTY|DELIVPER|
+------+------+-------------------+---------+------+------+------+------+-----+---------+---------+---------+-----------+--------+--------+
|  INFY|    EQ|2021-12-20 00:00:00|    235.6|233.95| 234.0|222.15|222.15|224.2|   226.63|     6447|    14.61|        206|    5507|   85.42|
|  LINC|    EQ|2021-12-21 00:00:00|    224.2|243.85|243.85|222.85| 226.0|225.6|    227.0|     8447|    19.17|        266|    3401|   40.26|
|  LINC|    EQ|2021-12-21 00:00:00|    224.2|243.85|243.85|222.85| 226.0|224.2|    227.0|     8447|    19.17|        266|    3401|   40.26|
+------+------+-------------------+---------+------+------+------+------+-----+---------+---------+---------+-----------+--------+--------+

Here is the code:

import datetime
from pyspark.sql.functions import first, col

data = [Row(SYMBOL='INFY', SERIES='  EQ', TIMESTAMP=datetime.datetime(2021, 12, 20, 0, 0), PREVCLOSE=235.6, OPEN=233.95,
            HIGH=234.0, LOW=222.15, LAST=222.15, CLOSE=224.2, AVG_PRICE=226.63, TOTTRDQTY=6447, TOTTRDVAL=14.61,
            TOTALTRADES=206, DELIVQTY=5507, DELIVPER=85.42),
        Row(SYMBOL='LINC', SERIES='  EQ', TIMESTAMP=datetime.datetime(2021, 12, 21, 0, 0), PREVCLOSE=224.2, OPEN=243.85,
            HIGH=243.85, LOW=222.85, LAST=226.0, CLOSE=225.6, AVG_PRICE=227.0, TOTTRDQTY=8447, TOTTRDVAL=19.17,
            TOTALTRADES=266, DELIVQTY=3401, DELIVPER=40.26),
        Row(SYMBOL='LINC', SERIES='  EQ', TIMESTAMP=datetime.datetime(2021, 12, 21, 0, 0), PREVCLOSE=224.2, OPEN=243.85,
            HIGH=243.85, LOW=222.85, LAST=226.0, CLOSE=224.2, AVG_PRICE=227.0, TOTTRDQTY=8447, TOTTRDVAL=19.17,
            TOTALTRADES=266, DELIVQTY=3401, DELIVPER=40.26)]

small_list = ['INFY', 'TCS', 'SBIN', 'LINC']

spark_frame = spark.createDataFrame(data)

# Initial data
spark_frame.show()

pivoted_df = spark_frame.groupBy('close').pivot('symbol').agg(first('avg_price'))

select_columns = [single_stock for single_stock in small_list if single_stock in pivoted_df.columns]

pivoted_df = pivoted_df.select('close', *select_columns)

# Output data
pivoted_df.show()
print(pivoted_df.collect())  # Don't use this on production data, you could get OOM on the driver node.

Output example:

+-----+------+-----+
|close|  INFY| LINC|
+-----+------+-----+
|224.2|226.63|227.0|
|225.6|  null|227.0|
+-----+------+-----+


[Row(close=224.2, INFY=226.63, LINC=227.0), 
 Row(close=225.6, INFY=None, LINC=227.0)]

You may need to tweak it a bit to get the logic in the aggregate to calculate what you need specifically.

Do not use collect in production because it collects all the data on the driver, which will probably result in an OOM exception.

Sign up to request clarification or add additional context in comments.

9 Comments

Thank you for your answer. I actually need to find correlated ones, therefore, moving them to be in parallel. but I attempted to run your code, and wasn't able to get it to run.
Just saw your edit to the question. First, it doesn't work since I've tried to use WHERE instead of WHEN from pyspark.sql.functions. Second, I didn't completely know how the data looks, but it is now present in your edit. Please provide the preferred output you are trying to achieve, and I can try and answer your question.
I have added the expected results, thanks
I have added an "edit" (completely new answer) based on your example output.
Thank you for updating the answer, unfortunately I get this in response: ``` +-----+----+----+----+---------+ |close|INFY| TCS|SBIN|ICICIBANK| +-----+----+----+----+---------+ |10.65|null|null|null| null| | 13.4|null|null|null| null| | 14.9|null|null|null| null| | 15.5|null|null|null| null| ``` which is not the desired value unfortunately.
|
0

I was finally able to circumvent this problem by doing something like the following, I still feel there is a possibility of a more optimal solution but this also works:

small_list = ["INFY","TCS", "SBIN", "ICICIBANK"]
frame = spark_frame.filter(col('symbol')==small_list[0]).select([col('close').alias(single_stock), 'timestamp'])
# frame.withColumnRenamed('close', small_list[0])

for single_stock in small_list[1:]:
    print(single_stock)
    current_stock = spark_frame.filter(col('symbol')==single_stock).select(['close', 'timestamp'])
    
    frame = frame.join(current_stock, "timestamp", "inner")
    

The outcome looks something like:

+-------------------+------+-------+------+---------+
|          timestamp| close|    TCS|  SBIN|ICICIBANK|
+-------------------+------+-------+------+---------+
|2020-01-01 00:00:00|736.85| 2167.6|334.45|   536.75|
|2020-01-02 00:00:00| 734.7|2157.65| 339.3|    540.6|
|2020-01-03 00:00:00| 746.0|2200.65| 333.7|   538.85|
+-------------------+------+-------+------+---------+

1 Comment

@ms12 this is what I did for now, do suggest if there is a better way for this one.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.