Skip to content

Commit

Permalink
In-framework deployment NeMo 2.0 nemo_export.py test (#11749)
Browse files Browse the repository at this point in the history
* Trying to support in-framework NeMo 1.0 & 2.0 testing

Signed-off-by: Jan Lasek <[email protected]>

* Expose generate method in MegatronLLMDeployableNemo2 for consistency

Signed-off-by: Jan Lasek <[email protected]>

* Apply isort and black reformatting

Signed-off-by: janekl <[email protected]>

* Create Lambada dataset (5 samples) for CI testing

Signed-off-by: Jan Lasek <[email protected]>

* Parameterize accuracy threshold for Lambada test

Signed-off-by: Jan Lasek <[email protected]>

* Add nemo_export.py in-framework test

Signed-off-by: Jan Lasek <[email protected]>

* Apply isort and black reformatting

Signed-off-by: janekl <[email protected]>

* Fix cicd-main.yml

Signed-off-by: Jan Lasek <[email protected]>

* Include (most) CI hints: long lines, docstrings, unused imports

Signed-off-by: Jan Lasek <[email protected]>

* Include deploy a.k.a. infer requirements in setup.py

Signed-off-by: Jan Lasek <[email protected]>

---------

Signed-off-by: Jan Lasek <[email protected]>
Signed-off-by: janekl <[email protected]>
Co-authored-by: janekl <[email protected]>
  • Loading branch information
janekl and janekl authored Jan 10, 2025
1 parent 490964a commit 1ab22d1
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 40 deletions.
30 changes: 30 additions & 0 deletions .github/workflows/cicd-main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4871,6 +4871,35 @@ jobs:
rm -rf /tmp/nemo2_ckpt
rm -rf /tmp/nemo2_ptq_engine
L2_NeMo_2_Export_In_Framework:
needs: [cicd-test-container-setup]
uses: ./.github/workflows/_test_template.yml
if: contains(fromJSON(needs.cicd-test-container-setup.outputs.test_to_run), 'L2_NeMo_2_Export_In_Framework') || needs.cicd-test-container-setup.outputs.all == 'true'
with:
RUNNER: self-hosted-azure
SCRIPT: |
python tests/collections/llm/test_hf_import.py \
--hf_model /home/TestData/nlp/megatron_llama/llama-ci-hf \
--output_path /tmp/nemo2_ckpt
python tests/setup/data/create_sample_lambada.py \
--output_file /tmp/lambada.json
python tests/export/nemo_export.py \
--model_name test \
--model_type llama \
--checkpoint_dir /tmp/nemo2_ckpt \
--min_tps 1 \
--in_framework True \
--test_deployment True \
--run_accuracy True \
--test_data_path /tmp/lambada.json \
--accuracy_threshold 0.0 \
--debug
AFTER_SCRIPT: |
rm -rf /tmp/nemo2_ckpt /tmp/lambada.json
L2_NeMo_2_LLAVA_NEXT_MOCK_TRAINING:
needs: [cicd-test-container-setup]
uses: ./.github/workflows/_test_template.yml
Expand Down Expand Up @@ -5068,6 +5097,7 @@ jobs:
- L2_Megatron_GPT_Reranker
- L2_NeMo_2_NeMo_Mcore_Mixtral_bitexact
- L2_NeMo_2_PTQ_Llama2_FP8
- L2_NeMo_2_Export_In_Framework
- L2_NeMo_2_jit_callback
- L2_NeMo_2_LLAVA_NEXT_MOCK_TRAINING
- L2_HF_Transformer_SFT_FSDP2_2gpu
Expand Down
91 changes: 71 additions & 20 deletions nemo/deploy/nlp/megatronllm_deployable.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,15 @@
import logging
from enum import IntEnum, auto
from pathlib import Path
from typing import List
from typing import List, Optional

import numpy as np
import torch
import torch.distributed
import wrapt
from lightning.pytorch.trainer.trainer import Trainer
from megatron.core.inference.common_inference_params import CommonInferenceParams
from pytorch_lightning.trainer.trainer import Trainer
from megatron.core.inference.inference_request import InferenceRequest

import nemo.lightning as nl
from nemo.collections.llm import inference
Expand Down Expand Up @@ -94,7 +95,7 @@ def GetNumpyDtype(pyvalue):


class ServerSync(IntEnum):
"""Enum for synchronization messages using torch.distributed"""
"""Enum for synchronization messages using torch.distributed."""

WAIT = auto()
SIGNAL = auto()
Expand All @@ -104,17 +105,35 @@ def to_long_tensor(self):


class MegatronLLMDeploy:
"""
A factory class for creating deployable instances of Megatron LLM models.
This class provides a method to get the appropriate deployable instance
based on the version of the NeMo checkpoint model used.
"""

@staticmethod
def get_deployable(
nemo_checkpoint_filepath: str = None,
nemo_checkpoint_filepath: str,
num_devices: int = 1,
num_nodes: int = 1,
tensor_model_parallel_size: int = 1,
pipeline_model_parallel_size: int = 1,
context_parallel_size: int = 1,
):

"""
Returns the appropriate deployable instance for the given NeMo checkpoint.
Args:
nemo_checkpoint_filepath (str): Path to the .nemo checkpoint file.
num_devices (int): Number of devices to use for deployment.
num_nodes (int): Number of nodes to use for deployment.
tensor_model_parallel_size (int): Size of the tensor model parallelism.
pipeline_model_parallel_size (int): Size of the pipeline model parallelism.
context_parallel_size (int): Size of the context parallelism.
Returns:
ITritonDeployable: An instance of a deployable class compatible with Triton inference server.
"""
if nemo_checkpoint_version(nemo_checkpoint_filepath) == NEMO2:
return MegatronLLMDeployableNemo2(
nemo_checkpoint_filepath=nemo_checkpoint_filepath,
Expand Down Expand Up @@ -178,6 +197,39 @@ def __init__(
inference_batch_times_seqlen_threshold=inference_batch_times_seqlen_threshold,
)

def generate(
self,
prompts: List[str],
max_batch_size: int = 4,
inference_params: Optional[CommonInferenceParams] = None,
random_seed: Optional[int] = None,
) -> List[InferenceRequest]:
"""
Generates text based on the provided input prompts.
Args:
prompts (List[str]): A list of input strings.
max_batch_size (int): The maximum batch size used for inference.
inference_params (Optional[CommonInferenceParams]): Parameters for controlling the inference process.
random_seed (Optional[int]): A random seed for reproducibility.
Returns:
List[InferenceRequest]: A list containing the generated results.
"""
# TODO: This function doesn't account for parallelism settings currently

inference_params = inference_params or CommonInferenceParams()

results = inference.generate(
model=self.inference_wrapped_model,
tokenizer=self.mcore_tokenizer,
prompts=prompts,
max_batch_size=max_batch_size,
random_seed=random_seed,
inference_params=inference_params,
)
return list(results)

@property
def get_triton_input(self):
inputs = (
Expand Down Expand Up @@ -222,14 +274,7 @@ def triton_infer_fn(self, **inputs: np.ndarray):
return_log_probs=log_probs,
)

results = inference.generate(
model=self.inference_wrapped_model,
tokenizer=self.mcore_tokenizer,
prompts=prompts,
max_batch_size=max_batch_size,
random_seed=random_seed,
inference_params=inference_params,
)
results = self.generate(prompts, max_batch_size, inference_params, random_seed)

output_texts = [r.generated_text if text_only else r for r in results]
output_infer = {"sentences": cast_output(output_texts, np.bytes_)}
Expand Down Expand Up @@ -263,11 +308,14 @@ def __init__(
raise IMPORT_ERROR
if nemo_checkpoint_filepath is None and existing_model is None:
raise ValueError(
"MegatronLLMDeployable requires either a .nemo checkpoint filepath or an existing MegatronGPTModel, but both provided were None"
"MegatronLLMDeployable requires either a .nemo checkpoint filepath "
"or an existing MegatronGPTModel, but both provided were None."
)
if num_devices > 1:
LOGGER.warning(
"Creating a MegatronLLMDeployable with num_devices>1 will assume running with a PyTorch Lightning DDP-variant strategy, which will run the main script once per device. Make sure any user code is compatible with multiple executions!"
"Creating a MegatronLLMDeployable with num_devices > 1 will assume running with "
"a PyTorch Lightning DDP-variant strategy, which will run the main script once per device. "
"Make sure any user code is compatible with multiple executions!"
)

# if both existing_model and nemo_checkpoint_filepath are provided, existing_model will take precedence
Expand All @@ -292,14 +340,16 @@ def _load_from_nemo_checkpoint(self, nemo_checkpoint_filepath: str, num_devices:
# transformer_engine should always be true according to EricH, but GPT-2B model will fail if it is enabled
if not custom_config.transformer_engine:
LOGGER.warning(
"MegatronLLMDeployable expects model config transformer_engine=True, but this model has it =False. "
"Overriding it to =True, but this may break certain checkpoints converted on older Nemo versions. "
"MegatronLLMDeployable expects model config transformer_engine=True, but this model uses False. "
"Overriding it to True, but this may break certain checkpoints converted on older Nemo versions. "
"If your model breaks, please try re-converting the checkpoint on the current Nemo version."
)
custom_config.transformer_engine = True
# using multi-gpu for tensor parallelism directly for now, could do pipeline parallel instead or a combination
# using multi-gpu for tensor parallelism directly for now,
# could do pipeline parallel instead or a combination
custom_config.tensor_model_parallel_size = num_devices
# had to override these to make Nemotron3-22B work, see sample_sequence_batch() in text_generation_utils.py
# had to override these to make Nemotron3-22B work,
# see sample_sequence_batch() in text_generation_utils.py
custom_config.activations_checkpoint_granularity = None
custom_config.activations_checkpoint_method = None
# Models trained with TE < 1.10 and loaded with TE >= 1.10 require
Expand Down Expand Up @@ -398,7 +448,8 @@ def generate(self, inputs: List[str], length_params: LengthParam, sampling_param
distributed_rank = torch.distributed.get_rank()
if distributed_rank != 0:
raise ValueError(
f"Triton inference function should not be called on a thread with torch.distributed rank != 0, but this thread is rank {distributed_rank}"
"Triton inference function should not be called on a thread with "
f"torch.distributed rank != 0, but this thread is rank {distributed_rank}."
)
signal_value = ServerSync.SIGNAL.to_long_tensor()
torch.distributed.broadcast(signal_value, 0)
Expand Down
6 changes: 3 additions & 3 deletions nemo/deploy/nlp/query_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import time
from abc import ABC, abstractmethod
from abc import ABC

import numpy as np

Expand Down Expand Up @@ -141,7 +141,7 @@ def query_llm(
"object": "text_completion",
"created": int(time.time()),
"model": self.model_name,
"choices": [{"text": str(sentences)}],
"choices": [{"text": sentences}],
}
if log_probs_output is not None:
openai_response["log_probs"] = log_probs_output
Expand Down Expand Up @@ -297,7 +297,7 @@ def query_llm(
"object": "text_completion",
"created": int(time.time()),
"model": self.model_name,
"choices": [{"text": str(sentences)}],
"choices": [{"text": sentences}],
}
if output_generation_logits:
openai_response["choices"][0]["generation_logits"] = result_dict["generation_logits"]
Expand Down
6 changes: 6 additions & 0 deletions requirements/requirements_deploy.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
fastapi
nvidia-pytriton
pydantic-settings
tensorstore==0.1.45
uvicorn
zarr
2 changes: 2 additions & 0 deletions requirements/requirements_infer.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# This is a copy of requirements_deploy.txt for a seamless rename 'infer' -> 'deploy'.
# TODO: Remove this file once it is not used in container build anywhere.
fastapi
nvidia-pytriton
pydantic-settings
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def req_file(filename, folder="requirements"):
'slu': req_file("requirements_slu.txt"),
'multimodal': req_file("requirements_multimodal.txt"),
'audio': req_file("requirements_audio.txt"),
'deploy': req_file("requirements_deploy.txt"),
}


Expand Down Expand Up @@ -257,7 +258,7 @@ def finalize_options(self):
extras_require=extras_require,
# Add in any packaged data.
include_package_data=True,
exclude=['tools', 'tests', 'nemo.deploy', 'nemo.export'],
exclude=['tools', 'tests'],
package_data={'': ['*.tsv', '*.txt', '*.far', '*.fst', '*.cpp', 'Makefile']},
zip_safe=False,
# PyPI package information.
Expand Down
59 changes: 46 additions & 13 deletions tests/export/nemo_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,17 @@

in_framework_supported = True
try:
from megatron.core.inference.common_inference_params import CommonInferenceParams
from nemo.deploy.nlp import NemoQueryLLMPyTorch
from nemo.deploy.nlp.megatronllm_deployable import MegatronLLMDeployable
from nemo.deploy.nlp.megatronllm_deployable import (
MegatronLLMDeploy,
MegatronLLMDeployable,
MegatronLLMDeployableNemo2,
)
except Exception as e:
LOGGER.warning(
"Cannot import MegatronLLMDeployable or NemoQueryLLMPyTorch,"
f" in-framework inference will not be available. {type(e).__name__}: {e}"
"Cannot import MegatronLLMDeploy* classes, or NemoQueryLLMPyTorch, or CommonInferenceParams, "
f"in-framework inference will not be available. Reason: {type(e).__name__}: {e}"
)
in_framework_supported = False

Expand Down Expand Up @@ -124,6 +129,18 @@ def get_accuracy_with_lambada(model, nq, task_ids, lora_uids, test_data_path):
)
# MegatronLLMDeployable returns prompt + generated output, so need to slice off prompt
model_output = model_output["sentences"][0][len(prompt) :].strip().lower()
elif in_framework_supported and isinstance(model, MegatronLLMDeployableNemo2):
model_output = model.generate(
prompts=[prompt],
inference_params=CommonInferenceParams(
temperature=0.1,
top_k=1,
top_p=0,
num_tokens_to_generate=1,
return_log_probs=False,
),
)
model_output = model_output[0].generated_text # Index [0] as a single prompt is used
else:
model_output = model.forward(
input_texts=[prompt],
Expand Down Expand Up @@ -158,8 +175,17 @@ def get_accuracy_with_lambada(model, nq, task_ids, lora_uids, test_data_path):
top_p=0,
temperature=0.1,
)
# MegatronLLMDeployable returns prompt + generated output, so need to slice off prompt
deployed_output = deployed_output["sentences"][0][0][len(prompt) :].decode().strip().lower()
# MegatronLLMDeployable for NeMo 1.0 returns prompt + generated output, so need to slice off prompt.
# On the other hand, MegatronLLMDeployableNeMo2 in the case of NeMo 2.0 returns only generated text.
# TODO: Unify this somewhere else
if isinstance(model, MegatronLLMDeployableNemo2):
prefix_len = 0
else:
prefix_len = len(prompt)

# Accessing [0][0] of "text" is to get a raw string entry from a NumPy array
# for a single prompt (batch size = 1) and stripping prefix if needed:
deployed_output = deployed_output["choices"][0]["text"][0][0][prefix_len:].strip().lower()
else:
deployed_output = nq.query_llm(
prompts=[prompt],
Expand Down Expand Up @@ -574,7 +600,7 @@ def run_in_framework_inference(

print("Path: {0} and model: {1} will be tested".format(checkpoint_path, model_name))

deployed_model = MegatronLLMDeployable(checkpoint_path, num_gpus)
deployed_model = MegatronLLMDeploy.get_deployable(checkpoint_path, num_gpus)

nm = DeployPyTriton(
model=deployed_model,
Expand All @@ -588,10 +614,12 @@ def run_in_framework_inference(
output_deployed = nq.query_llm(
prompts=prompts, top_k=top_k, top_p=top_p, temperature=temperature, max_length=max_output_len
)
output_deployed = output_deployed["sentences"]
# MegatronLLMDeployable will return the prompt + generated output, so cut off the prompt
for i, output in enumerate(output_deployed):
output_deployed[i, :] = output[0][len(prompts[i]) :]
output_deployed = output_deployed["choices"][0]["text"]
# MegatronLLMDeployable will return the prompt + generated output, so cut off the prompt.
# On the other hand, MegatronLLMDeployableNeMo2 returns only generated text.
if isinstance(deployed_model, MegatronLLMDeployable):
for i, output in enumerate(output_deployed):
output_deployed[i, :] = output[0][len(prompts[i]) :]

# Unwrap the generator if needed
output_deployed = list(output_deployed)
Expand Down Expand Up @@ -717,6 +745,11 @@ def get_args():
type=str,
default="False",
)
parser.add_argument(
"--accuracy_threshold",
type=float,
default=0.5,
)
parser.add_argument("--streaming", default=False, action="store_true")
parser.add_argument(
"--test_cpp_runtime",
Expand Down Expand Up @@ -980,8 +1013,8 @@ def optional_bool_to_pass_fail(b: Optional[bool]):
print(f"Deployed Model Accuracy: {accuracy_result.deployed_accuracy:.4f}")
print(f"Deployed Relaxed Model Accuracy: {accuracy_result.deployed_accuracy_relaxed:.4f}")
print(f"Evaluation Time [s]: {accuracy_result.evaluation_time:.2f}")
if (deployed_tests_only and accuracy_result.deployed_accuracy_relaxed < 0.5) or (
not deployed_tests_only and accuracy_result.accuracy_relaxed < 0.5
if (deployed_tests_only and accuracy_result.deployed_accuracy_relaxed < args.accuracy_threshold) or (
not deployed_tests_only and accuracy_result.accuracy_relaxed < args.accuracy_threshold
):
accuracy_test_result = "FAIL"

Expand All @@ -995,7 +1028,7 @@ def optional_bool_to_pass_fail(b: Optional[bool]):
raise Exception("Functional test failed")

if accuracy_test_result == "FAIL":
raise Exception("Model accuracy is below 0.5")
raise Exception(f"Model accuracy is below {args.accuracy_threshold}")


if __name__ == '__main__':
Expand Down
Loading

0 comments on commit 1ab22d1

Please sign in to comment.