-
Notifications
You must be signed in to change notification settings - Fork 58
feat(mcp): add local dev script tool #711
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 60 commits
7491600
98adeec
fce3abd
8c8f8c1
c416ef1
9f77901
96591ac
0411ccc
c38bebb
2fd6092
fe3f255
14a4b52
1ccb06e
f233d56
60dc850
02acd8e
ddbcb91
885beb1
ce64fd3
b1dc629
341966c
77ec0ab
0b9a4ce
ef413e4
3a6e7ce
72cb487
b28d1ae
2f72d04
023ce3e
5f66122
e355bfc
8005e52
e58aa27
3ba5749
6d95bc6
0ace9fc
5a1e112
4b63df2
9d9c45e
1547baa
8b80029
0386e4a
f5c45ce
bc9532c
3f8bf21
d53fb33
78eecf8
e0c0008
f79423c
ae8ce5b
0e6c6c6
12aa3eb
bd299ee
eee61a6
61f68e5
041f0eb
5815443
f1afcd7
d94192c
23799e3
ed3d871
543d5e8
cf456a9
b520416
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
"""Local development MCP operations.""" | ||
|
||
from typing import Annotated | ||
|
||
from fastmcp import FastMCP | ||
from pydantic import Field | ||
|
||
from airbyte.mcp._coding_templates import DOCS_TEMPLATE, SCRIPT_TEMPLATE | ||
from airbyte.sources import get_available_connectors | ||
|
||
|
||
def generate_pyairbyte_pipeline( | ||
source_connector_name: Annotated[ | ||
str, | ||
Field(description="The name of the source connector (e.g., 'source-faker')."), | ||
], | ||
destination_connector_name: Annotated[ | ||
str, | ||
Field(description="The name of the destination connector (e.g., 'destination-duckdb')."), | ||
], | ||
pipeline_name: Annotated[ | ||
str | None, | ||
Field( | ||
description="A descriptive name for the pipeline. " | ||
"If not provided, a default name will be generated.", | ||
), | ||
] = None, | ||
) -> dict[str, str]: | ||
"""Generate a PyAirbyte pipeline script with setup instructions. | ||
|
||
This tool creates a complete PyAirbyte pipeline script that extracts data from | ||
a source connector and loads it to a destination connector, along with setup | ||
instructions for running the pipeline. | ||
|
||
Returns a dictionary with 'code' and 'instructions' keys containing the | ||
generated pipeline script and setup instructions respectively. | ||
""" | ||
source_short_name = source_connector_name.replace("source-", "") | ||
destination_short_name = destination_connector_name.replace("destination-", "") | ||
if not pipeline_name: | ||
pipeline_name = f"{source_short_name}_to_{destination_short_name}_pipeline" | ||
|
||
pipeline_id = pipeline_name.lower().replace(" ", "_").replace("'", "") | ||
available_connectors: list[str] = get_available_connectors() | ||
if source_connector_name not in available_connectors: | ||
return { | ||
"error": ( | ||
f"Source connector '{source_connector_name}' not found. " | ||
f"Available connectors: {', '.join(sorted(available_connectors))}" | ||
) | ||
} | ||
|
||
if destination_connector_name not in available_connectors: | ||
return { | ||
"error": ( | ||
f"Destination connector '{destination_connector_name}' not found. " | ||
f"Available connectors: {', '.join(sorted(available_connectors))}" | ||
) | ||
} | ||
|
||
pipeline_code: str = SCRIPT_TEMPLATE.format( | ||
source_connector_name=source_connector_name, | ||
source_config_dict={}, # Placeholder for source config | ||
destination_connector_name=destination_connector_name, | ||
destination_config_dict={}, # Placeholder for destination config | ||
) | ||
|
||
setup_instructions: str = DOCS_TEMPLATE.format( | ||
source_connector_name=source_short_name, | ||
destination_connector_name=destination_short_name, | ||
pipeline_id=pipeline_id, | ||
source_short_name=source_short_name, | ||
dest_short_name=destination_short_name, | ||
) | ||
|
||
return { | ||
"code": pipeline_code, | ||
"instructions": setup_instructions, | ||
"filename": f"{pipeline_id}.py", | ||
} | ||
|
||
|
||
def register_coding_tools(app: FastMCP) -> None: | ||
"""Register development tools with the FastMCP app.""" | ||
app.tool(generate_pyairbyte_pipeline) |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,210 @@ | ||||||||||||||||||||||
# Copyright (c) 2025 Airbyte, Inc., all rights reserved. | ||||||||||||||||||||||
"""Code templates for MCP local code generation.""" | ||||||||||||||||||||||
|
||||||||||||||||||||||
SCRIPT_TEMPLATE = """ | ||||||||||||||||||||||
#!/usr/bin/env python | ||||||||||||||||||||||
# -*- coding: utf-8 -*- | ||||||||||||||||||||||
|
||||||||||||||||||||||
# --- Generated by pyairbyte-mcp-server --- | ||||||||||||||||||||||
|
||||||||||||||||||||||
import os | ||||||||||||||||||||||
import sys | ||||||||||||||||||||||
import logging | ||||||||||||||||||||||
|
||||||||||||||||||||||
import airbyte as ab | ||||||||||||||||||||||
|
||||||||||||||||||||||
from dotenv import load_dotenv | ||||||||||||||||||||||
|
||||||||||||||||||||||
# Configure logging | ||||||||||||||||||||||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | ||||||||||||||||||||||
|
||||||||||||||||||||||
# Load environment variables from .env file | ||||||||||||||||||||||
if not load_dotenv(): | ||||||||||||||||||||||
logging.warning("'.env' file not found. Please ensure it exists and contains the necessary credentials.") | ||||||||||||||||||||||
# Optionally exit if .env is strictly required | ||||||||||||||||||||||
# sys.exit("'.env' file is required. Please create it with the necessary credentials.") | ||||||||||||||||||||||
Comment on lines
+22
to
+25
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix line length to satisfy linting rules? The linter is flagging line 23 as too long (109 > 100 characters). Should we break this into multiple lines? wdyt? - logging.warning("'.env' file not found. Please ensure it exists and contains the necessary credentials.")
+ logging.warning(
+ "'.env' file not found. Please ensure it exists and contains the necessary credentials."
+ ) 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.11.9)23-23: Line too long (109 > 100) (E501) 🪛 GitHub Actions: Run Linters[error] 23-23: Ruff E501: Line too long (109 > 100). 🤖 Prompt for AI Agents
|
||||||||||||||||||||||
|
||||||||||||||||||||||
# --- Helper to get env vars --- | ||||||||||||||||||||||
def get_required_env(var_name: str) -> str: | ||||||||||||||||||||||
value = os.getenv(var_name) | ||||||||||||||||||||||
if value is None: | ||||||||||||||||||||||
logging.error(f"Missing required environment variable: {{var_name}}") | ||||||||||||||||||||||
sys.exit(f"Error: Environment variable '{{var_name}}' not set. Please add it to your .env file.") | ||||||||||||||||||||||
return value | ||||||||||||||||||||||
Comment on lines
+31
to
+33
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fix line length for consistency with linting rules? Similar to the previous issue, line 32 is flagged as too long (105 > 100 characters). Should we break this line as well? wdyt? - sys.exit(f"Error: Environment variable '{var_name}' not set. Please add it to your .env file.")
+ sys.exit(
+ f"Error: Environment variable '{var_name}' not set. Please add it to your .env file."
+ ) 📝 Committable suggestion
Suggested change
🧰 Tools🪛 Ruff (0.11.9)32-32: Line too long (105 > 100) (E501) 🪛 GitHub Actions: Run Linters[error] 32-32: Ruff E501: Line too long (105 > 100). 🤖 Prompt for AI Agents
|
||||||||||||||||||||||
|
||||||||||||||||||||||
def get_optional_env(var_name: str, default_value: str = None) -> str: | ||||||||||||||||||||||
value = os.getenv(var_name) | ||||||||||||||||||||||
if value is None: | ||||||||||||||||||||||
if default_value is not None: | ||||||||||||||||||||||
logging.info(f"Using default value for optional environment variable: {{var_name}}") | ||||||||||||||||||||||
return default_value | ||||||||||||||||||||||
else: | ||||||||||||||||||||||
logging.info(f"Optional environment variable not set: {{var_name}}") | ||||||||||||||||||||||
return None | ||||||||||||||||||||||
return value | ||||||||||||||||||||||
|
||||||||||||||||||||||
# --- Source Configuration --- | ||||||||||||||||||||||
source_name = "{source_name}" | ||||||||||||||||||||||
logging.info(f"Configuring source: {{source_name}}") | ||||||||||||||||||||||
source_config = {{ | ||||||||||||||||||||||
{source_config_dict} | ||||||||||||||||||||||
}} | ||||||||||||||||||||||
Comment on lines
+47
to
+51
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Template variable mismatch detected. I notice the template uses -source_name = "{source_name}"
+source_name = "{source_connector_name}" 📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents
|
||||||||||||||||||||||
|
||||||||||||||||||||||
# Optional: Add fixed configuration parameters here if needed | ||||||||||||||||||||||
# source_config["some_other_parameter"] = "fixed_value" | ||||||||||||||||||||||
|
||||||||||||||||||||||
try: | ||||||||||||||||||||||
source = ab.get_source( | ||||||||||||||||||||||
source_name, | ||||||||||||||||||||||
config=source_config, | ||||||||||||||||||||||
install_if_missing=True, | ||||||||||||||||||||||
) | ||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||
logging.error(f"Failed to initialize source '{{source_name}}': {{e}}") | ||||||||||||||||||||||
sys.exit(1) | ||||||||||||||||||||||
|
||||||||||||||||||||||
# Verify the connection | ||||||||||||||||||||||
logging.info("Checking source connection...") | ||||||||||||||||||||||
try: | ||||||||||||||||||||||
source.check() | ||||||||||||||||||||||
logging.info("Source connection check successful.") | ||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||
logging.error(f"Source connection check failed: {{e}}") | ||||||||||||||||||||||
sys.exit(1) | ||||||||||||||||||||||
|
||||||||||||||||||||||
# Select streams to sync (use select_all_streams() or specify) | ||||||||||||||||||||||
logging.info("Selecting all streams from source.") | ||||||||||||||||||||||
source.select_all_streams() | ||||||||||||||||||||||
# Example for selecting specific streams: | ||||||||||||||||||||||
# source.select_streams(["users", "products"]) | ||||||||||||||||||||||
|
||||||||||||||||||||||
# --- Read data into Cache and then Pandas DataFrame --- | ||||||||||||||||||||||
logging.info("Reading data from source into cache...") | ||||||||||||||||||||||
# By default, reads into a temporary DuckDB cache | ||||||||||||||||||||||
# Specify a cache explicitly: cache = ab.get_cache(config=...) | ||||||||||||||||||||||
try: | ||||||||||||||||||||||
results = source.read() | ||||||||||||||||||||||
logging.info("Finished reading data.") | ||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||
logging.error(f"Failed to read data from source: {{e}}") | ||||||||||||||||||||||
sys.exit(1) | ||||||||||||||||||||||
|
||||||||||||||||||||||
# --- Process Streams into DataFrames --- | ||||||||||||||||||||||
dataframes = {{}} | ||||||||||||||||||||||
if results.streams: | ||||||||||||||||||||||
logging.info(f"Converting {{len(results.streams)}} streams to Pandas DataFrames...") | ||||||||||||||||||||||
for stream_name, stream_data in results.streams.items(): | ||||||||||||||||||||||
try: | ||||||||||||||||||||||
df = stream_data.to_pandas() | ||||||||||||||||||||||
dataframes[stream_name] = df | ||||||||||||||||||||||
logging.info( | ||||||||||||||||||||||
f"Successfully converted stream '{{stream_name}}' to DataFrame ({{len(df)}} rows)." | ||||||||||||||||||||||
) | ||||||||||||||||||||||
# --- !! IMPORTANT !! --- | ||||||||||||||||||||||
# Add your data processing/analysis logic here! | ||||||||||||||||||||||
# Example: print(f"\\nDataFrame for stream '{{stream_name}}':") | ||||||||||||||||||||||
# print(df.head()) | ||||||||||||||||||||||
# print("-" * 30) | ||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||
logging.error(f"Failed to convert stream '{{stream_name}}' to DataFrame: {{e}}") | ||||||||||||||||||||||
logging.info("Finished processing streams.") | ||||||||||||||||||||||
else: | ||||||||||||||||||||||
logging.info("No streams found in the read result.") | ||||||||||||||||||||||
|
||||||||||||||||||||||
# Example: Access a specific dataframe | ||||||||||||||||||||||
# if "users" in dataframes: | ||||||||||||||||||||||
# users_df = dataframes["users"] | ||||||||||||||||||||||
# print("\\nUsers DataFrame Head:") | ||||||||||||||||||||||
# print(users_df.head()) | ||||||||||||||||||||||
|
||||||||||||||||||||||
# --- Destination Configuration --- | ||||||||||||||||||||||
destination_name = "{destination_name}" | ||||||||||||||||||||||
logging.info(f"Configuring destination: {{destination_name}}") | ||||||||||||||||||||||
dest_config = {{ | ||||||||||||||||||||||
{destination_config_dict} | ||||||||||||||||||||||
}} | ||||||||||||||||||||||
Comment on lines
+121
to
+125
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same template variable issue for destination. Similar to the source configuration, the destination template uses -destination_name = "{destination_name}"
+destination_name = "{destination_connector_name}"
🤖 Prompt for AI Agents
|
||||||||||||||||||||||
|
||||||||||||||||||||||
# Optional: Add fixed configuration parameters here if needed | ||||||||||||||||||||||
# dest_config["some_other_parameter"] = "fixed_value" | ||||||||||||||||||||||
|
||||||||||||||||||||||
try: | ||||||||||||||||||||||
destination = ab.get_destination( | ||||||||||||||||||||||
destination_name, | ||||||||||||||||||||||
config=dest_config, | ||||||||||||||||||||||
install_if_missing=True, | ||||||||||||||||||||||
) | ||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||
logging.error(f"Failed to initialize destination '{{destination_name}}': {{e}}") | ||||||||||||||||||||||
sys.exit(1) | ||||||||||||||||||||||
|
||||||||||||||||||||||
# Verify the connection | ||||||||||||||||||||||
logging.info("Checking destination connection...") | ||||||||||||||||||||||
try: | ||||||||||||||||||||||
destination.check() | ||||||||||||||||||||||
logging.info("Destination connection check successful.") | ||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||
logging.error(f"Destination connection check failed: {{e}}") | ||||||||||||||||||||||
# Depending on the destination, a check might not be possible or fail spuriously. | ||||||||||||||||||||||
# Consider logging a warning instead of exiting for some destinations. | ||||||||||||||||||||||
# logging.warning(f"Destination connection check failed: {{e}} - Continuing cautiously.") | ||||||||||||||||||||||
sys.exit(1) # Exit for safety by default | ||||||||||||||||||||||
|
||||||||||||||||||||||
# --- Read data and Write to Destination --- | ||||||||||||||||||||||
# This reads incrementally and writes to the destination. | ||||||||||||||||||||||
# Data is processed in memory or using a temporary cache if needed by the destination connector. | ||||||||||||||||||||||
logging.info("Starting data read from source and write to destination...") | ||||||||||||||||||||||
try: | ||||||||||||||||||||||
# source.read() returns a result object even when writing directly | ||||||||||||||||||||||
# The write() method consumes this result | ||||||||||||||||||||||
read_result = source.read() # Reads into default cache first usually | ||||||||||||||||||||||
logging.info(f"Finished reading data. Starting write to {{destination_name}}...") | ||||||||||||||||||||||
destination.write(read_result) | ||||||||||||||||||||||
logging.info("Successfully wrote data to destination.") | ||||||||||||||||||||||
except Exception as e: | ||||||||||||||||||||||
logging.error(f"Failed during data read/write: {{e}}") | ||||||||||||||||||||||
sys.exit(1) | ||||||||||||||||||||||
|
||||||||||||||||||||||
# --- Main execution --- | ||||||||||||||||||||||
if __name__ == "__main__": | ||||||||||||||||||||||
logging.info("Starting PyAirbyte pipeline script.") | ||||||||||||||||||||||
# The core logic is executed when the script runs directly | ||||||||||||||||||||||
# If converting to dataframe, analysis happens within the 'if output_to_dataframe:' block above. | ||||||||||||||||||||||
# If writing to destination, the write operation is the main action. | ||||||||||||||||||||||
logging.info("PyAirbyte pipeline script finished.") | ||||||||||||||||||||||
|
||||||||||||||||||||||
""" | ||||||||||||||||||||||
|
||||||||||||||||||||||
|
||||||||||||||||||||||
DOCS_TEMPLATE = """# PyAirbyte Pipeline Setup Instructions | ||||||||||||||||||||||
|
||||||||||||||||||||||
1. Install PyAirbyte: | ||||||||||||||||||||||
```bash | ||||||||||||||||||||||
pip install airbyte | ||||||||||||||||||||||
``` | ||||||||||||||||||||||
|
||||||||||||||||||||||
2. Install the required connectors: | ||||||||||||||||||||||
```bash | ||||||||||||||||||||||
python -c "import airbyte as ab; ab.get_source('{source_connector_name}').install()" | ||||||||||||||||||||||
python -c "import airbyte as ab; ab.get_destination('{destination_connector_name}').install()" | ||||||||||||||||||||||
``` | ||||||||||||||||||||||
|
||||||||||||||||||||||
## Configuration | ||||||||||||||||||||||
1. Update the source configuration in the pipeline script with your actual connection details | ||||||||||||||||||||||
2. Update the destination configuration in the pipeline script with your actual connection details | ||||||||||||||||||||||
3. Refer to the Airbyte documentation for each connector's required configuration fields | ||||||||||||||||||||||
|
||||||||||||||||||||||
## Running the Pipeline | ||||||||||||||||||||||
```bash | ||||||||||||||||||||||
python {pipeline_id}.py | ||||||||||||||||||||||
``` | ||||||||||||||||||||||
|
||||||||||||||||||||||
- Configure your source and destination connectors with actual credentials | ||||||||||||||||||||||
- Add error handling and logging as needed | ||||||||||||||||||||||
- Consider using environment variables for sensitive configuration | ||||||||||||||||||||||
- Add stream selection if you only need specific data streams | ||||||||||||||||||||||
- Set up scheduling using your preferred orchestration tool (Airflow, Dagster, etc.) | ||||||||||||||||||||||
|
||||||||||||||||||||||
- Source connector docs: https://docs.airbyte.com/integrations/sources/{source_short_name} | ||||||||||||||||||||||
- Destination connector docs: https://docs.airbyte.com/integrations/destinations/{dest_short_name} | ||||||||||||||||||||||
- PyAirbyte docs: https://docs.airbyte.com/using-airbyte/pyairbyte/getting-started | ||||||||||||||||||||||
""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
# Copyright (c) 2024 Airbyte, Inc., all rights reserved. | ||
"""Local connector config MCP operations.""" | ||
|
||
from typing import Annotated | ||
|
||
from fastmcp import FastMCP | ||
from pydantic import Field | ||
aaronsteers marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
|
||
def register_connector_config_tools(app: FastMCP) -> None: | ||
"""Register development tools with the FastMCP app.""" | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Check template variable consistency?
I noticed the template is being formatted with
source_connector_name
anddestination_connector_name
, but looking at the template in_coding_templates.py
, it uses{source_name}
and{destination_name}
. Should we verify these variable names match between the template and the formatting call? wdyt?🏁 Script executed:
Length of output: 2307
Align pipeline template variables with
SCRIPT_TEMPLATE
placeholdersIt looks like
SCRIPT_TEMPLATE
in_coding_templates.py
uses{source_name}
and{destination_name}
(along with{source_config_dict}
and{destination_config_dict}
), but inairbyte/mcp/_coding.py
(lines 62–67) we’re calling:This mismatch will raise a
KeyError
at runtime. Should we update it to:wdyt?
🤖 Prompt for AI Agents