Showcasing Databricks Vector Search: A Hands-On Example [2/2]
Part 2: The Hands On! đ¨âđť
Now, as we discussed the fundamentals in Part 1, we will see all that in action in this part.
Setting Your Expectations
In this part you will :
- Set up Databricks Vector Search and its integrated features using the SDK. Yes, you can do most, if not all, of this using the UI.
- Learn how to configure and create an embedding Model Serving Endpoint.
- Understand the process of creating vector search endpoints for effective data querying.
- Create and use a vector search index.
- Testing what we have built with a couple of scenarios.
- Youâll learn helpful best practices for practical implementations, marked with a thumbs-up emoji đ.
Get the Dataset and Create our First Delta Table
In this use case, we use a book description dataset available on Kaggle, known as the Book Genre Prediction Dataset.
To kickstart, what I did was download the dataset and then upload it to my Databricks workspace as a CSV file named âdata.csvâ.
The first step is to store the data in a Delta table, which will serve as the source for our vector search service. Yes, we could accomplish this in one step if we were familiar with the data. However, since Iâve just sourced this dataset from the internet and am not yet familiar with its contents, weâll start with some exploration. This way, we can make informed decisions about how to proceed. I also want to bring you along to see how I made my decisions.
So, in a notebook in my Unity Catalog-enabled workspace and compute, I read the âdata.csvâ file and display its contents (we will mostly use PySpark for this implementation).
df = spark.read.format("csv").option("header", "true").load("dbfs:/FileStore/books-data/data.csv")
display(df)
Well, our dataset contains four columns: an index, a book title, a genre, and a summary. It looks like we have some great âtextâ data, which is perfect for considering a similarity-based use case. How about thinking of a book recommender use case? We can test that later.
đ I prefer to save my raw data in a Delta table, a practice Iâve adopted from Databricksâ best practices from their Medallion architecture. They recommend using a âbronze layerâ that mirrors the exact source data without any manipulation or cleaning. Cleaning and processing is done in the subsequent âsilver layerâ!
Letâs name our bronze layer table âdatasetâ and save it in a catalog called âai_showcaseâ under the schema âbooksâ that is already created. Iâll set it to overwrite any existing table if necessary. Hereâs how the code looks:
from delta import *
df.write.format("delta").mode("overwrite").saveAsTable("ai_showcase.books.dataset")
Data Quality Check and Deduplication
Of course, cleaning data is always one of the first tasks we undertake, regardless of how smart or advanced the solution is! We need to ensure data quality.
Letâs check our dataset in the Delta table and print its schema to see our four columns. I read it into a variable called âraw_tableâ.
raw_table_name="ai_showcase.books.dataset"
raw_table = spark.read.table(raw_table_name)
raw_table.printSchema()
Let us perforn data quality checks and analysis on a table. We will create âtable_cleanâ by filtering our âraw_tableâ to:
- Exclude null values in the 'index', 'title', and 'summary' columns.
- Ensure 'index' contains only numbers.
- Limit the 'title' to 30 words or fewer. A fun fact that the Guinness world record for Longest title of a book contains 4,558 words with a main header of âStock Price Prediction ...â we are not expected that to be here :) .
table_clean= raw_table.filter("index is NOT NULL AND\
index RLIKE '^[0-9]+$' AND\
title IS NOT NULL AND\
size(split(title, ' ')) <= 30 AND\
summary IS NOT NULL AND")
errors=raw_table.subtract(table_clean)
print("Good Quality Rows:",table_clean.count())
print("Low Quality Rows:" ,errors.count())
đ Itâs a best practice to save these erroneous records to a Delta table so that we can check them later. We will write the errors to an error table.
errors_table_name="ai_showcase.books.books_errors"
errors.write.format("delta").mode("overwrite").saveAsTable(errors_table_name)
An important step is to check for duplicates. We can assume that no two books can have the same titel and index. While more sophisticated deduplication criteria can be applied, this is sufficient for our use case.
table_clean = table_clean.dropDuplicates(["index", "title"])
print("Clean Table:",table_clean.count(),"out of:", raw_table.count())
So, we ended up with 4,657 unique rows that meet our criteria out of the 9,127 rows in the original dataset! Well, thatâs just about half of our data :) Note that we didnât just delete the rest; we simply moved them to another table for a closer look later.
Combining Relevant Columns
Now, letâs think ahead for a moment! Consider our use case, which requires embedding text to enable similarity-based searches later. We could use just the title column, or alternatively, we can use the genre. We could also think that utilizing the summary might also provide more diverse information about the book. So which one should we use?
The beauty lies in the possibility of combining all three into one text. We shouldnât be afraid of it because text is unstructured data, and combining these columns doesnât make it âmoreâ unstructured; itâs already unstructured :)!
Letâs create a new column for this purpose, called âcombined_text.â Hereâs the code for generating it, and letâs also print the schema of the new table we create!
from pyspark.sql import functions as F
combined = table_clean.withColumn(
"combined_text",
F.concat(
F.lit("Title: "), "title", F.lit("\n"),
F.lit("Genre: "), "genre", F.lit("\n"),
F.lit("Summary: "), "summary", F.lit("\n")
)
)
combined.printSchema()
Yes, âfor now,â this Delta table called âcombinedâ can be our source table from which we will create our vectors and search index for our vector search. I say âfor nowâ because there will be two tiny modifications weâll make to this table later. I know some of you are curious about what these modifications are. Weâll address them later, but to satisfy your curiosity, they involve chunking this data to fit the embedding modelâs limits and enabling the change data feed for automatic index updates, will do that later.
So, for now, let us save this âsource-ishâ table in our database:
combined.write.saveAsTable("ai_showcase.books.combined",mode="overwrite")
Create Model Serving Endpoint
As we mentioned in our previous post, to make this Vector Search work, we need two types of endpoints. One is the model serving endpoint, which we will create in this section, and the other is the vector search endpoint that we will create later!
đBut wait! We need to understand what this model serving endpoint is supposed to do. This way, we can check if there are any additional requirements for our Delta table âai_showcase.books.combinedâ.
I say this because I am sure it has some limitations :) In fact, I did the exercise the harder way. I mean, I did not check for the limitations initially. Then, after using it and facing some issues, I had to backtrack my integration process and discovered there were some limitations I had to consider. So now, I can pretend that I am using proper forward thinking, checking for the limitations first, and doing things together.
So back to our task, we want to create a model serving endpoint. The purpose of this endpoint is to host the machine learning model (transformation, if you have read Part 1) that will smartly transform our raw data into vectors. I say âsmartlyâ because we expect this model to place similar data points close to each other in the vector space!
Well⌠all this mouthful of a sentence, âcreating vectors smartly because we are expecting this model to place similar data points close to each other in the vector space,â can just be called embedding! So, we need an embedding model that we can deploy in a Databricks model serving endpoint and use to calculate the embeddings (our smart vectors).
Now, the âdeliciousâ aspect of this Databricks Vector Search is that you can deploy whatever embedding model you want. It could be a pre-trained open-source model or a custom model you created from scratch! You just have to configure your serving endpoint. Suppose we are generic here (sometimes I feel being generic is synonymous with being lazy :) ), so we can use one of OpenAIâs embedding models!
Now, if we go to the OpenAI embedding model documentation, we see that it has a limitation of a maximum input of 8191 tokens for their two embedding models. And we should consider this.
Check Tokens Length (do we need chunking?)
Letâs have a look at our token lengths in our text data to make sure it fits this limit; otherwise, embeddding models will lose some context.
To make a rough estimation, we could use the first link that Google search returns, suggesting that 75 words â 100 tokens. We can check programmatically for the approximate token count to see how many data points exceed the limit.
import pyspark.sql.functions as F
from pyspark.sql.types import IntegerType
df_with_length = combined_table.withColumn('text_length', F.length(combined_table['combined_text']))\
.withColumn('token_length', (F.length(combined_table['combined_text'])/75*100).cast(IntegerType()))\
.select("index", "title","text_length","token_length")
max_input_tokens = 8191 # open AI max tocken check: https://platform.openai.com/docs/guides/embeddings/embedding-models
long_text= df_with_length.filter(df_with_length['token_length'] > max_input_tokens)
print("Number of texts exceeding max token:",long_text.count())
đWell, 69 isnât a large number compared to our dataset size of 4,657. Remember, we are dealing with book summaries, but this number could be significant if your dataset contains documents with longer texts. So, letâs see how to address this.
I wonât be inventing a new solution; itâs already outlined in one of the notebooks on the Databricks website, Iâve just adapted it a little. The solution is chunking! We split these long texts into separate rows based on a specific chunk length of our choosing. I wouldnât use 8191 as my chunk length, as I prefer shorter lengths so that the embedding model pays more attention to smaller text segments. Letâs go with 1024 (why not use a base 2 number? :) ). We can adapt the chunking function from the source, and hereâs the code we get.
Also, we will not rely on the tokens length approximation. We will use a tokenizer library called âtiktokenâ (you should pip install it), and we can create a tokenizer with âcl100k_baseâ encoding.
import tiktoken
tokenizer = tiktoken.get_encoding("cl100k_base")
You need to know 2 functions: encode and decode, let us test that:
print(tokenizer.encode("Vector Search is smart!"))
print(tokenizer.encode("Vector Search is brilliant!"))
tokenizer.decode([3866, 7694, 374, 7941, 0])
So, now letâs check our data. If you remember, we just saved our data in the âai_showcase.books.combinedâ table. We can load that now and print the schema and length, just to remind ourselves how it looks like:
combined_table_name = "ai_showcase.books.combined"
print("Reading Table:",combined_table_name)
combined_table= spark.read.table(combined_table_name)
combined_table.printSchema()
print("num of rows in ",combined_table_name ,combined_table.count())
We can write our chuncking function âchunk_textâ:
def chunk_text(text, max_chunk_tokens = 1024):
tokens = tokenizer.encode(text)
chunked_text = []
while tokens:
chunk_tokens = tokens[:max_chunk_tokens]
chunk_text = tokenizer.decode(chunk_tokens)
chunked_text.append(chunk_text)
tokens = tokens[max_chunk_tokens:]
return chunked_text
Now we iterate through the data rows and chunk long texts into multiple rows, each with new indexes. Hereâs how itâs done:
combined_pandas = combined_table.toPandas()
processed_data = []
for _ , row in combined_pandas.iterrows():
text_chunks = chunk_text(row['combined_text'])
chunk_no = 0
for chunk in text_chunks:
row_data = row.to_dict()
row_data['index'] = f"{row['index']}:{chunk_no}"
row_data['combined_text'] = chunk
processed_data.append(row_data)
chunk_no += 1
We can check how this chunking modified our data; we may need Pandas.
import pandas as pd
chunked_pandas_df = pd.DataFrame(processed_data)
chunked_spark_df = spark.createDataFrame(chunked_pandas_df)
print("combined table length :", combined_table.count())
print("chunked table length :", chunked_spark_df.count())
chunked_spark_df.display()
So, our newly chunked table has 4,839 rows, compared to the 4,657 in our original one. For instance, the book âThird Girlâ had a summary longer than 1024 tokens, and it has been split into two rows.
Remember, this chunking is the first (of two) tiny modification we referred to earlier in this post in the âcombining columnsâ section.
With this new chunked table, we have updated our source data for vector search. However, thereâs still another tiny modification we need to make, but weâll address that later. Letâs create this new table and call it âchunked.â
You might wonder if saving these multiple tables is an efficient. My short answer here is that in this post, we did not focus on optimizing storage. The aim is to make it easy to follow and get it working. You could do much storage optimization, but letâs focus on the service for now. Hereâs how to save the new âchunkedâ table.
chunked_table_name = "ai_showcase.books.chunked_data"
chunked_spark_df.write.format("delta").mode("overwrite").saveAsTable(chunked_table_name)
Create Embedding Model Serving Endpoint
Now, weâve reached the point where we can create our model serving endpoint. For this, Iâm assuming that:
- You have your OpenAI API key.
- This key is saved as a secret within a scope in your Databricks workspace. In my case, I saved my secret in an Azure Key Vault and connected the Key Vault to my Databricks workspace. This process might differ if youâre using another cloud provider. However, ultimately, your API key should be stored as a secret within a scope in Databricks.
If you set that up, you can check your scopes and secrets using these commands from within your notebook; they are self-explanatory.
dbutils.secrets.listScopes()
dbutils.secrets.list(scope="YOUR SCOPE")
secret_value = dbutils.secrets.get("YOUR SCOPE","YOUR SECRET NAME")
for char in secret_value[:5]:
print(char, end= " ... ")
To deploy the embedding model to serving endpoint we use mlflow deployments. we should tell it the deployment target as it supposed to support deploeymnet to differnt targets. Our target is simply âdatabricksâ. Then we should create the endpoint with confingured with external model and we follow this template and we end up with this code:
import mlflow.deployments
mlflow_deploy_client = mlflow.deployments.get_deploy_client("databricks") # deployment target
mlflow_deploy_client.create_endpoint(
name=emb_model_endpoint_name,
config={
"served_entities": [{
"external_model": {
"name": "text-embedding-ada-002",
"provider": "openai",
"task": "llm/v1/embeddings",
"openai_config": {
"openai_api_key": "{{secrets/vector-search/openai-key}}"
}
}
}]
}
)
If everything goes smoothly, a new endpoint will appear in your Databricks UI under the âServingâ section with Ready state.
Create a Vector Search Endpoint
Now, this Vector Search endpoint should will allow us to do two main things: create our search index and query our search index.
Letâs start by creating the vector search endpoint let us call it âbooks-vs-endpointâ. For this, we need âVectorSearchClientâ module from âdatabricks-vectorsearchâ package (you should pip install it). I see you can only set the type to STANDARD endpoint.
from databricks.vector_search.client import VectorSearchClient
vs_client = VectorSearchClient()
vs_endpoint_name="books-vs-edpoint"
vs_client.create_endpoint(
name=vs_endpoint_name,
endpoint_type="STANDARD"
)
vs_client.list_endpoints()
To verify if the endpoint has been created, you can either use the vs_client.list_endpoints() function or simply navigate to the 'Compute' section and then the 'Vector Search' tab in your Databricks UI. And YES, that is a compute resource :) !
Create Search Index
Now it looks like weâre ready to use what weâve built so far to create the index! It is worth noting here that Databricks offers two main types of indexes, depending on whether you want to synchronize updating the index with updates to your source Delta table that is Delta Sync Index, or if you prefer to update the index manually, that is Direct Vector Access Index. Another consideration is whether you want to manage the embeddings yourself or let Databricks handle them for you. For our purposes, we will use the Managed Delta Sync Index.
I really appreciated how convenient the âcreate_delta_sync_indexâ function is! To use it, you need to specify a few key details:
- The name of your vector search endpoint.
- The name of your model serving endpoint.
- The name of your source Delta table.
- The name of the column within that table that you want to index.
- Whether you want the indexing to be continuous or triggered. For our case, weâve chosen the triggered option.
However since we will create a Delta Sync Index, thereâs a tiny modification we need to make to our source table (yes, this is our second tiny modification we should do). We need to enable the change data feed to track row-level changes between versions of our source table. This allows the index to be updated whenever a change occurs in the source table. We can do this with the following line of code:
%sql
ALTER TABLE ai_showcase.books.chuncked_data SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
To ensure everything is correct regarding the names, I will print the values here:
index_name = "ai_showcase.books.books_openai_index"
source_table_name="ai_showcase.books.chunked_data"
print("endpoint_name :",vs_endpoint_name)
print("source_table_name :", source_table_name)
print("index_name :",index_name)
print("embedding_model_endpoint_name:", emb_model_endpoint_name)
These are our set names:
We can now proceed to create the index. After creating it, you can either print the status right here in the code or check the status in your Databricks UI.
index = vs_client.create_delta_sync_index(
endpoint_name=vs_endpoint_name,
source_table_name=source_table_name,
index_name=index_name,
pipeline_type='TRIGGERED',
primary_key="index",
embedding_source_column="combined_text",
embedding_model_endpoint_name=emb_model_endpoint_name
)
index.describe()['status']['message']
This process will take some time, so patience is key! For my cluster to complete the task, it took about 3,200 seconds, which is roughly 55 minutes. You might want to use this code to monitor the progress and determine when itâs ready to use.
import time
from IPython.display import clear_output
start_time = time.time()
index = vs_client.get_index(endpoint_name=vs_endpoint_name,
index_name=index_name)
elapse=0
while not index.describe().get('status')['ready']:
clear_output(wait=True)
print("Waiting for index to be ready...", elapse, "seconds")
time.sleep(30)
elapse+=30
elapsed_time = time.time() - start_time
print("Index is ready!")
print(f"Time elapsed: {elapsed_time:.2f} seconds")
index.describe()
Using it Up!
To use it, we need a VectorSearchClient. We instantiated one earlier, but why not instantiate it again? This assumes that the usage is detached from development, perhaps in a separate notebook or app.
from databricks.vector_search.client import VectorSearchClient
vs_client = VectorSearchClient()
We can first obtain our index and check if itâs ready:
vs_endpoint_name="books-vs-edpoint"
index_name = "ai_showcase.books.books_openai_index"
index = vs_client.get_index(endpoint_name=vs_endpoint_name,
index_name=index_name)
index.describe()['status']['message']
Now, letâs try querying with a real example. Weâll pick a random book, âAn Acquaintance with Darkness,â and use its summary from our source data for this exercise:
An Acquaintance with Darkness is the story of 14-year-old Emily Pigbush, who lives with her mother in Washington, D.C., in 1865. Emilyâs father died during the Civil War while fighting for the Union. Now the Pigbushesâ final servant, Ella May, has left because she was freed, leaving Emily to care for her mother alone. However, Emily sometimes has the help of her close friend, sixteen-year-old Annie Surratt, whose mother runs the boarding house across the street. Emilyâs mother is near deathâŚ
Letâs imagine a scenario where you vaguely remember a book with âdarkâ in its title, related to the Civil War, and set in a major American city, possibly California.
So, to test our vector search, we might use the phrase âthe dark in california civil warâ as our query. For this purpose, weâll utilize the convenient function âindex.similarity_searchâ.
query="the dark in california civil war"
results = index.similarity_search(
query_text=query,
columns=["title","summary"],
num_results=5
)
If you are curious about which algorithm Databricks use to calculate similarity that is Hierarchical Navigable Small World (HNSW) for its approximate nearest neighbor searches and the cosine similarity distance metric to measure embedding vector similarity.
Perhaps itâs important to take a moment to understand the structure of the returned response weâll receive. This will help us interpret the results more effectively.
print(type(results))
print(results.keys())
print(results["result"].keys())
print(len(results["result"]["data_array"]))
print(len(results["result"]["data_array"][0]))
The output is a dictionary containing a âresultâ key that holds an array list of the top 5 results (can return more). Each item in this list consists of three values: the first two are our requested columns, namely the title and summary, and the third element is a score indicating the similarity to our query. We can display these results as follows.
rows = results['result']['data_array']
for i, (title, summary, score) in enumerate(rows):
if len(summary) > 50:
# trim text output for readability
summary = summary[0:50] + "..."
print(f" Title: {title} Score: {score}")
You can see that the top result is our book! Even though other books in the results explicitly has the exact word âdark,â which was in our query, the search index still returned our book as the first title.
This can show the effectiveness of the vector search index, as it can also return titles that do not contain any specific words from our query, such as âThe Guns of the South.â Yes, we used the summary of the book combined with the title in our search, but the key point here is that we donât need to query the exact name; a degree of similarity in the content or context is sufficient for our search.
Letâs try to be more ambiguousđ. We will randomly grab some readersâ reviews from Amazon for various books and clean the reviews a bit by deleting explicit words, like the book title, for example. Then, we can take that random review and use it as input for our search to see if our book will show up in the list!
Let us see a book titled âThe Birthday Boysâ and I just get this random review and query our search index:
An historic fiction about the fated trek of the English team to the South Pole, the story is spellbinding and enables the reader to experience the challenges of this journey through the eyes, ears, thoughts and memories of different members of the team.While I personally would never be intrigued enough to make such a dangerous and difficult trip, the writing gave me insight into the kinds of people who are so motivated and instilled in me great respect for their courage, stamina and willingness to help each other. Highly recommend it.
review_book="An historic fiction about the fated trek of the English team to
the South Pole, the story is spellbinding and enables the reader
to experience the challenges of this journey through the eyes,
ears, thoughts and memories of different members of the team.
While I personally would never be intrigued enough to make
such a dangerous and difficult trip, the writing gave me insight into the kinds of
people who are so motivated and instilled in me great respect
for their courage, stamina and willingness to help each other.
Highly recommend it."
# Book: The Birthday Boys
results = index.similarity_search(
query_text=review_book,
columns=["title","summary"],
num_results=10
)
rows = results['result']['data_array']
for (title, summary, score) in rows:
if len(summary) > 100:
# trim text output for readability
summary = summary[0:100] + "..."
print(f"Title: {title:30} with Score: {score}")
Well, the search returned a list of books, and our book is the 9th (or if you consider the two first returned are the same due to chunking, that would be the 8th) in rank based on similarity with our query. Even though, you might have expected a better result, just as I did, I feel amazed how fast that was and how there are no explicit words in the review that can lead to that book!
A recommender System
Let us try another quick use case. As we have just seen, the search index thinks that some books are more relevant to the query (the reviewer), so maybe we can recommend these books to the reviewer! We can have some sort of a recommender system! That means there are other books.
In fact, we can select a random book that we assume a person has purchased, and then recommend similar books.
random_row_id=25
random_row= table_rows[random_row_id]#[random_row_id]
bought_book_title=random_row["title"]
bought_book_summary=random_row["summary"]
print("Title:",bought_book_title)
print("Summary:",bought_book_summary[:500],"...")
results = index.similarity_search(
query_text=bought_book_summary,
columns=["title","summary"],
num_results=5
)
rows = results['result']['data_array']
print('The similar books to: "', bought_book_title, '":')
print("---")
for (title, summary, score) in rows:
if len(summary) > 100:
# trim text output for readability
summary = summary[0:100] + "..."
print(f"Title: {title:30} with Score: {score}")
Conclusion
As we conclude, weâve successfully explored Databricks Vector Search. Weâve covered setting up Databricks Vector Search and its integrated features using the SDK, and weâve explored the process of creating vector search endpoints, embedding model serving endpoints, and vector search indices. Along the way, weâve shared valuable best practices to enhance your practical implementations.
Remember that the journey of learning and experimenting never truly ends. Each query, each line of code, and every dataset present a new opportunity to discover something remarkable. So, keep exploring, keep questioning, and keep pushing the boundaries of whatâs possible with data science and machine learning!