Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove getArgsNumAfterSegmentRuns #4122

Merged
merged 12 commits into from
Mar 25, 2025
Merged

Remove getArgsNumAfterSegmentRuns #4122

merged 12 commits into from
Mar 25, 2025

Conversation

wujingyue
Copy link
Collaborator

@wujingyue wujingyue commented Mar 21, 2025

Instead, check peak memory directly. The utilities will be used for verifying deallocation in host IR.

https://testing.googleblog.com/2013/08/testing-on-toilet-test-behavior-not.html

Copy link

github-actions bot commented Mar 21, 2025

Review updated until commit bc197eb

Description

  • Removed getArgsNumAfterSegmentRuns method

  • Moved memory leak check to the test

  • Added memory allocation tracking utilities

  • Updated test to verify memory deallocation


Changes walkthrough 📝

Relevant files
Enhancement
5 files
global_allocator.cpp
Reset tensor and allocated bytes in Arena                               
+3/-2     
fusion_kernel_runtime.cpp
Remove tracking of live arguments after segment runs         
+0/-4     
utils.cpp
Add memory allocation tracking utilities                                 
+53/-5   
fusion_kernel_runtime.h
Remove `getArgsNumAfterSegmentRuns` method                             
+0/-10   
utils.h
Declare memory allocation tracking utilities                         
+10/-0   
Formatting
3 files
test_multidevice_overlap.cpp
Reorder member variable declaration                                           
+2/-1     
validator.cpp
Remove unused include                                                                       
+0/-1     
CMakeLists.txt
Fix formatting in `add_test_without_main` function             
+2/-1     
Tests
1 files
test_runtime.cpp
Add memory allocation checks in test                                         
+31/-22 

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

🧪 PR contains tests
⚡ Recommended focus areas for review

Performance Impact

The removal of getArgsNumAfterSegmentRuns and the associated checks in the tests might impact the ability to verify that intermediate tensors are being properly deallocated. Ensure that the new method of checking peak memory directly is sufficient to catch memory leaks or excessive memory usage.

      args, runtime_workspace_, segmented_fusion_->inputs());

  // group should share cache id.
  auto group_cache_id = args.getCacheId();

  const int64_t num_groups = (int64_t)runtime_workspace_.group_run_order.size();
  if (isProfilerEnabled()) {
    FusionProfiler::startCompile();
  }

  // host ir
  std::unique_ptr<hir::HostIrContainer> hic;
  if (isOptionEnabled(EnableOption::HostIrLowering)) {
    hic = std::make_unique<hir::HostIrContainer>(
        num_groups); // Some indices will be empty
  }

  std::atomic<bool> detect_exception_in_thread_pool{false};
  std::string thread_pool_error_message;
  std::mutex thread_pool_error_message_mutex;
  for (int64_t run_order_id = 0; run_order_id < num_groups; ++run_order_id) {
    auto group_to_run = runtime_workspace_.group_run_order.at(run_order_id);

    if (isDebugDumpEnabled(DebugDumpOption::PythonDefinitionSegments)) {
      debug() << "Python definition for segmented group "
              << group_to_run->groupId() << ":" << std::endl;
      python_frontend::FusionDefinition fd(/*id=*/std::nullopt);
      python_frontend::translate(group_to_run->getFusion(), &fd);
      fd.print(debug());
    }

    // TODO: index mode should be updated per segmented kernel
    // Prepare input vector
    auto group_runtime_inputs =
        args_manager.translateValsToArgs(group_to_run->inputs());
    group_runtime_inputs.setDeviceIndex(args.getDeviceIndex());
    if (group_cache_id.has_value()) {
      group_runtime_inputs.setCacheId(group_cache_id.value());
    }

    if (num_groups == 1 || isOptionDisabled(DisableOption::ParallelCompile)) {
      FUSER_PERF_SCOPE("FusionKernelRuntime::compileFusionParallel");
      c10::cuda::CUDAGuard dg(args.getDeviceIndex());
      c10::Device device(c10::DeviceType::CUDA, args.getDeviceIndex());
      compileKernel(group_runtime_inputs, group_to_run, hic.get());
    } else {
      hir::HostIrContainer* hic_ptr = hic.get();
      // launch compileKernel thread here
      getThreadPool()->run([this,
                            args,
                            group_runtime_inputs,
                            group_to_run,
                            &detect_exception_in_thread_pool,
                            &thread_pool_error_message,
                            &thread_pool_error_message_mutex,
                            hic_ptr]() {
        FUSER_PERF_SCOPE("FusionKernelRuntime::compileFusionParallel");
        try {
          c10::cuda::CUDAGuard dg(args.getDeviceIndex());
          c10::Device device(c10::DeviceType::CUDA, args.getDeviceIndex());
          compileKernel(group_runtime_inputs, group_to_run, hic_ptr);
        } catch (const std::exception& e) {
          // Set flag inside lambda so we can throw an exception after thread
          // pool completes its work.
          detect_exception_in_thread_pool.store(true);
          const std::lock_guard<std::mutex> lock(
              thread_pool_error_message_mutex);
          std::stringstream ss;
          ss << thread_pool_error_message << "\nError from segmentation group "
             << group_to_run->groupId() << ": " << e.what() << "\n";
          thread_pool_error_message = ss.str();
        }
      });
    }

    auto fusion_to_run = segmented_fusion_->makeFusion(group_to_run).second;
    auto group_runtime_outputs =
        inferOutputSizes(fusion_to_run.get(), group_runtime_inputs);

    // map output args to tensor map
    args_manager.updateWithSegmentOutputs(
        group_to_run->outputs(), group_runtime_outputs, run_order_id);
  }

  // add all expressions and compiled kernels to the host ir container
  if (hic != nullptr) {
    IrCloner ir_cloner(hic.get());
    FusionGuard::setCurFusion(hic.get());
    for (int64_t run_order_id = 0; run_order_id < num_groups; ++run_order_id) {
      auto group_to_run = runtime_workspace_.group_run_order.at(run_order_id);
      if (hic->hasKernelExecutor(run_order_id)) {
        auto in_clone = ir_cloner.clone(group_to_run->inputs());
        auto out_clone = ir_cloner.clone(group_to_run->outputs());
        auto heuristic_params = schedulers().at(run_order_id).get();
        auto launch_kernel = IrBuilder::create<hir::LaunchKernel>(
            run_order_id,
            heuristic_params->lparams,
            heuristic_params->cparams,
            std::vector<Val*>{in_clone},
            std::vector<Val*>{out_clone});
        hic->pushBackTopLevelExprs(launch_kernel);
      } else {
        // push back segment's exprs into the container as top level expressions
        for (auto* expr : group_to_run->exprs()) {
          auto cloned_expr = ir_cloner.clone(expr);
          hic->pushBackTopLevelExprs(cloned_expr);
        }
      }
    }
    for (const Val* in : segmented_fusion_->inputs()) {
      hic->addInput(ir_cloner.clone(in));
    }
    for (const Val* out : segmented_fusion_->outputs()) {
      hic->addOutput(ir_cloner.clone(out));
    }
  }

  if (num_groups != 1 && !isOptionDisabled(DisableOption::ParallelCompile)) {
    // Wait until all segments finish compiling
    getThreadPool()->waitWorkComplete();
    NVF_ERROR(
        !detect_exception_in_thread_pool.load(),
        "Detected exception while compiling fusion segments in parallel. ",
        "Error messages from all threads are printed below.\n",
        thread_pool_error_message,
        "\nUse NVFUSER_DISABLE=parallel_compile to simplify error message.");
  }

  if (hic != nullptr) {
    hie_ = std::make_unique<hir::HostIrEvaluator>(
        hir::HostIrEvaluator(std::move(hic)));
  }

  if (isProfilerEnabled()) {
    FusionProfiler::stopCompile();
  }
}

void FusionKernelRuntime::disableKernelLaunch() {
  NVF_CHECK(
      isCompiled(),
      "Tried to set parameters of executors before they were initialized.");
  for (auto& executor : executors_) {
    if (auto ke = dynamic_cast<KernelExecutor*>(executor.get())) {
      ke->setExecuteKernelFlag(false);
    }
  }
}

SegmentedFusion* FusionKernelRuntime::fusionSegments() const {
  return segmented_fusion_.get();
}

HeuristicParamsList* FusionKernelRuntime::schedulerHeuristics() const {
  return heuristics_.get();
}

const ExecutorLog& FusionKernelRuntime::getMostRecentExecutorLog() const {
  NVF_ERROR(profiling_, "Executor log is only produced in profiling mode");
  return most_recent_executor_log_;
}

std::optional<std::unique_ptr<HeuristicParamsList>> FusionKernelRuntime::
    getMaybeHeuristicsFor(
        const KernelArgumentHolder& args,
        std::optional<PrimDataType> forced_index_type) {
  FUSER_PERF_SCOPE("FusionKernelRuntime::getMaybeHeuristicsFor");

  // The runtime group run order is different from the segmented_fusion group
  // order. Instead of using HeuristicParamsList::emplaceBack, we initialize
  // HeuristicParamsList with the desired number of groups.
  const int64_t num_groups = (int64_t)runtime_workspace_.group_run_order.size();
  std::unique_ptr<HeuristicParamsList> heuristics =
      std::make_unique<HeuristicParamsList>(num_groups);

  // We make a mutable copy of args so that we can use it in an
  // ArgumentManager
  ArgumentManager args_manager(
      args, runtime_workspace_, segmented_fusion_->inputs());
  // Follow group run order
  for (int64_t group_id : c10::irange(num_groups)) {
    auto group_to_run = runtime_workspace_.group_run_order.at(group_id);

    // Create fusion for this segmented group
    Fusion* fusion_to_run = group_to_run->getFusion();
    NVF_ERROR(fusion_to_run != nullptr);
    FusionGuard fg(fusion_to_run);

    // Get input arguments for SchedulerRuntimeInfo
    KernelArgumentHolder group_runtime_inputs =
        args_manager.translateValsToArgs(group_to_run->inputs());
    group_runtime_inputs.setDeviceIndex(args.getDeviceIndex());

    // Create PrecomputedValues for fusion segment
    std::unique_ptr<PrecomputedValues> evaluator_precomputed_values;
    {
      FUSER_PERF_SCOPE(
          "FusionKernelRuntime::getMaybeHeuristicsFor::PrecomputedValues");
      evaluator_precomputed_values =
          std::make_unique<PrecomputedValues>(fusion_to_run);
      evaluator_precomputed_values->bindInputs(group_runtime_inputs);
      // TODO Remove binding the original fusion inputs when creating
      // heuristics for fusion segment.
      evaluator_precomputed_values->bindValues(
          group_to_run->getCompleteFusionInputs(), args);
      evaluator_precomputed_values->evaluate();
    }

    // Get all tensorviews for segmented fusion
    std::vector<TensorView*> all_tvs_for_fusion_to_run =
        fusion_to_run->allTvs();

    SchedulerRuntimeInfo fusion_to_run_info(
        fusion_to_run,
        group_runtime_inputs,
        evaluator_precomputed_values.get(),
        all_tvs_for_fusion_to_run,
        forced_index_type);

    if (heuristics_ == nullptr) {
      // Add new scheduler entry for this segmented group
      heuristics->at(group_to_run->groupId()) =
          segmented_fusion_->makeInitialHeuristicParams(
              group_to_run, fusion_to_run_info);
    } else {
      // Try to get scheduler entry
      // NOTE: we are able to skip compile time checks here since the fusion
      // has already been segmented. During segmentation, each segment must
      // pass both canScheduleCompileTime and canScheduleRuntime for the
      // scheduler that accepts the segment. Since we might have different
      // runtime info than was used during segmentation, we cannot skip
      // canScheduleRuntime, but it is safe to skip canScheduleCompileTime. We
      // skip it here to avoid performing expensive fusion traversals on the
      // dynamic shape path.
      auto maybe_heuristic_params =
          group_to_run->getMaybeHeuristicParams(fusion_to_run_info);
      // If unavailable, then return std::nullopt
      if (!maybe_heuristic_params.has_value()) {
        return std::nullopt;
      }
      // Check if this scheduler entry matches the previous entry for this
      // segmented group. If no match, then return std::nullptr
      auto heuristic_params = std::move(maybe_heuristic_params.value());
      if (!heuristic_params->sameAs(
              heuristics_->at(group_to_run->groupId()).get())) {
        return std::nullopt;
      }
      // Add new scheduler entry for this segmented group
      heuristics->at(group_to_run->groupId()) = std::move(heuristic_params);
    }

    // Generate metadata for the fusion's outputs
    auto group_runtime_outputs = inferOutputSizes(
        fusion_to_run,
        group_runtime_inputs,
        evaluator_precomputed_values.get());

    args_manager.updateWithSegmentOutputs(
        group_to_run->outputs(), group_runtime_outputs, group_id);
  }
  return heuristics;
}

void FusionKernelRuntime::updateHeuristicsLaunchParams(
    HeuristicParamsList* update_heuristics) {
  auto scheduler_list_length = heuristics_->heuristicsList().size();
  NVF_ERROR(
      update_heuristics->heuristicsList().size() == scheduler_list_length);
  for (const auto i : c10::irange(scheduler_list_length)) {
    auto& heuristic_params = heuristics_->heuristicsList()[i];
    heuristic_params->lparams = update_heuristics->heuristicsList()[i]->lparams;
  }
}

const std::vector<std::unique_ptr<ExecutorAbstract>>& FusionKernelRuntime::
    executors() const {
  return executors_;
}

std::unordered_map<Val*, PolymorphicValue> FusionKernelRuntime::
    runSegmentsWithInputs(KernelArgumentHolder& args) {
  FUSER_PERF_SCOPE("FusionKernelRuntime::runSegmentsWithInputs");
  NVF_ERROR(
      args.size() == segmented_fusion_->inputs().size(),
      "Inputs were not set up correctly, received ",
      args.size(),
      " inputs but expected ",
      segmented_fusion_->inputs().size());

  ArgumentManager args_manager(
      args, runtime_workspace_, segmented_fusion_->inputs());

  // group should share cache id.
  auto group_cache_id = args.getCacheId();
  const int64_t num_groups = (int64_t)runtime_workspace_.group_run_order.size();
  kernel_time_ms_ = 0;
  for (auto run_order_id : c10::irange(num_groups)) {
    // TODO: index mode should be updated per segmented kernel
    // Prepare input vector
    auto group_to_run = runtime_workspace_.group_run_order.at(run_order_id);
    KernelArgumentHolder group_runtime_inputs =
        args_manager.translateValsToArgs(group_to_run->inputs());
    group_runtime_inputs.setDeviceIndex(args.getDeviceIndex());
    if (group_cache_id.has_value()) {
      group_runtime_inputs.setCacheId(group_cache_id.value());
    }

    // TODO: currently we are still outputing PyTorch tensors, instead of
    // something abstract. This is quite unsatisfying.

    // Run graph segment
    KernelArgumentHolder group_runtime_outputs =
        runKernelWithInput(group_runtime_inputs, group_to_run);

    args_manager.updateWithSegmentOutputs(
        group_to_run->outputs(), group_runtime_outputs, run_order_id);
  }

  if (isProfilerEnabled()) {
    int64_t input_bytes = 0;
    for (auto* inp : fusionSegments()->inputs()) {
      if (inp->isA<TensorView>()) {
Test Coverage

The new test ClearGmemBetweenSegments should be thoroughly reviewed to ensure it covers all scenarios that the old test FusionClearGmemBetweenSegments_CUDA covered. Verify that the new test accurately checks for memory deallocation and peak memory usage.

TEST_F(RuntimeTest, ClearGmemBetweenSegments) {
  at::cuda::clearCublasWorkspaces();
  releaseZeroedMemory();
  ASSERT_EQ(memoryAllocated(0), 0) << "Previous tests leaked memory.";

  auto fusion = std::make_unique<Fusion>();
  FusionGuard fg(fusion.get());
  std::vector<int64_t> input_shape{32, 64, 8, 128};
  auto tv0 = TensorViewBuilder()
                 .ndims(input_shape.size())
                 .dtype(DataType::Double)
                 .build();
  fusion->addInput(tv0);
  auto tv1 = add(tv0, IrBuilder::create<Val>(1.0)); // Group 0
  auto tv2 = sum(tv1, {0}); // Group 0
  auto tv3 = sum(tv2, {-1}); // Group 1
  auto output = sum(tv3, {0}); // Group 2
  fusion->addOutput(output);

  resetPeakMemoryStats(0);
  ASSERT_EQ(maxMemoryAllocated(0), 0) << "No tensors are allocated so far.";

  auto options = at::TensorOptions().dtype(at::kDouble).device(at::kCUDA, 0);
  at::Tensor at_x = at::randn(input_shape, options);
  FusionExecutorCache executor_cache(std::move(fusion));
  auto outputs = executor_cache.runFusionWithInputs({at_x});
  const int64_t max_memory_allocated = maxMemoryAllocated(0);

  auto runtime = executor_cache.getMostRecentKernelRuntime();
  EXPECT_EQ(runtime->fusionSegments()->groups().size(), 3)
      << "segmentation didn't happen as expected";
  testValidate(executor_cache.fusion(), outputs, {at_x}, __LINE__, __FILE__);

  if (c10::utils::check_env("PYTORCH_NO_CUDA_MEMORY_CACHING") == true) {
    GTEST_SKIP() << "Skipped because PYTORCH_NO_CUDA_MEMORY_CACHING is on. "
                    "This usually happens when running with compute-sanitizer. "
                    "maxMemoryAllocated can only collect peak memory "
                    "from a caching allocator.";
  }

  EXPECT_EQ(
      max_memory_allocated,
      (32 * 64 * 8 * 128 + 64 * 8 * 128 + 64 * 8) * sizeof(double))
      << "tv0 (32 * 64 * 8 * 128) outlived the execution, so it contributes "
         "to the peak memory. tv1 was never allocated because it's internal "
         "to group 0. tv2 (64 * 8 * 128) and tv3 (64 * 8) were both alive "
         "when executing group 1.";
}

} // namespace nvfuser
Memory Management

The new functions resetPeakMemoryStats, maxMemoryAllocated, and memoryAllocated should be reviewed for correctness and efficiency. Ensure they accurately reflect the memory usage and peak memory allocation on the specified device.

void resetPeakMemoryStats(const c10::DeviceIndex device) {
  c10::cuda::CUDACachingAllocator::CUDAAllocator* allocator =
      c10::cuda::CUDACachingAllocator::get();
  NVF_CHECK(allocator != nullptr);

  allocator->resetPeakStats(device);
}

namespace {
// Stats like allocated_bytes comes as a size-3 array (cf.
// https://github.com/pytorch/pytorch/blob/feb503c1df78afd46962ed04e446d6e88ac0522d/c10/core/Allocator.h#L365-L370).
// The 0-th element is an aggregation of both the small pool and the large.
constexpr auto kAggregateStatsIndex = static_cast<uint64_t>(
#if NVF_TORCH_VERSION_NO_LESS(2, 7, 0)
    c10::CachingAllocator::StatType::AGGREGATE
#else
    c10::CachingDeviceAllocator::StatType::AGGREGATE
#endif
);
} // namespace

int64_t maxMemoryAllocated(const c10::DeviceIndex device) {
  c10::cuda::CUDACachingAllocator::CUDAAllocator* allocator =
      c10::cuda::CUDACachingAllocator::get();
  NVF_CHECK(allocator != nullptr);

  c10::CachingDeviceAllocator::DeviceStats device_stats =
      allocator->getDeviceStats(device);

  return device_stats.allocated_bytes.at(kAggregateStatsIndex).peak;
}

int64_t memoryAllocated(const c10::DeviceIndex device) {
  c10::cuda::CUDACachingAllocator::CUDAAllocator* allocator =
      c10::cuda::CUDACachingAllocator::get();
  NVF_CHECK(allocator != nullptr);

  c10::CachingDeviceAllocator::DeviceStats device_stats =
      allocator->getDeviceStats(device);

  return device_stats.allocated_bytes.at(kAggregateStatsIndex).current;
}

} // namespace nvfuser

@wujingyue
Copy link
Collaborator Author

!test

@wujingyue wujingyue requested a review from nsarka March 21, 2025 21:00
@wujingyue wujingyue marked this pull request as ready for review March 21, 2025 21:00
@wujingyue
Copy link
Collaborator Author

!test

@wujingyue
Copy link
Collaborator Author

!test

Comment on lines 25 to 27
at::cuda::clearCublasWorkspaces();
releaseZeroedMemory();
ASSERT_EQ(memoryAllocated(0), 0) << "Memory leak detected";
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can also do this in TearDown of all tests so we can verify none of them leak GPU memory. I'm happy to pursue that in a separate PR because the blast radius is larger.

@wujingyue wujingyue requested a review from naoyam March 22, 2025 00:17
@wujingyue
Copy link
Collaborator Author

!test

@wujingyue
Copy link
Collaborator Author

!test

@wujingyue wujingyue requested a review from jjsjann123 March 25, 2025 15:06
@@ -29,7 +29,8 @@ class Arena {
debug() << "[global zeroed memory] Resetting allocated bytes to 0"
<< std::endl;
}
allocated_bytes_ = 0LL;
allocated_bytes_ = 0;
tensor_.reset();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏

@@ -210,6 +209,8 @@ class CollectiveBasedOverlapTest : public OverlapTest {
params.N});
return tc_unsharded_expected_reshaped.select(1, my_device_index_);
}

at::Tensor tc_locally_reduced_;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👏

@wujingyue
Copy link
Collaborator Author

!build

@wujingyue wujingyue requested a review from jjsjann123 March 25, 2025 22:20
@wujingyue wujingyue merged commit a0ce36a into main Mar 25, 2025
16 checks passed
@wujingyue wujingyue deleted the wjy/alloc branch March 25, 2025 22:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants