Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Touseef docker #733

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ jobs:
run: |
uv run pytest ./tests/test_models.py
if: ${{ success() || failure() }}

- name: Docker tests
run: |
uv run pytest ./tests/test_docker_executor.py
if: ${{ success() || failure() }}

- name: Memory tests
run: |
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ e2b = [
"e2b-code-interpreter>=1.0.3",
"python-dotenv>=1.0.1",
]
docker = [
"docker>=7.1.0",
]
gradio = [
"gradio>=5.13.2",
]
Expand Down
33 changes: 21 additions & 12 deletions src/smolagents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from .agent_types import AgentAudio, AgentImage, AgentType, handle_agent_output_types
from .default_tools import TOOL_MAPPING, FinalAnswerTool
from .docker_executor import DockerExecutor
from .e2b_executor import E2BExecutor
from .local_python_executor import (
BASE_BUILTIN_MODULES,
Expand Down Expand Up @@ -661,7 +662,6 @@ def replay(self, detailed: bool = False):

def __call__(self, task: str, **kwargs):
"""Adds additional prompting for the managed agent, runs it, and wraps the output.

This method is called only by a managed agent.
"""
full_task = populate_template(
Expand Down Expand Up @@ -1128,6 +1128,7 @@ class CodeAgent(MultiStepAgent):
additional_authorized_imports (`list[str]`, *optional*): Additional authorized imports for the agent.
planning_interval (`int`, *optional*): Interval at which the agent will run a planning step.
use_e2b_executor (`bool`, default `False`): Whether to use the E2B executor for remote code execution.
use_docker_executor (`bool`, default `False`): Whether to use the Docker executor for remote code execution.
max_print_outputs_length (`int`, *optional*): Maximum length of the print outputs.
**kwargs: Additional keyword arguments.

Expand All @@ -1142,6 +1143,7 @@ def __init__(
additional_authorized_imports: Optional[List[str]] = None,
planning_interval: Optional[int] = None,
use_e2b_executor: bool = False,
use_docker_executor: bool = False,
max_print_outputs_length: Optional[int] = None,
**kwargs,
):
Expand All @@ -1166,25 +1168,35 @@ def __init__(
0,
)

if use_e2b_executor and len(self.managed_agents) > 0:
raise Exception(
f"You passed both {use_e2b_executor=} and some managed agents. Managed agents is not yet supported with remote code execution."
)
# Validate executor options
if use_e2b_executor and use_docker_executor:
raise ValueError("Cannot use both E2B executor and Docker executor at the same time.")
if (use_e2b_executor or use_docker_executor) and len(self.managed_agents) > 0:
raise Exception("Managed agents are not yet supported with remote code execution.")

all_tools = {**self.tools, **self.managed_agents}

# Initialize the appropriate executor
if use_e2b_executor:
self.python_executor = E2BExecutor(
self.additional_authorized_imports,
list(all_tools.values()),
self.logger,
self.additional_authorized_imports, list(all_tools.values()), self.logger, initial_state=self.state
)
elif use_docker_executor:
self.python_executor = DockerExecutor(
self.additional_authorized_imports, list(all_tools.values()), self.logger, initial_state=self.state
)
else:
self.python_executor = LocalPythonInterpreter(
self.additional_authorized_imports,
all_tools,
initial_state=self.state,
max_print_outputs_length=max_print_outputs_length,
)

@property
def state(self):
return self.python_executor.state

def initialize_system_prompt(self) -> str:
system_prompt = populate_template(
self.prompt_templates["system_prompt"],
Expand Down Expand Up @@ -1249,10 +1261,7 @@ def step(self, memory_step: ActionStep) -> Union[None, Any]:
self.logger.log_code(title="Executing parsed code:", content=code_action, level=LogLevel.INFO)
is_final_answer = False
try:
output, execution_logs, is_final_answer = self.python_executor(
code_action,
self.state,
)
output, execution_logs, is_final_answer = self.python_executor(code_action)
execution_outputs_console = []
if len(execution_logs) > 0:
execution_outputs_console += [
Expand Down
243 changes: 243 additions & 0 deletions src/smolagents/docker_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,243 @@
import base64
import json
import pickle
import re
import time
from pathlib import Path
from typing import Any, List, Tuple

import docker
import requests


class DockerExecutor:
"""
Executes Python code using Jupyter Kernel Gateway in a Docker container.
"""

def __init__(
self,
additional_imports: List[str],
tools,
logger,
initial_state: dict = None,
host: str = "127.0.0.1",
port: int = 8888,
):
"""
Initialize the Docker-based Jupyter Kernel Gateway executor.
"""
self.logger = logger
self.host = host
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't support Unix sockets?

self.port = port
self.final_answer_pattern = re.compile(r"^final_answer\((.*)\)$")

# Initialize Docker
try:
self.client = docker.from_env()
except docker.errors.DockerException as e:
raise RuntimeError("Could not connect to Docker daemon") from e

# Build and start container
try:
# Build the Docker image
self.logger.info("Building Docker image...")
dockerfile_path = Path(__file__).parent / "Dockerfile"
if not dockerfile_path.exists():
with open(dockerfile_path, "w") as f:
f.write("""FROM python:3.12-slim

RUN pip install jupyter_kernel_gateway requests numpy pandas
RUN pip install jupyter_client notebook

EXPOSE 8888
CMD ["jupyter", "kernelgateway", "--KernelGatewayApp.ip='0.0.0.0'", "--KernelGatewayApp.port=8888", "--KernelGatewayApp.allow_origin='*'"]
""")
image, build_logs = self.client.images.build(
path=str(dockerfile_path.parent), dockerfile=str(dockerfile_path), tag="jupyter-kernel"
)
# Run the container

self.logger.info(f"Starting container on {host}:{port}...")
self.container = self.client.containers.run(
"jupyter-kernel", ports={"8888/tcp": (host, port)}, detach=True
)
# Wait for kernel gateway to start

self.logger.info("Waiting for kernel gateway to start...")
time.sleep(2)
# Initialize kernel session

self.base_url = f"http://{host}:{port}"
# Create new kernel via HTTP

r = requests.post(f"{self.base_url}/api/kernels")
if r.status_code != 201:
error_details = {
"status_code": r.status_code,
"headers": dict(r.headers),
"url": r.url,
"body": r.text,
"request_method": r.request.method,
"request_headers": dict(r.request.headers),
"request_body": r.request.body,
}
self.logger.error(f"Failed to create kernel. Details: {json.dumps(error_details, indent=2)}")
raise RuntimeError(f"Failed to create kernel: Status {r.status_code}\nResponse: {r.text}") from None

self.kernel_id = r.json()["id"]
# Initialize WebSocket connection

from websocket import create_connection

ws_url = f"ws://{host}:{port}/api/kernels/{self.kernel_id}/channels"
self.ws = create_connection(ws_url)
# Install additional packages

for package in additional_imports:
self.execute_code(f"!pip install {package}")

# Initialize state if provided
if initial_state:
self.send_variables_to_kernel(initial_state)

self.logger.info(f"Container {self.container.short_id} is running with kernel {self.kernel_id}")

except Exception as e:
self.cleanup()
# Re-raise with the original traceback preserved
raise RuntimeError(f"Failed to initialize Jupyter kernel: {e}") from e

def execute_code(self, code: str) -> str:
"""Execute code and return output"""
result, output, _ = self.run_code(code)
return output

def __call__(self, code_action: str) -> Tuple[Any, str, bool]:
"""Check if code is a final answer and run it accordingly"""
return self.run_code(code_action, return_final_answer=bool(self.final_answer_pattern.match(code_action)))

def run_code(self, code_action: str, return_final_answer: bool = False) -> Tuple[Any, str, bool]:
"""
Execute code and return result based on whether it's a final answer.
"""
try:
if return_final_answer:
match = self.final_answer_pattern.match(code_action)
if match:
result_expr = match.group(1)
wrapped_code = f"""
import pickle, base64
_result = {result_expr}
print("RESULT_PICKLE:" + base64.b64encode(pickle.dumps(_result)).decode())
"""
else:
wrapped_code = code_action

# Send execute request
msg_id = self._send_execute_request(wrapped_code)

# Collect output and results
outputs = []
result = None
waiting_for_idle = False

while True:
msg = json.loads(self.ws.recv())
msg_type = msg.get("msg_type", "")
parent_msg_id = msg.get("parent_header", {}).get("msg_id")

# Only process messages related to our execute request
if parent_msg_id != msg_id:
continue

if msg_type == "stream":
text = msg["content"]["text"]
if return_final_answer and text.startswith("RESULT_PICKLE:"):
pickle_data = text[len("RESULT_PICKLE:") :].strip()
result = pickle.loads(base64.b64decode(pickle_data))
waiting_for_idle = True
else:
outputs.append(text)
elif msg_type == "error":
traceback = msg["content"].get("traceback", [])
raise RuntimeError("\n".join(traceback)) from None
elif msg_type == "status" and msg["content"]["execution_state"] == "idle":
if not return_final_answer or waiting_for_idle:
break

return result, "".join(outputs), return_final_answer

except Exception as e:
self.logger.error(f"Code execution failed: {e}")
raise

def send_variables_to_kernel(self, variables: dict):
"""
Send variables to the kernel namespace using pickle.
"""
pickled_vars = base64.b64encode(pickle.dumps(variables)).decode()
code = f"""
import pickle, base64
vars_dict = pickle.loads(base64.b64decode('{pickled_vars}'))
globals().update(vars_dict)
"""
self.run_code(code)

def get_variable_from_kernel(self, var_name: str) -> Any:
"""
Retrieve a variable from the kernel namespace.
"""
code = f"""
import pickle, base64
print("RESULT_PICKLE:" + base64.b64encode(pickle.dumps({var_name})).decode())
"""
result, _, _ = self.run_code(code, return_final_answer=True)
return result

def _send_execute_request(self, code: str) -> str:
"""Send code execution request to kernel."""
import uuid

# Generate a unique message ID
msg_id = str(uuid.uuid4())

# Create execute request
execute_request = {
"header": {
"msg_id": msg_id,
"username": "anonymous",
"session": str(uuid.uuid4()),
"msg_type": "execute_request",
"version": "5.0",
},
"parent_header": {},
"metadata": {},
"content": {
"code": code,
"silent": False,
"store_history": True,
"user_expressions": {},
"allow_stdin": False,
},
}

self.ws.send(json.dumps(execute_request))
return msg_id

def cleanup(self):
"""Clean up resources."""
try:
if hasattr(self, "kernel_id"):
self.session.delete(f"{self.base_url}/api/kernels/{self.kernel_id}")
if hasattr(self, "container"):
self.logger.info(f"Stopping and removing container {self.container.short_id}...")
self.container.stop()
self.container.remove()
self.logger.info("Container cleanup completed")
except Exception as e:
self.logger.error(f"Error during cleanup: {e}")

def __del__(self):
"""Ensure cleanup on deletion."""
self.cleanup()
Loading
Loading