Skip to content
31 changes: 16 additions & 15 deletions python/ray/serve/_private/logging_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ServeContextFilter(logging.Filter):
def filter(self, record):
if should_skip_context_filter(record):
return True

request_context = ray.serve.context._get_serve_request_context()
if request_context.route:
setattr(record, SERVE_LOG_ROUTE, request_context.route)
Expand Down Expand Up @@ -369,44 +370,44 @@ def configure_component_logger(
maxBytes=max_bytes,
backupCount=backup_count,
)
# Create a memory handler that buffers log records and flushes to file handler
# Buffer capacity: buffer_size records
# Flush triggers: buffer full, ERROR messages, or explicit flush
memory_handler = logging.handlers.MemoryHandler(
capacity=buffer_size,
target=file_handler,
flushLevel=logging.ERROR, # Auto-flush on ERROR/CRITICAL
)
if RAY_SERVE_ENABLE_JSON_LOGGING:
logger.warning(
"'RAY_SERVE_ENABLE_JSON_LOGGING' is deprecated, please use "
"'LoggingConfig' to enable json format."
)
# Add filters directly to the memory handler effective for both buffered and non buffered cases
if RAY_SERVE_ENABLE_JSON_LOGGING or logging_config.encoding == EncodingType.JSON:
file_handler.addFilter(ServeCoreContextFilter())
file_handler.addFilter(ServeContextFilter())
file_handler.addFilter(
memory_handler.addFilter(ServeCoreContextFilter())
memory_handler.addFilter(ServeContextFilter())
memory_handler.addFilter(
ServeComponentFilter(component_name, component_id, component_type)
)
file_handler.setFormatter(json_formatter)
else:
file_handler.setFormatter(serve_formatter)

if logging_config.enable_access_log is False:
file_handler.addFilter(log_access_log_filter)
memory_handler.addFilter(log_access_log_filter)
else:
file_handler.addFilter(ServeContextFilter())
memory_handler.addFilter(ServeContextFilter())

# Remove unwanted attributes from the log record.
file_handler.addFilter(ServeLogAttributeRemovalFilter())
memory_handler.addFilter(ServeLogAttributeRemovalFilter())

# Redirect print, stdout, and stderr to Serve logger, only when it's on the replica.
if not RAY_SERVE_LOG_TO_STDERR and component_type == ServeComponentType.REPLICA:
builtins.print = redirected_print
sys.stdout = StreamToLogger(logger, logging.INFO, sys.stdout)
sys.stderr = StreamToLogger(logger, logging.INFO, sys.stderr)

# Create a memory handler that buffers log records and flushes to file handler
# Buffer capacity: buffer_size records
# Flush triggers: buffer full, ERROR messages, or explicit flush
memory_handler = logging.handlers.MemoryHandler(
capacity=buffer_size,
target=file_handler,
flushLevel=logging.ERROR, # Auto-flush on ERROR/CRITICAL
)

# Add the memory handler instead of the file handler directly
logger.addHandler(memory_handler)

Expand Down
38 changes: 38 additions & 0 deletions python/ray/serve/tests/test_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import sys
import time
import uuid
from collections import Counter
from contextlib import redirect_stderr
from pathlib import Path
from typing import List, Tuple
Expand Down Expand Up @@ -1360,5 +1361,42 @@ def test_configure_default_serve_logger_with_stderr_redirect(
assert not isinstance(sys.stderr, StreamToLogger)


@pytest.mark.parametrize("buffer_size", [1, 100])
def test_request_id_uniqueness_with_buffering(buffer_size, monkeypatch):
"""Test request IDs are unique when buffering is enabled."""

monkeypatch.setenv("RAY_SERVE_REQUEST_PATH_LOG_BUFFER_SIZE", str(buffer_size))
logger = logging.getLogger("ray.serve")

@serve.deployment(logging_config={"encoding": "JSON"})
class TestApp:
async def __call__(self):
logger.info("Processing request")
logger.info("Additional log entry")
return "OK"

serve.run(TestApp.bind())

for _ in range(150):
httpx.get("http://127.0.0.1:8000/")
logs_dir = get_serve_logs_dir()

for log_file in os.listdir(logs_dir):
if log_file.startswith("replica"):
with open(os.path.join(logs_dir, log_file)) as f:
log_request_ids = []
for line in f:
log_entry = json.loads(line)
request_id = log_entry.get("request_id", None)
if request_id:
log_request_ids.append(request_id)
# Verify no excessive duplication
request_id_counts = Counter(log_request_ids)
for request_id, count in request_id_counts.items():
assert (
count <= 5
), f"Request ids duplicated with buffer size {buffer_size}"


if __name__ == "__main__":
sys.exit(pytest.main(["-v", "-s", __file__]))