Using the pickle library to save the model and use the model in Incorta
In the previous blog, I was Using Time Series Analysis By Prophet. https://suziepyspark.blogspot.com/2021/03/using-time-series-analysis-by-prophet.html
In this blog, I will use Pickle to Save and use the model in Incorta.
In machine learning, we often need to store the trained model so that we can directly read the model when making a decision without retraining the model, which greatly saves time. The pickle module provided by Python solves this problem well. It can serialize objects and save them to disk and read them out when needed. Any object can be serialized.
Below is how to use the model:from pyspark.sql.functions import * | |
from pyspark.sql.types import * | |
import pandas as pd | |
from fbprophet import Prophet | |
import pickle | |
from sklearn.metrics import mean_absolute_error | |
#read data from Incorta | |
prod_demand_df = read("TimeSeriesNotebooks.HISTORICAL_PRODUCT_DEMAND") | |
prod_demand_df = prod_demand_df.withColumnRenamed("Date", "Order_Date") | |
prod_demand_df = prod_demand_df.select(date_format(col("Order_Date"),'yyyy-MM-dd').alias("Order_Date_Str"),col("Order_Demand"),col("Product_Category"),col("Warehouse")) | |
pdf = prod_demand_df.toPandas() | |
#Filter | |
npdf = pdf.loc[(pdf.Product_Category == 'Category_028') & (pdf.Warehouse == "Whse_J")] | |
#Add a column with datetime | |
npdf['pd_Datetime'] = pd.to_datetime(npdf['Order_Date_Str'] + ' 00:00:00') | |
npdf = npdf.set_index(pd.DatetimeIndex(npdf['pd_Datetime'])) | |
monthly_npdf = pd.DataFrame() | |
#Aggregate the order demand by month | |
monthly_npdf['Order_Demand'] = npdf['Order_Demand'].resample('MS').sum() | |
#Added date list to DataFrame | |
monthly_npdf['Order_Date'] = list(monthly_npdf.index) | |
#Date filter | |
monthly_npdf = monthly_npdf.loc[(monthly_npdf.Order_Date <= pd.to_datetime('2016-12-31 00:00:00'))] | |
#Rename the columns for Prophet | |
monthly_npdf.columns = ['y','ds'] | |
#Load ML model | |
ml_model_path = "/home/incorta/IncortaAnalytics/Tenants/demo/data/ml_model/" + "Order_Demand_Model.pckl" | |
with open(ml_model_path, 'rb') as fin: | |
prophet = pickle.load(fin) | |
#Prediction | |
future = list() | |
for i in range(1, 13): | |
date = '2017-%02d' % i | |
future.append([date]) | |
future = pd.DataFrame(future) | |
future.columns = ['ds'] | |
future['ds']= pd.to_datetime(future['ds']) | |
#Use the model(prophet) to make a forecast | |
forecast = prophet.predict(future) | |
#Save the predicted result | |
product_result = forecast[['ds','yhat']] | |
product_result['Product_Category'] = 'Category_028' | |
#Orginal and prediction only for specific Product_Category | |
monthly_npdf['Product_Category'] = 'Category_028' | |
product_result = product_result.append(monthly_npdf) | |
result_df = spark.createDataFrame(product_result) | |
save(result_df) |
from pyspark.sql.functions import * | |
from pyspark.sql.types import * | |
import pandas as pd | |
from fbprophet import Prophet | |
import pickle | |
from sklearn.metrics import mean_absolute_error | |
#read data from Incorta | |
prod_demand_df = read("TimeSeriesNotebooks.HISTORICAL_PRODUCT_DEMAND") | |
prod_demand_df = prod_demand_df.withColumnRenamed("Date", "Order_Date") | |
prod_demand_df = prod_demand_df.select(date_format(col("Order_Date"),'yyyy-MM-dd').alias("Order_Date_Str"),col("Order_Demand"),col("Product_Category"),col("Warehouse")) | |
pdf = prod_demand_df.toPandas() | |
#Filter | |
npdf = pdf.loc[(pdf.Product_Category == 'Category_028') & (pdf.Warehouse == "Whse_J")] | |
#Add a column with datetime | |
npdf['pd_Datetime'] = pd.to_datetime(npdf['Order_Date_Str'] + ' 00:00:00') | |
npdf = npdf.set_index(pd.DatetimeIndex(npdf['pd_Datetime'])) | |
monthly_npdf = pd.DataFrame() | |
#Aggregate the order demand by month | |
monthly_npdf['Order_Demand'] = npdf['Order_Demand'].resample('MS').sum() | |
#Added date list to DataFrame | |
monthly_npdf['Order_Date'] = list(monthly_npdf.index) | |
#Date filter | |
monthly_npdf = monthly_npdf.loc[(monthly_npdf.Order_Date <= pd.to_datetime('2016-12-31 00:00:00'))] | |
#Rename the columns for Prophet | |
monthly_npdf.columns = ['y','ds'] | |
#Create model | |
prophet = Prophet(changepoint_prior_scale=0.15, daily_seasonality=False) | |
prophet.fit(monthly_npdf) | |
#Save ML model | |
ml_model_path = "/home/incorta/IncortaAnalytics/Tenants/demo/data/ml_model/" + "Order_Demand_Model.pckl" | |
with open(ml_model_path, 'wb') as fout: | |
pickle.dump(prophet, fout) | |
#Prediction | |
future = list() | |
for i in range(1, 13): | |
date = '2017-%02d' % i | |
future.append([date]) | |
future = pd.DataFrame(future) | |
future.columns = ['ds'] | |
future['ds']= pd.to_datetime(future['ds']) | |
#Use the model(prophet) to make a forecast | |
forecast = prophet.predict(future) | |
#Save the predicted result | |
product_result = forecast[['ds','yhat']] | |
product_result['Product_Category'] = 'Category_028' | |
#Orginal and prediction only for specific Product_Category | |
monthly_npdf['Product_Category'] = 'Category_028' | |
product_result = product_result.append(monthly_npdf) | |
# calculate MAE between expected and predicted values for december | |
y_true = monthly_npdf['y'][-12:].values | |
y_pred = forecast['yhat'].values | |
mae = mean_absolute_error(y_true, y_pred) | |
print('MAE: %.3f' % mae) | |
result_df = spark.createDataFrame(product_result) | |
save(result_df) |
Reference:
"Pickle Serialization Study Notes - Programmer Sought". Programmersought.Com, 2021, https://www.programmersought.com/article/15805994125/.
"Save Model For Python · Issue #725 · Facebook/Prophet". Github, 2021, https://github.com/facebook/prophet/issues/725.
Comments
Post a Comment