Showcasing Databricks Vector Search: A Hands-On Example [2/2]

Part 2: The Hands On! 👨‍💻

Awadelrahman M. A. Ahmed
17 min readJan 31, 2024

Now, as we discussed the fundamentals in Part 1, we will see all that in action in this part.

Setting Your Expectations

In this part you will :

  • Set up Databricks Vector Search and its integrated features using the SDK. Yes, you can do most, if not all, of this using the UI.
  • Learn how to configure and create an embedding Model Serving Endpoint.
  • Understand the process of creating vector search endpoints for effective data querying.
  • Create and use a vector search index.
  • Testing what we have built with a couple of scenarios.
  • You’ll learn helpful best practices for practical implementations, marked with a thumbs-up emoji 👍.

Get the Dataset and Create our First Delta Table

In this use case, we use a book description dataset available on Kaggle, known as the Book Genre Prediction Dataset.

To kickstart, what I did was download the dataset and then upload it to my Databricks workspace as a CSV file named ‘data.csv’.

The first step is to store the data in a Delta table, which will serve as the source for our vector search service. Yes, we could accomplish this in one step if we were familiar with the data. However, since I’ve just sourced this dataset from the internet and am not yet familiar with its contents, we’ll start with some exploration. This way, we can make informed decisions about how to proceed. I also want to bring you along to see how I made my decisions.

So, in a notebook in my Unity Catalog-enabled workspace and compute, I read the ‘data.csv’ file and display its contents (we will mostly use PySpark for this implementation).

df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/books-data/data.csv")
display(df)

Well, our dataset contains four columns: an index, a book title, a genre, and a summary. It looks like we have some great ‘text’ data, which is perfect for considering a similarity-based use case. How about thinking of a book recommender use case? We can test that later.

👍 I prefer to save my raw data in a Delta table, a practice I’ve adopted from Databricks’ best practices from their Medallion architecture. They recommend using a ‘bronze layer’ that mirrors the exact source data without any manipulation or cleaning. Cleaning and processing is done in the subsequent ‘silver layer’!

Let’s name our bronze layer table ‘dataset’ and save it in a catalog called ‘ai_showcase’ under the schema ‘books’ that is already created. I’ll set it to overwrite any existing table if necessary. Here’s how the code looks:

from delta import *
df.write.format("delta").mode("overwrite").saveAsTable("ai_showcase.books.dataset")

Data Quality Check and Deduplication

Of course, cleaning data is always one of the first tasks we undertake, regardless of how smart or advanced the solution is! We need to ensure data quality.

Let’s check our dataset in the Delta table and print its schema to see our four columns. I read it into a variable called ‘raw_table’.

raw_table_name="ai_showcase.books.dataset"
raw_table = spark.read.table(raw_table_name)
raw_table.printSchema()
Raw Table Schema

Let us perforn data quality checks and analysis on a table. We will create ‘table_clean’ by filtering our ‘raw_table’ to:

table_clean= raw_table.filter("index is NOT NULL AND\
index RLIKE '^[0-9]+$' AND\
title IS NOT NULL AND\
size(split(title, ' ')) <= 30 AND\
summary IS NOT NULL AND")
errors=raw_table.subtract(table_clean)
print("Good Quality Rows:",table_clean.count())
print("Low Quality Rows:" ,errors.count())

👍 It’s a best practice to save these erroneous records to a Delta table so that we can check them later. We will write the errors to an error table.

errors_table_name="ai_showcase.books.books_errors"
errors.write.format("delta").mode("overwrite").saveAsTable(errors_table_name)

An important step is to check for duplicates. We can assume that no two books can have the same titel and index. While more sophisticated deduplication criteria can be applied, this is sufficient for our use case.

table_clean = table_clean.dropDuplicates(["index", "title"])
print("Clean Table:",table_clean.count(),"out of:", raw_table.count())

So, we ended up with 4,657 unique rows that meet our criteria out of the 9,127 rows in the original dataset! Well, that’s just about half of our data :) Note that we didn’t just delete the rest; we simply moved them to another table for a closer look later.

Combining Relevant Columns

Now, let’s think ahead for a moment! Consider our use case, which requires embedding text to enable similarity-based searches later. We could use just the title column, or alternatively, we can use the genre. We could also think that utilizing the summary might also provide more diverse information about the book. So which one should we use?

The beauty lies in the possibility of combining all three into one text. We shouldn’t be afraid of it because text is unstructured data, and combining these columns doesn’t make it “more” unstructured; it’s already unstructured :)!

Let’s create a new column for this purpose, called ‘combined_text.’ Here’s the code for generating it, and let’s also print the schema of the new table we create!

from pyspark.sql import functions as F

combined = table_clean.withColumn(
"combined_text",
F.concat(
F.lit("Title: "), "title", F.lit("\n"),
F.lit("Genre: "), "genre", F.lit("\n"),
F.lit("Summary: "), "summary", F.lit("\n")
)
)
combined.printSchema()
Combined Table Schema

Yes, ‘for now,’ this Delta table called “combined” can be our source table from which we will create our vectors and search index for our vector search. I say ‘for now’ because there will be two tiny modifications we’ll make to this table later. I know some of you are curious about what these modifications are. We’ll address them later, but to satisfy your curiosity, they involve chunking this data to fit the embedding model’s limits and enabling the change data feed for automatic index updates, will do that later.

So, for now, let us save this “source-ish” table in our database:

combined.write.saveAsTable("ai_showcase.books.combined",mode="overwrite")

Create Model Serving Endpoint

As we mentioned in our previous post, to make this Vector Search work, we need two types of endpoints. One is the model serving endpoint, which we will create in this section, and the other is the vector search endpoint that we will create later!

👍But wait! We need to understand what this model serving endpoint is supposed to do. This way, we can check if there are any additional requirements for our Delta table “ai_showcase.books.combined”.

I say this because I am sure it has some limitations :) In fact, I did the exercise the harder way. I mean, I did not check for the limitations initially. Then, after using it and facing some issues, I had to backtrack my integration process and discovered there were some limitations I had to consider. So now, I can pretend that I am using proper forward thinking, checking for the limitations first, and doing things together.

So back to our task, we want to create a model serving endpoint. The purpose of this endpoint is to host the machine learning model (transformation, if you have read Part 1) that will smartly transform our raw data into vectors. I say ‘smartly’ because we expect this model to place similar data points close to each other in the vector space!

Well… all this mouthful of a sentence, ‘creating vectors smartly because we are expecting this model to place similar data points close to each other in the vector space,’ can just be called embedding! So, we need an embedding model that we can deploy in a Databricks model serving endpoint and use to calculate the embeddings (our smart vectors).

Now, the “delicious” aspect of this Databricks Vector Search is that you can deploy whatever embedding model you want. It could be a pre-trained open-source model or a custom model you created from scratch! You just have to configure your serving endpoint. Suppose we are generic here (sometimes I feel being generic is synonymous with being lazy :) ), so we can use one of OpenAI’s embedding models!

Now, if we go to the OpenAI embedding model documentation, we see that it has a limitation of a maximum input of 8191 tokens for their two embedding models. And we should consider this.

From OpenAI Embedding Models Documentation Page

Check Tokens Length (do we need chunking?)

Let’s have a look at our token lengths in our text data to make sure it fits this limit; otherwise, embeddding models will lose some context.

To make a rough estimation, we could use the first link that Google search returns, suggesting that 75 words ≈ 100 tokens. We can check programmatically for the approximate token count to see how many data points exceed the limit.

import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType

df_with_length = combined_table.withColumn('text_length', F.length(combined_table['combined_text']))\
.withColumn('token_length', (F.length(combined_table['combined_text'])/75*100).cast(IntegerType()))\
.select("index", "title","text_length","token_length")

max_input_tokens = 8191 # open AI max tocken check: https://platform.openai.com/docs/guides/embeddings/embedding-models
long_text= df_with_length.filter(df_with_length['token_length'] > max_input_tokens)
print("Number of texts exceeding max token:",long_text.count())

👍Well, 69 isn’t a large number compared to our dataset size of 4,657. Remember, we are dealing with book summaries, but this number could be significant if your dataset contains documents with longer texts. So, let’s see how to address this.

I won’t be inventing a new solution; it’s already outlined in one of the notebooks on the Databricks website, I’ve just adapted it a little. The solution is chunking! We split these long texts into separate rows based on a specific chunk length of our choosing. I wouldn’t use 8191 as my chunk length, as I prefer shorter lengths so that the embedding model pays more attention to smaller text segments. Let’s go with 1024 (why not use a base 2 number? :) ). We can adapt the chunking function from the source, and here’s the code we get.

Also, we will not rely on the tokens length approximation. We will use a tokenizer library called ‘tiktoken’ (you should pip install it), and we can create a tokenizer with “cl100k_base” encoding.

import tiktoken

tokenizer = tiktoken.get_encoding("cl100k_base")

You need to know 2 functions: encode and decode, let us test that:

print(tokenizer.encode("Vector Search is smart!"))
print(tokenizer.encode("Vector Search is brilliant!"))
tokenizer.decode([3866, 7694, 374, 7941, 0])

So, now let’s check our data. If you remember, we just saved our data in the “ai_showcase.books.combined” table. We can load that now and print the schema and length, just to remind ourselves how it looks like:

combined_table_name = "ai_showcase.books.combined"
print("Reading Table:",combined_table_name)

combined_table= spark.read.table(combined_table_name)
combined_table.printSchema()
print("num of rows in ",combined_table_name ,combined_table.count())

We can write our chuncking function “chunk_text”:

def chunk_text(text, max_chunk_tokens = 1024):
tokens = tokenizer.encode(text)
chunked_text = []
while tokens:
chunk_tokens = tokens[:max_chunk_tokens]
chunk_text = tokenizer.decode(chunk_tokens)
chunked_text.append(chunk_text)
tokens = tokens[max_chunk_tokens:]
return chunked_text

Now we iterate through the data rows and chunk long texts into multiple rows, each with new indexes. Here’s how it’s done:

combined_pandas = combined_table.toPandas()
processed_data = []
for _ , row in combined_pandas.iterrows():
text_chunks = chunk_text(row['combined_text'])
chunk_no = 0
for chunk in text_chunks:
row_data = row.to_dict()
row_data['index'] = f"{row['index']}:{chunk_no}"
row_data['combined_text'] = chunk
processed_data.append(row_data)
chunk_no += 1

We can check how this chunking modified our data; we may need Pandas.

import pandas as pd
chunked_pandas_df = pd.DataFrame(processed_data)
chunked_spark_df = spark.createDataFrame(chunked_pandas_df)
print("combined table length :", combined_table.count())
print("chunked table length :", chunked_spark_df.count())
chunked_spark_df.display()

So, our newly chunked table has 4,839 rows, compared to the 4,657 in our original one. For instance, the book ‘Third Girl’ had a summary longer than 1024 tokens, and it has been split into two rows.

Remember, this chunking is the first (of two) tiny modification we referred to earlier in this post in the ‘combining columns’ section.

With this new chunked table, we have updated our source data for vector search. However, there’s still another tiny modification we need to make, but we’ll address that later. Let’s create this new table and call it ‘chunked.’

You might wonder if saving these multiple tables is an efficient. My short answer here is that in this post, we did not focus on optimizing storage. The aim is to make it easy to follow and get it working. You could do much storage optimization, but let’s focus on the service for now. Here’s how to save the new ‘chunked’ table.


chunked_table_name = "ai_showcase.books.chunked_data"
chunked_spark_df.write.format("delta").mode("overwrite").saveAsTable(chunked_table_name)

Create Embedding Model Serving Endpoint

Now, we’ve reached the point where we can create our model serving endpoint. For this, I’m assuming that:

  • You have your OpenAI API key.
  • This key is saved as a secret within a scope in your Databricks workspace. In my case, I saved my secret in an Azure Key Vault and connected the Key Vault to my Databricks workspace. This process might differ if you’re using another cloud provider. However, ultimately, your API key should be stored as a secret within a scope in Databricks.

If you set that up, you can check your scopes and secrets using these commands from within your notebook; they are self-explanatory.

dbutils.secrets.listScopes()
dbutils.secrets.list(scope="YOUR SCOPE")
secret_value = dbutils.secrets.get("YOUR SCOPE","YOUR SECRET NAME")
for char in secret_value[:5]:
print(char, end= " ... ")

To deploy the embedding model to serving endpoint we use mlflow deployments. we should tell it the deployment target as it supposed to support deploeymnet to differnt targets. Our target is simply “databricks”. Then we should create the endpoint with confingured with external model and we follow this template and we end up with this code:

import mlflow.deployments

mlflow_deploy_client = mlflow.deployments.get_deploy_client("databricks") # deployment target

mlflow_deploy_client.create_endpoint(
name=emb_model_endpoint_name,
config={
"served_entities": [{
"external_model": {
"name": "text-embedding-ada-002",
"provider": "openai",
"task": "llm/v1/embeddings",
"openai_config": {
"openai_api_key": "{{secrets/vector-search/openai-key}}"
}
}
}]
}
)

If everything goes smoothly, a new endpoint will appear in your Databricks UI under the ‘Serving’ section with Ready state.

Create a Vector Search Endpoint

Now, this Vector Search endpoint should will allow us to do two main things: create our search index and query our search index.

Let’s start by creating the vector search endpoint let us call it “books-vs-endpoint”. For this, we need “VectorSearchClient” module from “databricks-vectorsearch” package (you should pip install it). I see you can only set the type to STANDARD endpoint.

from databricks.vector_search.client import VectorSearchClient
vs_client = VectorSearchClient()
vs_endpoint_name="books-vs-edpoint"
vs_client.create_endpoint(
name=vs_endpoint_name,
endpoint_type="STANDARD"
)
vs_client.list_endpoints()

To verify if the endpoint has been created, you can either use the vs_client.list_endpoints() function or simply navigate to the 'Compute' section and then the 'Vector Search' tab in your Databricks UI. And YES, that is a compute resource :) !

Create Search Index

Now it looks like we’re ready to use what we’ve built so far to create the index! It is worth noting here that Databricks offers two main types of indexes, depending on whether you want to synchronize updating the index with updates to your source Delta table that is Delta Sync Index, or if you prefer to update the index manually, that is Direct Vector Access Index. Another consideration is whether you want to manage the embeddings yourself or let Databricks handle them for you. For our purposes, we will use the Managed Delta Sync Index.

I really appreciated how convenient the “create_delta_sync_index” function is! To use it, you need to specify a few key details:

  • The name of your vector search endpoint.
  • The name of your model serving endpoint.
  • The name of your source Delta table.
  • The name of the column within that table that you want to index.
  • Whether you want the indexing to be continuous or triggered. For our case, we’ve chosen the triggered option.

However since we will create a Delta Sync Index, there’s a tiny modification we need to make to our source table (yes, this is our second tiny modification we should do). We need to enable the change data feed to track row-level changes between versions of our source table. This allows the index to be updated whenever a change occurs in the source table. We can do this with the following line of code:

%sql
ALTER TABLE ai_showcase.books.chuncked_data SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

To ensure everything is correct regarding the names, I will print the values here:

index_name = "ai_showcase.books.books_openai_index"
source_table_name="ai_showcase.books.chunked_data"

print("endpoint_name :",vs_endpoint_name)
print("source_table_name :", source_table_name)
print("index_name :",index_name)
print("embedding_model_endpoint_name:", emb_model_endpoint_name)

These are our set names:

We can now proceed to create the index. After creating it, you can either print the status right here in the code or check the status in your Databricks UI.

index = vs_client.create_delta_sync_index(
endpoint_name=vs_endpoint_name,
source_table_name=source_table_name,
index_name=index_name,
pipeline_type='TRIGGERED',
primary_key="index",
embedding_source_column="combined_text",
embedding_model_endpoint_name=emb_model_endpoint_name
)
index.describe()['status']['message']

This process will take some time, so patience is key! For my cluster to complete the task, it took about 3,200 seconds, which is roughly 55 minutes. You might want to use this code to monitor the progress and determine when it’s ready to use.

import time
from IPython.display import clear_output

start_time = time.time()

index = vs_client.get_index(endpoint_name=vs_endpoint_name,
index_name=index_name)

elapse=0
while not index.describe().get('status')['ready']:
clear_output(wait=True)
print("Waiting for index to be ready...", elapse, "seconds")
time.sleep(30)
elapse+=30

elapsed_time = time.time() - start_time
print("Index is ready!")
print(f"Time elapsed: {elapsed_time:.2f} seconds")
index.describe()

Using it Up!

To use it, we need a VectorSearchClient. We instantiated one earlier, but why not instantiate it again? This assumes that the usage is detached from development, perhaps in a separate notebook or app.

from databricks.vector_search.client import VectorSearchClient
vs_client = VectorSearchClient()

We can first obtain our index and check if it’s ready:

vs_endpoint_name="books-vs-edpoint"
index_name = "ai_showcase.books.books_openai_index"

index = vs_client.get_index(endpoint_name=vs_endpoint_name,
index_name=index_name)
index.describe()['status']['message']

Now, let’s try querying with a real example. We’ll pick a random book, ‘An Acquaintance with Darkness,’ and use its summary from our source data for this exercise:

An Acquaintance with Darkness is the story of 14-year-old Emily Pigbush, who lives with her mother in Washington, D.C., in 1865. Emily’s father died during the Civil War while fighting for the Union. Now the Pigbushes’ final servant, Ella May, has left because she was freed, leaving Emily to care for her mother alone. However, Emily sometimes has the help of her close friend, sixteen-year-old Annie Surratt, whose mother runs the boarding house across the street. Emily’s mother is near death…

Let’s imagine a scenario where you vaguely remember a book with ‘dark’ in its title, related to the Civil War, and set in a major American city, possibly California.

So, to test our vector search, we might use the phrase ‘the dark in california civil war’ as our query. For this purpose, we’ll utilize the convenient function “index.similarity_search”.

query="the dark in california civil war"
results = index.similarity_search(
query_text=query,
columns=["title","summary"],
num_results=5
)

If you are curious about which algorithm Databricks use to calculate similarity that is Hierarchical Navigable Small World (HNSW) for its approximate nearest neighbor searches and the cosine similarity distance metric to measure embedding vector similarity.

Perhaps it’s important to take a moment to understand the structure of the returned response we’ll receive. This will help us interpret the results more effectively.

print(type(results))
print(results.keys())
print(results["result"].keys())
print(len(results["result"]["data_array"]))
print(len(results["result"]["data_array"][0]))

The output is a dictionary containing a ‘result’ key that holds an array list of the top 5 results (can return more). Each item in this list consists of three values: the first two are our requested columns, namely the title and summary, and the third element is a score indicating the similarity to our query. We can display these results as follows.

rows = results['result']['data_array']
for i, (title, summary, score) in enumerate(rows):
if len(summary) > 50:
# trim text output for readability
summary = summary[0:50] + "..."
print(f" Title: {title} Score: {score}")

You can see that the top result is our book! Even though other books in the results explicitly has the exact word ‘dark,’ which was in our query, the search index still returned our book as the first title.

This can show the effectiveness of the vector search index, as it can also return titles that do not contain any specific words from our query, such as ‘The Guns of the South.’ Yes, we used the summary of the book combined with the title in our search, but the key point here is that we don’t need to query the exact name; a degree of similarity in the content or context is sufficient for our search.

Let’s try to be more ambiguous😈. We will randomly grab some readers’ reviews from Amazon for various books and clean the reviews a bit by deleting explicit words, like the book title, for example. Then, we can take that random review and use it as input for our search to see if our book will show up in the list!

Let us see a book titled “The Birthday Boys” and I just get this random review and query our search index:

An historic fiction about the fated trek of the English team to the South Pole, the story is spellbinding and enables the reader to experience the challenges of this journey through the eyes, ears, thoughts and memories of different members of the team.While I personally would never be intrigued enough to make such a dangerous and difficult trip, the writing gave me insight into the kinds of people who are so motivated and instilled in me great respect for their courage, stamina and willingness to help each other. Highly recommend it.

review_book="An historic fiction about the fated trek of the English team to 
the South Pole, the story is spellbinding and enables the reader
to experience the challenges of this journey through the eyes,
ears, thoughts and memories of different members of the team.
While I personally would never be intrigued enough to make
such a dangerous and difficult trip, the writing gave me insight into the kinds of
people who are so motivated and instilled in me great respect
for their courage, stamina and willingness to help each other.
Highly recommend it."
# Book: The Birthday Boys    
results = index.similarity_search(
query_text=review_book,
columns=["title","summary"],
num_results=10
)
rows = results['result']['data_array']
for (title, summary, score) in rows:
if len(summary) > 100:
# trim text output for readability
summary = summary[0:100] + "..."
print(f"Title: {title:30} with Score: {score}")

Well, the search returned a list of books, and our book is the 9th (or if you consider the two first returned are the same due to chunking, that would be the 8th) in rank based on similarity with our query. Even though, you might have expected a better result, just as I did, I feel amazed how fast that was and how there are no explicit words in the review that can lead to that book!

A recommender System

Let us try another quick use case. As we have just seen, the search index thinks that some books are more relevant to the query (the reviewer), so maybe we can recommend these books to the reviewer! We can have some sort of a recommender system! That means there are other books.

In fact, we can select a random book that we assume a person has purchased, and then recommend similar books.

random_row_id=25
random_row= table_rows[random_row_id]#[random_row_id]
bought_book_title=random_row["title"]
bought_book_summary=random_row["summary"]
print("Title:",bought_book_title)
print("Summary:",bought_book_summary[:500],"...")
results = index.similarity_search(
query_text=bought_book_summary,
columns=["title","summary"],
num_results=5
)

rows = results['result']['data_array']
print('The similar books to: "', bought_book_title, '":')
print("---")

for (title, summary, score) in rows:
if len(summary) > 100:
# trim text output for readability
summary = summary[0:100] + "..."
print(f"Title: {title:30} with Score: {score}")

Conclusion

As we conclude, we’ve successfully explored Databricks Vector Search. We’ve covered setting up Databricks Vector Search and its integrated features using the SDK, and we’ve explored the process of creating vector search endpoints, embedding model serving endpoints, and vector search indices. Along the way, we’ve shared valuable best practices to enhance your practical implementations.

Remember that the journey of learning and experimenting never truly ends. Each query, each line of code, and every dataset present a new opportunity to discover something remarkable. So, keep exploring, keep questioning, and keep pushing the boundaries of what’s possible with data science and machine learning!

--

--