From af1cc3ee7eb07509f0a30d21c3854860dc42a331 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Thu, 4 Sep 2025 07:04:20 +0000 Subject: [PATCH 1/4] add IPv6 support for sockets Signed-off-by: Yicheng-Lu-llll --- cpp/src/ray/runtime/native_ray_runtime.cc | 3 +- cpp/src/ray/test/cluster/cluster_mode_test.cc | 2 +- cpp/src/ray/util/process_helper.cc | 6 +- cpp/src/ray/util/util.cc | 21 --- cpp/src/ray/util/util.h | 14 -- doc/source/ray-core/examples/lm/ray_train.py | 3 +- python/ray/_common/network_utils.py | 95 +++++++++++++- python/ray/_private/node.py | 13 +- python/ray/_private/services.py | 49 ++----- python/ray/_private/test_utils.py | 8 +- python/ray/_private/tls_utils.py | 19 ++- python/ray/air/_internal/util.py | 3 +- python/ray/dashboard/tests/test_dashboard.py | 3 +- python/ray/experimental/collective/util.py | 3 +- python/ray/includes/network_util.pxd | 7 +- python/ray/includes/network_util.pxi | 35 +++++ python/ray/serve/_private/http_util.py | 3 +- python/ray/serve/tests/test_standalone.py | 5 +- python/ray/tests/conftest.py | 9 +- python/ray/tests/test_advanced_3.py | 3 +- python/ray/util/client/server/proxier.py | 3 +- python/ray/util/rpdb.py | 7 +- python/ray/util/spark/utils.py | 3 +- .../workloads/benchmark_util.py | 3 +- ...nv_runner_server_for_external_inference.py | 3 +- .../classes/utils/dummy_external_client.py | 3 +- src/ray/common/BUILD.bazel | 5 + src/ray/common/node_config.h | 43 ++++++ .../tests/gcs_client_reconnection_test.cc | 6 +- src/ray/gcs/gcs_server/gcs_server_main.cc | 3 + .../tests/gcs_health_check_manager_test.cc | 14 +- .../gcs/store_client/redis_async_context.cc | 24 +++- src/ray/raylet/main.cc | 3 + src/ray/raylet/tests/BUILD.bazel | 1 + .../tests/runtime_env_agent_client_test.cc | 15 +-- src/ray/util/BUILD.bazel | 2 + src/ray/util/network_util.cc | 124 +++++++++++++++++- src/ray/util/network_util.h | 24 ++++ 38 files changed, 440 insertions(+), 150 deletions(-) create mode 100644 src/ray/common/node_config.h diff --git a/cpp/src/ray/runtime/native_ray_runtime.cc b/cpp/src/ray/runtime/native_ray_runtime.cc index e494d0004550..335bb5c91bc9 100644 --- a/cpp/src/ray/runtime/native_ray_runtime.cc +++ b/cpp/src/ray/runtime/native_ray_runtime.cc @@ -20,6 +20,7 @@ #include "./object/object_store.h" #include "./task/native_task_submitter.h" #include "ray/common/ray_config.h" +#include "ray/util/network_util.h" namespace ray { namespace internal { @@ -31,7 +32,7 @@ NativeRayRuntime::NativeRayRuntime() { auto bootstrap_address = ConfigInternal::Instance().bootstrap_ip; if (bootstrap_address.empty()) { - bootstrap_address = GetNodeIpAddress(); + bootstrap_address = ray::GetNodeIpAddressFromPerspective(); } global_state_accessor_ = ProcessHelper::GetInstance().CreateGlobalStateAccessor( bootstrap_address, ConfigInternal::Instance().bootstrap_port); diff --git a/cpp/src/ray/test/cluster/cluster_mode_test.cc b/cpp/src/ray/test/cluster/cluster_mode_test.cc index 3bbd0809c393..2bfdaa200aba 100644 --- a/cpp/src/ray/test/cluster/cluster_mode_test.cc +++ b/cpp/src/ray/test/cluster/cluster_mode_test.cc @@ -71,7 +71,7 @@ TEST(RayClusterModeTest, FullTest) { auto port = absl::GetFlag(FLAGS_redis_port); std::string username = absl::GetFlag(FLAGS_redis_username); std::string password = absl::GetFlag(FLAGS_redis_password); - std::string local_ip = ray::internal::GetNodeIpAddress(); + std::string local_ip = ray::GetNodeIpAddressFromPerspective(); ray::internal::ProcessHelper::GetInstance().StartRayNode( local_ip, port, username, password); config.address = ray::BuildAddress(local_ip, port); diff --git a/cpp/src/ray/util/process_helper.cc b/cpp/src/ray/util/process_helper.cc index fbddae45f7b9..56c5c59acefe 100644 --- a/cpp/src/ray/util/process_helper.cc +++ b/cpp/src/ray/util/process_helper.cc @@ -83,7 +83,7 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback) if (ConfigInternal::Instance().worker_type == WorkerType::DRIVER && bootstrap_ip.empty()) { - bootstrap_ip = GetNodeIpAddress(); + bootstrap_ip = ray::GetNodeIpAddressFromPerspective(); StartRayNode(bootstrap_ip, bootstrap_port, ConfigInternal::Instance().redis_username, @@ -95,9 +95,9 @@ void ProcessHelper::RayStart(CoreWorkerOptions::TaskExecutionCallback callback) std::string node_ip = ConfigInternal::Instance().node_ip_address; if (node_ip.empty()) { if (!bootstrap_ip.empty()) { - node_ip = GetNodeIpAddress(bootstrap_address); + node_ip = ray::GetNodeIpAddressFromPerspective(bootstrap_address); } else { - node_ip = GetNodeIpAddress(); + node_ip = ray::GetNodeIpAddressFromPerspective(); } } diff --git a/cpp/src/ray/util/util.cc b/cpp/src/ray/util/util.cc index 003451b79ffd..e4f0e1113469 100644 --- a/cpp/src/ray/util/util.cc +++ b/cpp/src/ray/util/util.cc @@ -24,27 +24,6 @@ namespace ray { namespace internal { -std::string GetNodeIpAddress(const std::string &address) { - auto parts = ParseAddress(address); - RAY_CHECK(parts.has_value()); - try { - boost::asio::io_service netService; - boost::asio::ip::udp::resolver resolver(netService); - boost::asio::ip::udp::resolver::query query( - boost::asio::ip::udp::v4(), (*parts)[0], (*parts)[1]); - boost::asio::ip::udp::resolver::iterator endpoints = resolver.resolve(query); - boost::asio::ip::udp::endpoint ep = *endpoints; - boost::asio::ip::udp::socket socket(netService); - socket.connect(ep); - boost::asio::ip::address addr = socket.local_endpoint().address(); - return addr.to_string(); - } catch (std::exception &e) { - RAY_LOG(FATAL) << "Could not get the node IP address with socket. Exception: " - << e.what(); - return ""; - } -} - std::string getLibraryPathEnv() { auto path_env_p = std::getenv(kLibraryPathEnvName); if (path_env_p != nullptr && strlen(path_env_p) != 0) { diff --git a/cpp/src/ray/util/util.h b/cpp/src/ray/util/util.h index 7ca9bc2b17e4..0af45808d608 100644 --- a/cpp/src/ray/util/util.h +++ b/cpp/src/ray/util/util.h @@ -18,20 +18,6 @@ namespace ray { namespace internal { -/// IP address by which the local node can be reached *from* the `address`. -/// -/// The behavior should be the same as `node_ip_address_from_perspective` from Ray Python -/// code. See -/// https://stackoverflow.com/questions/2674314/get-local-ip-address-using-boost-asio. -/// -/// TODO(kfstorm): Make this function shared code and migrate Python & Java to use this -/// function. -/// -/// \param address The IP address and port of any known live service on the network -/// you care about. -/// \return The IP address by which the local node can be reached from the address. -std::string GetNodeIpAddress(const std::string &address = "8.8.8.8:53"); - std::string getLibraryPathEnv(); } // namespace internal diff --git a/doc/source/ray-core/examples/lm/ray_train.py b/doc/source/ray-core/examples/lm/ray_train.py index 537680711546..9a670f983309 100644 --- a/doc/source/ray-core/examples/lm/ray_train.py +++ b/doc/source/ray-core/examples/lm/ray_train.py @@ -11,7 +11,6 @@ from fairseq_cli.train import main import ray -from ray._common.network_utils import build_address _original_save_checkpoint = fairseq.checkpoint_utils.save_checkpoint @@ -113,7 +112,7 @@ def run_fault_tolerant_loop(): # fairseq distributed training. ip = ray.get(workers[0].get_node_ip.remote()) port = ray.get(workers[0].find_free_port.remote()) - address = f"tcp://{build_address(ip, port)}" + address = f"tcp://{ip}:{port}" # Start the remote processes, and check whether their are any process # fails. If so, restart all the processes. diff --git a/python/ray/_common/network_utils.py b/python/ray/_common/network_utils.py index 9664c53a94a2..6ead83373b3a 100644 --- a/python/ray/_common/network_utils.py +++ b/python/ray/_common/network_utils.py @@ -1,7 +1,13 @@ from typing import Optional, Tuple, Union +import socket +from functools import lru_cache -from ray._raylet import build_address as _build_address -from ray._raylet import parse_address as _parse_address +from ray._raylet import ( + build_address as _build_address, + is_ipv6_ip as _is_ipv6_ip, + node_ip_address_from_perspective as _node_ip_address_from_perspective, + parse_address as _parse_address, +) def parse_address(address: str) -> Optional[Tuple[str, str]]: @@ -29,6 +35,65 @@ def build_address(host: str, port: Union[int, str]) -> str: return _build_address(host, port) +def node_ip_address_from_perspective(address: str = "") -> str: + """IP address by which the local node can be reached *from* the `address`. + + If no address is given, defaults to public DNS servers for detection. For + performance, the result is cached when using the default address (empty string). + When a specific address is provided, detection is performed fresh every time. + + Args: + address: The IP address and port of any known live service on the + network you care about. + + Returns: + The IP address by which the local node can be reached from the address. + """ + return _node_ip_address_from_perspective(address) + + +def is_ipv6_ip(ip: str) -> bool: + """Check if an IP string is IPv6 format. + + Args: + ip: The IP address string to check (must be pure IP, no port). + + Returns: + True if the IP is IPv6, False if IPv4. + """ + return _is_ipv6_ip(ip) + + +@lru_cache(maxsize=1) +def get_localhost_address() -> str: + """Get localhost loopback address with IPv4/IPv6 support. + + Returns: + The localhost loopback IP address (matching node IP family or auto-detected). + """ + import ray._private.worker + + if ( + ray._private.worker._global_node is not None + and ray._private.worker._global_node.node_ip_address + ): + node_ip = ray._private.worker._global_node.node_ip_address + return "::1" if is_ipv6_ip(node_ip) else "127.0.0.1" + + # Try IPv4 first, then IPv6 localhost resolution + for family in [socket.AF_INET, socket.AF_INET6]: + try: + dns_result = socket.getaddrinfo( + "localhost", None, family, socket.SOCK_STREAM + ) + return dns_result[0][4][0] + except socket.gaierror: + continue + + # Final fallback to IPv4 loopback + return "127.0.0.1" + + def is_localhost(host: str) -> bool: """Check if the given host string represents a localhost address. @@ -39,3 +104,29 @@ def is_localhost(host: str) -> bool: True if the host is a localhost address, False otherwise. """ return host in ("localhost", "127.0.0.1", "::1") + + +def create_socket(socket_type: int = socket.SOCK_STREAM) -> socket.socket: + """Create a Python socket object with the appropriate family based on the node IP. + + This function automatically gets the node IP address and creates a socket + with the correct family (AF_INET for IPv4, AF_INET6 for IPv6). + + Args: + socket_type: The socket type (socket.SOCK_STREAM, socket.SOCK_DGRAM, etc.). + + Returns: + A Python socket.socket object configured for the node's IP family. + + Example: + # Create a TCP socket for the current node + sock = create_socket() + + # Create a UDP socket for the current node + sock = create_socket(socket.SOCK_DGRAM) + """ + node_ip = node_ip_address_from_perspective() + family = socket.AF_INET6 if is_ipv6_ip(node_ip) else socket.AF_INET + + # Create socket directly with Python socket API + return socket.socket(family, socket_type) diff --git a/python/ray/_private/node.py b/python/ray/_private/node.py index 74e13ae883f3..6738e51cbb12 100644 --- a/python/ray/_private/node.py +++ b/python/ray/_private/node.py @@ -22,7 +22,12 @@ import ray import ray._private.ray_constants as ray_constants import ray._private.services -from ray._common.network_utils import build_address, parse_address +from ray._common.network_utils import ( + build_address, + create_socket, + get_localhost_address, + parse_address, +) from ray._common.ray_constants import LOGGING_ROTATE_BACKUP_COUNT, LOGGING_ROTATE_BYTES from ray._common.utils import try_to_create_directory from ray._private.resource_and_label_spec import ResourceAndLabelSpec @@ -138,7 +143,7 @@ def __init__( ) self._resource_and_label_spec = None - self._localhost = socket.gethostbyname("localhost") + self._localhost = get_localhost_address() self._ray_params = ray_params self._config = ray_params._system_config or {} @@ -880,7 +885,7 @@ def _get_unused_port(self, allocated_ports=None): if allocated_ports is None: allocated_ports = set() - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s = create_socket(socket.SOCK_STREAM) s.bind(("", 0)) port = s.getsockname()[1] @@ -893,7 +898,7 @@ def _get_unused_port(self, allocated_ports=None): # This port is allocated for other usage already, # so we shouldn't use it even if it's not in use right now. continue - new_s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + new_s = create_socket(socket.SOCK_STREAM) try: new_s.bind(("", new_port)) except OSError: diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 94040866676d..a2f46cb1fc9d 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -21,7 +21,13 @@ # Ray modules import ray import ray._private.ray_constants as ray_constants -from ray._common.network_utils import build_address, parse_address +from ray._common.network_utils import ( + build_address, + create_socket, + get_localhost_address, + node_ip_address_from_perspective, + parse_address, +) from ray._private.ray_constants import RAY_NODE_IP_FILENAME from ray._private.resource_isolation_config import ResourceIsolationConfig from ray._raylet import GcsClient, GcsClientOptions @@ -615,52 +621,21 @@ def resolve_ip_for_localhost(host: str): return host -def node_ip_address_from_perspective(address: str): - """IP address by which the local node can be reached *from* the `address`. - - Args: - address: The IP address and port of any known live service on the - network you care about. - - Returns: - The IP address by which the local node can be reached from the address. - """ - ip_address, port = parse_address(address) - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - # This command will raise an exception if there is no internet - # connection. - s.connect((ip_address, int(port))) - node_ip_address = s.getsockname()[0] - except OSError as e: - node_ip_address = "127.0.0.1" - # [Errno 101] Network is unreachable - if e.errno == errno.ENETUNREACH: - try: - # try get node ip address from host name - host_name = socket.getfqdn(socket.gethostname()) - node_ip_address = socket.gethostbyname(host_name) - except Exception: - pass - finally: - s.close() - - return node_ip_address - - # NOTE: This API should not be used when you obtain the # IP address when ray.init is not called because # it cannot find the IP address if it is specified by # ray start --node-ip-address. You should instead use # get_cached_node_ip_address. -def get_node_ip_address(address="8.8.8.8:53"): +def get_node_ip_address(address=""): if ray._private.worker._global_node is not None: return ray._private.worker._global_node.node_ip_address + if not ray_constants.ENABLE_RAY_CLUSTER: # Use loopback IP as the local IP address to prevent bothersome # firewall popups on OSX and Windows. # https://github.com/ray-project/ray/issues/18730. - return "127.0.0.1" + return get_localhost_address() + return node_ip_address_from_perspective(address) @@ -1222,7 +1197,7 @@ def start_api_server( port = ray_constants.DEFAULT_DASHBOARD_PORT else: port_retries = 0 - port_test_socket = socket.socket() + port_test_socket = create_socket() port_test_socket.setsockopt( socket.SOL_SOCKET, socket.SO_REUSEADDR, diff --git a/python/ray/_private/test_utils.py b/python/ray/_private/test_utils.py index dc33017aea0d..66562ac4a016 100644 --- a/python/ray/_private/test_utils.py +++ b/python/ray/_private/test_utils.py @@ -30,7 +30,7 @@ import ray._private.services import ray._private.services as services import ray._private.utils -from ray._common.network_utils import build_address, parse_address +from ray._common.network_utils import build_address, create_socket, parse_address from ray._common.test_utils import wait_for_condition from ray._common.utils import get_or_create_event_loop from ray._private import ( @@ -776,7 +776,7 @@ def wait_until_server_available(address, timeout_ms=5000, retry_interval_ms=100) time_elapsed = 0 start = time.time() while time_elapsed <= timeout_ms: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s = create_socket(socket.SOCK_STREAM) s.settimeout(1) try: s.connect((ip, port)) @@ -1753,7 +1753,7 @@ def job_hook(**kwargs): def find_free_port() -> int: - sock = socket.socket() + sock = create_socket() sock.bind(("", 0)) port = sock.getsockname()[1] sock.close() @@ -1884,7 +1884,7 @@ def get_current_unused_port(): A port number that is not currently in use. (Note that this port might become used by the time you try to bind to it.) """ - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock = create_socket(socket.SOCK_STREAM) # Bind the socket to a local address with a random port number sock.bind(("localhost", 0)) diff --git a/python/ray/_private/tls_utils.py b/python/ray/_private/tls_utils.py index 22b6f050ee60..ad19d2f4861d 100644 --- a/python/ray/_private/tls_utils.py +++ b/python/ray/_private/tls_utils.py @@ -2,6 +2,11 @@ import os import socket +from ray._common.network_utils import ( + get_localhost_address, + node_ip_address_from_perspective, +) + def generate_self_signed_tls_certs(): """Create self-signed key/cert pair for testing. @@ -29,21 +34,13 @@ def generate_self_signed_tls_certs(): ).decode() ray_interal = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, "ray-internal")]) - # This is the same logic used by the GCS server to acquire a - # private/interal IP address to listen on. If we just use localhost + - # 127.0.0.1 then we won't be able to connect to the GCS and will get - # an error like "No match found for server name: 192.168.X.Y" - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - s.connect(("8.8.8.8", 80)) - private_ip_address = s.getsockname()[0] - s.close() altnames = x509.SubjectAlternativeName( [ x509.DNSName( socket.gethostbyname(socket.gethostname()) - ), # Probably 127.0.0.1 - x509.DNSName("127.0.0.1"), - x509.DNSName(private_ip_address), # 192.168.*.* + ), # Probably 127.0.0.1 or ::1 + x509.DNSName(get_localhost_address()), + x509.DNSName(node_ip_address_from_perspective()), x509.DNSName("localhost"), ] ) diff --git a/python/ray/air/_internal/util.py b/python/ray/air/_internal/util.py index ddceba726ee4..c3a10e8df70b 100644 --- a/python/ray/air/_internal/util.py +++ b/python/ray/air/_internal/util.py @@ -10,12 +10,13 @@ import numpy as np from ray.air.constants import _ERROR_REPORT_TIMEOUT +from ray._common.network_utils import create_socket logger = logging.getLogger(__name__) def find_free_port(): - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + with closing(create_socket(socket.SOCK_STREAM)) as s: s.bind(("", 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return s.getsockname()[1] diff --git a/python/ray/dashboard/tests/test_dashboard.py b/python/ray/dashboard/tests/test_dashboard.py index 1f7036651465..d96f2a0c330e 100644 --- a/python/ray/dashboard/tests/test_dashboard.py +++ b/python/ray/dashboard/tests/test_dashboard.py @@ -15,6 +15,7 @@ import pytest from ray._common.test_utils import wait_for_condition +from ray._common.network_utils import create_socket import requests from click.testing import CliRunner from requests.exceptions import ConnectionError, HTTPError @@ -1079,7 +1080,7 @@ def test_agent_port_conflict(shutdown_only): ray.shutdown() # ocuppy the port with a socket. - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s = create_socket(socket.SOCK_STREAM) wait_for_condition( lambda: s.connect_ex( diff --git a/python/ray/experimental/collective/util.py b/python/ray/experimental/collective/util.py index 3539e5780766..d656f650fc1a 100644 --- a/python/ray/experimental/collective/util.py +++ b/python/ray/experimental/collective/util.py @@ -3,6 +3,7 @@ import socket import ray +from ray._common.network_utils import create_socket from ray.util.collective.types import Backend from ray.experimental.collective.tensor_transport_manager import TensorTransportManager @@ -63,7 +64,7 @@ def device_match_transport(device: "torch.device", tensor_transport: Backend) -> def find_free_port() -> int: - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + with closing(create_socket(socket.SOCK_STREAM)) as s: s.bind(("", 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) return s.getsockname()[1] diff --git a/python/ray/includes/network_util.pxd b/python/ray/includes/network_util.pxd index df4f8cb9d18d..351bd5b65a70 100644 --- a/python/ray/includes/network_util.pxd +++ b/python/ray/includes/network_util.pxd @@ -1,9 +1,12 @@ from libc.stddef cimport size_t from libcpp.string cimport string +from libcpp cimport bool from ray.includes.array cimport array_string_2 from ray.includes.optional cimport optional cdef extern from "ray/util/network_util.h" namespace "ray": - optional[array_string_2] ParseAddress(const string &address) - string BuildAddress(const string &host, const string &port) string BuildAddress(const string &host, int port) + string BuildAddress(const string &host, const string &port) + optional[array_string_2] ParseAddress(const string &address) + string GetNodeIpAddressFromPerspective(const string &address) + bool IsIPv6IP(const string &ip) diff --git a/python/ray/includes/network_util.pxi b/python/ray/includes/network_util.pxi index 27e330eeace4..c5d7a5ea4625 100644 --- a/python/ray/includes/network_util.pxi +++ b/python/ray/includes/network_util.pxi @@ -1,11 +1,14 @@ from ray.includes.network_util cimport ( BuildAddress, ParseAddress, + GetNodeIpAddressFromPerspective, + IsIPv6IP, array_string_2, optional, ) from libcpp.string cimport string from typing import Optional, Tuple, Union +import socket def parse_address(address: str) -> Optional[Tuple[str, str]]: """Parse a network address string into host and port. @@ -45,3 +48,35 @@ def build_address(host: str, port: Union[int, str]) -> str: result = BuildAddress(host_c, port_c) return result.decode('utf-8') + + +def node_ip_address_from_perspective(address: str = "") -> str: + """IP address by which the local node can be reached *from* the `address`. + + If no address is given, defaults to public DNS servers for detection. For + performance, the result is cached when using the default address (empty string). + When a specific address is provided, detection is performed fresh every time. + + Args: + address: The IP address and port of any known live service on the + network you care about. + + Returns: + The IP address by which the local node can be reached from the address. + """ + cdef string address_c = address.encode('utf-8') + cdef string result = GetNodeIpAddressFromPerspective(address_c) + return result.decode('utf-8') + + +def is_ipv6_ip(ip: str) -> bool: + """Check if an IP string is IPv6 format. + + Args: + ip: The IP address string to check (must be pure IP, no port). + + Returns: + True if the IP is IPv6, False if IPv4. + """ + cdef string ip_c = ip.encode('utf-8') + return IsIPv6IP(ip_c) diff --git a/python/ray/serve/_private/http_util.py b/python/ray/serve/_private/http_util.py index 93880b65f77b..691c1ed485ce 100644 --- a/python/ray/serve/_private/http_util.py +++ b/python/ray/serve/_private/http_util.py @@ -30,6 +30,7 @@ from uvicorn.config import Config from uvicorn.lifespan.on import LifespanOn +from ray._common.network_utils import create_socket from ray._common.pydantic_compat import IS_PYDANTIC_2 from ray.exceptions import RayActorError, RayTaskError from ray.serve._private.common import RequestMetadata @@ -698,7 +699,7 @@ async def start_asgi_http_server( """ app = _apply_middlewares(app, http_options.middlewares) - sock = socket.socket() + sock = create_socket() if enable_so_reuseport: set_socket_reuse_port(sock) diff --git a/python/ray/serve/tests/test_standalone.py b/python/ray/serve/tests/test_standalone.py index 2ddb36190acc..72031ef8d4b8 100644 --- a/python/ray/serve/tests/test_standalone.py +++ b/python/ray/serve/tests/test_standalone.py @@ -14,6 +14,7 @@ import ray from ray import serve +from ray._common.network_utils import create_socket from ray._common.test_utils import wait_for_condition from ray._private.test_utils import ( run_string_as_driver, @@ -393,7 +394,7 @@ def connect_in_deployment(*args): def test_set_socket_reuse_port(): - sock = socket.socket() + sock = create_socket() if hasattr(socket, "SO_REUSEPORT"): # If the flag exists, we should be able to to use it assert set_socket_reuse_port(sock) @@ -407,7 +408,7 @@ def test_set_socket_reuse_port(): def _reuse_port_is_available(): - sock = socket.socket() + sock = create_socket() return set_socket_reuse_port(sock) diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index 386f9f63baa0..f0e0dfbec519 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -22,9 +22,8 @@ import ray import ray._private.ray_constants as ray_constants -from ray._common.network_utils import build_address +from ray._common.network_utils import build_address, create_socket from ray._common.test_utils import wait_for_condition -from ray._private.conftest_utils import set_override_dashboard_url # noqa: F401 from ray._private.runtime_env import virtualenv_utils from ray._private.test_utils import ( RayletKiller, @@ -259,7 +258,7 @@ def _find_available_ports(start: int, end: int, *, num: int = 1) -> List[int]: ports = [] for _ in range(num): random_port = 0 - with socket.socket() as s: + with create_socket() as s: s.bind(("", 0)) random_port = s.getsockname()[1] if random_port >= start and random_port <= end and random_port not in ports: @@ -270,7 +269,7 @@ def _find_available_ports(start: int, end: int, *, num: int = 1) -> List[int]: if port in ports: continue try: - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + with create_socket(socket.SOCK_STREAM) as s: s.bind(("", port)) ports.append(port) break @@ -1220,7 +1219,7 @@ def set_runtime_env_retry_times(request): def listen_port(request): port = getattr(request, "param", 0) try: - sock = socket.socket() + sock = create_socket() if hasattr(socket, "SO_REUSEPORT"): sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 0) diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index 09d0743b6a0d..1b73dd507fd2 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -13,6 +13,7 @@ import ray import ray._private.ray_constants import ray._private.utils +from ray._common.network_utils import create_socket from ray._private.test_utils import check_call_ray, wait_for_num_actors import psutil @@ -299,7 +300,7 @@ def test_raylet_is_robust_to_random_messages(ray_start_regular): assert node_manager_address assert node_manager_port # Try to bring down the node manager: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s = create_socket(socket.SOCK_STREAM) s.connect((node_manager_address, node_manager_port)) s.send(1000 * b"asdf") diff --git a/python/ray/util/client/server/proxier.py b/python/ray/util/client/server/proxier.py index 8be870c566d7..ebd8365d0742 100644 --- a/python/ray/util/client/server/proxier.py +++ b/python/ray/util/client/server/proxier.py @@ -18,6 +18,7 @@ import psutil import ray +from ray._common.network_utils import create_socket import ray.core.generated.ray_client_pb2 as ray_client_pb2 import ray.core.generated.ray_client_pb2_grpc as ray_client_pb2_grpc import ray.core.generated.runtime_env_agent_pb2 as runtime_env_agent_pb2 @@ -144,7 +145,7 @@ def _get_unused_port(self) -> int: num_ports = len(self._free_ports) for _ in range(num_ports): port = self._free_ports.pop(0) - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s = create_socket(socket.SOCK_STREAM) try: s.bind(("", port)) except OSError: diff --git a/python/ray/util/rpdb.py b/python/ray/util/rpdb.py index ae102c96120b..084c03d9cecd 100644 --- a/python/ray/util/rpdb.py +++ b/python/ray/util/rpdb.py @@ -3,7 +3,7 @@ # (BSD 2-Clause "Simplified" License) import errno -from ray._common.network_utils import build_address +from ray._common.network_utils import build_address, create_socket import inspect import json import logging @@ -103,7 +103,7 @@ def __init__( self._breakpoint_uuid = breakpoint_uuid self._quiet = quiet self._patch_stdstreams = patch_stdstreams - self._listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self._listen_socket = create_socket(socket.SOCK_STREAM) self._listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True) self._listen_socket.bind((host, port)) self._ip_address = ip_address @@ -345,7 +345,8 @@ def _post_mortem(): def _connect_pdb_client(host, port): if sys.platform == "win32": import msvcrt - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + s = create_socket(socket.SOCK_STREAM) s.connect((host, port)) while True: diff --git a/python/ray/util/spark/utils.py b/python/ray/util/spark/utils.py index 65bfa4a52f2b..0214fbecf268 100644 --- a/python/ray/util/spark/utils.py +++ b/python/ray/util/spark/utils.py @@ -8,6 +8,7 @@ import shutil import time +from ray._common.network_utils import create_socket _logger = logging.getLogger("ray.util.spark.utils") @@ -100,7 +101,7 @@ def is_port_in_use(host, port): import socket from contextlib import closing - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock: + with closing(create_socket(socket.SOCK_STREAM)) as sock: return sock.connect_ex((host, port)) == 0 diff --git a/release/air_tests/air_benchmarks/workloads/benchmark_util.py b/release/air_tests/air_benchmarks/workloads/benchmark_util.py index 5fbaaf8c285a..a6e33f0b4721 100644 --- a/release/air_tests/air_benchmarks/workloads/benchmark_util.py +++ b/release/air_tests/air_benchmarks/workloads/benchmark_util.py @@ -5,6 +5,7 @@ from contextlib import closing from pathlib import Path from ray.air.util.node import _force_on_node +from ray._common.network_utils import create_socket import ray from typing import List, Dict, Union, Callable @@ -108,7 +109,7 @@ def get_ip_port_actors(actors: List[ray.actor.ActorHandle]) -> List[str]: def get_ip_port(): ip = ray.util.get_node_ip_address() - with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: + with closing(create_socket(socket.SOCK_STREAM)) as s: s.bind(("localhost", 0)) s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) port = s.getsockname()[1] diff --git a/rllib/env/external/env_runner_server_for_external_inference.py b/rllib/env/external/env_runner_server_for_external_inference.py index 36bb2723c27b..fc6c7894c2e0 100644 --- a/rllib/env/external/env_runner_server_for_external_inference.py +++ b/rllib/env/external/env_runner_server_for_external_inference.py @@ -5,6 +5,7 @@ import time from typing import Collection, DefaultDict, List, Optional, Union +from ray._common.network_utils import create_socket from ray.rllib.core import ( COMPONENT_RL_MODULE, DEFAULT_AGENT_ID, @@ -285,7 +286,7 @@ def _recycle_sockets(self, sleep: float = 0.0): time.sleep(sleep) # Start listening on the configured port. - self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.server_socket = create_socket(socket.SOCK_STREAM) # Allow reuse of the address. self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.server_socket.bind((self.host, self.port)) diff --git a/rllib/examples/envs/classes/utils/dummy_external_client.py b/rllib/examples/envs/classes/utils/dummy_external_client.py index 8cc1bf0af6f7..79f6ea451736 100644 --- a/rllib/examples/envs/classes/utils/dummy_external_client.py +++ b/rllib/examples/envs/classes/utils/dummy_external_client.py @@ -5,6 +5,7 @@ import gymnasium as gym import numpy as np +from ray._common.network_utils import create_socket from ray.rllib.core import ( Columns, COMPONENT_RL_MODULE, @@ -32,7 +33,7 @@ def _set_state(msg_body, rl_module): while True: try: print(f"Trying to connect to localhost:{port} ...") - sock_ = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock_ = create_socket(socket.SOCK_STREAM) sock_.connect(("localhost", port)) break except ConnectionRefusedError: diff --git a/src/ray/common/BUILD.bazel b/src/ray/common/BUILD.bazel index 63c10cc49e43..eaf4555e2f12 100644 --- a/src/ray/common/BUILD.bazel +++ b/src/ray/common/BUILD.bazel @@ -10,6 +10,11 @@ ray_cc_library( hdrs = ["constants.h"], ) +ray_cc_library( + name = "node_config", + hdrs = ["node_config.h"], +) + ray_cc_library( name = "test_utils", srcs = ["test_utils.cc"], diff --git a/src/ray/common/node_config.h b/src/ray/common/node_config.h new file mode 100644 index 000000000000..2c3598c16e10 --- /dev/null +++ b/src/ray/common/node_config.h @@ -0,0 +1,43 @@ +// Copyright 2025 The Ray Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace ray { + +// Global node configuration that can be accessed from anywhere in the Ray core. +class NodeConfig { + public: + static NodeConfig &Instance() { + static NodeConfig instance; + return instance; + } + + void SetNodeIpAddress(const std::string &ip) { node_ip_address_ = ip; } + + const std::string &GetNodeIpAddress() const { return node_ip_address_; } + + bool HasNodeIpAddress() const { return !node_ip_address_.empty(); } + + private: + NodeConfig() = default; + NodeConfig(const NodeConfig &) = delete; + NodeConfig &operator=(const NodeConfig &) = delete; + + std::string node_ip_address_; +}; + +} // namespace ray diff --git a/src/ray/gcs/gcs_client/tests/gcs_client_reconnection_test.cc b/src/ray/gcs/gcs_client/tests/gcs_client_reconnection_test.cc index 1167df2f0c53..df2e5df1bc7c 100644 --- a/src/ray/gcs/gcs_client/tests/gcs_client_reconnection_test.cc +++ b/src/ray/gcs/gcs_client/tests/gcs_client_reconnection_test.cc @@ -133,8 +133,10 @@ class GcsClientReconnectionTest : public ::testing::Test { unsigned short GetFreePort() { using namespace boost::asio; // NOLINT io_service service; - ip::tcp::acceptor acceptor(service, ip::tcp::endpoint(ip::tcp::v4(), 0)); - unsigned short port = acceptor.local_endpoint().port(); + + auto socket = ray::CreateTcpSocket(service); + socket->bind(boost::asio::ip::tcp::endpoint(socket->local_endpoint().protocol(), 0)); + unsigned short port = socket->local_endpoint().port(); return port; } diff --git a/src/ray/gcs/gcs_server/gcs_server_main.cc b/src/ray/gcs/gcs_server/gcs_server_main.cc index 9cf428dbbae7..f904ae6354db 100644 --- a/src/ray/gcs/gcs_server/gcs_server_main.cc +++ b/src/ray/gcs/gcs_server/gcs_server_main.cc @@ -19,6 +19,7 @@ #include #include "gflags/gflags.h" +#include "ray/common/node_config.h" #include "ray/common/ray_config.h" #include "ray/gcs/gcs_server/gcs_server.h" #include "ray/gcs/store_client/redis_store_client.h" @@ -107,6 +108,8 @@ int main(int argc, char *argv[]) { ray::asio::testing::Init(); ray::rpc::testing::Init(); + ray::NodeConfig::Instance().SetNodeIpAddress(node_ip_address); + // IO Service for main loop. SetThreadName("gcs_server"); instrumented_io_context main_service( diff --git a/src/ray/gcs/gcs_server/tests/gcs_health_check_manager_test.cc b/src/ray/gcs/gcs_server/tests/gcs_health_check_manager_test.cc index 8ca66a12522f..0298075374e1 100644 --- a/src/ray/gcs/gcs_server/tests/gcs_health_check_manager_test.cc +++ b/src/ray/gcs/gcs_server/tests/gcs_health_check_manager_test.cc @@ -39,15 +39,11 @@ using namespace boost::asio::ip; // NOLINT int GetFreePort() { io_context io_service; - tcp::acceptor acceptor(io_service); - tcp::endpoint endpoint; - - // try to bind to port 0 to find a free port - acceptor.open(tcp::v4()); - acceptor.bind(tcp::endpoint(tcp::v4(), 0)); - endpoint = acceptor.local_endpoint(); - auto port = endpoint.port(); - acceptor.close(); + + auto socket = ray::CreateTcpSocket(io_service); + socket->bind(tcp::endpoint(socket->local_endpoint().protocol(), 0)); + auto port = socket->local_endpoint().port(); + socket->close(); return port; } diff --git a/src/ray/gcs/store_client/redis_async_context.cc b/src/ray/gcs/store_client/redis_async_context.cc index d022009b9a10..25830f225411 100644 --- a/src/ray/gcs/store_client/redis_async_context.cc +++ b/src/ray/gcs/store_client/redis_async_context.cc @@ -18,6 +18,11 @@ #include #include +#ifndef _WIN32 +#include +#include +#endif + extern "C" { #include "hiredis/async.h" #include "hiredis/hiredis.h" @@ -50,7 +55,24 @@ RedisAsyncContext::RedisAsyncContext( // hiredis is already connected // use the existing native socket - socket_.assign(boost::asio::ip::tcp::v4(), handle); +#ifdef _WIN32 + boost::asio::ip::tcp protocol = (pi.iAddressFamily == AF_INET6) + ? boost::asio::ip::tcp::v6() + : boost::asio::ip::tcp::v4(); + socket_.assign(protocol, handle); +#else + struct sockaddr_storage addr; + socklen_t addr_len = sizeof(addr); + if (getsockname(c->fd, reinterpret_cast(&addr), &addr_len) == 0) { + boost::asio::ip::tcp protocol = (addr.ss_family == AF_INET6) + ? boost::asio::ip::tcp::v6() + : boost::asio::ip::tcp::v4(); + socket_.assign(protocol, handle); + } else { + // Fallback to IPv4 + socket_.assign(boost::asio::ip::tcp::v4(), handle); + } +#endif // register hooks with the hiredis async context redis_async_context_->ev.addRead = CallbackAddRead; diff --git a/src/ray/raylet/main.cc b/src/ray/raylet/main.cc index 85f29663e916..3d8287da77d9 100644 --- a/src/ray/raylet/main.cc +++ b/src/ray/raylet/main.cc @@ -28,6 +28,7 @@ #include "ray/common/constants.h" #include "ray/common/id.h" #include "ray/common/lease/lease.h" +#include "ray/common/node_config.h" #include "ray/common/ray_config.h" #include "ray/common/status.h" #include "ray/gcs/gcs_client/gcs_client.h" @@ -228,6 +229,8 @@ int main(int argc, char *argv[]) { RAY_CHECK_NE(FLAGS_cluster_id, "") << "Expected cluster ID."; ray::ClusterID cluster_id = ray::ClusterID::FromHex(FLAGS_cluster_id); RAY_LOG(INFO) << "Setting cluster ID to: " << cluster_id; + + ray::NodeConfig::Instance().SetNodeIpAddress(node_ip_address); gflags::ShutDownCommandLineFlags(); // Get cgroup setup instance and perform necessary resource setup. diff --git a/src/ray/raylet/tests/BUILD.bazel b/src/ray/raylet/tests/BUILD.bazel index 25321d7d3fab..c70b7ef3e2d0 100644 --- a/src/ray/raylet/tests/BUILD.bazel +++ b/src/ray/raylet/tests/BUILD.bazel @@ -86,6 +86,7 @@ ray_cc_test( "//src/ray/common:id", "//src/ray/protobuf:runtime_env_agent_cc_proto", "//src/ray/raylet:runtime_env_agent_client", + "//src/ray/util:network_util", "@boost//:asio", "@boost//:beast", "@boost//:thread", diff --git a/src/ray/raylet/tests/runtime_env_agent_client_test.cc b/src/ray/raylet/tests/runtime_env_agent_client_test.cc index e51d1d6c757f..6ed895d207d2 100644 --- a/src/ray/raylet/tests/runtime_env_agent_client_test.cc +++ b/src/ray/raylet/tests/runtime_env_agent_client_test.cc @@ -30,6 +30,7 @@ #include "gtest/gtest.h" #include "ray/common/asio/asio_util.h" #include "ray/common/id.h" +#include "ray/util/network_util.h" #include "src/ray/protobuf/runtime_env_agent.pb.h" namespace ray { @@ -42,15 +43,11 @@ using boost::asio::ip::port_type; port_type GetFreePort() { boost::asio::io_context io_service; - boost::asio::ip::tcp::acceptor acceptor(io_service); - boost::asio::ip::tcp::endpoint endpoint; - - // try to bind to port 0 to find a free port - acceptor.open(tcp::v4()); - acceptor.bind(tcp::endpoint(tcp::v4(), 0)); - endpoint = acceptor.local_endpoint(); - auto port = endpoint.port(); - acceptor.close(); + + auto socket = ray::CreateTcpSocket(io_service); + socket->bind(tcp::endpoint(socket->local_endpoint().protocol(), 0)); + auto port = socket->local_endpoint().port(); + socket->close(); return port; } diff --git a/src/ray/util/BUILD.bazel b/src/ray/util/BUILD.bazel index 98e0c7a74860..5d0a2706d11d 100644 --- a/src/ray/util/BUILD.bazel +++ b/src/ray/util/BUILD.bazel @@ -216,7 +216,9 @@ ray_cc_library( deps = [ ":filesystem", ":string_utils", + "//src/ray/common:node_config", "@boost//:asio", + "@com_github_gflags_gflags//:gflags", "@com_google_absl//absl/container:flat_hash_map", "@com_google_absl//absl/strings", "@com_google_absl//absl/strings:str_format", diff --git a/src/ray/util/network_util.cc b/src/ray/util/network_util.cc index 2364a42265c4..004ad248a60e 100644 --- a/src/ray/util/network_util.cc +++ b/src/ray/util/network_util.cc @@ -18,17 +18,28 @@ #include #include #ifndef _WIN32 +#include +#include +#include + #include +#else +#include +#include #endif #include +#include #include #include #include #include +#include #include "absl/strings/match.h" #include "absl/strings/str_format.h" +#include "ray/common/node_config.h" #include "ray/util/filesystem.h" +#include "ray/util/logging.h" #include "ray/util/string_utils.h" using boost::asio::io_context; @@ -36,8 +47,10 @@ using boost::asio::ip::tcp; namespace ray { +bool IsIPv6IP(const std::string &ip) { return ip.find(':') != std::string::npos; } + std::string BuildAddress(const std::string &host, const std::string &port) { - if (host.find(':') != std::string::npos) { + if (IsIPv6IP(host)) { // IPv6 address return absl::StrFormat("[%s]:%s", host, port); } else { @@ -59,7 +72,7 @@ std::optional> ParseAddress(const std::string &addres std::string host = address.substr(0, pos); std::string port = address.substr(pos + 1); - if (host.find(':') != std::string::npos) { + if (IsIPv6IP(host)) { if (host.size() >= 2 && host.front() == '[' && host.back() == ']') { host = host.substr(1, host.size() - 2); } else { @@ -74,14 +87,24 @@ std::optional> ParseAddress(const std::string &addres bool CheckPortFree(int port) { io_context io_service; - tcp::socket socket(io_service); - socket.open(tcp::v4()); + auto socket = CreateTcpSocket(io_service); boost::system::error_code ec; - socket.bind(tcp::endpoint(tcp::v4(), port), ec); - socket.close(); + + bool is_ipv4 = (socket->local_endpoint(ec).protocol() == tcp::v4()); + if (ec) { + // If we can't get the protocol info, fallback to IPv4 + is_ipv4 = true; + } + + if (is_ipv4) { + socket->bind(tcp::endpoint(tcp::v4(), port), ec); + } else { + socket->bind(tcp::endpoint(tcp::v6(), port), ec); + } + + socket->close(); return !ec.failed(); } - std::string EndpointToUrl( const boost::asio::generic::basic_endpoint &ep, bool include_scheme) { @@ -203,4 +226,91 @@ std::shared_ptr> ParseURL(std::str return result; } +std::string GetNodeIpAddressFromPerspective(const std::string &address) { + static const auto detect_ip = [](const std::string &target_address) -> std::string { + std::vector> test_addresses; + if (!target_address.empty()) { + test_addresses = {{target_address, boost::asio::ip::udp::v4()}, + {target_address, boost::asio::ip::udp::v6()}}; + } else { + test_addresses = {{"8.8.8.8:53", boost::asio::ip::udp::v4()}, + {"[2001:4860:4860::8888]:53", boost::asio::ip::udp::v6()}}; + } + + // Try socket-based detection with IPv4/IPv6 + for (const auto &[addr_str, protocol] : test_addresses) { + auto parts = ParseAddress(addr_str); + if (!parts.has_value()) continue; + + try { + boost::asio::io_service net_service; + boost::asio::ip::udp::resolver resolver(net_service); + boost::asio::ip::udp::resolver::query query(protocol, (*parts)[0], (*parts)[1]); + auto endpoints = resolver.resolve(query); + boost::asio::ip::udp::endpoint ep = *endpoints; + boost::asio::ip::udp::socket socket(net_service, protocol); + socket.connect(ep); + boost::asio::ip::address local_addr = socket.local_endpoint().address(); + return local_addr.to_string(); + } catch (const std::exception &) { + // Continue to next address/protocol combination + continue; + } + } + + // Fallback to hostname resolution + try { + boost::asio::io_service net_service; + boost::asio::ip::tcp::resolver resolver(net_service); + boost::asio::ip::tcp::resolver::query query(boost::asio::ip::host_name(), ""); + auto endpoints = resolver.resolve(query); + + std::string ipv6_candidate; + for (const auto &endpoint : endpoints) { + if (endpoint.endpoint().address().is_v4()) { + return endpoint.endpoint().address().to_string(); + } else if (endpoint.endpoint().address().is_v6() && ipv6_candidate.empty()) { + ipv6_candidate = endpoint.endpoint().address().to_string(); + } + } + + if (!ipv6_candidate.empty()) { + return ipv6_candidate; + } + } catch (const std::exception &) { + // Hostname resolution failed + } + + // Final fallback + return "127.0.0.1"; + }; + + // Only use cache for default address (empty string) + if (address.empty()) { + static const std::string cached_ip = detect_ip(""); + return cached_ip; + } else { + return detect_ip(address); + } +} + +std::unique_ptr CreateTcpSocket( + boost::asio::io_context &io_context) { + std::string node_ip; + + if (ray::NodeConfig::Instance().HasNodeIpAddress()) { + node_ip = ray::NodeConfig::Instance().GetNodeIpAddress(); + } else { + node_ip = GetNodeIpAddressFromPerspective(); + } + + if (IsIPv6IP(node_ip)) { + return std::make_unique(io_context, + boost::asio::ip::tcp::v6()); + } else { + return std::make_unique(io_context, + boost::asio::ip::tcp::v4()); + } +} + } // namespace ray diff --git a/src/ray/util/network_util.h b/src/ray/util/network_util.h index c10a062b6089..2cf70a6cc470 100644 --- a/src/ray/util/network_util.h +++ b/src/ray/util/network_util.h @@ -15,6 +15,8 @@ #pragma once #include +#include +#include #include #include #include @@ -49,6 +51,28 @@ std::string BuildAddress(const std::string &host, int port); /// \return Optional array with [host, port] if port found, nullopt if no colon separator. std::optional> ParseAddress(const std::string &address); +/// IP address by which the local node can be reached *from* the `address`. +/// If no address is given, defaults to public DNS servers for detection. For +/// performance, the result is cached when using the default address (empty string). +/// When a specific address is provided, detection is performed fresh every time. +/// \param address The IP address and port of any known live service on the +/// network you care about. +/// \return The IP address by which the local node can be reached from the address. +std::string GetNodeIpAddressFromPerspective(const std::string &address = ""); + +/// Check if an IP string is IPv6 format (contains colons). +/// \param ip The IP address string to check (must be pure IP, no port). +/// \return true if the IP is IPv6, false if IPv4. +bool IsIPv6IP(const std::string &ip); + +/// Create a TCP socket with the appropriate family (IPv4 or IPv6) based on the node IP +/// address. This function automatically detects the node IP using +/// GetNodeIpAddressFromPerspective and creates the corresponding TCP socket. +/// \param io_context The IO context to use for the socket. +/// \return A TCP socket configured for the node's IP family. +std::unique_ptr CreateTcpSocket( + boost::asio::io_context &io_context); + /// Check whether the given port is available, via attempt to bind a socket to the port. /// Notice, the check could be non-authentic if there're concurrent port assignments. /// \param port The port number to check. From 7be19c03dae10e5860c8e710860bebe2eaa92fd2 Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Thu, 4 Sep 2025 16:13:23 +0000 Subject: [PATCH 2/4] clea up comment Signed-off-by: Yicheng-Lu-llll --- python/ray/_common/network_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/ray/_common/network_utils.py b/python/ray/_common/network_utils.py index 6ead83373b3a..51ecf177d9e6 100644 --- a/python/ray/_common/network_utils.py +++ b/python/ray/_common/network_utils.py @@ -128,5 +128,4 @@ def create_socket(socket_type: int = socket.SOCK_STREAM) -> socket.socket: node_ip = node_ip_address_from_perspective() family = socket.AF_INET6 if is_ipv6_ip(node_ip) else socket.AF_INET - # Create socket directly with Python socket API return socket.socket(family, socket_type) From f0472e2dc5935f23e476a8d63ac5695321a3b74a Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Fri, 5 Sep 2025 02:06:20 +0000 Subject: [PATCH 3/4] Fix CI test failures that are not related to this PR Signed-off-by: Yicheng-Lu-llll --- python/ray/tests/conftest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/ray/tests/conftest.py b/python/ray/tests/conftest.py index f0e0dfbec519..5c9ed59a3d4c 100644 --- a/python/ray/tests/conftest.py +++ b/python/ray/tests/conftest.py @@ -24,6 +24,7 @@ import ray._private.ray_constants as ray_constants from ray._common.network_utils import build_address, create_socket from ray._common.test_utils import wait_for_condition +from ray._private.conftest_utils import set_override_dashboard_url # noqa: F401 from ray._private.runtime_env import virtualenv_utils from ray._private.test_utils import ( RayletKiller, From ec48d95bde8a694b44203ced2a2d93bf9f24680b Mon Sep 17 00:00:00 2001 From: Yicheng-Lu-llll Date: Fri, 5 Sep 2025 05:44:23 +0000 Subject: [PATCH 4/4] remove choose ip family based on node ip for get_localhost_address Signed-off-by: Yicheng-Lu-llll --- python/ray/_common/network_utils.py | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) diff --git a/python/ray/_common/network_utils.py b/python/ray/_common/network_utils.py index 51ecf177d9e6..43d0579cc0d3 100644 --- a/python/ray/_common/network_utils.py +++ b/python/ray/_common/network_utils.py @@ -69,17 +69,8 @@ def get_localhost_address() -> str: """Get localhost loopback address with IPv4/IPv6 support. Returns: - The localhost loopback IP address (matching node IP family or auto-detected). + The localhost loopback IP address. """ - import ray._private.worker - - if ( - ray._private.worker._global_node is not None - and ray._private.worker._global_node.node_ip_address - ): - node_ip = ray._private.worker._global_node.node_ip_address - return "::1" if is_ipv6_ip(node_ip) else "127.0.0.1" - # Try IPv4 first, then IPv6 localhost resolution for family in [socket.AF_INET, socket.AF_INET6]: try: