Skip to content

fix: wrap MCP server output as JSON when structured logging enabled #713

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions airbyte/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,26 @@ def _str_to_bool(value: str) -> bool:
directories for permissions reasons.
"""

TEMP_FILE_CLEANUP = _str_to_bool(
os.getenv(
key="AIRBYTE_TEMP_FILE_CLEANUP",
default="true",
)
)

NO_LIVE_PROGRESS = _str_to_bool(
os.getenv(
key="NO_LIVE_PROGRESS",
default="false",
)
)
"""Whether to disable live progress displays.

When enabled, this prevents Rich live progress views from interfering with MCP client communication
or other systems that use Rich simultaneously. This value is read from the `NO_LIVE_PROGRESS`
environment variable. If the variable is not set, the default value is `False`.
"""

TEMP_FILE_CLEANUP = _str_to_bool(
os.getenv(
key="AIRBYTE_TEMP_FILE_CLEANUP",
Expand Down
11 changes: 8 additions & 3 deletions airbyte/mcp/_local_ops.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Local MCP operations."""

import sys
import traceback
from itertools import islice
from pathlib import Path
Expand All @@ -12,7 +11,7 @@

from airbyte import get_source
from airbyte.caches.util import get_default_cache
from airbyte.mcp._util import resolve_config
from airbyte.mcp._util import log_mcp_message, resolve_config
from airbyte.secrets.config import _get_secret_sources
from airbyte.secrets.google_gsm import GoogleGSMSecretManager
from airbyte.sources.registry import get_connector_metadata
Expand Down Expand Up @@ -209,7 +208,13 @@ def read_source_stream_records(
# Next we load a limited number of records from the generator into our list.
records: list[dict[str, Any]] = list(islice(record_generator, max_records))

print(f"Retrieved {len(records)} records from stream '{stream_name}'", sys.stderr)
log_mcp_message(
f"Retrieved {len(records)} records from stream '{stream_name}'",
component="local_ops",
event="records_retrieved",
stream_name=stream_name,
record_count=len(records),
)

except Exception as ex:
tb_str = traceback.format_exc()
Expand Down
67 changes: 67 additions & 0 deletions airbyte/mcp/_util.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,85 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
"""Internal utility functions for MCP."""

import logging
import os
import sys
from functools import lru_cache
from pathlib import Path
from typing import Any

import dotenv
import structlog
import yaml

from airbyte.constants import NO_LIVE_PROGRESS
from airbyte.logs import AIRBYTE_STRUCTURED_LOGGING
from airbyte.secrets import GoogleGSMSecretManager, register_secret_manager
from airbyte.secrets.hydration import deep_update, detect_hardcoded_secrets
from airbyte.secrets.util import get_secret, is_secret_available


if not NO_LIVE_PROGRESS:
os.environ["NO_LIVE_PROGRESS"] = "1"


@lru_cache
def get_mcp_logger() -> logging.Logger | structlog.BoundLogger:
"""Get a logger for MCP server operations that respects structured logging settings."""
if AIRBYTE_STRUCTURED_LOGGING:
structlog.configure(
processors=[
structlog.processors.TimeStamper(fmt="%Y-%m-%d %H:%M:%S"),
structlog.stdlib.add_log_level,
structlog.stdlib.PositionalArgumentsFormatter(),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer(),
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)

logger = logging.getLogger("airbyte.mcp")
logger.setLevel(logging.INFO)
logger.propagate = False

for handler in logger.handlers:
logger.removeHandler(handler)

handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)

return structlog.get_logger("airbyte.mcp")
logger = logging.getLogger("airbyte.mcp")
logger.setLevel(logging.INFO)
logger.propagate = False

for handler in logger.handlers:
logger.removeHandler(handler)

handler = logging.StreamHandler(sys.stderr)
handler.setFormatter(logging.Formatter("%(message)s"))
logger.addHandler(handler)

return logger


def log_mcp_message(
message: str, level: str = "info", **kwargs: str | float | bool | None
) -> None:
"""Log a message using the MCP logger with appropriate formatting."""
logger = get_mcp_logger()

if AIRBYTE_STRUCTURED_LOGGING:
getattr(logger, level)(message, **kwargs)
else:
getattr(logger, level)(message)


AIRBYTE_MCP_DOTENV_PATH_ENVVAR = "AIRBYTE_MCP_ENV_FILE"


Expand Down
18 changes: 13 additions & 5 deletions airbyte/mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from airbyte.mcp._cloud_ops import register_cloud_ops_tools
from airbyte.mcp._connector_registry import register_connector_registry_tools
from airbyte.mcp._local_ops import register_local_ops_tools
from airbyte.mcp._util import initialize_secrets
from airbyte.mcp._util import initialize_secrets, log_mcp_message


initialize_secrets()
Expand All @@ -22,16 +22,24 @@

def main() -> None:
"""Main entry point for the MCP server."""
print("Starting Airbyte MCP server.", file=sys.stderr)
log_mcp_message("Starting Airbyte MCP server", component="mcp_server", event="startup")
try:
asyncio.run(app.run_stdio_async())
except KeyboardInterrupt:
print("Airbyte MCP server interrupted by user.", file=sys.stderr)
log_mcp_message(
"Airbyte MCP server interrupted by user", component="mcp_server", event="interrupt"
)
except Exception as ex:
print(f"Error running Airbyte MCP server: {ex}", file=sys.stderr)
log_mcp_message(
f"Error running Airbyte MCP server: {ex}",
level="error",
component="mcp_server",
event="error",
error=str(ex),
)
sys.exit(1)

print("Airbyte MCP server stopped.", file=sys.stderr)
log_mcp_message("Airbyte MCP server stopped", component="mcp_server", event="shutdown")


if __name__ == "__main__":
Expand Down
Loading