1

And I need to extract from utc_timestamp its date and its hour into two different columns depending on time zone. Time zone name is defined by id from configuration const variable.

    Input DF                                           Output DF
+-------------+--+                         +-------------+--+----------+----+
|utc_timestamp|id|                         |utc_timestamp|id|date      |hour|
+-------------+--+                         +-------------+--+----------+----|
|1608000000782|1 |                         |1608000000782|1 |2020-12-14|20  |
+-------------+--+                         +-------------+--+----------+----+
|1608000240782|2 |                         |1608000240782|2 |2020-12-15|11  |
+-------------+--+                         +-------------+--+----------+----+

I have pandas_udf that allows me to extract one column at a time and I have to create it twice:

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import DateType, IntegerType 
import pandas as pd
import pytz

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}


class TimezoneUdfProvider(object):
    def __init__(self):
        self.extract_date_udf = pandas_udf(self._extract_date, DateType(), PandasUDFType.SCALAR)
        self.extract_hour_udf = pandas_udf(self._extract_hour, IntegerType(), PandasUDFType.SCALAR)
        
     def _extract_date(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_date(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

     def _extract_hour(self, utc_timestamps: pd.Series, ids: pd.Series) -> pd.Series:
         return pd.Series([extract_hour(c1, c2) for c1, c2 in zip(utc_timestamps, ids)])

def extract_date(utc_timestamp: int, id: str):
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).date()

def extract_hour(utc_timestamp: int, id: str) -> int:
    timezone_name = TIMEZONE_LIST[id]
    timezone_nw = pytz.timezone(timezone_name)
    return pd.datetime.fromtimestamp(utc_timestamp / 1000e00, tz=timezone_nw).hour
    

def extract_from_utc(df: DataFrame) -> DataFrame:
     timezone_udf1 = TimezoneUdfProvider()
     df_with_date = df.withColumn('date', timezone_udf1.extract_date_udf(f.col(utc_timestamp), f.col(id)))
     timezone_udf2 = TimezoneUdfProvider()
     df_with_hour = df_with_date.withColumn('hour', timezone_udf2.extract_hour_udf(f.col(utc_timestamp), f.col(id)))
    return df_with_hour

Is there a better way to do it? Without a need to use the same udf provider twice?

0

1 Answer 1

1

you can do this without using udf using spark inbuilt functions.

We can use create_map to map the dictionary and create new timezone column , then convert using from_unixtime and from_utc_timestamp using the timezone as the newly mapped column. Once we have the timestamp as per the timezones, we can then fetch Hour and date feilds.

TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}

import pyspark.sql.functions as F
from itertools import chain

map_exp = F.create_map([F.lit(i) for i in chain(*TIMEZONE_LIST.items())])


final = (df.withColumn("TimeZone", map_exp.getItem(col("id")))
          .withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()

(3) Spark Jobs
final:pyspark.sql.dataframe.DataFrame = [utc_timestamp: long, id: long ... 3 more fields]

+-------------+---+---------------+----------+----+
|utc_timestamp| id|       TimeZone|      date|Hour|
+-------------+---+---------------+----------+----+
|1608000000782|  1|America/Chicago|2020-12-14|  20|
|1608000240782|  2|     Asia/Tokyo|2020-12-15|  11|
+-------------+---+---------------+----------+----+

EDIT: replacing create_map with a udf:

import pyspark.sql.functions as F
from pyspark.sql.functions import StringType
TIMEZONE_LIST = {1: 'America/Chicago', 2: 'Asia/Tokyo'}
def fun(x):
  return TIMEZONE_LIST.get(x,None)
map_udf = F.udf(fun,StringType())


final = (df.withColumn("TimeZone", map_udf("id")).withColumn("Timestamp",
   F.from_utc_timestamp(F.from_unixtime(F.col("utc_timestamp")/1000),F.col("TimeZone")))
   .withColumn("date",F.to_date("Timestamp")).withColumn("Hour",F.hour("Timestamp"))
   .drop("Timestamp"))

final.show()
Sign up to request clarification or add additional context in comments.

9 Comments

create_map does not work for me, I receive an error: Traceback (most recent call last): File "<input>", line 1, in <module> File "<input>", line 1, in <listcomp> File "/home/ninav/venvs/acutecare-rdb-nlp-processor/lib/python3.6/site-packages/pyspark/sql/functions.py", line 44, in _ jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) AttributeError: 'NoneType' object has no attribute '_jvm'
I tried on the example TIMEZONE_LIST and received an error
@NinaVolfenzon Updated my answer where I used an udf instead of create_map, can you try the second method on the example dataframe you shared?
I did the import the same way you showed in your first example. I'm using python 3.6, could this be the reason for my error?
@NinaVolfenzon Are you still getting the error when you use the second method shown that uses a udf?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.