Using PySpark to make Date table in Incorta
Date dimension almost exists in all analytics solutions to allow navigation of the fact table through familiar dates, months, fiscal periods, and special days on the calendar. The calendar date dimension typically has many columns that describe characteristics such as date, day number, week of the month, week of the year, etc.
I want to use PySpark to make it in Incorta, and I was able to create the same data using a PySpark program.
Below is the code I used.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyspark.sql import * | |
from pyspark.sql.types import * | |
from datetime import date,timedelta | |
import holidays | |
from pyspark.sql.functions import * | |
date_start=date(1980,1,1) | |
date_end=date(2060,12,31) | |
delta=date_end-date_start | |
cData=[] | |
i=0 | |
while i <= delta.days: | |
r=[] | |
r.append(i) | |
cData.append(r) | |
i += 1 | |
cSchema = StructType([StructField("idx", IntegerType())]) | |
df_idx = spark.createDataFrame(cData,schema=cSchema) | |
df_idx.registerTempTable("TBL") | |
date_range = spark.sql( | |
''' | |
SELECT date Date1, | |
datediff(date, '1979-12-31') Day_Number, | |
date_format(date, 'EEEE') Day_Name, | |
concat('Week-',date_format(date, 'W')) week_of_month, | |
concat('Week-',date_format(date, 'w')) week_of_year, | |
date_sub(date, dayofweek(date)-1) Week_Start_Date, | |
date_format(date, 'MMMMM') Month_Name, | |
month(date) Month_Number, | |
((year(date)*12) + month(date) - (1980*12)) Month_Seq, | |
date_format(date, 'MMM-yy') Month_Year, | |
date_format(date, 'yyyyMM') Month_Year_Number, | |
date_add(last_day(add_months(date, -1)),1) Month_Start_date, | |
last_day(date) Month_End_date, | |
concat( concat('Q', quarter(date)),'/', year(date)) QuarterName, | |
concat('Q', quarter(date)) Quarter_Number, | |
CAST(concat(year(date), quarter(date)) AS INT) AS Quarter_Year_Number, | |
add_months(trunc(date, 'YEAR'),((quarter(date) -1) * 3)) Quarter_Start_date, | |
case when quarter(date) = 1 then concat('3/31/', date_format(date, 'yy')) | |
when quarter(date) = 2 then concat('6/30/', date_format(date, 'yy')) | |
when quarter(date) = 3 then concat('9/30/', date_format(date, 'yy')) | |
when quarter(date) = 4 then concat('12/31/', date_format(date, 'yy')) | |
else null | |
end Quarter_End_date, | |
year(date) Year, | |
trunc(date, 'YEAR') Year_Start_date, | |
concat('12/31/', year(date)) Year_End_date, | |
datediff(date, '1980-01-01') day_seq, | |
floor ((datediff(date, '1979-12-31') + 1)/7) week_seq, | |
(year(trunc(date, 'YEAR')) - 1980) year_seq, | |
date_add(date,-7) week_ago_date, | |
date_add(date,-1) day_ago_date, | |
add_months(date, -1) month_ago_date, | |
add_months(date, -3) quarter_ago_date, | |
add_months(date, -12) year_ago_date | |
FROM ( SELECT date_add(TO_DATE('01-Jan-1980','dd-MMM-yyyy'),idx) date | |
FROM TBL | |
) | |
''' | |
) | |
date_range.registerTempTable("Range_TBL") | |
us_holidays = holidays.UnitedStates(years=range(1980,2061)) | |
schema = StructType([ | |
StructField('us_holidays', StringType(), True) | |
]) | |
df_hoildays = sc.parallelize([(k,)+(v,) for k,v in us_holidays.items()]).toDF(['date','holliday_name']) | |
df_hoildays=df_hoildays.withColumnRenamed("date","date2") | |
df_hoildays.registerTempTable("Holiday_TBL") | |
#df=date_range.join(df_hoildays, "date_range.Date1 = df_hoildays.Date", 'inner') | |
df = spark.sql( | |
''' | |
select Range_TBL.* , | |
Holiday_TBL.holliday_name, | |
case when holliday_name is not null then 'HOLIDAY' | |
when Day_Name = 'Saturday' then 'WEEKEND' | |
when Day_Name = 'Sunday' then 'WEEKEND' | |
when Day_Name = 'Monday' then 'WEEKDAY' | |
when Day_Name = 'Tuesday' then 'WEEKDAY' | |
when Day_Name = 'Wednesday' then 'WEEKDAY' | |
when Day_Name = 'Thursday' then 'WEEKDAY' | |
when Day_Name = 'Friday' then 'WEEKDAY' | |
else null | |
end | |
day_type, | |
SUM(case when holliday_name is not null then 0 | |
when Day_Name = 'Saturday' then 0 | |
when Day_Name = 'Sunday' then 0 | |
else 1 | |
end | |
) | |
over ( ORDER BY Date1 ROWS BETWEEN unbounded preceding AND CURRENT ROW ) workday_seq | |
from | |
Range_TBL | |
left join | |
Holiday_TBL | |
on Range_TBL.Date1 = Holiday_TBL.date2 | |
''' | |
) | |
df = df.withColumnRenamed('Date1', 'Date') | |
save(df) |
Here, it should be noted that the date I used is 1980-2060. If you need to change the year, please pay attention to replace all the places in the code that need to give the year to the year you want.
Explore Data in Incorta.
Here is a screenshot of the dashboard.
Comments
Post a Comment