diff --git a/.ruff.toml b/.ruff.toml index e82a0e74..23b20144 100644 --- a/.ruff.toml +++ b/.ruff.toml @@ -138,7 +138,7 @@ ignore-overlong-task-comments = true convention = "google" [lint.flake8-annotations] -allow-star-arg-any = false +allow-star-arg-any = true # Allow *args and **kwargs to use `Any` type ignore-fully-untyped = false [format] diff --git a/airbyte/_util/openai.py b/airbyte/_util/openai.py new file mode 100644 index 00000000..4075f9dc --- /dev/null +++ b/airbyte/_util/openai.py @@ -0,0 +1,118 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""OpenAI API utilities, including support for OpenAI-compatible APIs.""" + +import subprocess +from contextlib import suppress +from typing import TYPE_CHECKING, Any + +from openai import Client + +from airbyte.secrets import get_secret +from airbyte.secrets.base import SecretString + + +if TYPE_CHECKING: + from collections.abc import Callable + + +_has_github_model_token: dict[str, bool] = {} + + +def _try_get_openai_client_from_api_key( + **kwargs: Any, +) -> Client | None: + """Get the OpenAI API key, or None if not set. + + Anything sent as keyword argument will be passed to the OpenAI client constructor. + asdf + """ + if api_key := get_secret("OPENAI_API_KEY", or_none=True): + return Client( + api_key=api_key, + base_url=get_secret("OPENAI_BASE_URL", or_none=True) or None, + **kwargs, + ) + + return None + + +def _validate_github_models_token(token: SecretString) -> bool: + """Validate that a GitHub token has models:read permission.""" + try: + # Create a temporary client to test the token + test_client = Client( + base_url="https://models.github.ai/inference", + api_key=token, + ) + # Try to list available models - this requires models:read permission + test_client.models.list() + except Exception: + # Any error (auth, permission, network) means the token isn't valid for models + return False + else: + # If we reach here, the token is valid and has the required permission + return True + + +def _try_get_github_models_token() -> SecretString | None: + """Try to get a GitHub token from various sources.""" + methods: dict[str, Callable] = { + "GITHUB_MODELS_PAT": lambda: get_secret("GITHUB_MODELS_PAT", or_none=True), + "GITHUB_TOKEN": lambda: get_secret("GITHUB_TOKEN", or_none=True), + "gh_cli": lambda: subprocess.run( + ["gh", "auth", "token"], + capture_output=True, + text=True, + check=True, + timeout=5 + ).stdout.strip(), + } + # First try the specific GitHub Models PAT + for method_name, token_fn in methods.items(): + match _has_github_model_token.get(method_name, None): + case True: + # If we already know this method works, return the token + # This could trigger an exception if the token has recently + # become invalid. But that is a true exception and should + # be raised to the caller. + if raw_token := token_fn(): + return SecretString(raw_token) + # If token_fn() fails, let the exception propagate + + case False: + # Skip this method, we know it doesn't work + continue + + case None: + # Not yet validated, proceed to validate + token: SecretString | None = None + with suppress(Exception): + if raw_token := token_fn(): + token = SecretString(raw_token) + if token and _validate_github_models_token(token): + _has_github_model_token[method_name] = True + return token + + # Otherwise, mark this method as not valid: + _has_github_model_token[method_name] = False + + # If we reach here, no valid GitHub token was found + return None + + +def try_get_openai_client(**kwargs: Any) -> Client | None: + """Get the OpenAI client, or None if not available. + + Any keyword arguments are passed to the OpenAI client constructor. + """ + if client := _try_get_openai_client_from_api_key(**kwargs): + return client + + if token := _try_get_github_models_token(): + return Client( + base_url="https://models.github.ai/inference", # << OpenAI-compatible endpoint + api_key=token, + **kwargs, + ) + + return None diff --git a/airbyte/mcp/_coding.py b/airbyte/mcp/_coding.py new file mode 100644 index 00000000..473188e6 --- /dev/null +++ b/airbyte/mcp/_coding.py @@ -0,0 +1,86 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Local development MCP operations.""" + +from typing import Annotated + +from fastmcp import FastMCP +from pydantic import Field + +from airbyte.mcp._coding_templates import DOCS_TEMPLATE, SCRIPT_TEMPLATE +from airbyte.sources import get_available_connectors + + +def generate_pyairbyte_pipeline( + source_connector_name: Annotated[ + str, + Field(description="The name of the source connector (e.g., 'source-faker')."), + ], + destination_connector_name: Annotated[ + str, + Field(description="The name of the destination connector (e.g., 'destination-duckdb')."), + ], + pipeline_name: Annotated[ + str | None, + Field( + description="A descriptive name for the pipeline. " + "If not provided, a default name will be generated.", + ), + ] = None, +) -> dict[str, str]: + """Generate a PyAirbyte pipeline script with setup instructions. + + This tool creates a complete PyAirbyte pipeline script that extracts data from + a source connector and loads it to a destination connector, along with setup + instructions for running the pipeline. + + Returns a dictionary with 'code' and 'instructions' keys containing the + generated pipeline script and setup instructions respectively. + """ + source_short_name = source_connector_name.replace("source-", "") + destination_short_name = destination_connector_name.replace("destination-", "") + if not pipeline_name: + pipeline_name = f"{source_short_name}_to_{destination_short_name}_pipeline" + + pipeline_id = pipeline_name.lower().replace(" ", "_").replace("'", "") + available_connectors: list[str] = get_available_connectors() + if source_connector_name not in available_connectors: + return { + "error": ( + f"Source connector '{source_connector_name}' not found. " + f"Available connectors: {', '.join(sorted(available_connectors))}" + ) + } + + if destination_connector_name not in available_connectors: + return { + "error": ( + f"Destination connector '{destination_connector_name}' not found. " + f"Available connectors: {', '.join(sorted(available_connectors))}" + ) + } + + pipeline_code: str = SCRIPT_TEMPLATE.format( + source_connector_name=source_connector_name, + source_config_dict={}, # Placeholder for source config + destination_connector_name=destination_connector_name, + destination_config_dict={}, # Placeholder for destination config + ) + + setup_instructions: str = DOCS_TEMPLATE.format( + source_connector_name=source_short_name, + destination_connector_name=destination_short_name, + pipeline_id=pipeline_id, + source_short_name=source_short_name, + dest_short_name=destination_short_name, + ) + + return { + "code": pipeline_code, + "instructions": setup_instructions, + "filename": f"{pipeline_id}.py", + } + + +def register_coding_tools(app: FastMCP) -> None: + """Register development tools with the FastMCP app.""" + app.tool(generate_pyairbyte_pipeline) diff --git a/airbyte/mcp/_coding_templates.py b/airbyte/mcp/_coding_templates.py new file mode 100644 index 00000000..57698113 --- /dev/null +++ b/airbyte/mcp/_coding_templates.py @@ -0,0 +1,210 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Code templates for MCP local code generation.""" + +SCRIPT_TEMPLATE = """ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# --- Generated by pyairbyte-mcp-server --- + +import os +import sys +import logging + +import airbyte as ab + +from dotenv import load_dotenv + +# Configure logging +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# Load environment variables from .env file +if not load_dotenv(): + logging.warning("'.env' file not found. Please ensure it exists and contains the necessary credentials.") + # Optionally exit if .env is strictly required + # sys.exit("'.env' file is required. Please create it with the necessary credentials.") + +# --- Helper to get env vars --- +def get_required_env(var_name: str) -> str: + value = os.getenv(var_name) + if value is None: + logging.error(f"Missing required environment variable: {{var_name}}") + sys.exit(f"Error: Environment variable '{{var_name}}' not set. Please add it to your .env file.") + return value + +def get_optional_env(var_name: str, default_value: str = None) -> str: + value = os.getenv(var_name) + if value is None: + if default_value is not None: + logging.info(f"Using default value for optional environment variable: {{var_name}}") + return default_value + else: + logging.info(f"Optional environment variable not set: {{var_name}}") + return None + return value + +# --- Source Configuration --- +source_name = "{source_name}" +logging.info(f"Configuring source: {{source_name}}") +source_config = {{ + {source_config_dict} +}} + +# Optional: Add fixed configuration parameters here if needed +# source_config["some_other_parameter"] = "fixed_value" + +try: + source = ab.get_source( + source_name, + config=source_config, + install_if_missing=True, + ) +except Exception as e: + logging.error(f"Failed to initialize source '{{source_name}}': {{e}}") + sys.exit(1) + +# Verify the connection +logging.info("Checking source connection...") +try: + source.check() + logging.info("Source connection check successful.") +except Exception as e: + logging.error(f"Source connection check failed: {{e}}") + sys.exit(1) + +# Select streams to sync (use select_all_streams() or specify) +logging.info("Selecting all streams from source.") +source.select_all_streams() +# Example for selecting specific streams: +# source.select_streams(["users", "products"]) + +# --- Read data into Cache and then Pandas DataFrame --- +logging.info("Reading data from source into cache...") +# By default, reads into a temporary DuckDB cache +# Specify a cache explicitly: cache = ab.get_cache(config=...) +try: + results = source.read() + logging.info("Finished reading data.") +except Exception as e: + logging.error(f"Failed to read data from source: {{e}}") + sys.exit(1) + +# --- Process Streams into DataFrames --- +dataframes = {{}} +if results.streams: + logging.info(f"Converting {{len(results.streams)}} streams to Pandas DataFrames...") + for stream_name, stream_data in results.streams.items(): + try: + df = stream_data.to_pandas() + dataframes[stream_name] = df + logging.info( + f"Successfully converted stream '{{stream_name}}' to DataFrame ({{len(df)}} rows)." + ) + # --- !! IMPORTANT !! --- + # Add your data processing/analysis logic here! + # Example: print(f"\\nDataFrame for stream '{{stream_name}}':") + # print(df.head()) + # print("-" * 30) + except Exception as e: + logging.error(f"Failed to convert stream '{{stream_name}}' to DataFrame: {{e}}") + logging.info("Finished processing streams.") +else: + logging.info("No streams found in the read result.") + +# Example: Access a specific dataframe +# if "users" in dataframes: +# users_df = dataframes["users"] +# print("\\nUsers DataFrame Head:") +# print(users_df.head()) + +# --- Destination Configuration --- +destination_name = "{destination_name}" +logging.info(f"Configuring destination: {{destination_name}}") +dest_config = {{ + {destination_config_dict} +}} + +# Optional: Add fixed configuration parameters here if needed +# dest_config["some_other_parameter"] = "fixed_value" + +try: + destination = ab.get_destination( + destination_name, + config=dest_config, + install_if_missing=True, + ) +except Exception as e: + logging.error(f"Failed to initialize destination '{{destination_name}}': {{e}}") + sys.exit(1) + +# Verify the connection +logging.info("Checking destination connection...") +try: + destination.check() + logging.info("Destination connection check successful.") +except Exception as e: + logging.error(f"Destination connection check failed: {{e}}") + # Depending on the destination, a check might not be possible or fail spuriously. + # Consider logging a warning instead of exiting for some destinations. + # logging.warning(f"Destination connection check failed: {{e}} - Continuing cautiously.") + sys.exit(1) # Exit for safety by default + +# --- Read data and Write to Destination --- +# This reads incrementally and writes to the destination. +# Data is processed in memory or using a temporary cache if needed by the destination connector. +logging.info("Starting data read from source and write to destination...") +try: + # source.read() returns a result object even when writing directly + # The write() method consumes this result + read_result = source.read() # Reads into default cache first usually + logging.info(f"Finished reading data. Starting write to {{destination_name}}...") + destination.write(read_result) + logging.info("Successfully wrote data to destination.") +except Exception as e: + logging.error(f"Failed during data read/write: {{e}}") + sys.exit(1) + +# --- Main execution --- +if __name__ == "__main__": + logging.info("Starting PyAirbyte pipeline script.") + # The core logic is executed when the script runs directly + # If converting to dataframe, analysis happens within the 'if output_to_dataframe:' block above. + # If writing to destination, the write operation is the main action. + logging.info("PyAirbyte pipeline script finished.") + +""" + + +DOCS_TEMPLATE = """# PyAirbyte Pipeline Setup Instructions + +1. Install PyAirbyte: + ```bash + pip install airbyte + ``` + +2. Install the required connectors: + ```bash + python -c "import airbyte as ab; ab.get_source('{source_connector_name}').install()" + python -c "import airbyte as ab; ab.get_destination('{destination_connector_name}').install()" + ``` + +## Configuration +1. Update the source configuration in the pipeline script with your actual connection details +2. Update the destination configuration in the pipeline script with your actual connection details +3. Refer to the Airbyte documentation for each connector's required configuration fields + +## Running the Pipeline +```bash +python {pipeline_id}.py +``` + +- Configure your source and destination connectors with actual credentials +- Add error handling and logging as needed +- Consider using environment variables for sensitive configuration +- Add stream selection if you only need specific data streams +- Set up scheduling using your preferred orchestration tool (Airflow, Dagster, etc.) + +- Source connector docs: https://docs.airbyte.com/integrations/sources/{source_short_name} +- Destination connector docs: https://docs.airbyte.com/integrations/destinations/{dest_short_name} +- PyAirbyte docs: https://docs.airbyte.com/using-airbyte/pyairbyte/getting-started +""" diff --git a/airbyte/mcp/_connector_config.py b/airbyte/mcp/_connector_config.py new file mode 100644 index 00000000..d4d52249 --- /dev/null +++ b/airbyte/mcp/_connector_config.py @@ -0,0 +1,64 @@ +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +"""Local connector config MCP operations.""" + +from typing import Annotated + +from fastmcp import FastMCP +from pydantic import Field + +from airbyte._util.openai import try_get_openai_client + + +def is_llm_api_available() -> tuple[bool, str | None]: + """Check if LLM-assisted generation is enabled. + + Returns a tuple (is_available: bool, base_url: str | None). + """ + client = try_get_openai_client() + if client is None: + return False, None + + return True, str(client.base_url) or None + + +def generate_connector_config_template( + connector_name: Annotated[ + str, + Field(description="The name of the connector (e.g., 'source-faker')."), + ], + llm_guidance: Annotated[ + str | None, + Field( + description="Optional guidance for LLM-assisted generation. " + "Ignored if `llm_assisted=False`. " + "When provided, it will be used to guide the template generation. " + "Helpful guidance includes expected authentication methods, " + "data formats, and any specific requirements that would be needed " + "for the connector. Secrets should never be included in this guidance.", + default=None, + ), + ] = None, + llm_assisted: Annotated[ + bool | None, + Field( + description="Whether to use LLM-assisted generation for the template. " + "If True, the template will be guided by the LLM, failing if no LLM is available. " + "If False, a basic template will be generated without LLM assistance. " + "If omitted or None, LLM assistance will be used only if available.", + default=None, + ), + ] = None, +) -> dict[str, str]: + """Generate a connector configuration template. + + This tool creates a basic configuration template for a given connector. + The template includes common fields and can be customized further. + + Returns a dictionary with 'template' key containing the generated config template. + """ + + +def register_connector_config_tools(app: FastMCP) -> None: + """Register development tools with the FastMCP app.""" + app.tool(is_llm_api_available) + app.tool(generate_connector_config_template) diff --git a/airbyte/mcp/_local_ops.py b/airbyte/mcp/_local_ops.py index f2cb06ae..643ea27d 100644 --- a/airbyte/mcp/_local_ops.py +++ b/airbyte/mcp/_local_ops.py @@ -13,7 +13,7 @@ from airbyte import get_source from airbyte.caches.util import get_default_cache from airbyte.mcp._util import resolve_config -from airbyte.secrets.config import _get_secret_sources +from airbyte.secrets.config import get_secret_sources from airbyte.secrets.google_gsm import GoogleGSMSecretManager from airbyte.sources.registry import get_connector_metadata @@ -98,7 +98,7 @@ def list_connector_config_secrets( return the actual secret values. """ secrets_names: list[str] = [] - for secrets_mgr in _get_secret_sources(): + for secrets_mgr in get_secret_sources(): if isinstance(secrets_mgr, GoogleGSMSecretManager): secrets_names.extend( [ diff --git a/airbyte/mcp/_secrets.py b/airbyte/mcp/_secrets.py new file mode 100644 index 00000000..09d1c7c3 --- /dev/null +++ b/airbyte/mcp/_secrets.py @@ -0,0 +1,22 @@ +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +"""Secret manager tools for MCP.""" + +from fastmcp import FastMCP + +import airbyte.secrets.util as ab_secrets_util + + +def list_registered_secret_managers() -> list[str]: + """List all registered secret managers.""" + return [manager.name for manager in ab_secrets_util.get_secret_sources()] + + +def list_available_secrets() -> list[tuple[str, str]]: + """List all secrets from the configured secret sources.""" + return ab_secrets_util.list_available_secrets() + + +def register_secret_manager_tools(app: FastMCP) -> None: + """Register a custom secret manager.""" + app.tool(list_registered_secret_managers) + app.tool(list_available_secrets) diff --git a/airbyte/mcp/_util.py b/airbyte/mcp/_util.py index 95a7b0be..e2527077 100644 --- a/airbyte/mcp/_util.py +++ b/airbyte/mcp/_util.py @@ -5,33 +5,23 @@ from pathlib import Path from typing import Any -import dotenv import yaml from airbyte.secrets import GoogleGSMSecretManager, register_secret_manager +from airbyte.secrets.env_vars import DotenvSecretManager from airbyte.secrets.hydration import deep_update, detect_hardcoded_secrets -from airbyte.secrets.util import get_secret, is_secret_available +from airbyte.secrets.util import get_secret, is_secret_available, list_available_secrets AIRBYTE_MCP_DOTENV_PATH_ENVVAR = "AIRBYTE_MCP_ENV_FILE" -def _load_dotenv_file(dotenv_path: Path | str) -> None: - """Load environment variables from a .env file.""" - if isinstance(dotenv_path, str): - dotenv_path = Path(dotenv_path) - if not dotenv_path.exists(): - raise FileNotFoundError(f".env file not found: {dotenv_path}") - - dotenv.load_dotenv(dotenv_path=dotenv_path) - - def initialize_secrets() -> None: """Initialize dotenv to load environment variables from .env files.""" # Load the .env file from the current working directory. if AIRBYTE_MCP_DOTENV_PATH_ENVVAR in os.environ: dotenv_path = Path(os.environ[AIRBYTE_MCP_DOTENV_PATH_ENVVAR]) - _load_dotenv_file(dotenv_path) + register_secret_manager(DotenvSecretManager(dotenv_path=dotenv_path)) if is_secret_available("GCP_GSM_CREDENTIALS") and is_secret_available("GCP_GSM_PROJECT_ID"): # Initialize the GoogleGSMSecretManager if the credentials and project are set. diff --git a/airbyte/mcp/server.py b/airbyte/mcp/server.py index 036b2b70..2905b5c7 100644 --- a/airbyte/mcp/server.py +++ b/airbyte/mcp/server.py @@ -7,8 +7,11 @@ from fastmcp import FastMCP from airbyte.mcp._cloud_ops import register_cloud_ops_tools +from airbyte.mcp._coding import register_coding_tools +from airbyte.mcp._connector_config import register_connector_config_tools from airbyte.mcp._connector_registry import register_connector_registry_tools from airbyte.mcp._local_ops import register_local_ops_tools +from airbyte.mcp._secrets import register_secret_manager_tools from airbyte.mcp._util import initialize_secrets @@ -17,7 +20,10 @@ app: FastMCP = FastMCP("airbyte-mcp") register_connector_registry_tools(app) register_local_ops_tools(app) +register_coding_tools(app) register_cloud_ops_tools(app) +register_connector_config_tools(app) +register_secret_manager_tools(app) def main() -> None: diff --git a/airbyte/secrets/base.py b/airbyte/secrets/base.py index b9e4ab7d..c0a7bd8d 100644 --- a/airbyte/secrets/base.py +++ b/airbyte/secrets/base.py @@ -179,6 +179,38 @@ def get_secret(self, secret_name: str) -> SecretString | None: """ ... + @abstractmethod + def list_secrets(self) -> list[str] | None: + """List all secrets available in the secret manager. + + This method should be implemented by subclasses to return a list of all + secrets available in the secret store. If the secret manager does not support + listing secrets, it can return None. + """ + + def is_secret_available(self, secret_name: str) -> bool: + """Check if a secret is available in the secret manager. + + This method checks if the secret exists in the secret manager. If the secret is found, + it returns `True`; otherwise, it returns `False`. + + Subclasses can override this method to provide a more optimized code path. + """ + secrets_list: list[str] | None = self.list_secrets() + if secrets_list is not None: + # If the secret manager supports listing secrets, check if the secret is in the list + return secret_name in secrets_list + + try: + # Otherwise, attempt to retrieve the secret + _ = self.get_secret(secret_name) + except Exception: + # If an exception is raised, the secret is not available + return False + else: + # If no exception was raised, the secret is available + return True + def __str__(self) -> str: """Return the name of the secret manager.""" return self.name diff --git a/airbyte/secrets/config.py b/airbyte/secrets/config.py index f793043d..b4908757 100644 --- a/airbyte/secrets/config.py +++ b/airbyte/secrets/config.py @@ -14,13 +14,12 @@ if TYPE_CHECKING: from airbyte.secrets.base import SecretSourceEnum - from airbyte.secrets.custom import CustomSecretManager _SECRETS_SOURCES: list[SecretManager] = [] -def _get_secret_sources() -> list[SecretManager]: +def get_secret_sources() -> list[SecretManager]: """Initialize the default secret sources.""" if len(_SECRETS_SOURCES) == 0: # Initialize the default secret sources @@ -40,11 +39,11 @@ def _get_secret_sources() -> list[SecretManager]: # Ensure the default secret sources are initialized -_ = _get_secret_sources() +_ = get_secret_sources() def register_secret_manager( - secret_manager: CustomSecretManager, + secret_manager: SecretManager, *, as_backup: bool = False, replace_existing: bool = False, diff --git a/airbyte/secrets/env_vars.py b/airbyte/secrets/env_vars.py index 4cd0b2e3..5f74bf47 100644 --- a/airbyte/secrets/env_vars.py +++ b/airbyte/secrets/env_vars.py @@ -4,12 +4,17 @@ from __future__ import annotations import os +from typing import TYPE_CHECKING from dotenv import dotenv_values from airbyte.secrets.base import SecretManager, SecretSourceEnum, SecretString +if TYPE_CHECKING: + from pathlib import Path + + class EnvVarSecretManager(SecretManager): """Secret manager that retrieves secrets from environment variables.""" @@ -22,16 +27,57 @@ def get_secret(self, secret_name: str) -> SecretString | None: return SecretString(os.environ[secret_name]) + def _should_ignore(self, env_var_name: str) -> bool: + """Determine if a given environment variable should be ignored.""" + if env_var_name in { + "PYTHONPATH", # Ignore Python path + "PATH", # Ignore system path + "HOME", # Ignore home directory + "USER", # Ignore user name + "SHELL", # Ignore shell type + "LC_CTYPE", # Ignore locale settings + "VIRTUAL_ENV", # Ignore virtual environment + "ARROW_DEFAULT_MEMORY_POOL", # Ignore Arrow memory pool setting + "__CF_USER_TEXT_ENCODING", # Ignore macOS specific encoding variable + }: + return True + + ignored_prefixes = ( + "PYAIRBYTE_", # Ignore Airbyte-specific environment variables + "AIRBYTE_", # Ignore Airbyte-specific environment variables + "GCP_GSM_", # Ignore Google Cloud Secret Manager variables + "AWS_SECRET_", # Ignore AWS Secrets Manager variables + ) + + # Ignore if variable starts with any ignored prefix + return bool(any(env_var_name.startswith(prefix) for prefix in ignored_prefixes)) + + def list_secrets(self) -> list[str]: + """List all secrets available in the environment.""" + # Return all environment variable names as a list + return sorted([key for key in os.environ if not self._should_ignore(key)]) + + def is_secret_available(self, secret_name: str) -> bool: + """Check if a secret is available in the environment.""" + # Check if the secret name exists in the environment variables + return secret_name in os.environ + class DotenvSecretManager(SecretManager): """Secret manager that retrieves secrets from a `.env` file.""" - name = SecretSourceEnum.DOTENV.value + def __init__(self, dotenv_path: Path | None = None) -> None: + """Initialize the DotenvSecretManager with an optional path to a `.env` file.""" + super().__init__() + self.dotenv_path: Path | None = dotenv_path + self.name = str(dotenv_path) if dotenv_path else SecretSourceEnum.DOTENV.value def get_secret(self, secret_name: str) -> SecretString | None: """Get a named secret from the `.env` file.""" try: - dotenv_vars: dict[str, str | None] = dotenv_values() + dotenv_vars: dict[str, str | None] = dotenv_values( + dotenv_path=self.dotenv_path, + ) except Exception: # Can't locate or parse a .env file return None @@ -41,3 +87,22 @@ def get_secret(self, secret_name: str) -> SecretString | None: return None return SecretString(dotenv_vars[secret_name]) + + def list_secrets(self) -> list[str]: + """List all secrets available in the `.env` file.""" + if self.dotenv_path is None: + try: + dotenv_keys = dotenv_values().keys() + except Exception: + # Can't locate or parse default .env file. This is common if no .env file exists. + # Treat as empty. + return [] + else: + return list(dotenv_keys) + + # When a specific dotenv file is provided, we should expect it to exist. + return list(dotenv_values(dotenv_path=self.dotenv_path).keys()) + + def is_secret_available(self, secret_name: str) -> bool: + """Check if a secret is available in the `.env` file.""" + return secret_name in (self.list_secrets() or []) diff --git a/airbyte/secrets/google_colab.py b/airbyte/secrets/google_colab.py index 1f4ded0f..a4f3700f 100644 --- a/airbyte/secrets/google_colab.py +++ b/airbyte/secrets/google_colab.py @@ -35,3 +35,12 @@ def get_secret(self, secret_name: str) -> SecretString | None: except Exception: # Secret name not found. Continue. return None + + def is_secret_available(self, secret_name: str) -> bool: + """Check if a secret is available in Google Colab user secrets.""" + return self.get_secret(secret_name) is not None + + def list_secrets(self) -> None: + """Not supported. Always returns None.""" + # Google Colab does not provide a way to list user secrets. + return diff --git a/airbyte/secrets/google_gsm.py b/airbyte/secrets/google_gsm.py index 78e190c1..3f061702 100644 --- a/airbyte/secrets/google_gsm.py +++ b/airbyte/secrets/google_gsm.py @@ -161,13 +161,20 @@ def _fully_qualified_secret_name(self, secret_name: str) -> str: return full_name - def get_secret(self, secret_name: str) -> SecretString: + def get_secret(self, secret_name: str) -> SecretString | None: """Get a named secret from Google Colab user secrets.""" - return SecretString( - self.secret_client.access_secret_version( - name=self._fully_qualified_secret_name(secret_name) - ).payload.data.decode("UTF-8") - ) + try: + return SecretString( + self.secret_client.access_secret_version( + name=self._fully_qualified_secret_name(secret_name) + ).payload.data.decode("UTF-8") + ) + except Exception: + return None + + def list_secrets(self) -> None: + """Not supported. Always returns None.""" + return def get_secret_handle( self, diff --git a/airbyte/secrets/prompt.py b/airbyte/secrets/prompt.py index b0a70f9f..12e467ef 100644 --- a/airbyte/secrets/prompt.py +++ b/airbyte/secrets/prompt.py @@ -26,3 +26,15 @@ def get_secret( return SecretString(getpass(f"Enter the value for secret '{secret_name}': ")) return None + + def is_secret_available( + self, + secret_name: str, + ) -> bool: + """Always returns True because the prompt will always ask for the secret.""" + _ = secret_name + return True + + def list_secrets(self) -> None: + """Not supported. Always returns None.""" + return diff --git a/airbyte/secrets/util.py b/airbyte/secrets/util.py index 69d14209..20b0102c 100644 --- a/airbyte/secrets/util.py +++ b/airbyte/secrets/util.py @@ -4,12 +4,12 @@ from __future__ import annotations import warnings -from typing import Any, cast +from typing import Any, Literal, cast, overload from airbyte import exceptions as exc from airbyte.constants import SECRETS_HYDRATION_PREFIX from airbyte.secrets.base import SecretManager, SecretSourceEnum, SecretString -from airbyte.secrets.config import _get_secret_sources +from airbyte.secrets.config import get_secret_sources def is_secret_available( @@ -29,14 +29,39 @@ def is_secret_available( return True +@overload def get_secret( secret_name: str, /, *, sources: list[SecretManager | SecretSourceEnum] | None = None, allow_prompt: bool = True, + or_none: Literal[False] = False, # Not-null return if False or not specified **kwargs: dict[str, Any], -) -> SecretString: +) -> SecretString: ... + + +@overload +def get_secret( + secret_name: str, + /, + *, + sources: list[SecretManager | SecretSourceEnum] | None = None, + allow_prompt: bool = True, + or_none: Literal[True], # Nullable return if True + **kwargs: dict[str, Any], +) -> SecretString | None: ... + + +def get_secret( + secret_name: str, + /, + *, + sources: list[SecretManager | SecretSourceEnum] | None = None, + allow_prompt: bool = True, + or_none: bool = False, + **kwargs: dict[str, Any], +) -> SecretString | None: """Get a secret from the environment. The optional `sources` argument of enum type `SecretSourceEnum` or list of `SecretSourceEnum` @@ -45,6 +70,9 @@ def get_secret( If `allow_prompt` is `True` or if SecretSourceEnum.PROMPT is declared in the `source` arg, then the user will be prompted to enter the secret if it is not found in any of the other sources. + + If `or_none` is `True`, then `allow_prompt` will be assumed to be 'False' and the function + will return `None` if the secret is not found in any of the sources. """ if secret_name.startswith(SECRETS_HYDRATION_PREFIX): # If the secret name starts with the hydration prefix, we assume it's a secret reference. @@ -60,7 +88,7 @@ def get_secret( sources = kwargs.pop("source") # type: ignore [assignment] available_sources: dict[str, SecretManager] = {} - for available_source in _get_secret_sources(): + for available_source in get_secret_sources(): # Add available sources to the dict. Order matters. available_sources[available_source.name] = available_source @@ -94,8 +122,8 @@ def get_secret( secret_managers.index(SecretSourceEnum.PROMPT), # type: ignore [arg-type] ) - if allow_prompt: - # Always check prompt last. Add it to the end of the list. + if allow_prompt and or_none is False: + # Check prompt last, and only if `or_none` is False. secret_managers.append(prompt_source) for secret_mgr in secret_managers: @@ -103,7 +131,31 @@ def get_secret( if val: return SecretString(val) + if or_none: + # If or_none is True, return None when the secret is not found. + return None + raise exc.PyAirbyteSecretNotFoundError( secret_name=secret_name, sources=[str(s) for s in available_sources], ) + + +def list_available_secrets() -> list[tuple[str, str]]: + """List all available secrets from the configured secret sources. + + Returns: + A set of tuples containing the secret name and the source name. + """ + def _if_none(value: Any, if_none: Any) -> Any: + """Return the default value if the value is None.""" + return value if value is not None else if_none + + secret_managers: list[SecretManager] = get_secret_sources() + result = sorted([ + (secret_name, secret_mgr.name) + for secret_mgr in secret_managers + for secret_name in (_if_none(secret_mgr.list_secrets(), ["(multiple secrets not listed)"])) + ]) + + return result # noqa: RET504 (unnecessary assignment) diff --git a/poetry.lock b/poetry.lock index 5e423ead..41373ad1 100644 --- a/poetry.lock +++ b/poetry.lock @@ -814,7 +814,7 @@ version = "1.9.0" description = "Distro - an OS platform information API" optional = false python-versions = ">=3.6" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "distro-1.9.0-py3-none-any.whl", hash = "sha256:7bffd925d65168f85027d8da9af6bddab658135b840670a223589bc0c8ef02b2"}, {file = "distro-1.9.0.tar.gz", hash = "sha256:2fa77c6fd8940f116ee1d6b94a2f90b13b5ea8d019b98bc8bafdcabcdd9bdbed"}, @@ -1886,7 +1886,7 @@ version = "0.10.0" description = "Fast iterable JSON parser." optional = false python-versions = ">=3.9" -groups = ["dev"] +groups = ["main", "dev"] files = [ {file = "jiter-0.10.0-cp310-cp310-macosx_10_12_x86_64.whl", hash = "sha256:cd2fb72b02478f06a900a5782de2ef47e0396b3e1f7d5aba30daeb1fce66f303"}, {file = "jiter-0.10.0-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:32bb468e3af278f095d3fa5b90314728a6916d89ba3d0ffb726dd9bf7367285e"}, @@ -2459,14 +2459,14 @@ files = [ [[package]] name = "openai" -version = "1.91.0" +version = "1.93.0" description = "The official Python library for the openai API" optional = false python-versions = ">=3.8" -groups = ["dev"] +groups = ["main", "dev"] files = [ - {file = "openai-1.91.0-py3-none-any.whl", hash = "sha256:207f87aa3bc49365e014fac2f7e291b99929f4fe126c4654143440e0ad446a5f"}, - {file = "openai-1.91.0.tar.gz", hash = "sha256:d6b07730d2f7c6745d0991997c16f85cddfc90ddcde8d569c862c30716b9fc90"}, + {file = "openai-1.93.0-py3-none-any.whl", hash = "sha256:3d746fe5498f0dd72e0d9ab706f26c91c0f646bf7459e5629af8ba7c9dbdf090"}, + {file = "openai-1.93.0.tar.gz", hash = "sha256:988f31ade95e1ff0585af11cc5a64510225e4f5cd392698c675d0a9265b8e337"}, ] [package.dependencies] @@ -5371,4 +5371,4 @@ type = ["pytest-mypy"] [metadata] lock-version = "2.1" python-versions = ">=3.10,<3.13" -content-hash = "ad72cf79abd247f49f3428e0aaed7e986bac93b1dd5f290bc407ce4fddd2208b" +content-hash = "2316723d8677e813bdf6bc0396dd1bebc92806ff6fbb7d8807dffdd023ac64f4" diff --git a/pyproject.toml b/pyproject.toml index 93486cfb..4d56b61a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -52,6 +52,7 @@ sqlalchemy-bigquery = { version = "1.12.0", python = "<3.13" } typing-extensions = "*" uuid7 = "^0.1.0" fastmcp = "^2.8.1" +openai = "^1.93.0" [tool.poetry.group.dev.dependencies] coverage = "^7.5.1"