1

Is it possible to do aggregation on a partial dataframe? Or is it possible to efficiently split a dataframe given conditions?

Say that I have a dataframe like the below:

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1588119659000| 3489692692      |        3.0|   0.239999     |   11.2699 |   
|1587497991000| 3489692692      |        2.0|   0.159999     |   21.6999 | 
|1587864812000| 3489692692      |        2.0|   0.959999     |   359.649 |  
|1587581329000| 3489692692      |        1.0|   1.039999     |   336.209 |  
|1587581329000| 3489692692      |        3.0|   1.039999     |   336.299 |   
|1587581329000| 3489692692      |        1.0|   2.799999     |   336.209 |   

Is there an efficient way to split by 'event', assuming an event starts with state=3 and ends with state=1 I would like to have smaller dataframes of anything contained between those states, in this small case:

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1588119659000| 3489692692      |        3.0|   0.239999     |   11.2699 |   
|1587497991000| 3489692692      |        2.0|   0.159999     |   21.6999 | 
|1587864812000| 3489692692      |        2.0|   0.959999     |   359.649 |  
|1587581329000| 3489692692      |        1.0|   1.039999     |   336.209 | 

and

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1587581329000| 3489692692      |        3.0|   1.039999     |   336.299 |   
|1587581329000| 3489692692      |        1.0|   2.799999     |   336.209 |  

My end goal is to have another dataframe that does aggregations on the values based on start and end epoch, something like:

+-------------+---------------+-------------+--------------+-------------+
|  ID         |start epoch    |end_epoch    | max(value 1) | max(value 2)|
+-------------+---------------+-------------+--------------+-------------+
|3489692692   |1588119659000  |1587581329000|1.039999      |359.649      |
|3489692692   |1587581329000  |1587581329000|2.799999      |336.299      |

Previously when I was not handling too much data I was using pandas to iterate over the dataframe and constructing the new dataframe row by row, but yeah, that is not very efficient. Any hint to point me in the right direction would be appreciated.

-------###UPDATE###----------

I guess the below is a better sample of the data I'm working with:

+-------------+-----------------+-----------+----------------+-----------+
|     epoch_ms|ID               | state     | value 1        | value 2   |
+-------------+-----------------+-----------+----------------+-----------+
|1585766054000| 3489692692      |        3.0|   0.159999     |   7.58996 |
|1585766055000| 3489692692      |        3.0|   0.239999     |   11.2699 |  
|1585766058000| 3489692692      |        3.0|   0.135489     |   13.8790 |
|1587497991000| 3489692692      |        2.0|   0.159999     |   21.6999 | 
|1587864812000| 3489692692      |        2.0|   0.959999     |   359.649 |  
|1587581329000| 3489692692      |        1.0|   1.039999     |   336.209 |  
|1587581339000| 3489692692      |        3.0|   1.039999     |   336.299 | 
|1587581329000| 3489692692      |        1.0|   2.799999     |   336.209 |
|1588088096000| 3489692670      |        3.0|   2.869564     |   285.963 |
|1588088099000| 3489692670      |        2.0|   0.758753     |   299.578 |
|1588088199000| 3489692670      |        1.0|   3.965424     |   5.89677 |

Things to consider:

  • An event starts with state 3 and ends with state 1
  • States can repeat, for example state 3 or 2 can show up multiple times after the start, but the event must comprise them all until state 1 shows up.
  • Other states after state 1 can occur, state one multiple times or state 2, but the next event wont start until the state is three again, anything bewteen state 1 and 3 (end of previous event and start of new event) should be ignored.
  • If the dataframe ends with a state other than 3 it should be assumed a three happens at the end.
  • Multiple IDs are possible, and the dataframe is ordered by both epoch and id.

The results for the above sample should be something like:

+-------------+---------------+-------------+--------------+-------------+
|  ID         |start epoch    |end_epoch    | max(value 1) | max(value 2)|
+-------------+---------------+-------------+--------------+-------------+
|3489692692   |1585766054000  |1587581329000|1.039999      |359.649      |
|3489692692   |1587581339000  |1587581329000|2.799999      |336.299      |
|3489692670   |1588088096000  |1588088199000|3.965424      |299.578      |

1 Answer 1

1

Splitting will be counter intuitive, you should express your logic using pyspark in-built aggregation functions(window + groupBy). As long as data is ordered in the way you presented, the code will work fine(because it is not possible to ascertain ordering as for some rows you have different epoch_ms for the same state(row 2,3). And the logic is to use an incremental sum using a condition on state to fish out your grouping for start/end. Try it and lmk.

df.show() #sampledata
#+-------------+----------+-----+--------+-------+
#|     epoch_ms|        ID|state| value 1|value 2|
#+-------------+----------+-----+--------+-------+
#|1588119659000|3489692692|  3.0|0.239999|11.2699|
#|1587497991000|3489692692|  2.0|0.159999|21.6999|
#|1587864812000|3489692692|  2.0|0.959999|359.649|
#|1587581329000|3489692692|  1.0|1.039999|336.209|
#|1587581329000|3489692692|  3.0|1.039999|336.299|
#|1587581329000|3489692692|  1.0|2.799999|336.209|
#+-------------+----------+-----+--------+-------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().partitionBy("ID").orderBy(F.lit(1))
w2=Window().partitionBy("ID").orderBy("rowNum")

df.withColumn("rowNum", F.row_number().over(w))\
  .withColumn("inc_sum", F.sum(F.when(F.col("state")==3,F.lit(1)).otherwise(F.lit(0))).over(w2))\
  .groupBy("inc_sum").agg(F.first("ID").alias("ID"),\
                          F.max("epoch_ms").alias("start_epoch"),\
                          F.min("epoch_ms").alias("end_epoch"),F.max("value 1").alias("max_value1"),\
                          F.max("value 2").alias("max_value2")).drop("inc_sum").show()

#+-------+----------+-------------+-------------+----------+----------+
#|inc_sum|        ID|  start_epoch|    end_epoch|max_value1|max_value2|
#+-------+----------+-------------+-------------+----------+----------+
#|      1|3489692692|1588119659000|1587497991000|  1.039999|   359.649|
#|      2|3489692692|1587581329000|1587581329000|  2.799999|   336.299|
#+-------+----------+-------------+-------------+----------+----------+

UPDATE:

Try this. I use lag condition!=3 with state=3 condition to single out the start of the event and then an incremental sum on it to get our groups.

df.show() #sampledata
#+-------------+----------+-----+--------+-------+
#|     epoch_ms|        ID|state| value 1|value 2|
#+-------------+----------+-----+--------+-------+
#|1585766054000|3489692692|  3.0|0.159999|7.58996|
#|1585766055000|3489692692|  3.0|0.239999|11.2699|
#|1585766058000|3489692692|  3.0|0.135489| 13.879|
#|1587497991000|3489692692|  2.0|0.159999|21.6999|
#|1587864812000|3489692692|  2.0|0.959999|359.649|
#|1587581329000|3489692692|  1.0|1.039999|336.209|
#|1587581339000|3489692692|  3.0|1.039999|336.299|
#|1587581329000|3489692692|  1.0|2.799999|336.209|
#|1588088096000|3489692670|  3.0|2.869564|285.963|
#|1588088099000|3489692670|  2.0|0.758753|299.578|
#|1588088199000|3489692670|  1.0|3.965424|5.89677|
#+-------------+----------+-----+--------+-------+

from pyspark.sql import functions as F
from pyspark.sql.window import Window

w=Window().orderBy("rowNum")

df.withColumn("rowNum", F.monotonically_increasing_id())\
  .withColumn("inc_sum", F.sum(F.when((F.col("state")==3) & (F.lag("state").over(w)!=3)\
                                      ,F.lit(1)).otherwise(F.lit(0)))\
                                       .over(w))\
    .groupBy("inc_sum").agg(F.first("ID").alias("ID"),\
                          F.first("epoch_ms").alias("start_epoch"),\
                          F.last("epoch_ms").alias("end_epoch"),F.max("value 1").alias("max_value1"),\
                          F.max("value 2").alias("max_value2")).drop("inc_sum").show()

#+----------+-------------+-------------+----------+----------+
#|        ID|  start_epoch|    end_epoch|max_value1|max_value2|
#+----------+-------------+-------------+----------+----------+
#|3489692692|1585766054000|1587581329000|  1.039999|   359.649|
#|3489692692|1587581339000|1587581329000|  2.799999|   336.299|
#|3489692670|1588088096000|1588088199000|  3.965424|   299.578|
#+----------+-------------+-------------+----------+----------+
Sign up to request clarification or add additional context in comments.

6 Comments

This is definitely a step in the right direction but I'm getting some unexpected results, and i think that it is because state can be repeated multiple times when events last for a long time. Can you think of a way to account for this?
So the change of states can happen multiple times The same state can be logged multiple times on a row for events that last longer
The logic depends on the start of the state which is 3, so you are saying that 3 can appear multiple times in consecutive rows? Lmk if that’s the case n I will update solution
Also would help if u could provide a few rows of the events you are referring to
Thanks so much for your time here friend. I will update the question itself above ^^
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.