Automating ML Workflows: Webhooks in Databricks with MLflow

A Use Case:
Machine Learning (ML) is a dynamic field where models are continuously improved and updated. Consider an ML engineer at a tech company that deploys models for image recognition. Every time the engineer updates or improves a model, they must ensure it meets the required accuracy and performance metrics before it’s deployed into production.
Manually testing these models every time they’re updated can be tedious and error-prone. Moreover, waiting for scheduled tests may delay the deployment of an improved model, which could mean missing out on enhanced performance or user experience.
Webhooks to the Rescue:
This is where webhooks come in handy. Instead of manually initiating tests or waiting for a scheduled job, webhooks can be set up to automatically trigger the testing process the moment a new model version arrives in the model registry. This automation streamlines the workflow, ensuring that models are tested promptly, and if they meet the criteria, they can be deployed without unnecessary delays.
So, in essence, webhooks plays a pivotal role in the CI/CD process for ML jobs, ensuring faster feedback loops and more efficient deployment cycles!
Webhooks with Databricks and MLflow:
Within the realm of ML automation, MLflow’s Model Registry in Databricks offers using “Webhooks with Job triggers.” Though other platforms might offer similar functionalities, our focus will be on Databricks.
This feature enables an immediate reaction within the Databricks workspace whenever there’s a model update or stage shift. Simply put, as soon as a new model version appears, a corresponding job — be it validation, testing, or deployment — can be triggered instantly, ensuring a streamlined process for every model update within Databricks.
Prerequisites:
- Databricks Knowledge: You should be acquainted with Databricks and its interface.
- Databricks Workspace: Ensure you’ve set up a workspace.
- Active Cluster: A cluster should be created and active in your workspace for executing code.
Step1: Train and Register a Model
In the following code, we emulate a genuine scenario faced by data scientists. Using a dataset, named `my_data.csv`, weuse LightGBM to create a predictive model.
Then, by leveraging MLflow within the Databricks ecosystem, we systematically log its details and performance metrics. We also register the model at the end of the code.
This step is pivotal in our journey, laying the groundwork for the automation that Databricks offers through webhooks.
Create a new notebook, name it as you want and the following code goes its cell.
import mlflow
from mlflow.models.signature import infer_signature
import pandas as pd
import lightgbm as lgb
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
with mlflow.start_run(run_name="Webhook LGBM Experiment") as run:
# Assumption: The dataset 'my_data.csv' is stored at a predefined path.
# Also assuming that the target column named "target"
df = pd.read_csv("/path_to_your_data/my_data.csv")
# Splitting the dataset into training and test sets.
X_train, X_test, y_train, y_test = train_test_split(df.drop(["target"],
axis=1),
df["target"],
random_state=42)
# Generating a signature for the input and output of the model
# using the training data.
signature = infer_signature(X_train, pd.DataFrame(y_train))
# Getting a few example rows from the training data for model
# documentation.
example = X_train.head(3)
# Initializing and training the LightGBM regressor.
lgbm = lgb.LGBMRegressor(random_state=42)
lgbm.fit(X_train, y_train)
# Logging the trained LightGBM model to MLflow along with the signature
# and example input.
mlflow.lightgbm.log_model(lgbm, "lightgbm-model",
signature=signature,
input_example=example)
# Calculating the mean squared error (MSE) on the test data.
mse = mean_squared_error(y_test, lgbm.predict(X_test))
# Logging the calculated MSE to MLflow.
mlflow.log_metric("mse", mse)
# Retrieving the run and experiment IDs for future reference.
run_id = run.info.run_id
experiment_id = run.info.experiment_id
# Register the model
model_uri = f"runs:/{run_id}/lightgbm-model"
name= "my_model"
model_details = mlflow.register_model(model_uri=model_uri,
name=name)
Step2: Creating a Job in Databricks Using the UI:
As simple code on a notebook
To easily visualize the automatic triggering of your job through the webhook, you can create a straightforward task that just waits for 5 minutes. This should give you ample time to observe that the job was initiated automatically.
Here’s what you can do:
- Create a new notebook in Databricks and name it whatever you prefer.
- Paste the following code into that notebook:
import time
# This will make the job pause for 5 minutes (5 minutes * 60 seconds/minute)
time.sleep(5 * 60)
Save and run the notebook. If everything is set up correctly, once your model transitions between stages, this notebook will automatically execute, pausing for the specified 5 minutes.
Create the job itself
To set up a job that executes the notebook we’ve crafted, follow these steps in the Databricks UI:
- Access Sidebar: On the left of the Databricks UI, hover over the sidebar.
- Initiate Job Creation: Click on “Create Job”.
- Naming: Provide a suitable name for your job.
- Select Notebook: Choose the notebook you’ve previously created for model training.
- Choose Cluster: Ensure you select an active cluster for job execution.
- Record Job ID: After setting up the job, make sure to copy and save the Job ID — it’s crucial for later steps.
Note: you can do that programmatically, we just want to make ourlives easier.
Step3: Create a Job Webhook
You need a token!
In Databricks, a token serves as a secure method for authentication and authorization. It confirms the identity of the user making API requests and ensures they have the appropriate permissions to carry out specific actions. Essentially, it’s a safeguard to prevent unauthorized access and actions in your Databricks environment.
How to Create a User Access Token in Databricks:
- Access Settings: Click on the Settings icon in the Databricks UI.
- Navigate to User Settings: Select “User Settings”.
- Access Tokens: Go to the “Access Tokens” tab.
- Generate New Token: Click the “Generate New Token” button.
- Set Details: Optionally, provide a description or comment for the token, and specify its expiration period.
- Finalize Token Creation: Click the “Generate” button.
- Copy & Store Safely: Once the token is generated, make sure to copy it immediately and store it in a secure location, as you won’t be able to view it again.
Create the Webhook:
Within the context of MLflow and the Databricks Model Registry, several events can potentially trigger a webhook. These events correspond to different transitions or changes in the life cycle of a model version.
In this example, we will use the most natural choice: triggering a webhook when a model version transitions between stages.
The code sets up a webhook on Databricks using MLflow. It specifies that the webhook should be triggered when a model version changes its stage. To achieve this, it constructs an HTTP request to the appropriate Databricks API endpoint, including relevant details like the model name, the type of event to watch for, and other specifications like the job to trigger.
import json
from mlflow.utils.rest_utils import http_request
from mlflow.utils.databricks_utils import get_databricks_host_creds
# Fetch the Databricks instance's webapp URL
instance = mlflow.utils.databricks_utils.get_webapp_url()
# Specify the API endpoint for webhook creation
endpoint = "/api/2.0/mlflow/registry-webhooks/create"
# Get the Databricks host credentials
# used for authenticating the HTTP request
host_creds = get_databricks_host_creds("databricks")
# Construct the JSON payload for the webhook
job_json = {
"model_name": name, # Name of the model to monitor
"events": ["MODEL_VERSION_TRANSITIONED_STAGE"], # Type of event
"description": "A webhook trigger", # A description of the webhook
"status": "Active", # Status of the webhook (can be Active or Inactive)
"job_spec": {
"job_id": job_id, # The ID of the Databricks job to run
"workspace_url": instance, # Databricks workspace URL
"access_token": token # Access token for authentication
}
}
# Make an HTTP POST request to create the webhook
response = http_request(
host_creds=host_creds, # Databricks host credentials
endpoint=endpoint, # API endpoint for webhook creation
method="POST", # HTTP method
json=job_json # JSON payload containing webhook details
)
# Check if the HTTP request was successful
assert response.status_code == 200, \
f"Expected HTTP 200, received {response.status_code}"
Test the Webhook:
After registering the webhook, transition the model from “None” to “Staging” in the Experiment UI. Check the “Jobs” tab to confirm the job has executed. We do that by:
- Open Databricks and click the
MLflow
icon. - Find and select your model.
- In “Model Versions”, choose your model version.
- Change its stage to “Staging”.
- Go back to Databricks main UI.
- Click the “Jobs” tab to see your job status.
Delete the Webhook
No we can delete our webhook, that is by essentially communicating to the MLflow API that we wish to delete a webhook you previously created.
# Define the webhook ID to delete
delete_wh_json = {"id": your_webhook_id}
# Specify the API endpoint for webhook deletion
endpoint = f"/api/2.0/mlflow/registry-webhooks/delete"
# Send an HTTP DELETE request to remove the webhook
response = http_request(
host_creds=host_creds,
endpoint=endpoint,
method="DELETE",
json=delete_wh_json
)
# Ensure the request was successful
assert response.status_code == 200,\
f"Expected HTTP 200, received {response.status_code}"
Conclusion
I hope this walkthrough on utilizing webhooks to automate ML tasks in Databricks using MLflow has been insightful. Automating workflows is a significant step towards efficient and seamless ML operations.
If this post has eased even a small part of your ML journey, then it has served its purpose. Always keep exploring and optimizing, and remember, every little automation goes a long way in the grand scheme of things. Happy coding!