Source code for Statsmodels Exporter

from __future__ import absolute_import

import sys, os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
sys.path.append(BASE_DIR)


from pprint import pprint
from PMML44 import *
from datetime import datetime
import metadata
import warnings
import math
from enums import *

[docs]class StatsmodelsToPmml: """ Exports time-series models from statsmodels library into PMML. Parameters: ----------- results_obj: Instance of AR(I)MAResultsWrapper / (SARI/VAR)MAXResultsWrapper from statsmodels pmml_file_name: string Name of the PMML conf_int : list (optional) Confidence intervel. A list of values mentioning the percentage of confidence. e.g., conf_int = [80,95] will create OutputField for lower bound and upper bound of confidence interval with 80% and 95%. model_name : string (optional) Name of the model description : string (optional) Description of the model Returns ------- Generates PMML object and exports it to `pmml_file_name` """ def __init__(self, results_obj=None, pmml_file_name="from_statsmodels.pmml", conf_int=None, model_name=None, description=None): self.results_obj = results_obj self.pmml_file_name = pmml_file_name self.conf_int = conf_int self.model_name = model_name self.description = description self.pmml = None self.construct_pmml() self.export_pmml()
[docs] def export_pmml(self): """ Writes the generated PMML object to given `pmml_file_name` """ pmml = PMML( version=PMML_SCHEMA.VERSION.value, Header=Header( copyright = "Copyright (c) 2018 Software AG", description = self.description, Timestamp = Timestamp(datetime.now()), Application=Application(name="Nyoka",version=metadata.__version__) ), DataDictionary=self.data_dictionary, TimeSeriesModel=[self.ts_model] ) pmml.export(open(self.pmml_file_name,'w'),0)
[docs] def generate_data_dictionary(self): """ Generates DataDictionary Object. The number of DataField is one more than the dimension of the data.\ The extra DataField is a supplementary to hold the value of `h`(horizon) for forecasting. """ data_fields = [] for val in self.y: data_fields.append( DataField( name=val, optype=OPTYPE.CONTINUOUS.value, dataType=DATATYPE.DOUBLE.value ) ) data_fields.append( DataField(name="h", optype=OPTYPE.CONTINUOUS.value, dataType=DATATYPE.INTEGER.value) ) self.data_dictionary = DataDictionary( numberOfFields=len(data_fields), DataField=data_fields )
[docs] def construct_pmml(self): """ Constructs the actual model object. (ARIMA/ TimeSeriesModel) """ if 'int' in str(self.results_obj.model.endog.dtype): self.results_obj.model.endog=self.results_obj.model.endog.astype('float64') self.results_obj.model.data.endog=self.results_obj.model.data.endog.astype('float64') self.data_obj = self.results_obj.data self.model = self.results_obj.model self.y = self.results_obj.data.ynames if self.y.__class__.__name__ == "str": self.y = [self.y.split(".")[-1]] self.generate_data_dictionary() output = self.generate_output() mining_schema = self.generate_mining_schema() time_series_list = self.generate_time_series() arima_model = None state_space_model = None if self.model.__class__.__name__ in ['ARMA', 'ARIMA']: self.model_name = self.model_name if self.model_name else "ArimaModel" self.description = self.description if self.description else "Non-Seasonal Arima Model" if hasattr(self.results_obj,"fit_details"): best_fit = TIMESERIES_ALGORITHM.STATE_SPACE_MODEL.value state_space_model = self.generate_state_space_model() else: best_fit = TIMESERIES_ALGORITHM.ARIMA.value arima_model = self.generate_arima_model() elif self.model.__class__.__name__ in ['VARMAX','SARIMAX']: best_fit = TIMESERIES_ALGORITHM.STATE_SPACE_MODEL.value self.model_name = self.model_name if self.model_name else self.model.__class__.__name__ self.description = self.description if self.description else "State Space Model" state_space_model = self.generate_state_space_model() else: raise NotImplementedError("Not Implemented. Currently we support only ARMA, ARIMA, SARIMAX and VARMAX.") self.ts_model = TimeSeriesModel( modelName=self.model_name, functionName=MINING_FUNCTION.TIMESERIES.value, bestFit=best_fit, MiningSchema=mining_schema, Output=output, TimeSeries=time_series_list, ARIMA=arima_model, StateSpaceModel=state_space_model )
[docs] def generate_state_space_model(self): """ Constructs StateSpaceModel object. For the following models -\ - `statsmodels.tsa.statespace.sarimax.SARIMAX` - `statsmodels.tsa.statespace.varmax.VARMAX` - `statsmodels.tsa.statespace.tsa.arima.ARIMA` """ import numpy as np np.set_printoptions(precision=12) selected_state_cov_matrix = None predicted_state_cov_matrix = None smoother_results = self.results_obj.smoother_results S_t0 = self.results_obj.filtered_state[...,-1] if self.model.__class__.__name__ in ["ARMA","ARIMA"]: intercept = smoother_results.obs_intercept[...,-1] else: #state_intercept might contain `nan` values. So here it is generated manually. intercept = np.zeros(S_t0.shape) if self.model.k_trend: if self.model.__class__.__name__ == 'VARMAX': mu = self.results_obj.params[self.model._params_trend] if mu.__class__.__name__ == 'Series': mu = mu.values intercept[:len(mu)] += mu else: mu=self.results_obj._params_trend[0] if mu.__class__.__name__ == 'Series': mu = mu.values spec = self.results_obj.specification k_state = spec['k_diff']+spec['seasonal_periods']*spec['k_seasonal_diff'] intercept[k_state] += mu F_matrix = smoother_results.transition[...,-1] #transition_matrix G = smoother_results.design[...,-1] #measurement_matrix if len(intercept) == len(S_t0): S_t1 = np.dot(F_matrix, S_t0) + intercept #finalStateVector else: S_t1 = np.dot(F_matrix, S_t0) t_mat = Matrix(nbRows=F_matrix.shape[0], nbCols=F_matrix.shape[1]) for row in F_matrix: array_content = " ".join([str(val) for val in row]) t_mat.add_Array(ArrayType(content=array_content, type_=ARRAY_TYPE.REAL.value)) transition_matrix = TransitionMatrix(Matrix=t_mat) m_mat = Matrix(nbRows=G.shape[0], nbCols=G.shape[1]) for row in G: array_content = " ".join([str(val) for val in row]) m_mat.add_Array(ArrayType(content=array_content, type_=ARRAY_TYPE.REAL.value)) measurement_matrix = MeasurementMatrix(Matrix=m_mat) arr_content = " ".join([str(val) for val in S_t1]) arr = ArrayType(type_=ARRAY_TYPE.REAL.value,content=arr_content, n=len(S_t1)) final_state_vector = FinalStateVector(Array=arr) #InterceptVector will always be there arr_content = " ".join([str(val) for val in intercept]) arr = ArrayType(type_=ARRAY_TYPE.REAL.value,content=arr_content, n=len(intercept)) intercept_vector = InterceptVector(Array=arr) #For confidence interval if self.conf_int is not None: R = smoother_results.selection[...,-1] # selection matrix Q = smoother_results.state_cov[...,-1] # state_covariance matrix R_Q_R_prime = np.dot(R,np.dot(Q,R.T)) # selected_state_cov P_t0 = smoother_results.predicted_state_cov[...,-1] # predicted_state_cov RQR_mat = Matrix(nbRows=R_Q_R_prime.shape[0], nbCols=R_Q_R_prime.shape[1]) for row in R_Q_R_prime: array_content = " ".join([str(val) for val in row]) RQR_mat.add_Array(ArrayType(content=array_content, type_=ARRAY_TYPE.REAL.value)) selected_state_cov_matrix = SelectedStateCovarianceMatrix(Matrix=RQR_mat) p_mat = Matrix(nbRows=P_t0.shape[0], nbCols=P_t0.shape[1]) for row in P_t0: array_content = " ".join([str(val) for val in row]) p_mat.add_Array(ArrayType(content=array_content, type_=ARRAY_TYPE.REAL.value)) predicted_state_cov_matrix = PredictedStateCovarianceMatrix(Matrix=p_mat) state_space_model = StateSpaceModel( StateVector=final_state_vector, TransitionMatrix=transition_matrix, MeasurementMatrix=measurement_matrix, InterceptVector=intercept_vector, SelectedStateCovarianceMatrix=selected_state_cov_matrix, PredictedStateCovarianceMatrix=predicted_state_cov_matrix ) return state_space_model
[docs] def generate_arima_model(self): """ Constructs ARIMA object. Only for `statsmodels.tsa.arima_model.ARIMA` class. """ p = self.results_obj.k_ar q = self.results_obj.k_ma d = getattr(self.results_obj,'k_diff',0) ar = None ma = None if p > 0: ar_content = ' '.join([str(i) for i in self.results_obj.arparams]) ar_params_array = ArrayType(content = ar_content, n = p, type_ = ARRAY_TYPE.REAL.value) ar = AR(Array = ar_params_array) if q > 0: ma_content = ' '.join([str(coeff) for coeff in self.results_obj.maparams]) ma_coeff_array = ArrayType(content = ma_content, n = q, type_ = ARRAY_TYPE.REAL.value) ny_maCoef_obj = MACoefficients(Array = ma_coeff_array) residuals = self.results_obj.resid[-q:] resid_content = ' '.join([str(res) for res in residuals]) resid_array = ArrayType(content = resid_content, n = q, type_ = ARRAY_TYPE.REAL.value) residual_obj = Residuals(Array = resid_array) ma = MA(MACoefficients = ny_maCoef_obj, Residuals = residual_obj) const_term = 0 if self.results_obj.k_trend: const_term = self.results_obj.params[0] non_seasonal_comp = NonseasonalComponent(p = p, d = d, q = q, AR = ar, MA = ma) rmse = math.sqrt(self.model.sigma2) arima_obj = ARIMA(constantTerm = const_term, predictionMethod = ARIMA_PREDICTION_METHOD.CSS.value, RMSE=rmse, NonseasonalComponent = non_seasonal_comp ) return arima_obj
[docs] def generate_time_value_object(self, data): """ Generates TimeValue object. If data has any index, then the index will be in TimeStamp object. """ time_values = [] indices = self.data_obj.dates for data_idx in range(len(data)): tv = TimeValue(index=data_idx,value=data[data_idx],\ Timestamp=Timestamp(str(indices[data_idx])) if indices is not None else None) time_values.append(tv) return time_values
[docs] def generate_time_series(self): """ Generates TimeSeries object. The number of TimeSeries object is equal to the dimeansion of the data. """ time_series_list = [] if self.data_obj.endog.ndim == 1: ts = TimeSeries(usage = TIMESERIES_USAGE.ORIGINAL.value, field=self.y[0], startTime = 0,\ endTime = len(self.data_obj.endog) - 1,\ TimeValue = self.generate_time_value_object(self.data_obj.endog)) time_series_list.append(ts) else: for i in range(self.data_obj.endog.shape[-1]): ts = TimeSeries(usage = TIMESERIES_USAGE.ORIGINAL.value, field=self.y[i], startTime = 0,\ endTime = len(self.data_obj.endog) - 1,\ TimeValue = self.generate_time_value_object(self.data_obj.endog[...,i])) time_series_list.append(ts) return time_series_list
[docs] def generate_output(self): """ Generates Output object. If user provides value in `conf_int` parameter, then there will be two OuputField\ for each value. One with `feature=confidenceIntervalLower` and another with `feature=confidenceIntervalUpper`. """ out_flds = [] for y_ in self.y: out_flds.append( OutputField( name="predicted_"+y_, optype=OPTYPE.CONTINUOUS.value, dataType=DATATYPE.STRING.value, feature=RESULT_FEATURE.PREDICTED_VALUE.value, Extension=[Extension(extender="ADAPA",name="dataType",value="json")] ) ) if self.conf_int is not None: lower = [] upper = [] for percent in self.conf_int: for y_ in self.y: lower.append( OutputField( name=f"conf_int_{percent}_lower_{y_}", optype=OPTYPE.CONTINUOUS.value, dataType=DATATYPE.STRING.value, targetField=y_, feature=RESULT_FEATURE.CONFIDENCE_INTERVAL_LOWER.value, value=percent, Extension=[Extension(extender="ADAPA",name="dataType",value="json")] ) ) upper.append( OutputField( name=f"conf_int_{percent}_upper_{y_}", optype=OPTYPE.CONTINUOUS.value, dataType=DATATYPE.STRING.value, targetField=y_, feature=RESULT_FEATURE.CONFIDENCE_INTERVAL_UPPER.value, value=percent, Extension=[Extension(extender="ADAPA",name="dataType",value="json")] ) ) out_flds.extend(lower + upper) return Output(OutputField=out_flds)
[docs] def generate_mining_schema(self): """ Generates MiningSchema object. """ mining_fields = [] for y_ in self.y: mining_fields.append(MiningField(name = y_, usageType = FIELD_USAGE_TYPE.TARGET.value)) mining_fields.append(MiningField(name = 'h', usageType = FIELD_USAGE_TYPE.SUPPLEMENTARY.value)) return MiningSchema(MiningField=mining_fields)