diff --git a/cpp/tensorrt_llm/common/envUtils.cpp b/cpp/tensorrt_llm/common/envUtils.cpp index 4c1c41d5ae..a4fb013b2f 100644 --- a/cpp/tensorrt_llm/common/envUtils.cpp +++ b/cpp/tensorrt_llm/common/envUtils.cpp @@ -284,6 +284,23 @@ std::string getEnvUCXInterface() return ucxInterface; } +std::string getEnvNixlInterface() +{ + static std::once_flag flag; + static std::string nixlInterface; + + std::call_once(flag, + [&]() + { + char const* nixl_interface = std::getenv("TRTLLM_NIXL_INTERFACE"); + if (nixl_interface) + { + nixlInterface = nixl_interface; + } + }); + return nixlInterface; +} + bool getEnvDisaggLayerwise() { static bool const disaggLayerwise = getBoolEnv("TRTLLM_DISAGG_LAYERWISE"); diff --git a/cpp/tensorrt_llm/common/envUtils.h b/cpp/tensorrt_llm/common/envUtils.h index 5a9dc27182..e85ee6468f 100644 --- a/cpp/tensorrt_llm/common/envUtils.h +++ b/cpp/tensorrt_llm/common/envUtils.h @@ -60,6 +60,8 @@ bool getEnvUseNixlKvCache(); std::string getEnvUCXInterface(); +std::string getEnvNixlInterface(); + bool getEnvDisaggLayerwise(); bool getEnvDisableSelectiveCacheTransfer(); diff --git a/cpp/tensorrt_llm/executor/cache_transmission/nixl_utils/transferAgent.cpp b/cpp/tensorrt_llm/executor/cache_transmission/nixl_utils/transferAgent.cpp index 812f2af87a..721cade13a 100644 --- a/cpp/tensorrt_llm/executor/cache_transmission/nixl_utils/transferAgent.cpp +++ b/cpp/tensorrt_llm/executor/cache_transmission/nixl_utils/transferAgent.cpp @@ -22,15 +22,83 @@ #include "tensorrt_llm/runtime/utils/mpiUtils.h" #include +#include +#include #include #include #include #include +#include +#include #include +#include namespace tensorrt_llm::executor::kv_cache { +class FileLock +{ +private: + int fd_; + std::string lockFile_; + bool locked_; + +public: + explicit FileLock(std::string const& lockFile) + : fd_(-1) + , lockFile_(lockFile) + , locked_(false) + { + } + + ~FileLock() + { + unlock(); + } + + bool lock() + { + if (locked_) + return true; + + size_t pos = lockFile_.find_last_of('/'); + if (pos != std::string::npos) + { + std::string dir = lockFile_.substr(0, pos); + mkdir(dir.c_str(), 0755); + } + + fd_ = open(lockFile_.c_str(), O_CREAT | O_WRONLY, 0644); + if (fd_ == -1) + { + TLLM_LOG_ERROR("Failed to open lock file: %s", lockFile_.c_str()); + return false; + } + + if (flock(fd_, LOCK_EX) == -1) + { + TLLM_LOG_ERROR("Failed to acquire file lock: %s", lockFile_.c_str()); + close(fd_); + fd_ = -1; + return false; + } + + locked_ = true; + return true; + } + + void unlock() + { + if (locked_ && fd_ != -1) + { + flock(fd_, LOCK_UN); + close(fd_); + fd_ = -1; + locked_ = false; + } + } +}; + static std::string getAvailableIP() { struct ifaddrs *ifaddr, *ifa; @@ -51,14 +119,14 @@ static std::string getAvailableIP() if (ifa->ifa_addr == nullptr) continue; - std::string ucxInterface = common::getEnvUCXInterface(); - if (!ucxInterface.empty() && strcmp(ifa->ifa_name, ucxInterface.c_str()) != 0) + std::string nixlInterface = common::getEnvNixlInterface(); + if (!nixlInterface.empty() && strcmp(ifa->ifa_name, nixlInterface.c_str()) != 0) { continue; } // Skip the loopback interface - if (ucxInterface.empty() && (strncmp(ifa->ifa_name, "docker", 6) == 0 || strcmp(ifa->ifa_name, "lo") == 0)) + if (nixlInterface.empty() && (strncmp(ifa->ifa_name, "docker", 6) == 0 || strcmp(ifa->ifa_name, "lo") == 0)) { continue; } @@ -71,8 +139,8 @@ static std::string getAvailableIP() char address_buffer[INET_ADDRSTRLEN]; inet_ntop(AF_INET, addr_ptr, address_buffer, sizeof(address_buffer)); - TLLM_LOG_DEBUG(mpi::MpiComm::world().getRank(), " ***** UCX Interface: %s IP Address: %s", ifa->ifa_name, - address_buffer); + TLLM_LOG_DEBUG(mpi::MpiComm::world().getRank(), " ***** NIXL Interface: %s IP Address: %s", + ifa->ifa_name, address_buffer); ip = address_buffer; break; } @@ -80,7 +148,7 @@ static std::string getAvailableIP() if (ifa == nullptr) { TLLM_LOG_ERROR(mpi::MpiComm::world().getRank(), - "UCX No valid IP address found please set correct UCX interface with env variable TRTLLM_UCX_INTERFACE"); + "UCX No valid IP address found please set correct NIXL interface with env variable TRTLLM_UCX_INTERFACE"); } freeifaddrs(ifaddr); @@ -198,11 +266,18 @@ NixlTransferAgent::NixlTransferAgent(BaseAgentConfig const& config) : mName{config.mName} { nixl_status_t status; - auto envPort = common::getEnvNixlPort(); - uint16_t port = envPort > 0 ? getIncrmentPort(envPort) : getAvailablePort(); - nixlAgentConfig nixlConfig{config.useProgThread, true, port}; - mAddress = getAvailableIP() + ":" + std::to_string(port); - mRawAgent = std::make_unique(config.mName, std::move(nixlConfig)); + { + FileLock lock("/tmp/trtllm_nixl_port.lock"); + if (!lock.lock()) + { + TLLM_THROW("Failed to lock /tmp/trtllm_nixl_port.lock"); + } + auto envPort = common::getEnvNixlPort(); + uint16_t port = envPort > 0 ? getIncrmentPort(envPort) : getAvailablePort(); + nixlAgentConfig nixlConfig{config.useProgThread, true, port}; + mAddress = getAvailableIP() + ":" + std::to_string(port); + mRawAgent = std::make_unique(config.mName, std::move(nixlConfig)); + } nixl_b_params_t init1; nixl_mem_list_t mems1; @@ -337,6 +412,9 @@ void NixlTransferAgent::connectRemoteAgent(std::string const& name, ConnectionIn { std::string ip = connectionInfo.substr(0, connectionInfo.find(":")); std::string port = connectionInfo.substr(connectionInfo.find(":") + 1); + TLLM_LOG_DEBUG(mpi::MpiComm::world().getRank(), + "NixlTransferAgent::connectRemoteAgent connectRemoteAgent to %s remoteagent name: %s", connectionInfo.c_str(), + name.c_str()); TLLM_CHECK_WITH_INFO(!ip.empty() && !port.empty(), "connectRemoteAgent get empty ip or port, connectionInfo: %s", connectionInfo.c_str()); nixl_opt_args_t md_extra_params;