Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[docs] Pinecone tutorial #26775

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
311 changes: 311 additions & 0 deletions docs/docs-beta/docs/tutorial/pinecone.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,311 @@
---
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we standardize project naming convention?

Modal and Bluesky are named examples/project_ whereas this is examples/tutorial_.

title: Pinecone
description: Use Dagster to upload data into Pinecone
last_update:
date: 2024-12-30
author: Dennis Hume
---

## Overview

Check warning on line 9 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Dagster.heading-content] Rename the heading '## Overview', or re-purpose the content elsewhere. Raw Output: {"message": "[Dagster.heading-content] Rename the heading '## Overview', or re-purpose the content elsewhere.", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 9, "column": 1}}}, "severity": "WARNING"}
Many AI applications are data applications. Organizations want to leverage existing LLMs rather than build their own. But in order to take advantage of all the models that exist, you need to supplement them with your own data to produce more accurate and contextually aware results. We will demonstrate how to use Dagster to extract data, generate embeddings, and store the results within a vector database ([Pinecone](https://www.pinecone.io/)) which we can then use to power AI models to craft far more detailed answers.

Check failure on line 10 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Dagster.spelling] Is 'LLMs' spelled correctly? Raw Output: {"message": "[Dagster.spelling] Is 'LLMs' spelled correctly?", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 10, "column": 85}}}, "severity": "ERROR"}

Check failure on line 10 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Vale.Spelling] Did you really mean 'LLMs'? Raw Output: {"message": "[Vale.Spelling] Did you really mean 'LLMs'?", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 10, "column": 85}}}, "severity": "ERROR"}

Check warning on line 10 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Terms.words] Use 'to' instead of 'in order to'. Raw Output: {"message": "[Terms.words] Use 'to' instead of 'in order to'.", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 10, "column": 123}}}, "severity": "WARNING"}

### Dagster Concepts
- [resources]()
- [run configurations]()

### Services
- [DuckDB]()
- [Pinecone]()

Check failure on line 18 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Dagster.spelling] Is 'Pinecone' spelled correctly? Raw Output: {"message": "[Dagster.spelling] Is 'Pinecone' spelled correctly?", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 18, "column": 4}}}, "severity": "ERROR"}

Check failure on line 18 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Vale.Spelling] Did you really mean 'Pinecone'? Raw Output: {"message": "[Vale.Spelling] Did you really mean 'Pinecone'?", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 18, "column": 4}}}, "severity": "ERROR"}

## Code
![Pinecone asset graph](/images/tutorials/pinecone/pinecone_dag.png)

### Setup
All the code for this tutorial can be found at [project_dagster_pinecone]().

Install the project dependencies:
```
pip install -e ".[dev]"
```

Run Dagster:
```
dagster dev
```

Open [http://localhost:3000](http://localhost:3000) in your browser.

### Loading Data
We will be working with review data from Goodreads. These reviews exist as a collection of JSON files categorized by different genres. We will focus on just the files for graphic novels to limit the size of the files we will process. Within this domain, the files we will be working with are `goodreads_books_comics_graphic.json.gz` and `goodreads_reviews_comics_graphic.json.gz`. Since the data is normalized across these two files, we will want to combine information before feeding it into our vector database.

One way to handle preprocessing of the data is with [DuckDB](https://duckdb.org/). DuckDB is an in-process database, similar to SQLite, optimized for analytical workloads. We will start by creating two Dagster assets to load in the data. Each will load one of the files and create a DuckDB table (`graphic_novels` and `reviews`):
```python
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use this tutorial as an example of embedding the code with CodeExample?

Right now docs_beta_snippets is hard coded, but we could improve the component to take in a prop of the snippet location, and default to docs_beta_snippets if not provided.

    import(`!!raw-loader!/../../examples/docs_beta_snippets/docs_beta_snippets/${filePath}`)

https://github.com/dagster-io/dagster/blob/master/docs/docs-beta/src/components/CodeExample.tsx#L44

# asssets.py
@dg.asset(
kinds={"duckdb"},
group_name="ingestion",
deps=[goodreads],
)
def graphic_novels(duckdb_resource: dg_duckdb.DuckDBResource):
url = "https://datarepo.eng.ucsd.edu/mcauley_group/gdrive/goodreads/byGenre/goodreads_books_comics_graphic.json.gz"
query = f"""
create table if not exists graphic_novels as (
select *
from read_json(
'{url}',
ignore_errors = true
)
);
"""
with duckdb_resource.get_connection() as conn:
conn.execute(query)

@dg.asset(
kinds={"duckdb"},
group_name="ingestion",
deps=[goodreads],
)
def reviews(duckdb_resource: dg_duckdb.DuckDBResource):
url = "https://datarepo.eng.ucsd.edu/mcauley_group/gdrive/goodreads/byGenre/goodreads_reviews_comics_graphic.json.gz"
query = f"""
create table if not exists reviews as (
select *
from read_json(
'{url}',
ignore_errors = true
)
);
"""
with duckdb_resource.get_connection() as conn:
conn.execute(query)
```

With our DuckDB tables created, we can now query them like any other SQL table. Our third asset will join and filter the data and then return a DataFrame (we will also `LIMIT` the results to 500):
```python
# asssets.py
@dg.asset(
kinds={"duckdb"},
group_name="ingestion",
)
def enriched_reviews(
duckdb_resource: dg_duckdb.DuckDBResource,
graphic_novels,
reviews,
) -> pd.DataFrame:
query = f"""
select
reviews.review_id as review_id,
graphic_novels.title as title,
reviews.rating as rating,
reviews.review_text as text
from graphic_novels
left join reviews
on graphic_novels.book_id = reviews.book_id
where graphic_novels.language_code = 'eng'
limit 500
"""
with duckdb_resource.get_connection() as conn:
return conn.execute(query).fetch_df()
```

Now that the data has been prepared, we are ready to work with our vector database.

### Vector Database
We can begin by creating the index within our vector database. A vector database is a specialized database designed to store, manage, and retrieve high-dimensional vector embeddings, enabling efficient similarity search and machine learning tasks. There are many different vector databases available. For this demo we will use Pinecone which is a cloud based vector database that offers a [free tier](https://app.pinecone.io/) that can help us get started.

Check failure on line 114 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Vale.Spelling] Did you really mean 'Pinecone'? Raw Output: {"message": "[Vale.Spelling] Did you really mean 'Pinecone'?", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 114, "column": 328}}}, "severity": "ERROR"}

Check failure on line 114 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Dagster.spelling] Is 'Pinecone' spelled correctly? Raw Output: {"message": "[Dagster.spelling] Is 'Pinecone' spelled correctly?", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 114, "column": 328}}}, "severity": "ERROR"}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point in the tutorial I had the question, "Why do we need both DuckDB and Pinecone databases?"

I understand that they serve different purposes, but we could maybe clarify more upfront on their usage.


:::note
After you have your API key from Pinecone. You will want to set it to the environment variable `PINECONE_API_KEY`:

Check failure on line 117 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Vale.Spelling] Did you really mean 'Pinecone'? Raw Output: {"message": "[Vale.Spelling] Did you really mean 'Pinecone'?", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 117, "column": 34}}}, "severity": "ERROR"}

Check failure on line 117 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Dagster.spelling] Is 'Pinecone' spelled correctly? Raw Output: {"message": "[Dagster.spelling] Is 'Pinecone' spelled correctly?", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 117, "column": 34}}}, "severity": "ERROR"}
```
export PINECONE_API_KEY={YOUR PINECONE API KEY}
```
:::

Now that we have a Pinecone account we can incorporate Pinecone into our code. To help make everything consistent, we will create a Pinecone resource similar to the Dagster maintained DuckDB resource we used to ingest the Goodreads data. Creating a custom resource is fairly simple, the only information we need to authenticate with the Pinecone client is their API key:

Check failure on line 123 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Dagster.spelling] Is 'Pinecone' spelled correctly? Raw Output: {"message": "[Dagster.spelling] Is 'Pinecone' spelled correctly?", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 123, "column": 20}}}, "severity": "ERROR"}

Check warning on line 123 in docs/docs-beta/docs/tutorial/pinecone.md

View workflow job for this annotation

GitHub Actions / runner / vale

[vale] reported by reviewdog 🐶 [Dagster.chars-eol-whitespace] Remove whitespace characters from the end of the line. Raw Output: {"message": "[Dagster.chars-eol-whitespace] Remove whitespace characters from the end of the line.", "location": {"path": "docs/docs-beta/docs/tutorial/pinecone.md", "range": {"start": {"line": 123, "column": 371}}}, "severity": "WARNING"}
```python
#resources.py
class PineconeResource(ConfigurableResource):
api_key: str = Field(
description=(
"Pinecone API key. See https://docs.pinecone.io/reference/api/introduction"
)
)

_client: Pinecone = PrivateAttr()

def setup_for_execution(self, context: InitResourceContext) -> None:
self._client = Pinecone(
api_key=self.api_key,
)
...
```

For our methods we will want the ability to create an index, retrieve a index so we can upsert records, and the ability to embed inputs. You can see all the details in the repo but this is what the `create_index` method looks like:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a silly suggestion, but can we explain what an index is in the context of vector databases?

```python
#resources.py
def create_index(
self,
index_name: str,
dimension: int,
metric: str,
serverless_spec: ServerlessSpec,
):
existing_indexes = [
index_info["name"] for index_info in self._client.list_indexes()
]

if index_name not in existing_indexes:
self._client.create_index(
index_name,
dimension=dimension,
metric=metric,
spec=serverless_spec,
)
while not self._client.describe_index(index_name).status["ready"]:
time.sleep(1)
```

With our Pinecone resource complete, we can initialize it using the API key that we set the environment variable for:
```python
# resources.py
pinecone_resource = PineconeResource(api_key=dg.EnvVar("PINECONE_API_KEY"))
```

Using our Pinecone resource it is easy enough to create an asset to manage the index where we will upload our embeddings:
```python
# assets.py
@dg.asset(
kinds={"Pinecone"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We typically use lowercase for kinds.

Suggested change
kinds={"Pinecone"},
kinds={"pinecone"},
image

group_name="processing",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would "embeddings" be a more accurate group name for the vector db related assets?

)
def pinecone_index(
pinecone_resource: PineconeResource,
):
spec = ServerlessSpec(cloud="aws", region="us-east-1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point I was confused by ServerlessSpec. A short blurb explaining that this is a Pinecone concept for compute (I think...?) might be helpful.

pinecone_resource.create_index(
INDEX_NAME,
dimension=1024,
metric="cosine",
serverless_spec=spec,
)
```

### Generate Embeddings
Between our data and the index, we are ready to create and upload our embeddings. One final asset will bring this all together. We will iterate through DataFrame from `enriched_reviews` and generate embeddings for each review. Then we can include additional metadata that will be the final vectors we upload to Pinecone using our Pinecone resource.

```python
# assets.py
@dg.asset(
kinds={"Pinecone"},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
kinds={"Pinecone"},
kinds={"pinecone"},

group_name="processing",
)
def pinecone_embeddings(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be good to return MaterializeResult from these assets to include meaningful metadata.

pinecone_resource: PineconeResource,
enriched_reviews: pd.DataFrame,
pinecone_index,
):
data = enriched_reviews.to_dict("records")

vectors = []
for chunk in _chunks(data, 50):

embeddings = pinecone_resource.embed(
inputs=[r["text"] for r in chunk],
input_type="passage",
)

for d, e in zip(chunk, embeddings):
vectors.append(
{
"id": str(d["review_id"]),
"values": e["values"],
"metadata": {
"source": "goodreads",
"rating": d["rating"],
"title": d["title"],
"text": d["text"],
},
}
)

pinecone_resource.index(INDEX_NAME).upsert(
vectors=vectors,
namespace=NAMESPACE,
)
```

Those 5 assets are all we need to create and populate our index within Pinecone. We can combine these assets together into a job and execute them all together:
```python
# jobs.py
goodreads_pinecone = dg.define_asset_job(
name="goodreads_pinecone",
selection=[
assets.graphic_novels,
assets.reviews,
assets.enriched_reviews,
assets.pinecone_index,
assets.pinecone_embeddings,
],
)
```

![Pinecone job](/images/tutorials/pinecone/pinecone_job.png)

In the Dagster UI go to **Job** and select **goodreads_pinecone** and click the **Materialize all** button. If this is the first time running the job, it may take a minute or so to process the data.

### Building AI Assets
Populating our vector database is only one part of our workflow. We can also build assets that query that index. We will look at one final asset that is separate from the `goodreads_pinecone` job. This asset is called `graphic_novel_search` which allows us to run queries and get the most similar results based on the reviews. We might want to ask many different questions using this asset so we will include a run configuration so we can execute many different questions:
```python
#assets.py
class GoodreadsQuestion(dg.Config):
question: str
top_k: int

@dg.asset(
kinds={"Pinecone"},
group_name="retrieval",
)
def graphic_novel_search(
context: dg.AssetExecutionContext,
config: GoodreadsQuestion,
pinecone_resource: PineconeResource,
pinecone_embeddings,
):
query_embedding = pinecone_resource.embed(
inputs=[config.question],
input_type="query",
)
index = pinecone_resource.index(INDEX_NAME)
results = index.query(
namespace=NAMESPACE,
vector=query_embedding[0].values,
top_k=config.top_k,
include_values=False,
include_metadata=True,
)
for result in results["matches"]:
context.log.info(result)
```

Again we are using the Pinecone resource, though this time we are querying rather than uploading results. Within the Dagster UI we can execute this asset by itself. Since this asset has a run configuration, we will have to set that before we can materialize a run. Feel free to experiment but we will ask it to find a scary graphic novel:
```
ops:
graphic_novel_search:
config:
question: A scary graphic novel
top_k: 1
```

After materializing the asset you can view the logs to see the output of what was the best match:
```
{'id': '37b6d106-2223-f7b9-c9b7-c4bfc4867dc1',
'metadata': {'rating': 4.0,
'source': 'goodreads',
'text': 'Dark and disturbing, yet many of the panels are '
'gorgeously illustrated. A promising start!',
'title': 'Wytches, Volume 1'},
'score': 0.852409363,
'values': []}
```

# Going Forward
This was a relatively simple example but you see the amount of coordination that is needed to power realworld AI applications. As you add more and more sources of unstructured data that updates on different cadences and powers multiple downstream tasks you will want to think of operational details for building around AI.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We prob don't need to call it out being simple.

Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
1 change: 1 addition & 0 deletions examples/tutorial_pinecone/.env.example
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
PINECONE_API_KEY=
9 changes: 9 additions & 0 deletions examples/tutorial_pinecone/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
*.egg-info/*
*.mp3
*.pyc
.DS_Store
.direnv/
.env
.envrc
__pycache__/
tmp*/
9 changes: 9 additions & 0 deletions examples/tutorial_pinecone/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
ruff:
ruff check --select I --fix .

clean:
find . -name "__pycache__" -exec rm -rf {} +
find . -name "*.pyc" -exec rm -f {} +

install:
uv pip install -e .[dev]
1 change: 1 addition & 0 deletions examples/tutorial_pinecone/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
## Dagster × Pinecone Demo
7 changes: 7 additions & 0 deletions examples/tutorial_pinecone/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
[build-system]
requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "tutorial_pinecone.definitions"
code_location_name = "tutorial_pinecone"
2 changes: 2 additions & 0 deletions examples/tutorial_pinecone/setup.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[metadata]
name = tutorial_pinecone
15 changes: 15 additions & 0 deletions examples/tutorial_pinecone/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from setuptools import find_packages, setup

setup(
name="tutorial_pinecone",
packages=find_packages(exclude=["tutorial_pinecone_tests"]),
install_requires=[
"dagster",
"dagster-duckdb",
"dagster-duckdb-pandas",
"pinecone-client",
"pydantic",
"pandas",
],
extras_require={"dev": ["dagster-webserver", "pytest", "ruff"]},
)
1 change: 1 addition & 0 deletions examples/tutorial_pinecone/tutorial_pinecone/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

Loading
Loading