splicemachine.mlflow_support package¶
Submodules¶
splicemachine.mlflow_support.mlflow_support module¶
This module contains the entrypoint to the Splice Machine managed mlflow environment
Copyright 2020 Splice Machine, Inc.
Licensed under the Apache License, Version 2.0 (the “License”); you may not use this file except in compliance with the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
All functions in this module are accessible through the mlflow object and are to be referenced without the leading underscore as
mlflow.function_name()
For example, the function _current_exp_id() is accessible via
mlflow.current_exp_id()
All functions are accessible after running the following import
from splicemachine.mlflow_support import *
Importing anything directly from mlflow before running the above statement will cause problems. After running the above import, you can import additional mlflow submodules as normal
from splicemachine.mlflow_support import *
from mlflow.tensorflow import autolog
-
_deploy_aws(app_name: str, region: str = 'us-east-2', instance_type: str = 'ml.m5.xlarge', run_id: Optional[str] = None, instance_count: int = 1, deployment_mode: str = 'replace')[source]¶ Queue Job to deploy a run to sagemaker with the given run id (found in MLFlow UI or through search API)
- Parameters
run_id – the id of the run to deploy. Will default to the current run id.
app_name – the name of the app in sagemaker once deployed
region – the sagemaker region to deploy to (us-east-2, us-west-1, us-west-2, eu-central-1 supported)
instance_type – the EC2 Sagemaker instance type to deploy on (ml.m4.xlarge supported)
instance_count – the number of instances to load balance predictions on
deployment_mode – the method to deploy; create=application will fail if an app with the name specified already exists; replace=application in sagemaker will be replaced with this one if app already exists; add=add the specified model to a prexisting application (not recommended)
-
_deploy_azure(endpoint_name: str, resource_group: str, workspace: str, run_id: str, region: str = 'East US', cpu_cores: float = 0.1, allocated_ram: float = 0.5, model_name: Optional[str] = None)[source]¶ Deploy a given run to AzureML.
- Parameters
endpoint_name – (str) the name of the endpoint in AzureML when deployed to Azure Container Services. Must be unique.
resource_group – (str) Azure Resource Group for model. Automatically created if it doesn’t exist.
workspace – (str) the AzureML workspace to deploy the model under. Will be created if it doesn’t exist
run_id – (str) if specified, will deploy a previous run ( must have an spark model logged). Otherwise, will default to the active run
region – (str) AzureML Region to deploy to: Can be East US, East US 2, Central US, West US 2, North Europe, West Europe or Japan East
cpu_cores – (float) Number of CPU Cores to allocate to the instance. Can be fractional. Default=0.1
allocated_ram – (float) amount of RAM, in GB, allocated to the container. Default=0.5
model_name – (str) If specified, this will be the name of the model in AzureML. Otherwise, the model name will be randomly generated.
-
_deploy_db(db_schema_name: str, db_table_name: str, run_id: str, reference_table: Optional[str] = None, reference_schema: Optional[str] = None, primary_key: Optional[Dict[str, str]] = None, df: Optional[Union[pyspark.sql.dataframe.DataFrame, pandas.core.frame.DataFrame]] = None, create_model_table: Optional[bool] = True, model_cols: Optional[List[str]] = None, classes: Optional[List[str]] = None, library_specific: Optional[Dict[str, str]] = None, replace: Optional[bool] = False, max_batch_size: Optional[int] = 10000, verbose: bool = False) → None[source]¶ Deploy a trained (currently Spark, Sklearn, Keras or H2O) model to the Database. This either creates a new table or alters an existing table in the database (depending on parameters passed)
- Parameters
db_schema_name – (str) the schema name to deploy to.
db_table_name – (str) the table name to deploy to.
run_id – (str) The run_id to deploy the model on. The model associated with this run will be deployed
reference_table – (str) if creating a new table, an alternative to specifying a dataframe is specifying a reference table. The column schema of the reference table will be used to create the new table (e.g. MYTABLE)
reference_schema – (str) the db schema for the reference table.
primary_key –
(Dict) Dictionary of column + SQL datatype to use for the primary/composite key.
If you are deploying to a table that already exists, it must already have a primary key, and this parameter will be ignored.
If you are creating the table in this function, you MUST pass in a primary key
df –
(Spark or Pandas DF) The dataframe used to train the model
NOTE: The columns in this df are the ones that will be used to create the table unless specified by model_colscreate_model_table – Whether or not to create the table from the dataframe. Default True. This Will ONLY be used if the table does not exist and a dataframe is passed in
model_cols – (List[str]) The columns from the table to use for the model. If None, all columns in the table will be passed to the model. If specified, the columns will be passed to the model IN THAT ORDER. The columns passed here must exist in the table.
classes –
(List[str]) The classes (prediction labels) for the model being deployed.
NOTE: If not supplied, the table will have default column names for each class
library_specific –
(dict{str: str}) Prediction options for certain model types:
- Certain model types (specifically Keras and Scikit-learn) support prediction arguments. Here are the options that we support:
- Scikit-learn
predict_call: determines function call for the model. Available: ‘predict’ (default), ‘predict_proba’, ‘transform’
predict_args: passed into the predict call (for Gaussian and Bayesian models). Available: ‘return_std’, ‘return_cov’
- Keras
pred_threshold: prediction threshold for Keras binary classification models. Note: If the model type is Keras, the output layer has 1 node, and pred_threshold is None, you will NOT receive a class prediction, only the output of the final layer (like model.predict()). If you want a class prediction for your binary classification problem, you MUST pass in a threshold.
If the model does not support these parameters, they will be ignored.
- Parameters
max_batch_size – (int) the max size for the database to batch groups of rows for prediction. Default 10,000.
replace – (bool) whether or not to replace a currently existing model. This param is not yet implemented
- Returns
None
This function creates the following IF you are creating a table from the dataframe
The model table where run_id is the run_id passed in. This table will have a column for each feature in the feature vector. It will also contain:
USER which is the current user who made the request
EVAL_TIME which is the CURRENT_TIMESTAMP
the PRIMARY KEY column(s) passed in
PREDICTION. The prediction of the model. If the :classes: param is not filled in, this will be default values for classification models
A column for each class of the predictor with the value being the probability/confidence of the model if applicable
IF you are deploying to an existing table, the table will be altered to include the columns above.
- NOTE
The columns listed above are default value columns. This means that on a SQL insert into the table, you do not need to reference or insert values into them. They are automatically taken care of. Set verbose=True in the function call for more information
A trigger is also created on the deployment table that runs the model after every insert into that table.
-
_deploy_kubernetes(run_id: str, service_port: int = 80, base_replicas: int = 1, autoscaling_enabled: bool = False, max_replicas: int = 2, target_cpu_utilization: int = 50, disable_nginx: bool = False, gunicorn_workers: int = 1, resource_requests_enabled: bool = False, resource_limits_enabled: bool = False, cpu_request: int = 0.5, cpu_limit: int = 1, memory_request: str = '512Mi', memory_limit: str = '2048Mi', expose_external: bool = False)[source]¶ Deploy model associated with the specified or active run to Kubernetes cluster.
- Creates the Following Resources:
Pod (with your model loaded in via an init container)
ReplicaSet (configured to base replicas specified)
HPA (if autoscaling is enabled)
Service (model-<run id>.<db namespace>.svc.cluster.local:<service port specified>)
Deployment
Ingress (if expose enable is set to True) (on <your cluster url>/<run id>/invocations)
- Parameters
run_id – specified if overriding the active run
service_port – (default 80) the port that the prediction service runs on internally in the cluster
autoscaling_enabled – (default False) whether or not to provision a Horizontal Pod Autoscaler to provision pods dynamically
max_replicas – (default 2) [USED IF AUTOSCALING ENABLED] max number of pods to scale up to
target_cpu_utilization – (default 50) [USED IF AUTOSCALING ENABLED] the cpu utilization to scale up to new pods on
disable_nginx – (default False) disable nginx inside of the pod (recommended)
gunicorn_workers – (default 1) [MUST BE 1 FOR SPARK ML models TO PREVENT OOM] Number of web workers.
resource_requests_enabled – (default False) whether or not to enable Kubernetes resource requests
resource_limits_enabled – (default False) whether or not to enable Kubernetes resource limits
cpu_request – (default 0.5) [USED IF RESOURCE REQUESTS ENABLED] number of CPU to request
cpu_limit – (default 1) [USED IF RESOURCE LIMITS ENABLED] number of CPU to cap at
memory_request – (default 512Mi) [USED IF RESOURCE REQUESTS ENABLED] amount of RAM to request
memory_limit – (default 2048Mi) [USED IF RESOURCE LIMITS ENABLED] amount of RAM to limit at
expose_external –
(default False) whether or not to create Ingress resource to deploy outside of the cluster.
- NOTE
It is not recommended to create an Ingress resource using this parameter, as your model will be deployed with no authorization (and public access). Instead, it is better to deploy your model as an internal service, and deploy an authentication proxy (such as https://github.com/oauth2-proxy/oauth2-proxy) to proxy traffic to your internal service after authenticating.
-
_download_artifact(name, local_path=None, run_id=None)[source]¶ Download the artifact at the given run id (active default) + name to the local path
- Parameters
name – (str) artifact name to load (with respect to the run)
local_path – (str) local path to download the model to. If set, this path MUST include the file extension. Will default to the current directory and the name of the saved artifact
run_id – (str) the run id to download the artifact from. Defaults to active run
-
_end_run(status='FINISHED', save_html=True)[source]¶ End an active MLflow run (if there is one).
Example¶import mlflow # Start run and get status mlflow.start_run() run = mlflow.active_run() print("run_id: {}; status: {}".format(run.info.run_id, run.info.status)) # End run and get status mlflow.end_run() run = mlflow.get_run(run.info.run_id) print("run_id: {}; status: {}".format(run.info.run_id, run.info.status)) print("--") # Check for any active runs print("Active run: {}".format(mlflow.active_run()))
Output¶run_id: b47ee4563368419880b44ad8535f6371; status: RUNNING run_id: b47ee4563368419880b44ad8535f6371; status: FINISHED -- Active run: None
-
_fetch_logs(job_id: int)[source]¶ Get the logs as an array :param job_id: the job to get the logs for
-
_get_current_run_data()[source]¶ Get the data associated with the current run. As of MLFLow 1.6, it currently does not support getting run info from the mlflow.active_run object, so we need it to be retrieved via the tracking client.
- Returns
active run data object
-
_get_deployed_models() → pandas.core.frame.DataFrame[source]¶ Get the currently deployed models in the database :return: Pandas df
-
_get_model_name(run_id)[source]¶ Gets the model name associated with a run or None
- Parameters
run_id – (str) the run_id that the model is stored under
- Returns
(str or None) The model name if it exists
-
_get_run_ids_by_name(run_name, experiment_id=None)[source]¶ Gets a run id from the run name. If there are multiple runs with the same name, all run IDs are returned
- Parameters
run_name – (str) The name of the run
experiment_id – (int) The experiment to search in. If None, all experiments are searched. [Default None]
- Returns
(List[str]) List of run ids
-
_lm(key, value, step=None)[source]¶ Add a shortcut for logging metrics in MLFlow.
- Parameters
key – (str) key for the parameter
value – (str or int) value for the parameter
step – (int) A single integer step at which to log the specified Metrics. If unspecified, each metric is logged at step zero.
-
_load_model(run_id=None, name=None, as_pyfunc=False)[source]¶ Download and deserialize a serialized model
- Parameters
run_id – (str) the id of the run to get a model from (the run must have an associated model with it named spark_model)
name – (str) the name of the model in the database
as_pyfunc – (bool) load as a model-agnostic pyfunc model (https://www.mlflow.org/docs/latest/models.html#python-function-python-function)
-
_log_artifact(file_name, name=None, run_uuid=None, artifact_path=None)[source]¶ Log an artifact for the active run
- Example
with mlflow.start_run(): mlflow.log_artifact('my_image.png')
- Parameters
file_name – (str) the name of the file name to log
name – (str) the name to store the artifact as. Defaults to the file name. If the name param includes the file extension (or is not passed in) you will be able to preview it in the mlflow UI (image, text, html, geojson files).
run_uuid – (str) the run uuid of a previous run, if none, defaults to current run
artifact_path – If you would like the artifact logged as a subdirectory of an particular folder, you can set this value. If the directory doesn’t exist, it will be created for this run’s artifact path.
- Returns
None
- NOTE
We do not currently support logging directories. If you would like to log a directory, please zip it first and log the zip file
-
_log_feature_transformations(unfit_pipeline)[source]¶ Log feature transformations for an unfit spark pipeline Logs –> feature movement through the pipeline
- Parameters
unfit_pipeline – (PipelineModel) unfit spark pipeline to log
- Returns
None
-
_log_model(model, name='model', model_lib=None, **flavor_options)[source]¶ Log a trained machine learning model
- Parameters
model – (Model) is the trained Spark/SKlearn/H2O/Keras model with the current run
name – (str) the run relative name to store the model under. [Deault ‘model’]
model_lib – An optional param specifying the model type of the model to log Available options match the mlflow built-in model flavors https://www.mlflow.org/docs/1.8.0/models.html#built-in-model-flavors
flavor_options – (**kwargs) The full set of save options to pass into the save_model function. If this is passed, model_class must also be provided and the keys of this dictionary must match the params of that functions signature (ie mlflow.pyfunc.save_model). An example of pyfuncs signature is here, although each flavor has its own. https://mlflow.org/docs/latest/python_api/mlflow.pyfunc.html#mlflow.pyfunc.save_model
-
_log_model_params(pipeline_or_model)[source]¶ Log the parameters of a fitted spark model or a model stage of a fitted spark pipeline
- Parameters
pipeline_or_model – fitted spark pipeline/fitted spark model
-
_log_pipeline_stages(pipeline)[source]¶ Log the pipeline stages of a Spark Pipeline as params for the run
- Parameters
pipeline – (PipelineModel) fitted/unitted pipeline
- Returns
None
-
_login_director(username=None, password=None, jwt_token=None)[source]¶ Authenticate into the MLManager Director
- Parameters
username – (str) database username
password – (str) database password
jwt_token – (str) database JWT token authentication
Either (username/password) for basic auth or jwt_token must be provided. Basic authentication takes precedence if set (mlflow default)
-
_lp(key, value)[source]¶ Add a shortcut for logging parameters in MLFlow.
- Parameters
key – (str) key for the parameter
value – (str) value for the parameter
- Returns
None
-
_mlflow_patch(name)[source]¶ Create a MLFlow Patch that applies the default gorilla settings
- Parameters
name – destination name under mlflow package
- Returns
decorator for patched function
-
_register_feature_store(fs: splicemachine.features.feature_store.FeatureStore)[source]¶ Register a feature store for feature tracking of experiments
- Parameters
feature_store – (FeatureStore) The feature store
- Returns
None
-
_register_splice_context(splice_context)[source]¶ Register a Splice Context for Spark/Database operations (artifact storage, for example)
- Parameters
splice_context – (PySpliceContext) splice context to input
- Returns
None
-
_remove_active_training_set()[source]¶ Removes the active training set from mlflow. This function deletes mlflows active training set (retrieved from the feature store), which will in turn stop the automated logging of features to the active mlflow run. To recreate an active training set, call fs.get_training_set or fs.get_training_set_from_view in the Feature Store.
-
_set_mlflow_uri(uri)[source]¶ Set the tracking uri for mlflow. Only needed if running outside of the Splice Machine K8s Cloud Service
- Parameters
uri – (str) the URL of your mlflow UI.
- Returns
None
-
_start_run(run_id=None, tags=None, experiment_id=None, run_name=None, nested=False)[source]¶ Start a new run
- Example
mlflow.start_run(run_name='my_run') # or with mlflow.start_run(run_name='my_run'): ...
- Parameters
tags – a dictionary containing metadata about the current run. For example: { ‘team’: ‘pd’, ‘purpose’: ‘r&d’ }
run_name – (str) an optional name for the run to show up in the MLFlow UI. [Default None]
run_id – (str) if you want to reincarnate an existing run, pass in the run id [Default None]
experiment_id – (int) if you would like to create an experiment/use one for this run [Default None]
nested – (bool) Controls whether run is nested in parent run. True creates a nest run [Default False]
- Returns
(ActiveRun) the mlflow active run object
-
_timer(timer_name, param=False)[source]¶ Context manager for logging
- Example
with mlflow.timer('my_timer'): ...
- Parameters
timer_name – (str) the name of the timer
param – (bool) whether or not to log the timer as a param (default=True). If false, logs as metric.