Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs in retriever sdg notebook #522

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 32 additions & 19 deletions nemo_curator/filters/synthetic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@

from nemo_curator.filters.doc_filter import DocumentFilter
from nemo_curator.utils.decorators import batched
from nemo_curator.utils.distributed_utils import NoWorkerError, load_object_on_worker


def create_client(base_url, api_key):
openai_client = OpenAI(
base_url=base_url,
api_key=api_key,
)
return openai_client


# ----------------------------------------------------------------------------80
Expand Down Expand Up @@ -52,16 +61,21 @@ def __init__(
self.percentile = percentile
if truncate:
self.truncate = truncate
try:
self.client = OpenAI(base_url=self.base_url, api_key=self.api_key)
except Exception as e:
print(f"Error accessing NIM model: {e}")
self.batch_size = batch_size
self.text_fields = text_fields

@batched
def score_document(self, df: pd.DataFrame):

try:
self.client = load_object_on_worker(
attr="openai_client_easiness",
load_object_function=create_client,
load_object_kwargs={"base_url": self.base_url, "api_key": self.api_key},
)
except NoWorkerError:
return pd.Series(np.ones(len(df)), dtype=float)

document_score = self._calc_similarity_nim(
df[self.text_fields[0]].to_list(), df[self.text_fields[1]].to_list()
)
Expand Down Expand Up @@ -90,7 +104,7 @@ def _get_nim_embedding(self, text, input_type):
print(f"Error: {e}")
response = None

if response:
if response and not isinstance(response, str):
if isinstance(text, list):
embeddings = [r.embedding for r in response.data]
elif isinstance(text, str):
Expand All @@ -116,9 +130,6 @@ def _calc_similarity_nim(self, context, question):

return sim

def __dask_tokenize__(self):
return normalize_token(EasinessFilter)


# ----------------------------------------------------------------------------80
# ----------------------- Answerability Filter ---------------------------------
Expand Down Expand Up @@ -149,19 +160,24 @@ def __init__(
self.system_prompt = answerability_system_prompt
self.user_prompt_template = answerability_user_prompt_template
self.num_criteria = num_criteria

try:
self.client = OpenAI(base_url=self.base_url, api_key=self.api_key)
except Exception as e:
print(f"Error accessing NIM model: {e}")

self.text_fields = text_fields

@batched
def score_document(self, df: pd.DataFrame):
return df.apply(

try:
self.client = load_object_on_worker(
attr="openai_client_answerability",
load_object_function=create_client,
load_object_kwargs={"base_url": self.base_url, "api_key": self.api_key},
)
except NoWorkerError:
return pd.Series(["string"] * len(df))

return df.progress_apply(
lambda row: self._llm_as_judge(
row[self.text_fields[0]], row[self.text_fields[1]]
row[self.text_fields[0]],
row[self.text_fields[1]],
),
axis=1,
)
Expand Down Expand Up @@ -212,8 +228,5 @@ def _llm_as_judge(self, context: str, question: str):

return generation

def __dask_tokenize__(self):
return normalize_token(AnswerabilityFilter)


# ----------------------------------------------------------------------------80
31 changes: 25 additions & 6 deletions tutorials/nemo-retriever-synthetic-data-generation/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,40 @@ Navigate to the [quick start notebook](notebooks/quickstart.ipynb) and follow th

### Run Pipeline (CLI)

The pipeline can be run with datasets in rawdoc (only text, title and ids if any) format. To test the pipeline, you can use the provided example data at ```sample_data_rawdoc.jsonl```
The pipeline can be run with datasets in rawdoc (only text, title and ids if any) format. To test the pipeline, you can use the provided example data at ```sample_data/sample_data_rawdoc.jsonl```

Navigate to the top level of this project directory and run the following command in your command line. It will take roughly 5-10 minutes.

- `Rawdoc format`
- `jsonl format`

To use rawdoc format, provide your data in a `.jsonl` file. The structure of the data should follow this format: `{"text": <document>, "title": <title>}`. Additionally, if the documents already have a document id, the input file can also contain document ids. The same ids will be persisted in the generated data as well. Another accepted format is `{"_id": <document_id>, "text": <document>, "title": <title>}`.
To use jsonl format, provide your data in a `.jsonl` file. The structure of the data should follow this format: `{"text": <document>, "title": <title>}`. Additionally, if the documents already have a document id, the input file can also contain document ids. The same ids will be persisted in the generated data as well. Another accepted format is `{"_id": <document_id>, "text": <document>, "title": <title>}`.

In order to run the pipeline, use the script ```main.py```
In order to run the full pipeline (generation and filtering), use the script ```main.py```
```
python tutorials/nemo-retriever-synthetic-data-generation/main.py \
--api-key=<API Key> \
--input-file=tutorials/nemo-retriever-synthetic-data-generation/data/sample_data_rawdoc.jsonl \
--input-dir=tutorials/nemo-retriever-synthetic-data-generation/sample_data \
--pipeline-config=tutorials/nemo-retriever-synthetic-data-generation/config/config.yaml\
--input-format=jsonl \
--output-dir=tutorials/nemo-retriever-synthetic-data-generation/outputs/sample_data_rawdoc
```

Alternatively, to just generate data, use the script ```generate.py```
```
python tutorials/nemo-retriever-synthetic-data-generation/generate.py \
--api-key=<API Key> \
--input-dir=tutorials/nemo-retriever-synthetic-data-generation/sample_data \
--pipeline-config=tutorials/nemo-retriever-synthetic-data-generation/config/config.yaml\
--input-format=jsonl \
--output-dir=tutorials/nemo-retriever-synthetic-data-generation/outputs/sample_data_rawdoc
```

And, to filter pre-generated data, use the script, ```filter.py```
```
python tutorials/nemo-retriever-synthetic-data-generation/filter.py \
--api-key=<API Key> \
--input-dir=tutorials/nemo-retriever-synthetic-data-generation/outputs/sample_data_rawdoc/jsonl/all \
--pipeline-config=tutorials/nemo-retriever-synthetic-data-generation/config/config.yaml\
--input-format=rawdoc \
--output-dir=tutorials/nemo-retriever-synthetic-data-generation/outputs/sample_data_rawdoc
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ percentile: 70 # Percentile for threshold calculation (float) [0, 100]
batch_size: 1

#Answerability filter (LLM-as-judge)
answerability_filter: "meta/llama3-70b-instruct"
answerability_filter: "meta/llama-3.1-70b-instruct"
num_criteria: 4 # Number of criteria to parse from the response. It must be alined with the prompt template
answerability_system_prompt: |
You are an evaluator who is rating questions to given context passages based on the given criteria. Assess the given question for clarity and answerability given enough domain knowledge, consider the following evaluation criterion:
Expand Down
205 changes: 205 additions & 0 deletions tutorials/nemo-retriever-synthetic-data-generation/filter.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between this filter.py, generate.py, and main.py? They look nearly identical. They also all look to be CLI scripts but only main.py is mentioned in the README.

Copy link
Contributor Author

@vinay-raman vinay-raman Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

README has both generate.py and filter.py mentioned, please have a look.
This is needed if the user just needs to generate data or filter pre-generated data.

Copy link
Collaborator

@ryantwolf ryantwolf Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding two files, can you just add two command line arguments like so?

  • --generate-only
  • --filter-only

With two new files, it's very hard to tell if the differences between them are correct or buggy. CLI args in a single file make it much easier to maintain.

Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import importlib
import os
import shutil
import time
from typing import Any, List

import numpy as np
from dask.diagnostics import ProgressBar
from dask.distributed import progress
from retriever_evalset_generator import RetrieverEvalSetGenerator

from config.config import RetrieverEvalSDGConfig
from nemo_curator import AsyncOpenAIClient, ScoreFilter, Sequential, get_client
from nemo_curator.datasets import DocumentDataset
from nemo_curator.filters import (
AnswerabilityFilter,
EasinessFilter,
NonAlphaNumericFilter,
)
from nemo_curator.modules.filter import Score, ScoreFilter
from nemo_curator.utils.file_utils import get_all_files_paths_under


def get_pipeline(args: Any) -> Any:

cfg = RetrieverEvalSDGConfig.from_yaml(args.pipeline_config)
# update api_key from input args
cfg.api_key = args.api_key

filters = []
if cfg.easiness_filter:
filters.append(
ScoreFilter(
EasinessFilter(
cfg.base_url,
cfg.api_key,
cfg.easiness_filter,
cfg.percentile,
cfg.truncate,
cfg.batch_size,
),
text_field=["text", "question"],
score_field="easiness_scores",
)
)
if cfg.answerability_filter:
filters.append(
ScoreFilter(
AnswerabilityFilter(
cfg.base_url,
cfg.api_key,
cfg.answerability_filter,
cfg.answerability_system_prompt,
cfg.answerability_user_prompt_template,
cfg.num_criteria,
),
text_field=["text", "question"],
score_field="answerability_scores",
)
)

if filters:
filtering_pipeline = Sequential(filters)
else:
filtering_pipeline = None

return filtering_pipeline


def write_to_beir(
args: Any, filtered_dataset: DocumentDataset, input_dataset: DocumentDataset
):

df = filtered_dataset.df
df = df.compute()

save_dir = os.path.join(args.output_dir, "beir", "filtered")
qrels_save_dir = os.path.join(args.output_dir, "beir", "filtered", "qrels")
corpus_save_path = os.path.join(args.output_dir, "beir", "filtered", "corpus.jsonl")
queries_save_path = os.path.join(
args.output_dir, "beir", "filtered", "queries.jsonl"
)

os.makedirs(save_dir)
os.makedirs(qrels_save_dir)

df[["question-id", "question"]].rename(
columns={"question-id": "_id", "question": "text"}
).to_json(queries_save_path, lines=True, orient="records")

corpus_file_path = os.path.join(args.output_dir, "beir", "filtered", "corpus.jsonl")
input_df = input_dataset.df.compute() # we need the full corpus of documents
input_df[["_id", "text"]].to_json(corpus_save_path, lines=True, orient="records")

df[["question-id", "_id", "score"]].rename(
columns={"question-id": "query-id", "_id": "corpus-id"}
).to_csv(os.path.join(qrels_save_dir, "test.tsv"), sep="\t", index=False)


def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--input-dir",
type=str,
default="",
help="File path of input file containing document chunks for synthetic data generation",
)
parser.add_argument(
"--pipeline-config",
type=str,
default="",
help="Pipeline configuartion yaml file path",
)
parser.add_argument(
"--output-dir",
type=str,
default="",
help="Output dir for generated data",
)
parser.add_argument(
"--api-key",
type=str,
default=None,
help="The API key to use for the synthetic data generation LLM client.",
)
parser.add_argument(
"--api-timeout",
type=int,
default=120,
help="The timeout value for API calls in seconds.",
)
parser.add_argument(
"--n-partitions",
type=int,
default=1,
help="Number of partitions for parallel processing of data.",
)

args = parser.parse_args()

if not os.path.exists(args.output_dir):
os.makedirs(args.output_dir)
elif not any(os.scandir(args.output_dir)):
print("Provided directory exists but is empty, using the empty directory")
else:
raise ValueError("Output directory exists already, use a new directory!")

if args.input_dir:
input_files = get_all_files_paths_under(args.input_dir, keep_extensions="part")
input_dataset = DocumentDataset.read_json(input_files)
else:
raise ValueError(
"Input directory not provided, should contain files in jsonl format"
)

if args.n_partitions:
ddf = input_dataset.df
n_data = len(ddf)
if args.n_partitions < n_data:
ddf = ddf.repartition(npartitions=args.n_partitions)
input_dataset = DocumentDataset(ddf)
else:
print("Number of partitions greater than data size, using 1 partition")

filtering_pipeline = get_pipeline(args)

if filtering_pipeline:
print("Filtering data ...")
st_time = time.time()
filtered_dataset = filtering_pipeline(input_dataset)
filtered_dataset.persist()
print("Writing filtered data to disk ...")
all_save_dir = os.path.join(args.output_dir, "jsonl", "filtered")
os.makedirs(all_save_dir)
filtered_dataset.to_json(all_save_dir)
print("Time taken to filter data = {:.2f} s".format(time.time() - st_time))

print("Writing filtered data in beir format")
# saving in beir format
write_to_beir(args, filtered_dataset, input_dataset)
else:
print("Filtering config not correct, filtering pipeline is empty")

print("Filtering complete!")


if __name__ == "__main__":
dask_client = get_client()
main()
# dask_client.cancel(dask_client.futures, force=True)
vinay-raman marked this conversation as resolved.
Show resolved Hide resolved
Loading