Skip to content

use file lock to avoid port conflict #5123

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions cpp/tensorrt_llm/common/envUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,23 @@ std::string getEnvUCXInterface()
return ucxInterface;
}

std::string getEnvNixlInterface()
{
static std::once_flag flag;
static std::string nixlInterface;

std::call_once(flag,
[&]()
{
char const* nixl_interface = std::getenv("TRTLLM_NIXL_INTERFACE");
if (nixl_interface)
{
nixlInterface = nixl_interface;
}
});
return nixlInterface;
}

bool getEnvDisaggLayerwise()
{
static bool const disaggLayerwise = getBoolEnv("TRTLLM_DISAGG_LAYERWISE");
Expand Down
2 changes: 2 additions & 0 deletions cpp/tensorrt_llm/common/envUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ bool getEnvUseNixlKvCache();

std::string getEnvUCXInterface();

std::string getEnvNixlInterface();

bool getEnvDisaggLayerwise();

bool getEnvDisableSelectiveCacheTransfer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,83 @@
#include "tensorrt_llm/runtime/utils/mpiUtils.h"

#include <arpa/inet.h>
#include <dirent.h>
#include <fcntl.h>
#include <ifaddrs.h>
#include <netdb.h>
#include <netinet/in.h>
#include <nixl_types.h>
#include <sys/file.h>
#include <sys/stat.h>
#include <unistd.h>
#include <vector>

namespace tensorrt_llm::executor::kv_cache
{

class FileLock
{
private:
int fd_;
std::string lockFile_;
bool locked_;

public:
explicit FileLock(std::string const& lockFile)
: fd_(-1)
, lockFile_(lockFile)
, locked_(false)
{
}

~FileLock()
{
unlock();
}

bool lock()
{
if (locked_)
return true;

size_t pos = lockFile_.find_last_of('/');
if (pos != std::string::npos)
{
std::string dir = lockFile_.substr(0, pos);
mkdir(dir.c_str(), 0755);
}

fd_ = open(lockFile_.c_str(), O_CREAT | O_WRONLY, 0644);
if (fd_ == -1)
{
TLLM_LOG_ERROR("Failed to open lock file: %s", lockFile_.c_str());
return false;
}

if (flock(fd_, LOCK_EX) == -1)
{
TLLM_LOG_ERROR("Failed to acquire file lock: %s", lockFile_.c_str());
close(fd_);
fd_ = -1;
return false;
}

locked_ = true;
return true;
}

void unlock()
{
if (locked_ && fd_ != -1)
{
flock(fd_, LOCK_UN);
close(fd_);
fd_ = -1;
locked_ = false;
}
}
};

static std::string getAvailableIP()
{
struct ifaddrs *ifaddr, *ifa;
Expand All @@ -51,14 +119,14 @@ static std::string getAvailableIP()
if (ifa->ifa_addr == nullptr)
continue;

std::string ucxInterface = common::getEnvUCXInterface();
if (!ucxInterface.empty() && strcmp(ifa->ifa_name, ucxInterface.c_str()) != 0)
std::string nixlInterface = common::getEnvNixlInterface();
if (!nixlInterface.empty() && strcmp(ifa->ifa_name, nixlInterface.c_str()) != 0)
{
continue;
}

// Skip the loopback interface
if (ucxInterface.empty() && (strncmp(ifa->ifa_name, "docker", 6) == 0 || strcmp(ifa->ifa_name, "lo") == 0))
if (nixlInterface.empty() && (strncmp(ifa->ifa_name, "docker", 6) == 0 || strcmp(ifa->ifa_name, "lo") == 0))
{
continue;
}
Expand All @@ -71,16 +139,16 @@ static std::string getAvailableIP()
char address_buffer[INET_ADDRSTRLEN];
inet_ntop(AF_INET, addr_ptr, address_buffer, sizeof(address_buffer));

TLLM_LOG_DEBUG(mpi::MpiComm::world().getRank(), " ***** UCX Interface: %s IP Address: %s", ifa->ifa_name,
address_buffer);
TLLM_LOG_DEBUG(mpi::MpiComm::world().getRank(), " ***** NIXL Interface: %s IP Address: %s",
ifa->ifa_name, address_buffer);
ip = address_buffer;
break;
}
}
if (ifa == nullptr)
{
TLLM_LOG_ERROR(mpi::MpiComm::world().getRank(),
"UCX No valid IP address found please set correct UCX interface with env variable TRTLLM_UCX_INTERFACE");
"UCX No valid IP address found please set correct NIXL interface with env variable TRTLLM_UCX_INTERFACE");
}

freeifaddrs(ifaddr);
Expand Down Expand Up @@ -198,11 +266,18 @@ NixlTransferAgent::NixlTransferAgent(BaseAgentConfig const& config)
: mName{config.mName}
{
nixl_status_t status;
auto envPort = common::getEnvNixlPort();
uint16_t port = envPort > 0 ? getIncrmentPort(envPort) : getAvailablePort();
nixlAgentConfig nixlConfig{config.useProgThread, true, port};
mAddress = getAvailableIP() + ":" + std::to_string(port);
mRawAgent = std::make_unique<nixlAgent>(config.mName, std::move(nixlConfig));
{
FileLock lock("/tmp/trtllm_nixl_port.lock");
if (!lock.lock())
{
TLLM_THROW("Failed to lock /tmp/trtllm_nixl_port.lock");
}
auto envPort = common::getEnvNixlPort();
uint16_t port = envPort > 0 ? getIncrmentPort(envPort) : getAvailablePort();
nixlAgentConfig nixlConfig{config.useProgThread, true, port};
mAddress = getAvailableIP() + ":" + std::to_string(port);
mRawAgent = std::make_unique<nixlAgent>(config.mName, std::move(nixlConfig));
}

nixl_b_params_t init1;
nixl_mem_list_t mems1;
Expand Down Expand Up @@ -337,6 +412,9 @@ void NixlTransferAgent::connectRemoteAgent(std::string const& name, ConnectionIn
{
std::string ip = connectionInfo.substr(0, connectionInfo.find(":"));
std::string port = connectionInfo.substr(connectionInfo.find(":") + 1);
TLLM_LOG_DEBUG(mpi::MpiComm::world().getRank(),
"NixlTransferAgent::connectRemoteAgent connectRemoteAgent to %s remoteagent name: %s", connectionInfo.c_str(),
name.c_str());
TLLM_CHECK_WITH_INFO(!ip.empty() && !port.empty(), "connectRemoteAgent get empty ip or port, connectionInfo: %s",
connectionInfo.c_str());
nixl_opt_args_t md_extra_params;
Expand Down