splicemachine.mlflow_support package

Submodules

splicemachine.mlflow_support.mlflow_support module

This module contains the entrypoint to the Splice Machine managed mlflow environment

Copyright 2020 Splice Machine, Inc.

Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.


All functions in this module are accessible through the mlflow object and are to be referenced without the leading underscore as

mlflow.function_name()

For example, the function _current_exp_id() is accessible via

mlflow.current_exp_id()

All functions are accessible after running the following import

from splicemachine.mlflow_support import *

Importing anything directly from mlflow before running the above statement will cause problems. After running the above import, you can import additional mlflow submodules as normal

from splicemachine.mlflow_support import *
from mlflow.tensorflow import autolog

_current_exp_id()[source]

Retrieve the current exp id

Returns

(int) the current experiment id

_current_run_id()[source]

Retrieve the current run id

Returns

(str) the current run id

_deploy_aws(app_name: str, region: str = 'us-east-2', instance_type: str = 'ml.m5.xlarge', run_id: Optional[str] = None, instance_count: int = 1, deployment_mode: str = 'replace')[source]

Queue Job to deploy a run to sagemaker with the given run id (found in MLFlow UI or through search API)

Parameters
  • run_id – the id of the run to deploy. Will default to the current run id.

  • app_name – the name of the app in sagemaker once deployed

  • region – the sagemaker region to deploy to (us-east-2, us-west-1, us-west-2, eu-central-1 supported)

  • instance_type – the EC2 Sagemaker instance type to deploy on (ml.m4.xlarge supported)

  • instance_count – the number of instances to load balance predictions on

  • deployment_mode – the method to deploy; create=application will fail if an app with the name specified already exists; replace=application in sagemaker will be replaced with this one if app already exists; add=add the specified model to a prexisting application (not recommended)

_deploy_azure(endpoint_name: str, resource_group: str, workspace: str, run_id: str, region: str = 'East US', cpu_cores: float = 0.1, allocated_ram: float = 0.5, model_name: Optional[str] = None)[source]

Deploy a given run to AzureML.

Parameters
  • endpoint_name – (str) the name of the endpoint in AzureML when deployed to Azure Container Services. Must be unique.

  • resource_group – (str) Azure Resource Group for model. Automatically created if it doesn’t exist.

  • workspace – (str) the AzureML workspace to deploy the model under. Will be created if it doesn’t exist

  • run_id – (str) if specified, will deploy a previous run ( must have an spark model logged). Otherwise, will default to the active run

  • region – (str) AzureML Region to deploy to: Can be East US, East US 2, Central US, West US 2, North Europe, West Europe or Japan East

  • cpu_cores – (float) Number of CPU Cores to allocate to the instance. Can be fractional. Default=0.1

  • allocated_ram – (float) amount of RAM, in GB, allocated to the container. Default=0.5

  • model_name – (str) If specified, this will be the name of the model in AzureML. Otherwise, the model name will be randomly generated.

_deploy_db(db_schema_name: str, db_table_name: str, run_id: str, reference_table: Optional[str] = None, reference_schema: Optional[str] = None, primary_key: Optional[Dict[str, str]] = None, df: Optional[Union[pyspark.sql.dataframe.DataFrame, pandas.core.frame.DataFrame]] = None, create_model_table: Optional[bool] = True, model_cols: Optional[List[str]] = None, classes: Optional[List[str]] = None, library_specific: Optional[Dict[str, str]] = None, replace: Optional[bool] = False, max_batch_size: Optional[int] = 10000, verbose: bool = False)None[source]

Deploy a trained (currently Spark, Sklearn, Keras or H2O) model to the Database. This either creates a new table or alters an existing table in the database (depending on parameters passed)

Parameters
  • db_schema_name – (str) the schema name to deploy to.

  • db_table_name – (str) the table name to deploy to.

  • run_id – (str) The run_id to deploy the model on. The model associated with this run will be deployed

  • reference_table – (str) if creating a new table, an alternative to specifying a dataframe is specifying a reference table. The column schema of the reference table will be used to create the new table (e.g. MYTABLE)

  • reference_schema – (str) the db schema for the reference table.

  • primary_key

    (Dict) Dictionary of column + SQL datatype to use for the primary/composite key.

    • If you are deploying to a table that already exists, it must already have a primary key, and this parameter will be ignored.

    • If you are creating the table in this function, you MUST pass in a primary key

  • df

    (Spark or Pandas DF) The dataframe used to train the model

    NOTE: The columns in this df are the ones that will be used to create the table unless specified by model_cols

  • create_model_table – Whether or not to create the table from the dataframe. Default True. This Will ONLY be used if the table does not exist and a dataframe is passed in

  • model_cols – (List[str]) The columns from the table to use for the model. If None, all columns in the table will be passed to the model. If specified, the columns will be passed to the model IN THAT ORDER. The columns passed here must exist in the table.

  • classes

    (List[str]) The classes (prediction labels) for the model being deployed.

    NOTE: If not supplied, the table will have default column names for each class

  • library_specific

    (dict{str: str}) Prediction options for certain model types:

    • Certain model types (specifically Keras and Scikit-learn) support prediction arguments. Here are the options that we support:
      • Scikit-learn
        • predict_call: determines function call for the model. Available: ‘predict’ (default), ‘predict_proba’, ‘transform’

        • predict_args: passed into the predict call (for Gaussian and Bayesian models). Available: ‘return_std’, ‘return_cov’

      • Keras
        • pred_threshold: prediction threshold for Keras binary classification models. Note: If the model type is Keras, the output layer has 1 node, and pred_threshold is None, you will NOT receive a class prediction, only the output of the final layer (like model.predict()). If you want a class prediction for your binary classification problem, you MUST pass in a threshold.

If the model does not support these parameters, they will be ignored.

Parameters
  • max_batch_size – (int) the max size for the database to batch groups of rows for prediction. Default 10,000.

  • replace – (bool) whether or not to replace a currently existing model. This param is not yet implemented

Returns

None

This function creates the following IF you are creating a table from the dataframe

  • The model table where run_id is the run_id passed in. This table will have a column for each feature in the feature vector. It will also contain:

    • USER which is the current user who made the request

    • EVAL_TIME which is the CURRENT_TIMESTAMP

    • the PRIMARY KEY column(s) passed in

    • PREDICTION. The prediction of the model. If the :classes: param is not filled in, this will be default values for classification models

    • A column for each class of the predictor with the value being the probability/confidence of the model if applicable

IF you are deploying to an existing table, the table will be altered to include the columns above.

NOTE
The columns listed above are default value columns.

This means that on a SQL insert into the table, 

you do not need to reference or insert values into them.

They are automatically taken care of.

Set verbose=True in the function call for more information

A trigger is also created on the deployment table that runs the model after every insert into that table.

_deploy_kubernetes(run_id: str, service_port: int = 80, base_replicas: int = 1, autoscaling_enabled: bool = False, max_replicas: int = 2, target_cpu_utilization: int = 50, disable_nginx: bool = False, gunicorn_workers: int = 1, resource_requests_enabled: bool = False, resource_limits_enabled: bool = False, cpu_request: int = 0.5, cpu_limit: int = 1, memory_request: str = '512Mi', memory_limit: str = '2048Mi', expose_external: bool = False)[source]

Deploy model associated with the specified or active run to Kubernetes cluster.

Creates the Following Resources:
  • Pod (with your model loaded in via an init container)

  • ReplicaSet (configured to base replicas specified)

  • HPA (if autoscaling is enabled)

  • Service (model-<run id>.<db namespace>.svc.cluster.local:<service port specified>)

  • Deployment

  • Ingress (if expose enable is set to True) (on <your cluster url>/<run id>/invocations)

Parameters
  • run_id – specified if overriding the active run

  • service_port – (default 80) the port that the prediction service runs on internally in the cluster

  • autoscaling_enabled – (default False) whether or not to provision a Horizontal Pod Autoscaler to provision pods dynamically

  • max_replicas – (default 2) [USED IF AUTOSCALING ENABLED] max number of pods to scale up to

  • target_cpu_utilization – (default 50) [USED IF AUTOSCALING ENABLED] the cpu utilization to scale up to new pods on

  • disable_nginx – (default False) disable nginx inside of the pod (recommended)

  • gunicorn_workers – (default 1) [MUST BE 1 FOR SPARK ML models TO PREVENT OOM] Number of web workers.

  • resource_requests_enabled – (default False) whether or not to enable Kubernetes resource requests

  • resource_limits_enabled – (default False) whether or not to enable Kubernetes resource limits

  • cpu_request – (default 0.5) [USED IF RESOURCE REQUESTS ENABLED] number of CPU to request

  • cpu_limit – (default 1) [USED IF RESOURCE LIMITS ENABLED] number of CPU to cap at

  • memory_request – (default 512Mi) [USED IF RESOURCE REQUESTS ENABLED] amount of RAM to request

  • memory_limit – (default 2048Mi) [USED IF RESOURCE LIMITS ENABLED] amount of RAM to limit at

  • expose_external

    (default False) whether or not to create Ingress resource to deploy outside of the cluster.

    NOTE
    It is not recommended to create an Ingress resource using this parameter, as your model will be
    deployed with no authorization (and public access). Instead, it is better to deploy your model
    as an internal service, and deploy an authentication proxy (such as https://github.com/oauth2-proxy/oauth2-proxy)
    to proxy traffic to your internal service after authenticating.
    

_download_artifact(name, local_path=None, run_id=None)[source]

Download the artifact at the given run id (active default) + name to the local path

Parameters
  • name – (str) artifact name to load (with respect to the run)

  • local_path – (str) local path to download the model to. If set, this path MUST include the file extension. Will default to the current directory and the name of the saved artifact

  • run_id – (str) the run id to download the artifact from. Defaults to active run

_end_run(status='FINISHED', save_html=True)[source]

End an active MLflow run (if there is one).

Example
import mlflow

# Start run and get status
mlflow.start_run()
run = mlflow.active_run()
print("run_id: {}; status: {}".format(run.info.run_id, run.info.status))

# End run and get status
mlflow.end_run()
run = mlflow.get_run(run.info.run_id)
print("run_id: {}; status: {}".format(run.info.run_id, run.info.status))
print("--")

# Check for any active runs
print("Active run: {}".format(mlflow.active_run()))
Output
run_id: b47ee4563368419880b44ad8535f6371; status: RUNNING
run_id: b47ee4563368419880b44ad8535f6371; status: FINISHED
--
Active run: None
_fetch_logs(job_id: int)[source]

Get the logs as an array :param job_id: the job to get the logs for

_get_current_run_data()[source]

Get the data associated with the current run. As of MLFLow 1.6, it currently does not support getting run info from the mlflow.active_run object, so we need it to be retrieved via the tracking client.

Returns

active run data object

_get_deployed_models()pandas.core.frame.DataFrame[source]

Get the currently deployed models in the database :return: Pandas df

_get_model_name(run_id)[source]

Gets the model name associated with a run or None

Parameters

run_id – (str) the run_id that the model is stored under

Returns

(str or None) The model name if it exists

_get_run_ids_by_name(run_name, experiment_id=None)[source]

Gets a run id from the run name. If there are multiple runs with the same name, all run IDs are returned

Parameters
  • run_name – (str) The name of the run

  • experiment_id – (int) The experiment to search in. If None, all experiments are searched. [Default None]

Returns

(List[str]) List of run ids

_lm(key, value, step=None)[source]

Add a shortcut for logging metrics in MLFlow.

Parameters
  • key – (str) key for the parameter

  • value – (str or int) value for the parameter

  • step – (int) A single integer step at which to log the specified Metrics. If unspecified, each metric is logged at step zero.

_load_model(run_id=None, name=None, as_pyfunc=False)[source]

Download and deserialize a serialized model

Parameters
_log_artifact(file_name, name=None, run_uuid=None, artifact_path=None)[source]

Log an artifact for the active run

Example
with mlflow.start_run():

    mlflow.log_artifact('my_image.png')
Parameters
  • file_name – (str) the name of the file name to log

  • name – (str) the name to store the artifact as. Defaults to the file name. If the name param includes the file extension (or is not passed in) you will be able to preview it in the mlflow UI (image, text, html, geojson files).

  • run_uuid – (str) the run uuid of a previous run, if none, defaults to current run

  • artifact_path – If you would like the artifact logged as a subdirectory of an particular folder, you can set this value. If the directory doesn’t exist, it will be created for this run’s artifact path.

Returns

None

NOTE

We do not currently support logging directories. If you would like to log a directory, please zip it first and log the zip file

_log_feature_transformations(unfit_pipeline)[source]

Log feature transformations for an unfit spark pipeline Logs –> feature movement through the pipeline

Parameters

unfit_pipeline – (PipelineModel) unfit spark pipeline to log

Returns

None

_log_model(model, name='model', model_lib=None, **flavor_options)[source]

Log a trained machine learning model

Parameters
  • model – (Model) is the trained Spark/SKlearn/H2O/Keras model with the current run

  • name – (str) the run relative name to store the model under. [Deault ‘model’]

  • model_lib – An optional param specifying the model type of the model to log Available options match the mlflow built-in model flavors https://www.mlflow.org/docs/1.8.0/models.html#built-in-model-flavors

  • flavor_options – (**kwargs) The full set of save options to pass into the save_model function. If this is passed, model_class must also be provided and the keys of this dictionary must match the params of that functions signature (ie mlflow.pyfunc.save_model). An example of pyfuncs signature is here, although each flavor has its own. https://mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.save_model

_log_model_params(pipeline_or_model)[source]

Log the parameters of a fitted spark model or a model stage of a fitted spark pipeline

Parameters

pipeline_or_model – fitted spark pipeline/fitted spark model

_log_pipeline_stages(pipeline)[source]

Log the pipeline stages of a Spark Pipeline as params for the run

Parameters

pipeline – (PipelineModel) fitted/unitted pipeline

Returns

None

_login_director(username=None, password=None, jwt_token=None)[source]

Authenticate into the MLManager Director

Parameters
  • username – (str) database username

  • password – (str) database password

  • jwt_token – (str) database JWT token authentication

Either (username/password) for basic auth or jwt_token must be provided. Basic authentication takes precedence if set (mlflow default)

_lp(key, value)[source]

Add a shortcut for logging parameters in MLFlow.

Parameters
  • key – (str) key for the parameter

  • value – (str) value for the parameter

Returns

None

_mlflow_patch(name)[source]

Create a MLFlow Patch that applies the default gorilla settings

Parameters

name – destination name under mlflow package

Returns

decorator for patched function

_register_feature_store(fs: splicemachine.features.feature_store.FeatureStore)[source]

Register a feature store for feature tracking of experiments

Parameters

feature_store – (FeatureStore) The feature store

Returns

None

_register_splice_context(splice_context)[source]

Register a Splice Context for Spark/Database operations (artifact storage, for example)

Parameters

splice_context – (PySpliceContext) splice context to input

Returns

None

_remove_active_training_set()[source]

Removes the active training set from mlflow. This function deletes mlflows active training set (retrieved from the feature store), which will in turn stop the automated logging of features to the active mlflow run. To recreate an active training set, call fs.get_training_set or fs.get_training_set_from_view in the Feature Store.

_set_mlflow_uri(uri)[source]

Set the tracking uri for mlflow. Only needed if running outside of the Splice Machine K8s Cloud Service

Parameters

uri – (str) the URL of your mlflow UI.

Returns

None

_start_run(run_id=None, tags=None, experiment_id=None, run_name=None, nested=False)[source]

Start a new run

Example
mlflow.start_run(run_name='my_run')

# or

with mlflow.start_run(run_name='my_run'):
    ...
Parameters
  • tags – a dictionary containing metadata about the current run. For example: { ‘team’: ‘pd’, ‘purpose’: ‘r&d’ }

  • run_name – (str) an optional name for the run to show up in the MLFlow UI. [Default None]

  • run_id – (str) if you want to reincarnate an existing run, pass in the run id [Default None]

  • experiment_id – (int) if you would like to create an experiment/use one for this run [Default None]

  • nested – (bool) Controls whether run is nested in parent run. True creates a nest run [Default False]

Returns

(ActiveRun) the mlflow active run object

_timer(timer_name, param=False)[source]

Context manager for logging

Example
with mlflow.timer('my_timer'): 

    ...
Parameters
  • timer_name – (str) the name of the timer

  • param – (bool) whether or not to log the timer as a param (default=True). If false, logs as metric.

_undeploy_kubernetes(run_id: str)[source]

Removes a model deployment from Kubernetes. This will delete the Kubernetes deployment and record the event

Parameters

run_id – specified if overriding the active run

_watch_job(job_id: int)[source]

Stream the logs in real time to standard out of a Job

Parameters

job_id – the job id to watch (returned after executing an operation)

Raises

SpliceMachineException – If the job being watched fails