Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cpp/src/ray/runtime/native_ray_runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include "./object/object_store.h"
#include "./task/native_task_submitter.h"
#include "ray/common/ray_config.h"
#include "ray/util/network_util.h"

namespace ray {
namespace internal {
Expand All @@ -31,7 +32,7 @@ NativeRayRuntime::NativeRayRuntime() {

auto bootstrap_address = ConfigInternal::Instance().bootstrap_ip;
if (bootstrap_address.empty()) {
bootstrap_address = GetNodeIpAddress();
bootstrap_address = ray::GetNodeIpAddressFromPerspective();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I renamed the function to GetNodeIpAddressFromPerspective to keep it consistent with the Python function node_ip_address_from_perspective. Additionally, I used CPython bindings to avoid maintaining duplicate implementations in both C++ and Python.

}
global_state_accessor_ = ProcessHelper::GetInstance().CreateGlobalStateAccessor(
bootstrap_address, ConfigInternal::Instance().bootstrap_port);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/ray/test/cluster/cluster_mode_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ TEST(RayClusterModeTest, FullTest) {
auto port = absl::GetFlag<int32_t>(FLAGS_redis_port);
std::string username = absl::GetFlag<std::string>(FLAGS_redis_username);
std::string password = absl::GetFlag<std::string>(FLAGS_redis_password);
std::string local_ip = ray::internal::GetNodeIpAddress();
std::string local_ip = ray::GetNodeIpAddressFromPerspective();
ray::internal::ProcessHelper::GetInstance().StartRayNode(
local_ip, port, username, password);
config.address = ray::BuildAddress(local_ip, port);
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/ray/util/process_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)

if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER &&
bootstrap_ip.empty()) {
bootstrap_ip = GetNodeIpAddress();
bootstrap_ip = ray::GetNodeIpAddressFromPerspective();
StartRayNode(bootstrap_ip,
bootstrap_port,
ConfigInternal::Instance().redis_username,
Expand All @@ -95,9 +95,9 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback)
std::string node_ip = ConfigInternal::Instance().node_ip_address;
if (node_ip.empty()) {
if (!bootstrap_ip.empty()) {
node_ip = GetNodeIpAddress(bootstrap_address);
node_ip = ray::GetNodeIpAddressFromPerspective(bootstrap_address);
} else {
node_ip = GetNodeIpAddress();
node_ip = ray::GetNodeIpAddressFromPerspective();
}
}

Expand Down
21 changes: 0 additions & 21 deletions cpp/src/ray/util/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,6 @@
namespace ray {
namespace internal {

std::string GetNodeIpAddress(const std::string &address) {
auto parts = ParseAddress(address);
RAY_CHECK(parts.has_value());
try {
boost::asio::io_service netService;
boost::asio::ip::udp::resolver resolver(netService);
boost::asio::ip::udp::resolver::query query(
boost::asio::ip::udp::v4(), (*parts)[0], (*parts)[1]);
boost::asio::ip::udp::resolver::iterator endpoints = resolver.resolve(query);
boost::asio::ip::udp::endpoint ep = *endpoints;
boost::asio::ip::udp::socket socket(netService);
socket.connect(ep);
boost::asio::ip::address addr = socket.local_endpoint().address();
return addr.to_string();
} catch (std::exception &e) {
RAY_LOG(FATAL) << "Could not get the node IP address with socket. Exception: "
<< e.what();
return "";
}
}

std::string getLibraryPathEnv() {
auto path_env_p = std::getenv(kLibraryPathEnvName);
if (path_env_p != nullptr && strlen(path_env_p) != 0) {
Expand Down
14 changes: 0 additions & 14 deletions cpp/src/ray/util/util.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,6 @@
namespace ray {
namespace internal {

/// IP address by which the local node can be reached *from* the `address`.
///
/// The behavior should be the same as `node_ip_address_from_perspective` from Ray Python
/// code. See
/// https://stackoverflow.com/questions/2674314/get-local-ip-address-using-boost-asio.
///
/// TODO(kfstorm): Make this function shared code and migrate Python & Java to use this
/// function.
///
/// \param address The IP address and port of any known live service on the network
/// you care about.
/// \return The IP address by which the local node can be reached from the address.
std::string GetNodeIpAddress(const std::string &address = "8.8.8.8:53");

std::string getLibraryPathEnv();

} // namespace internal
Expand Down
3 changes: 1 addition & 2 deletions doc/source/ray-core/examples/lm/ray_train.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
from fairseq_cli.train import main

import ray
from ray._common.network_utils import build_address

_original_save_checkpoint = fairseq.checkpoint_utils.save_checkpoint

Expand Down Expand Up @@ -113,7 +112,7 @@ def run_fault_tolerant_loop():
# fairseq distributed training.
ip = ray.get(workers[0].get_node_ip.remote())
port = ray.get(workers[0].find_free_port.remote())
address = f"tcp://{build_address(ip, port)}"
address = f"tcp://{ip}:{port}"

# Start the remote processes, and check whether their are any process
# fails. If so, restart all the processes.
Expand Down
85 changes: 83 additions & 2 deletions python/ray/_common/network_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from typing import Optional, Tuple, Union
import socket
from functools import lru_cache

from ray._raylet import build_address as _build_address
from ray._raylet import parse_address as _parse_address
from ray._raylet import (
build_address as _build_address,
is_ipv6_ip as _is_ipv6_ip,
node_ip_address_from_perspective as _node_ip_address_from_perspective,
parse_address as _parse_address,
)


def parse_address(address: str) -> Optional[Tuple[str, str]]:
Expand Down Expand Up @@ -29,6 +35,56 @@ def build_address(host: str, port: Union[int, str]) -> str:
return _build_address(host, port)


def node_ip_address_from_perspective(address: str = "") -> str:
"""IP address by which the local node can be reached *from* the `address`.
If no address is given, defaults to public DNS servers for detection. For
performance, the result is cached when using the default address (empty string).
When a specific address is provided, detection is performed fresh every time.
Args:
address: The IP address and port of any known live service on the
network you care about.
Returns:
The IP address by which the local node can be reached from the address.
"""
return _node_ip_address_from_perspective(address)


def is_ipv6_ip(ip: str) -> bool:
"""Check if an IP string is IPv6 format.
Args:
ip: The IP address string to check (must be pure IP, no port).
Returns:
True if the IP is IPv6, False if IPv4.
"""
return _is_ipv6_ip(ip)


@lru_cache(maxsize=1)
def get_localhost_address() -> str:
Copy link
Contributor Author

@Yicheng-Lu-llll Yicheng-Lu-llll Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, get_localhost_address should also be moved to C++ and shared with Python. I wrote the Python version for now because this PR does not focus on localhost IPv6 support, and there are currently no usages of get_localhost_address in C++. It can definitely be moved to C++ later.

"""Get localhost loopback address with IPv4/IPv6 support.
Returns:
The localhost loopback IP address.
"""
# Try IPv4 first, then IPv6 localhost resolution
for family in [socket.AF_INET, socket.AF_INET6]:
try:
dns_result = socket.getaddrinfo(
"localhost", None, family, socket.SOCK_STREAM
)
return dns_result[0][4][0]
except socket.gaierror:
continue

# Final fallback to IPv4 loopback
return "127.0.0.1"


def is_localhost(host: str) -> bool:
"""Check if the given host string represents a localhost address.
Expand All @@ -39,3 +95,28 @@ def is_localhost(host: str) -> bool:
True if the host is a localhost address, False otherwise.
"""
return host in ("localhost", "127.0.0.1", "::1")


def create_socket(socket_type: int = socket.SOCK_STREAM) -> socket.socket:
"""Create a Python socket object with the appropriate family based on the node IP.
This function automatically gets the node IP address and creates a socket
with the correct family (AF_INET for IPv4, AF_INET6 for IPv6).
Args:
socket_type: The socket type (socket.SOCK_STREAM, socket.SOCK_DGRAM, etc.).
Returns:
A Python socket.socket object configured for the node's IP family.
Example:
# Create a TCP socket for the current node
sock = create_socket()
# Create a UDP socket for the current node
sock = create_socket(socket.SOCK_DGRAM)
"""
node_ip = node_ip_address_from_perspective()
family = socket.AF_INET6 if is_ipv6_ip(node_ip) else socket.AF_INET

return socket.socket(family, socket_type)
13 changes: 9 additions & 4 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@
import ray
import ray._private.ray_constants as ray_constants
import ray._private.services
from ray._common.network_utils import build_address, parse_address
from ray._common.network_utils import (
build_address,
create_socket,
get_localhost_address,
parse_address,
)
from ray._common.ray_constants import LOGGING_ROTATE_BACKUP_COUNT, LOGGING_ROTATE_BYTES
from ray._common.utils import try_to_create_directory
from ray._private.resource_and_label_spec import ResourceAndLabelSpec
Expand Down Expand Up @@ -138,7 +143,7 @@ def __init__(
)

self._resource_and_label_spec = None
self._localhost = socket.gethostbyname("localhost")
self._localhost = get_localhost_address()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old way should just work for both ipv4 and ipv6

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

socket.gethostbyname(hostname)
Translate a host name to IPv4 address format. The IPv4 address is returned as a string, such as '100.50.200.5'. If the host name is an IPv4 address itself it is returned unchanged. See gethostbyname_ex() for a more complete interface. gethostbyname() does not support IPv6 name resolution, and getaddrinfo() should be used instead for IPv4/v6 dual stack support.

Raises an auditing event socket.gethostbyname with argument hostname.

Availability: not WASI.

I checked the documentation here, it seems that it does not support it: https://docs.python.org/3/library/socket.html

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use socket.getaddrinfo which works for both ipv4 and ipv6.

Copy link
Contributor Author

@Yicheng-Lu-llll Yicheng-Lu-llll Sep 5, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the new function get_localhost_address uses socket.getaddrinfo underneath.
It first checks the node IP to decide. If no node IP is found, it falls back to socket.getaddrinfo, preferring IPv4.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer to have a consistent way to determine whether to use IPv6 or IPv4. The current approach is:
decide based on the user-specified node IP; if no node IP is provided, prefer IPv4, then IPv6.
The same logic should applied in all places, including localhost and socket-related code.

self._ray_params = ray_params
self._config = ray_params._system_config or {}

Expand Down Expand Up @@ -880,7 +885,7 @@ def _get_unused_port(self, allocated_ports=None):
if allocated_ports is None:
allocated_ports = set()

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = create_socket(socket.SOCK_STREAM)
s.bind(("", 0))
port = s.getsockname()[1]

Expand All @@ -893,7 +898,7 @@ def _get_unused_port(self, allocated_ports=None):
# This port is allocated for other usage already,
# so we shouldn't use it even if it's not in use right now.
continue
new_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
new_s = create_socket(socket.SOCK_STREAM)
try:
new_s.bind(("", new_port))
except OSError:
Expand Down
49 changes: 12 additions & 37 deletions python/ray/_private/services.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@
# Ray modules
import ray
import ray._private.ray_constants as ray_constants
from ray._common.network_utils import build_address, parse_address
from ray._common.network_utils import (
build_address,
create_socket,
get_localhost_address,
node_ip_address_from_perspective,
parse_address,
)
from ray._private.ray_constants import RAY_NODE_IP_FILENAME
from ray._private.resource_isolation_config import ResourceIsolationConfig
from ray._raylet import GcsClient, GcsClientOptions
Expand Down Expand Up @@ -615,52 +621,21 @@ def resolve_ip_for_localhost(host: str):
return host


def node_ip_address_from_perspective(address: str):
"""IP address by which the local node can be reached *from* the `address`.

Args:
address: The IP address and port of any known live service on the
network you care about.

Returns:
The IP address by which the local node can be reached from the address.
"""
ip_address, port = parse_address(address)
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# This command will raise an exception if there is no internet
# connection.
s.connect((ip_address, int(port)))
node_ip_address = s.getsockname()[0]
except OSError as e:
node_ip_address = "127.0.0.1"
# [Errno 101] Network is unreachable
if e.errno == errno.ENETUNREACH:
try:
# try get node ip address from host name
host_name = socket.getfqdn(socket.gethostname())
node_ip_address = socket.gethostbyname(host_name)
except Exception:
pass
finally:
s.close()

return node_ip_address


# NOTE: This API should not be used when you obtain the
# IP address when ray.init is not called because
# it cannot find the IP address if it is specified by
# ray start --node-ip-address. You should instead use
# get_cached_node_ip_address.
def get_node_ip_address(address="8.8.8.8:53"):
def get_node_ip_address(address=""):
if ray._private.worker._global_node is not None:
return ray._private.worker._global_node.node_ip_address

if not ray_constants.ENABLE_RAY_CLUSTER:
# Use loopback IP as the local IP address to prevent bothersome
# firewall popups on OSX and Windows.
# https://github.com/ray-project/ray/issues/18730.
return "127.0.0.1"
return get_localhost_address()

return node_ip_address_from_perspective(address)


Expand Down Expand Up @@ -1222,7 +1197,7 @@ def start_api_server(
port = ray_constants.DEFAULT_DASHBOARD_PORT
else:
port_retries = 0
port_test_socket = socket.socket()
port_test_socket = create_socket()
port_test_socket.setsockopt(
socket.SOL_SOCKET,
socket.SO_REUSEADDR,
Expand Down
8 changes: 4 additions & 4 deletions python/ray/_private/test_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import ray._private.services
import ray._private.services as services
import ray._private.utils
from ray._common.network_utils import build_address, parse_address
from ray._common.network_utils import build_address, create_socket, parse_address
from ray._common.test_utils import wait_for_condition
from ray._common.utils import get_or_create_event_loop
from ray._private import (
Expand Down Expand Up @@ -776,7 +776,7 @@ def wait_until_server_available(address, timeout_ms=5000, retry_interval_ms=100)
time_elapsed = 0
start = time.time()
while time_elapsed <= timeout_ms:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s = create_socket(socket.SOCK_STREAM)
s.settimeout(1)
try:
s.connect((ip, port))
Expand Down Expand Up @@ -1753,7 +1753,7 @@ def job_hook(**kwargs):


def find_free_port() -> int:
sock = socket.socket()
sock = create_socket()
sock.bind(("", 0))
port = sock.getsockname()[1]
sock.close()
Expand Down Expand Up @@ -1884,7 +1884,7 @@ def get_current_unused_port():
A port number that is not currently in use. (Note that this port
might become used by the time you try to bind to it.)
"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = create_socket(socket.SOCK_STREAM)

# Bind the socket to a local address with a random port number
sock.bind(("localhost", 0))
Expand Down
Loading