Source code for skl_to_pmml

from __future__ import absolute_import

import sys, os
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
sys.path.append(BASE_DIR)
import numpy as np
import PMML43Ext as pml
import json
from skl import pre_process as pp
from datetime import datetime
from collections import OrderedDict

from pprint import pprint

[docs]def skl_to_pmml(pipeline, col_names, target_name, pmml_f_name='from_sklearn.pmml'): """ Exports scikit-learn pipeline object into pmml Parameters ---------- pipeline : Contains an instance of Pipeline with preprocessing and final estimator col_names : List Contains list of feature/column names. target_name : String Name of the target column. pmml_f_name : String Name of the pmml file. (Default='from_sklearn.pmml') Returns ------- Returns a pmml file """ try: model = pipeline.steps[-1][1] except: raise TypeError("Exporter expects pipeleine_instance and not an estimator_instance") else: if isinstance(col_names, np.ndarray): col_names = col_names.tolist() ppln_sans_predictor = pipeline.steps[:-1] trfm_dict_kwargs = dict() derived_col_names = col_names categoric_values = tuple() mining_imp_val = tuple() if ppln_sans_predictor: pml_pp = pp.get_preprocess_val(ppln_sans_predictor, col_names, model) trfm_dict_kwargs['TransformationDictionary'] = pml_pp['trfm_dict'] derived_col_names = pml_pp['derived_col_names'] col_names = pml_pp['preprocessed_col_names'] categoric_values = pml_pp['categorical_feat_values'] mining_imp_val = pml_pp['mining_imp_values'] PMML_kwargs = get_PMML_kwargs(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values) pmml = pml.PMML( version=get_version(), Header=get_header(), DataDictionary=get_data_dictionary(model, col_names, target_name, categoric_values), **trfm_dict_kwargs, **PMML_kwargs ) pmml.export(outfile=open(pmml_f_name, "w"), level=0)
[docs]def any_in(seq_a, seq_b): return any(elem in seq_b for elem in seq_a)
[docs]def get_PMML_kwargs(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values): """ It returns all the pmml elements. Parameters ---------- model : An instance of Scikit-learn model. derived_col_names : List Contains column names after preprocessing col_names : List Contains list of feature/column names. target_name : String Name of the target column . mining_imp_val : tuple Contains the mining_attributes,mining_strategy, mining_impute_value categoric_values : tuple Contains Categorical attribute names and its values Returns ------- algo_kwargs : Dictionary Get the PMML model argument based on scikit learn model object """ skl_mdl_super_cls_names = get_super_cls_names(model) regression_model_names = ('LinearRegression', 'LogisticRegression', 'RidgeClassifier', 'SGDClassifier', 'LinearDiscriminantAnalysis') tree_model_names = ('BaseDecisionTree',) support_vector_model_names = ('SVC', 'SVR') naive_bayes_model_names = ('GaussianNB',) mining_model_names = ('BaseEnsemble',) neurl_netwk_model_names = ('MLPClassifier', 'MLPRegressor') nearest_neighbour_names = ('NeighborsBase',) if any_in(tree_model_names, skl_mdl_super_cls_names): algo_kwargs = {'TreeModel': get_tree_models(model, derived_col_names, col_names, target_name, mining_imp_val)} elif any_in(regression_model_names, skl_mdl_super_cls_names): algo_kwargs = {'RegressionModel': get_regrs_models(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values)} elif any_in(support_vector_model_names, skl_mdl_super_cls_names): algo_kwargs = {'SupportVectorMachineModel': get_supportVectorMachine_models(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values)} elif any_in(mining_model_names, skl_mdl_super_cls_names): algo_kwargs = {'MiningModel': get_ensemble_models(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values)} elif any_in(neurl_netwk_model_names, skl_mdl_super_cls_names): algo_kwargs = {'NeuralNetwork': get_neural_models(model, derived_col_names, col_names, target_name, mining_imp_val)} elif any_in(naive_bayes_model_names, skl_mdl_super_cls_names): algo_kwargs = {'NaiveBayesModel': get_naiveBayesModel(model, derived_col_names, col_names, target_name, mining_imp_val)} elif any_in(nearest_neighbour_names, skl_mdl_super_cls_names): algo_kwargs = {'NearestNeighborModel': get_nearestNeighbour_model(model, derived_col_names, col_names, target_name, mining_imp_val)} else: algo_kwargs = None return algo_kwargs
[docs]def get_model_kwargs(model, col_names, target_name, mining_imp_val): """ It returns all the model element for a specific model. Parameters ---------- model : An instance of Scikit-learn model. col_names : List Contains list of feature/column names. target_name : String Name of the Target column. mining_imp_val : tuple Contains the mining_attributes,mining_strategy, mining_impute_value Returns ------- model_kwargs : Dictionary Returns functionname, MiningSchema and Output of the sk_model object """ model_kwargs = dict() model_kwargs['functionName'] = get_mining_func(model) model_kwargs['MiningSchema'] = get_mining_schema(model, col_names, target_name, mining_imp_val) model_kwargs['Output'] = get_output(model, target_name) return model_kwargs
[docs]def get_nearestNeighbour_model(model, derived_col_names, col_names, target_name, mining_imp_val): """ It returns the Nearest Neighbour model element. Parameters ---------- model : An instance of Scikit-learn model. derived_col_names : List Contains column names after preprocessing col_names : List Contains list of feature/column names. target_name : String Name of the Target column. Returns ------- nearest_neighbour_model : Returns a nearest neighbour model instance """ model_kwargs = get_model_kwargs(model, col_names, target_name, mining_imp_val) nearest_neighbour_model = list() nearest_neighbour_model.append( pml.NearestNeighborModel( modelName="KNNModel", continuousScoringMethod='average', algorithmName="KNN", numberOfNeighbors=model.n_neighbors, KNNInputs=get_knn_inputs(derived_col_names), ComparisonMeasure=get_comparison_measure(model), TrainingInstances=get_training_instances(model, derived_col_names, target_name), **model_kwargs ) ) return nearest_neighbour_model
[docs]def get_training_instances(model, derived_col_names, target_name): """ It returns the Training Instance element. Parameters ---------- model : An instance of Scikit-learn model. derived_col_names : List Contains column names after preprocessing target_name : String Name of the Target column. Returns ------- TrainingInstances : Returns a TrainingInstances instance """ return pml.TrainingInstances( InstanceFields=get_instance_fields(derived_col_names, target_name), InlineTable=get_inline_table(model) )
[docs]def get_inline_table(model): """ It Returns the Inline Table element of the model. Parameters ---------- model : An instance of Scikit-learn model. Returns ------- InlineTable : Returns a InlineTable instance. """ rows = [] x = model._tree.get_arrays()[0].tolist() y = model._y.tolist() X = [] for idx in range(len(model._tree.get_arrays()[0][0])): X.append("x" + str(idx + 1)) for idx in range(len(x)): row = pml.row() row.elementobjs_ = ['y'] + X if hasattr(model, 'classes_'): row.y = model.classes_[y[idx]] else: row.y = y[idx] for idx_2 in range(len(x[idx])): exec("row." + X[idx_2] + "=" + str(x[idx][idx_2])) rows.append(row) return pml.InlineTable(row=rows)
[docs]def get_instance_fields(derived_col_names, target_name): """ It returns the Instance field element. Parameters ---------- derived_col_names : List Contains column names after preprocessing. target_name : String Name of the Target column. Returns ------- InstanceFields : Returns a InstanceFields instance """ instance_fields = list() instance_fields.append(pml.InstanceField(field=target_name, column="y")) for (index, name) in enumerate(derived_col_names): instance_fields.append(pml.InstanceField(field=str(name), column="x" + str(index + 1))) return pml.InstanceFields(InstanceField=instance_fields)
[docs]def get_comparison_measure(model): """ It return the Comparison measure element. Parameters ---------- model : An instance of Scikit-learn model. Returns ------- comp_measure : Returns a ComparisonMeasure instance. """ if model.effective_metric_ == 'euclidean': comp_measure = pml.ComparisonMeasure(euclidean=pml.euclidean(), kind="distance") elif model.effective_metric_ == 'minkowski': comp_measure = pml.ComparisonMeasure(minkowski=pml.minkowski(), kind="distance") elif model.effective_metric_ == 'manhattan': comp_measure = pml.ComparisonMeasure(cityBlock=pml.cityBlock(), kind="distance") return comp_measure
[docs]def get_knn_inputs(col_names): """ It returns the KNN Inputs element. Parameters ---------- col_names : List Contains list of feature/column names. Returns ------- KNNInputs : Returns a KNNInputs instance. """ knnInput = list() for name in col_names: knnInput.append(pml.KNNInput(field=str(name))) return pml.KNNInputs(KNNInput=knnInput)
[docs]def get_naiveBayesModel(model, derived_col_names, col_names, target_name, mining_imp_val): """ It returns the Naive Bayes Model element of the model. Parameters ---------- model : An instance of Scikit-learn model. derived_col_names : List Contains column names after preprocessing. col_names : List Contains list of feature/column names. target_name : String Name of the Target column. Returns ------- naive_bayes_model : List Returns the NaiveBayesModel """ model_kwargs = get_model_kwargs(model, col_names, target_name, mining_imp_val) naive_bayes_model = list() naive_bayes_model.append(pml.NaiveBayesModel( BayesInputs=get_bayes_inputs(model, derived_col_names), BayesOutput=get_bayes_output(model, target_name), threshold=get_threshold(), **model_kwargs )) return naive_bayes_model
[docs]def get_threshold(): """ It returns the Threshold value. Returns ------- Returns the Threshold value """ return '0.001'
[docs]def get_bayes_output(model, target_name): """ It returns the Bayes Output element of the model Parameters ---------- model : An instance of Scikit-learn model. target_name : String Name of the Target column. Returns ------- BayesOutput : Returns a BayesOutput instance """ class_counts = model.class_count_ target_val_counts = pml.TargetValueCounts() for name, count in zip(model.classes_, class_counts): tr_val = pml.TargetValueCount(value=str(name), count=str(count)) target_val_counts.add_TargetValueCount(tr_val) return pml.BayesOutput( fieldName=target_name, TargetValueCounts=target_val_counts )
[docs]def get_bayes_inputs(model, derived_col_names): """ It returns the Bayes Input element of the model . Parameters ---------- model : An instance of Scikit-learn model. derived_col_names : List Contains column names after preprocessing. Returns ------- bayes_inputs : Returns a BayesInput instance. """ bayes_inputs = pml.BayesInputs() for indx, name in enumerate(derived_col_names): means = model.theta_[:, indx] variances = model.sigma_[:, indx] target_val_stats = pml.TargetValueStats() for idx, val in enumerate(model.classes_): target_val = pml.TargetValueStat( val, GaussianDistribution=pml.GaussianDistribution( mean='{:.20f}'.format(means[idx]), variance='{:.20f}'.format(variances[idx]))) target_val_stats.add_TargetValueStat(target_val) bayes_inputs.add_BayesInput(pml.BayesInput(fieldName=str(name), TargetValueStats=target_val_stats)) return bayes_inputs
[docs]def get_supportVectorMachine_models(model, derived_col_names, col_names, target_names, mining_imp_val, categoric_values): """ It returns the Support Vector Machine Model element. Parameters ---------- model : An instance of Scikit-learn model. derived_col_names : List Contains column names after preprocessing. col_names : List Contains list of feature/column names. target_names : String Name of the Target column. mining_imp_val : tuple Contains the mining_attributes,mining_strategy, mining_impute_value categoric_values : tuple Contains Categorical attribute names and its values Returns ------- supportVector_models : List Returns SupportVectorMachineModel elements which contains classificationMethod, VectorDictionary, SupportVectorMachine, kernelType """ model_kwargs = get_model_kwargs(model, col_names, target_names, mining_imp_val) supportVector_models = list() kernel_type = get_kernel_type(model) supportVector_models.append(pml.SupportVectorMachineModel( classificationMethod=get_classificationMethod(model), VectorDictionary=get_vectorDictionary(model, derived_col_names, categoric_values ), SupportVectorMachine=get_supportVectorMachine(model), **kernel_type, **model_kwargs )) return supportVector_models
[docs]def get_ensemble_models(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values): """ It returns the Mining Model element of the model Parameters ---------- model : An instance of Scikit-learn model. derived_col_names : List Contains column names after preprocessing. col_names : List Contains list of feature/column names. target_name : String Name of the Target column. mining_imp_val : tuple Contains the mining_attributes,mining_strategy, mining_impute_value. categoric_values : tuple Contains Categorical attribute names and its values Returns ------- mining_models : List Returns the MiningModel of the respective ensemble model """ model_kwargs = get_model_kwargs(model, col_names, target_name, mining_imp_val) if 'GradientBoostingRegressor' in str(model.__class__): model_kwargs['Targets'] = get_targets(model, target_name) mining_models = list() mining_models.append(pml.MiningModel( Segmentation=get_outer_segmentation(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values), **model_kwargs )) return mining_models
[docs]def get_targets(model, target_name): """ It returns the Target element of the model. Parameters ---------- model : A Scikit-learn model instance. target_name : String Name of the Target column. Returns ------- targets : Returns a Target instance. """ if 'GradientBoostingRegressor' in str(model.__class__): targets = pml.Targets( Target=[ pml.Target( field=target_name, rescaleConstant=str(model.init_.mean), rescaleFactor=str(model.learning_rate) ) ] ) else: targets = pml.Targets( Target=[ pml.Target( field=target_name, rescaleConstant=str(model.base_score) ) ] ) return targets
[docs]def get_multiple_model_method(model): """ It returns the name of the Multiple Model Chain element of the model. Parameters ---------- model : A Scikit-learn model instance Returns ------- The multiple model method for a mining model. """ if 'GradientBoostingClassifier' in str(model.__class__): return 'modelChain' elif 'GradientBoostingRegressor' in str(model.__class__): return 'sum' elif 'RandomForestClassifier' in str(model.__class__): return 'majorityVote' elif 'RandomForestRegressor' in str(model.__class__): return 'average'
[docs]def get_outer_segmentation(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values): """ It returns the Segmentation element of the model. Parameters ---------- model : A Scikit-learn model instance. derived_col_names : List Contains column names after preprocessing. col_names : List Contains list of feature/column names. target_name : String Name of the Target column. mining_imp_val : tuple Contains the mining_attributes,mining_strategy, mining_impute_value categoric_values : tuple Contains Categorical attribute names and its values Returns ------- segmentation : A segmentation instance. """ segmentation = pml.Segmentation( multipleModelMethod=get_multiple_model_method(model), Segment=get_segments(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values) ) return segmentation
[docs]def get_segments(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values): """ It returns the Segment element of the model. Parameters ---------- model : A Scikit-learn model instance. derived_col_names : List Contains column names after preprocessing. col_names : List Contains list of feature/column names. target_name : String Name of the Target column. mining_imp_val : tuple Contains the mining_attributes,mining_strategy, mining_impute_value categoric_values : tuple Contains Categorical attribute names and its values Returns ------- segments : A list of segment instances. """ segments = None if 'GradientBoostingClassifier' in str(model.__class__): segments = get_segments_for_gbc(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values) else: segments = get_inner_segments(model, derived_col_names, col_names, 0) return segments
[docs]def get_segments_for_gbc(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values): """ It returns list of Segments element of the model. Parameters ---------- model : A Scikit-learn model instance. derived_col_names : List Contains column names after preprocessing. col_names : List Contains list of feature/column names. target_name : String Name of the Target column. mining_imp_val : tuple Contains the mining_attributes,mining_strategy, mining_impute_value categoric_values : tuple Contains Categorical attribute names and its values Returns ------- segments : List Get the Segments for the Segmentation element. """ segments = list() out_field_names = list() for estm_idx in range(len(model.estimators_[0])): mining_fields_for_first = list() for name in col_names: mining_fields_for_first.append(pml.MiningField(name=name)) miningschema_for_first = pml.MiningSchema(MiningField=mining_fields_for_first) output_fields = list() output_fields.append( pml.OutputField( name='decisionFunction(' + str(estm_idx) + ')', feature='predictedValue', isFinalResult=False ) ) output_fields.append( pml.OutputField( name='expDecisionFunction(' + str(estm_idx) + ')', feature='transformedValue', isFinalResult=True, Apply=pml.Apply( function='exp', FieldRef=[pml.FieldRef(field='decisionFunction(' + str(estm_idx) + ')')] ) ) ) out_field_names.append('expDecisionFunction(' + str(estm_idx) + ')') segments.append( pml.Segment( True_=pml.True_(), id=str(estm_idx), MiningModel=pml.MiningModel( functionName='regression', MiningSchema=miningschema_for_first, Output=pml.Output(OutputField=output_fields), Segmentation=pml.Segmentation( multipleModelMethod=get_multiple_model_method(model), Segment=get_inner_segments(model, derived_col_names, col_names, estm_idx) ) ) ) ) segments.append( pml.Segment( id=str(len(model.estimators_[0])), True_=pml.True_(), RegressionModel=get_regrs_models(model, out_field_names, out_field_names, target_name, mining_imp_val, categoric_values)[0] ) ) return segments
[docs]def get_inner_segments(model, derived_col_names, col_names, index): """ It returns the Inner segments of the model. Parameters ---------- model : A Scikit-learn model instance. derived_col_names : List Contains column names after preprocessing. col_names : List Contains list of feature/column names. index : Integer The index of the estimator for the model Returns ------- segments : List Get the Segments for the Segmentation element. """ segments = list() for estm_idx in range(model.n_estimators): if np.asanyarray(model.estimators_).ndim == 1: estm = model.estimators_[estm_idx] else: estm = model.estimators_[estm_idx][index] tree_features = estm.tree_.feature features_ = list() for feat in tree_features: if feat != -2 and feat not in features_: features_.append(feat) if len(features_) != 0: mining_fields = list() for feat in col_names: mining_fields.append(pml.MiningField(name=feat)) segments.append( pml.Segment( True_=pml.True_(), id=str(estm_idx), TreeModel=pml.TreeModel( modelName="decisionTree_Model", functionName=get_mining_func(estm), splitCharacteristic="multiSplit", MiningSchema=pml.MiningSchema(MiningField = mining_fields), Node=get_node(estm, derived_col_names, model) ) ) ) return segments
[docs]def get_classificationMethod(model): """ It returns the Classification Model name of the model. Parameters ---------- model : A Scikit-learn model instance. Returns ------- Returns the classification method of the SVM model """ if 'SVC' in str(model.__class__): return 'OneAgainstOne' else: return 'OneAgainstAll'
[docs]def get_vectorDictionary(model, derived_col_names, categoric_values): """ It return the Vector Dictionary element. Parameters ---------- model : A Scikit-learn model instance. derived_col_names : List Contains column names after preprocessing. categoric_values : tuple Contains Categorical attribute names and its values Returns ------- VectorDictionary : A Vector Dictionary instance. """ model_coef = model.C fieldref_element = get_vectorfields(model_coef, derived_col_names, categoric_values) vectorfields_element = pml.VectorFields(FieldRef=fieldref_element) vec_id = list(model.support_) vecs = list(model.support_vectors_) vecinsts = list() for vec_idx in range(len(vecs)): vecinsts.append(pml.VectorInstance( id=vec_id[vec_idx], REAL_SparseArray=pml.REAL_SparseArray( n=len(vecs[vec_idx]), Indices=([x for x in range(1, len(vecs[vec_idx] + 1) + 1)]), REAL_Entries=vecs[vec_idx].tolist() ) )) return pml.VectorDictionary(VectorFields=vectorfields_element, VectorInstance=vecinsts)
[docs]def get_vectorfields(model_coef, feat_names, categoric_values): """ It return the Vector Fields . Parameters ---------- model : A Scikit-learn model instance. derived_col_names : List Contains column names after preprocessing. categoric_values : tuple Contains Categorical attribute names and its values Returns ------- Returns the Vector Dictionary instance for Support Vector model. """ der_fld_len = len(feat_names) der_fld_idx = 0 row_idx = -1 predictors = list() if categoric_values: class_lbls = categoric_values[0] class_attribute = categoric_values[1] while der_fld_idx < der_fld_len: if is_labelbinarizer(feat_names[der_fld_idx]): if not is_stdscaler(feat_names[der_fld_idx]): class_id = get_classid(class_attribute, feat_names[der_fld_idx]) cat_predictors = get_categoric_pred(row_idx, der_fld_idx, model_coef, class_lbls[class_id], class_attribute[class_id]) for predictor in cat_predictors: predictors.append(predictor) der_fld_idx = der_fld_idx + len(class_lbls[class_id]) else: vectorfields_element = pml.FieldRef(field=feat_names[der_fld_idx]) predictors.append(vectorfields_element) der_fld_idx += 1 else: vectorfields_element = pml.FieldRef(field=feat_names[der_fld_idx]) predictors.append(vectorfields_element) der_fld_idx += 1 return predictors
[docs]def get_kernel_type(model): """ It returns the kernel type element. Parameters ---------- model : A Scikit-learn model instance. Returns ------- kernel_kwargs : Dictionary Get the respective kernel type of the SVM model. """ kernel_kwargs = dict() if model.kernel == 'linear': kernel_kwargs['LinearKernelType'] = pml.LinearKernelType(description='Linear Kernel Type') elif model.kernel == 'poly': kernel_kwargs['PolynomialKernelType'] = pml.PolynomialKernelType(description='Polynomial Kernel type', gamma=model._gamma, coef0=model.coef0, degree=model.degree) elif model.kernel == 'rbf': kernel_kwargs['RadialBasisKernelType'] = pml.RadialBasisKernelType(description='Radial Basis Kernel Type', gamma=model._gamma) else: kernel_kwargs['SigmoidKernelType'] = pml.SigmoidKernelType(description='Sigmoid Kernel Type', gamma=model._gamma, coef0=model.coef0) return kernel_kwargs
[docs]def get_supportVectorMachine(model): """ It return the Support Vector Machine element. Parameters ---------- model : A Scikit-learn model instance. Returns ------- support_vector_machines : List Get the Support Vector Machine element which conatains targetCategory, alternateTargetCategory, SupportVectors, Coefficients """ support_vector_machines = list() if 'SVR' in str(model.__class__): support_vector = list() for sv in model.support_: support_vector.append(pml.SupportVector(vectorId=sv)) support_vectors = pml.SupportVectors(SupportVector=support_vector) coefficient = list() absoValue = model.intercept_[0] for coef in model.dual_coef_: for num in coef: coefficient.append(pml.Coefficient(value=num)) coeff = pml.Coefficients(absoluteValue=absoValue, Coefficient=coefficient) support_vector_machines.append(pml.SupportVectorMachine(SupportVectors=support_vectors, Coefficients=coeff)) else: support_vector_locs = np.cumsum(np.hstack([[0], model.n_support_])) n_class = model.dual_coef_.shape[0] + 1 coef_abs_val_index = 0 for class1 in range(n_class): sv1 = model.support_[support_vector_locs[class1]:support_vector_locs[class1 + 1]] for class2 in range(class1 + 1, n_class): svs = list() coefs = list() sv2 = model.support_[support_vector_locs[class2]:support_vector_locs[class2 + 1]] svs.append((list(sv1) + list(sv2))) alpha1 = model.dual_coef_[class2 - 1, support_vector_locs[class1]:support_vector_locs[class1 + 1]] alpha2 = model.dual_coef_[class1, support_vector_locs[class2]:support_vector_locs[class2 + 1]] coefs.append((list(alpha1) + list(alpha2))) all_svs = list() for sv in (svs[0]): all_svs.append(pml.SupportVector(vectorId=sv)) all_coefs = list() for coef in (coefs[0]): all_coefs.append(pml.Coefficient(value=str(coef))) coef_abs_value = model.intercept_[coef_abs_val_index] coef_abs_val_index += 1 if len(model.classes_) == 2: support_vector_machines.append( pml.SupportVectorMachine( targetCategory=model.classes_[class1], alternateTargetCategory=model.classes_[class2], SupportVectors=pml.SupportVectors(SupportVector=all_svs), Coefficients=pml.Coefficients(absoluteValue=coef_abs_value, Coefficient=all_coefs) ) ) else: support_vector_machines.append( pml.SupportVectorMachine( targetCategory=model.classes_[class2], alternateTargetCategory=model.classes_[class1], SupportVectors=pml.SupportVectors(SupportVector=all_svs), Coefficients=pml.Coefficients(absoluteValue=coef_abs_value, Coefficient=all_coefs) ) ) return support_vector_machines
[docs]def get_tree_models(model, derived_col_names, col_names, target_name, mining_imp_val): """ It return Tree Model element of the model Parameters ---------- model : A Scikit-learn model instance. derived_col_names : Contains column names after preprocessing. col_names : List Contains list of feature/column names. target_name : String Name of the Target column. mining_imp_val : tuple Contains the mining_attributes,mining_strategy, mining_impute_value Returns ------- tree_models : List Get the TreeModel element. """ model_kwargs = get_model_kwargs(model, col_names, target_name, mining_imp_val) tree_models = list() tree_models.append(pml.TreeModel( Node=get_node(model, derived_col_names), **model_kwargs )) return tree_models
[docs]def get_neural_models(model, derived_col_names, col_names, target_name, mining_imp_val): """ It returns Neural Network element of the model. Parameters ---------- model : A Scikit-learn model instance. derived_col_names : List Contains column names after preprocessing. col_names : List Contains list of feature/column names. target_name : String Name of the Target column. mining_imp_val : tuple Contains the mining_attributes,mining_strategy, mining_impute_value. Returns ------- neural_model : List Model attributes for PMML file. """ model_kwargs = get_model_kwargs(model, col_names, target_name, mining_imp_val) neural_model = list() neural_model.append(pml.NeuralNetwork( modelName="Neural_Network", threshold='0', altitude='1.0', activationFunction=get_funct(model), NeuralInputs = get_neuron_input(derived_col_names), NeuralLayer = get_neural_layer(model, derived_col_names, target_name)[0], NeuralOutputs = get_neural_layer(model, derived_col_names, target_name)[1], **model_kwargs )) return neural_model
[docs]def get_funct(sk_model): """ It returns the activation fucntion of the model. Parameters ---------- model : A Scikit-learn model instance. Returns ------- a_fn : String Returns the activation function. """ a_fn = sk_model.activation if a_fn =='relu': a_fn = 'rectifier' return a_fn
[docs]def get_regrs_models(model, derived_col_names, col_names, target_name, mining_imp_val, categoric_values): """ It returns the Regression Model element of the model Parameters ---------- model : A Scikit-learn model instance. derived_col_names : List Contains column names after preprocessing. col_names : List Contains list of feature/column names. target_name : String Name of the Target column. mining_imp_val : tuple Contains the mining_attributes,mining_strategy, mining_impute_value categoric_values : tuple Contains Categorical attribute names and its values Returns ------- regrs_models : List Returns a regression model of the respective model """ model_kwargs = get_model_kwargs(model, col_names, target_name, mining_imp_val) if 'SGDClassifier' in str(model.__class__) or 'RidgeClassifier' in str(model.__class__): model_kwargs['normalizationMethod'] = 'logit' elif 'LogisticRegression' in str(model.__class__): model_kwargs['normalizationMethod'] = 'softmax' regrs_models = list() regrs_models.append(pml.RegressionModel( RegressionTable=get_regrs_tabl(model, derived_col_names, target_name, categoric_values), **model_kwargs )) return regrs_models
[docs]def get_regrs_tabl(model, feature_names, target_name, categoric_values): """ It returns the Regression Table element of the model. Parameters ---------- model : A Scikit-learn model instance. derived_col_names : List Contains column names after preprocessing. target_name : String Name of the Target column. categoric_values : tuple Contains Categorical attribute names and its values Returns ------- merge : List Returns a list of Regression Table. """ merge = list() if hasattr(model, 'intercept_'): func_name = get_mining_func(model) inter = model.intercept_ model_coef = model.coef_ merge = list() target_classes = target_name row_idx = 0 if not hasattr(inter, '__iter__'): inter = np.array([inter]) target_classes = [target_classes] model_coef = model_coef.reshape(1, model_coef.shape[0]) target_cat = None else: target_classes = model.classes_ max_target_index = len(target_classes) - 1 target_cat = target_classes[max_target_index] if len(inter) == 1: regr_predictor = get_regr_predictors(model_coef, row_idx, feature_names, categoric_values) merge.append( pml.RegressionTable( intercept="{:.15f}".format(inter.item()), targetCategory=target_cat, NumericPredictor=regr_predictor ) ) if func_name != 'regression': merge.append( pml.RegressionTable( intercept="0.0", targetCategory=target_classes[0] ) ) else: for tgname, tg_idx in zip(np.unique(target_classes), range(len(np.unique(target_classes)))): row_idx = tg_idx regr_predictors = get_regr_predictors(model_coef, row_idx, feature_names, categoric_values) merge.append( pml.RegressionTable( intercept=inter[tg_idx], targetCategory=tgname, NumericPredictor=regr_predictors ) ) else: if len(model.classes_) == 2: merge.append( pml.RegressionTable( NumericPredictor=[pml.NumericPredictor(coefficient='1.0',name=feature_names[0])], intercept='0.0', targetCategory=str(model.classes_[-1]) ) ) merge.append( pml.RegressionTable(intercept='0.0', targetCategory=str(model.classes_[0])) ) else: for feat_idx in range(len(feature_names)): merge.append( pml.RegressionTable( NumericPredictor=[pml.NumericPredictor(coefficient='1.0',name=feature_names[feat_idx])], intercept='0.0', targetCategory=str(model.classes_[feat_idx]) ) ) return merge
[docs]def get_node(model, features_names, main_model=None): """ It return the Node element of the model. Parameters ---------- model : An instance of the estimator of the tree object. features_names : List Contains the list of feature/column name. main_model : A Scikit-learn model instance. Returns ------- _getNode : Get all the underlying Nodes. """ tree = model.tree_ if main_model and 'RandomForestClassifier' in str(main_model.__class__): classes = main_model.classes_ elif hasattr(model,'classes_'): classes = model.classes_ tree_leaf = -1 def _getNode(idx, cond=None): simple_pred_cond = None if cond: simple_pred_cond = cond node = pml.Node(id=idx, recordCount=float(tree.n_node_samples[idx])) if simple_pred_cond: node.SimplePredicate = simple_pred_cond else: node.True_ = pml.True_() if tree.children_left[idx] != tree_leaf: fieldName = features_names[tree.feature[idx]] simplePredicate = pml.SimplePredicate(field=fieldName, operator="lessOrEqual", value=str(tree.threshold[idx])) left_child = _getNode(tree.children_left[idx], simplePredicate) simplePredicate = pml.SimplePredicate(field=fieldName, operator="greaterThan", value=str(tree.threshold[idx])) right_child = _getNode(tree.children_right[idx], simplePredicate) node.add_Node(left_child) node.add_Node(right_child) else: nodeValue = list(tree.value[idx][0]) lSum = float(sum(nodeValue)) if 'DecisionTreeClassifier' in str(model.__class__): probs = [x / lSum for x in nodeValue] score_dst = [] for i in range(len(probs)): score_dst.append(pml.ScoreDistribution(confidence=probs[i], recordCount=float(nodeValue[i]), value=classes[i])) node.ScoreDistribution = score_dst node.score = classes[probs.index(max(probs))] else: node.score = lSum return node return _getNode(0)
[docs]def get_output(model, target_name): """ It returns the output element of the model. Parameters ---------- model : A Scikit-learn model instance. target_name : String Name of the Target column. Returns ------- Output : Get the Output element. """ mining_func = get_mining_func(model) output_fields = list() alt_target_name = 'predicted_' + target_name output_fields.append(pml.OutputField(name=alt_target_name)) if mining_func == 'classification': for cls in model.classes_: output_fields.append(pml.OutputField( name='probability_' + str(cls), feature="probability", optype="continuous", dataType="double", value=str(cls) )) return pml.Output(OutputField=output_fields) elif mining_func == 'regression': return pml.Output(OutputField=output_fields)
[docs]def get_mining_func(model): """ It returns the name of the mining function of the model. Parameters ---------- model : A Scikit-learn model instance. Returns ------- func_name : String Returns the function name of the model """ if not hasattr(model, 'classes_'): func_name = 'regression' else: if isinstance(model.classes_, np.ndarray): func_name = 'classification' else: func_name = 'regression' return func_name
[docs]def get_mining_schema(model, feature_names, target_name, mining_imp_val): """ It returns the Mining Schema of the model. Parameters ---------- model : A Scikit-learn model instance. feature_names : List Contains the list of feature/column name. target_name : String Name of the Target column. mining_imp_val : tuple Contains the mining_attributes,mining_strategy, mining_impute_value. Returns ------- MiningSchema : Get the MiningSchema element """ if mining_imp_val: mining_attributes = mining_imp_val[0] mining_strategy = mining_imp_val[1] mining_replacement_val = mining_imp_val[2] n_features = len(feature_names) features_pmml_optype = ['continuous'] * n_features features_pmml_utype = ['active'] * n_features target_pmml_utype = 'target' mining_func = get_mining_func(model) if mining_func == 'classification': target_pmml_optype = 'categorical' elif mining_func == 'regression': target_pmml_optype = 'continuous' mining_flds = list() mining_name_stored = list() # handling impute pre processing if mining_imp_val: for mining_item, mining_idx in zip(mining_attributes, range(len(mining_attributes))): for feat_name,feat_idx in zip(feature_names, range(len(feature_names))): if feat_name in mining_item: if feat_name not in mining_name_stored: impute_index = mining_item.index(feat_name) mining_flds.append(pml.MiningField(name=str(feat_name), optype=features_pmml_optype[feat_idx], missingValueReplacement=mining_replacement_val[mining_idx][ impute_index], missingValueTreatment=mining_strategy[mining_idx], usageType=features_pmml_utype[feat_idx])) mining_name_stored.append(feat_name) for feat_name, feat_idx in zip(feature_names, range(len(feature_names))): if feat_name not in mining_name_stored: mining_flds.append(pml.MiningField(name=str(feat_name), optype=features_pmml_optype[feat_idx], usageType=features_pmml_utype[feat_idx])) mining_flds.append(pml.MiningField(name=target_name, optype=target_pmml_optype, usageType=target_pmml_utype)) return pml.MiningSchema(MiningField=mining_flds)
[docs]def get_neuron_input(feature_names): """ It returns the Neural Input element. Parameters ---------- feature_names : List Contains the list of feature/column name. Returns ------- neural_input_element : Returns the NeuralInputs element """ neural_input = list() for features in feature_names: field_ref = pml.FieldRef(field = str(features)) derived_flds = pml.DerivedField(optype = "continuous", dataType = "double", FieldRef = field_ref) class_node = pml.NeuralInput(id = str(features), DerivedField = derived_flds) neural_input.append(class_node) neural_input_element = pml.NeuralInputs(NeuralInput = neural_input, numberOfInputs = str(len(neural_input))) return neural_input_element
[docs]def get_neural_layer(model, feature_names, target_name): """ It returns the Neural Layer and Neural Ouptput element. Parameters ---------- model : A Scikit-learn model instance. feature_names : List Contains the list of feature/column name. target_name : String Name of the Target column. Returns ------- all_neuron_layer : List Return the list of NeuralLayer elelemt. neural_output_element : Return the NeuralOutput element instance """ weight = model.coefs_ bias = model.intercepts_ last_layer = bias[-1] hidden_layer_sizes = model.hidden_layer_sizes hidden_layers = list(hidden_layer_sizes) hidden_layers.append(len(last_layer)) neuron = list() all_neuron_layer = list() input_features = feature_names neuron_id = list() for count in range(len(hidden_layers)): for count1 in range(hidden_layers[count]): con = list() for count2 in range(len(input_features)): con.append(pml.Con(from_ = input_features[count2], weight = format(weight[count][count2][count1]))) neuron.append(pml.Neuron(id = str(count)+str(count1), bias = format(bias[count][count1]),Con = con)) neuron_id.append(str(count)+str(count1)) all_neuron_layer.append(pml.NeuralLayer(Neuron = neuron)) input_features = neuron_id neuron_id = list() neuron = list() if hidden_layers[-1]==1 and 'MLPClassifier' in str(model.__class__): bias1=[1.0,0.0] weight1=[-1.0,1.0] con = list() linear = ['linear/1'] i_d = ['true', 'false'] con.append(pml.Con(from_ = input_features[0], weight = 1.0)) neuron.append(pml.Neuron(id = linear[0], bias = ('0.0'), Con = con)) all_neuron_layer.append(pml.NeuralLayer(activationFunction = "logistic", Neuron = neuron)) neuron = list() con = list() for num in range(2): con.append(pml.Con(from_ = linear[0], weight = format(weight1[num]))) neuron.append(pml.Neuron(id = i_d[num], bias = format(bias1[num]), Con = con)) con = list() all_neuron_layer.append(pml.NeuralLayer(activationFunction = "identity", Neuron = neuron)) if 'MLPClassifier' in str(model.__class__): neural_output = list() for values, count in zip(model.classes_, range(len(model.classes_))): norm_discrete = pml.NormDiscrete(field = target_name, value = str(values)) derived_flds = pml.DerivedField(optype = "categorical", dataType = 'double', NormDiscrete = norm_discrete) if len(input_features)==1: class_node = pml.NeuralOutput(outputNeuron = input_features, DerivedField = derived_flds) else: class_node = pml.NeuralOutput(outputNeuron = input_features[count],DerivedField = derived_flds) neural_output.append(class_node) neural_output_element = pml.NeuralOutputs(numberOfOutputs = None, Extension = None, NeuralOutput = neural_output) if 'MLPRegressor' in str(model.__class__): neural_output = list() fieldRef = pml.FieldRef(field = target_name) derived_flds = pml.DerivedField(optype = "continuous", dataType = "double", FieldRef = fieldRef) class_node = pml.NeuralOutput(outputNeuron = input_features, DerivedField = derived_flds) neural_output.append(class_node) neural_output_element = pml.NeuralOutputs(numberOfOutputs = None, Extension = None, NeuralOutput = neural_output) return all_neuron_layer, neural_output_element
[docs]def get_super_cls_names(model_inst): """ It returns the set of Super class of the model. Parameters: ------- model_inst: Instance of the scikit-learn model Returns ------- parents : Set Returns all the parent class of the model instance. """ def super_cls_names(cls): nonlocal parents parents.add(cls.__name__) for super_cls in cls.__bases__: super_cls_names(super_cls) cls = model_inst.__class__ parents = set() super_cls_names(cls) return parents
[docs]def get_version(): """ It returns the pmml version . Returns ------- version : String Returns the version of the pmml. """ version = '4.3Ext' return version
[docs]def get_header(): """ It returns the Header element of the pmml. Returns ------- header : Returns the header of the pmml. """ copyryt = "Copyright (c) 2018 Software AG" description = "Default description" timestamp = pml.Timestamp(datetime.now()) header = pml.Header(copyright=copyryt, description=description, Timestamp=timestamp) return header
[docs]def get_dtype(feat_value): """ It return the data type of the value. Parameters ---------- feat_value : Contains a value for finding the its data type. Returns ------- Returns the respective data type of that value. """ data_type=str(type(feat_value)) if 'float' in data_type: return 'float' if 'int' in data_type: return 'integer' if 'long' in data_type: return 'long' if 'complex' in data_type: return 'complex' if 'str' in data_type: return 'string'
[docs]def get_data_dictionary(model, feature_names, target_name, categoric_values): """ It returns the Data Dictionary element. Parameters ---------- model : A Scikit-learn model instance. feature_names : List Contains the list of feature/column name. target_name : List Name of the Target column. categoric_values : tuple Contains Categorical attribute names and its values Returns ------- data_dict : Return the dataDictionary instance """ categoric_feature_name = list() if categoric_values: categoric_labels = categoric_values[0] categoric_feature_name = categoric_values[1] target_attr_values = [] n_features = len(feature_names) features_pmml_optype = ['continuous'] * n_features features_pmml_dtype = ['double'] * n_features mining_func = get_mining_func(model) if mining_func == 'classification': target_pmml_optype = 'categorical' target_pmml_dtype = get_dtype(model.classes_[0]) target_attr_values = model.classes_.tolist() elif mining_func == 'regression': target_pmml_optype = 'continuous' target_pmml_dtype = 'double' data_fields = list() if categoric_values: for class_list, attr_for_class in zip(categoric_labels, categoric_feature_name): category_flds = pml.DataField(name=str(attr_for_class), optype="categorical", dataType=get_dtype(class_list[0]) if class_list else 'string') if class_list: for values in class_list: category_flds.add_Value(pml.Value(value=str(values))) data_fields.append(category_flds) attr_without_class_attr = [feat_name for feat_name in feature_names if feat_name not in categoric_feature_name] for feature_idx, feat_name in enumerate(attr_without_class_attr): data_fields.append(pml.DataField(name=str(feat_name), optype=features_pmml_optype[feature_idx], dataType=features_pmml_dtype[feature_idx])) class_node = pml.DataField(name=str(target_name), optype=target_pmml_optype, dataType=target_pmml_dtype) for class_value in target_attr_values: class_node.add_Value(pml.Value(value=str(class_value))) data_fields.append(class_node) data_dict = pml.DataDictionary(numberOfFields=len(data_fields), DataField=data_fields) return data_dict
[docs]def get_regr_predictors(model_coef, row_idx, feat_names, categoric_values): """ Parameters ---------- model_coef : array Contains the estimators coefficient values row_idx : int Contains an integer value to differentiate between linear and svm models feat_names : list Contains the list of feature/column names categoric_values : tuple Contains Categorical attribute names and its values Returns ------- predictors : list Returns a list with instances of nyoka numeric/categorical predictor class """ der_fld_len = len(feat_names) der_fld_idx = 0 predictors = list() if categoric_values: class_lbls = categoric_values[0] class_attribute = categoric_values[1] while der_fld_idx < der_fld_len: if is_labelbinarizer(feat_names[der_fld_idx]): if not is_stdscaler(feat_names[der_fld_idx]): class_id = get_classid(class_attribute, feat_names[der_fld_idx]) cat_predictors = get_categoric_pred(row_idx, der_fld_idx, model_coef, class_lbls[class_id], class_attribute[class_id]) for predictor in cat_predictors: predictors.append(predictor) der_fld_idx = der_fld_idx + len(class_lbls[class_id]) else: num_predictors = get_numeric_pred(row_idx, der_fld_idx, model_coef, feat_names[der_fld_idx]) predictors.append(num_predictors) der_fld_idx += 1 else: num_predictors = get_numeric_pred(row_idx, der_fld_idx, model_coef, feat_names[der_fld_idx]) predictors.append(num_predictors) der_fld_idx += 1 return predictors
[docs]def get_classid(class_attribute, feat_name): """ Parameters ---------- class_attribute: Contains the name of the attribute/column that contains categorical values feat_name : string Contains the name of the attribute/column Returns ------- class_idx:int Returns an integer value that will represent each categorical value """ for class_idx,class_attr in enumerate(class_attribute): if class_attr in feat_name: return class_idx
[docs]def is_labelbinarizer(feat_name): """ Parameters ---------- feat_name : string Contains the name of the attribute Returns ------- Returns a boolean value that states whether label binarizer has been applied or not """ if "labelBinarizer" in feat_name: return True else: return False
[docs]def is_stdscaler(feat_name): """ Parameters ---------- feat_name : string Contains the name of the attribute Returns ------- Returns a boolean value that states whether standard scaler has been applied or not """ if "standardScaler" in feat_name: return True else: return False
[docs]def get_categoric_pred(row_idx, der_fld_idx, model_coef, class_lbls, class_attribute): """ Parameters ---------- row_idx : int Contains an integer value to index attribute/column names der_fld_idx : int Contains an integer value to differentiate between linear and svm models model_coef : array Contains the estimators coefficient values class_lbls : list Contains the list of categorical values class_attribute : tuple Contains Categorical attribute name Returns ------- categoric_predictor : list Returns a list with instances of nyoka categorical predictor class """ categoric_predictor = list() classes_len = len(class_lbls) if classes_len == 2: if row_idx == -1: coef = model_coef else: coef = model_coef[row_idx][der_fld_idx ] cat_pred = pml.CategoricalPredictor(name=class_attribute, value=class_lbls[-1], coefficient="{:.15f}".format(coef)) cat_pred.original_tagname_ = "CategoricalPredictor" categoric_predictor.append(cat_pred) else: for cname, class_idx in zip(class_lbls, range(len(class_lbls))): if row_idx == -1: coef = model_coef else: coef = model_coef[row_idx][der_fld_idx+class_idx] cat_pred = pml.CategoricalPredictor(name=class_attribute, value=cname, coefficient=coef) cat_pred.original_tagname_ = "CategoricalPredictor" categoric_predictor.append(cat_pred) return categoric_predictor
[docs]def get_numeric_pred(row_idx, der_fld_idx, model_coef, der_fld_name): """ Parameters ---------- row_idx : int Contains an integer value to index attribute/column names der_fld_idx : int Contains an integer value to differentiate between linear and svm models model_coef : array Contains the estimators coefficient values der_fld_name : string Contains the name of the attribute Returns ------- num_pred : Returns an instances of nyoka numeric predictor class """ num_pred = pml.NumericPredictor( name=der_fld_name, exponent='1', coefficient="{:.15f}".format(model_coef[row_idx][der_fld_idx])) num_pred.original_tagname_ = "NumericPredictor" return num_pred