Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Touseef docker #733

Draft
wants to merge 7 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ jobs:
run: |
uv run pytest ./tests/test_models.py
if: ${{ success() || failure() }}

- name: Docker tests
run: |
uv run pytest ./tests/test_docker_executor.py
if: ${{ success() || failure() }}

- name: Memory tests
run: |
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ e2b = [
"e2b-code-interpreter>=1.0.3",
"python-dotenv>=1.0.1",
]
docker = [
"docker>=7.1.0",
]
gradio = [
"gradio>=5.13.2",
]
Expand Down
33 changes: 24 additions & 9 deletions src/smolagents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@

from .agent_types import AgentAudio, AgentImage, AgentType, handle_agent_output_types
from .default_tools import TOOL_MAPPING, FinalAnswerTool
from .docker_executor import DockerExecutor
from .e2b_executor import E2BExecutor
from .local_python_executor import (
BASE_BUILTIN_MODULES,
Expand Down Expand Up @@ -661,7 +662,6 @@ def replay(self, detailed: bool = False):

def __call__(self, task: str, **kwargs):
"""Adds additional prompting for the managed agent, runs it, and wraps the output.

This method is called only by a managed agent.
"""
full_task = populate_template(
Expand Down Expand Up @@ -1128,6 +1128,7 @@ class CodeAgent(MultiStepAgent):
additional_authorized_imports (`list[str]`, *optional*): Additional authorized imports for the agent.
planning_interval (`int`, *optional*): Interval at which the agent will run a planning step.
use_e2b_executor (`bool`, default `False`): Whether to use the E2B executor for remote code execution.
use_docker_executor (`bool`, default `False`): Whether to use the Docker executor for remote code execution.
max_print_outputs_length (`int`, *optional*): Maximum length of the print outputs.
**kwargs: Additional keyword arguments.

Expand All @@ -1142,6 +1143,7 @@ def __init__(
additional_authorized_imports: Optional[List[str]] = None,
planning_interval: Optional[int] = None,
use_e2b_executor: bool = False,
use_docker_executor: bool = False,
max_print_outputs_length: Optional[int] = None,
**kwargs,
):
Expand All @@ -1166,25 +1168,41 @@ def __init__(
0,
)

if use_e2b_executor and len(self.managed_agents) > 0:
raise Exception(
f"You passed both {use_e2b_executor=} and some managed agents. Managed agents is not yet supported with remote code execution."
)
# Validate executor options
if use_e2b_executor and use_docker_executor:
raise ValueError("Cannot use both E2B executor and Docker executor at the same time.")
if (use_e2b_executor or use_docker_executor) and len(self.managed_agents) > 0:
raise Exception("Managed agents are not yet supported with remote code execution.")

all_tools = {**self.tools, **self.managed_agents}

# Initialize the appropriate executor
if use_e2b_executor:
self.python_executor = E2BExecutor(
self.additional_authorized_imports,
list(all_tools.values()),
self.logger,
initial_state=self.state
)
elif use_docker_executor:
self.python_executor = DockerExecutor(
self.additional_authorized_imports,
list(all_tools.values()),
self.logger,
initial_state=self.state
)
else:
self.python_executor = LocalPythonInterpreter(
self.additional_authorized_imports,
all_tools,
initial_state=self.state,
max_print_outputs_length=max_print_outputs_length,
)

@property
def state(self):
return self.python_executor.state

def initialize_system_prompt(self) -> str:
system_prompt = populate_template(
self.prompt_templates["system_prompt"],
Expand Down Expand Up @@ -1249,10 +1267,7 @@ def step(self, memory_step: ActionStep) -> Union[None, Any]:
self.logger.log_code(title="Executing parsed code:", content=code_action, level=LogLevel.INFO)
is_final_answer = False
try:
output, execution_logs, is_final_answer = self.python_executor(
code_action,
self.state,
)
output, execution_logs, is_final_answer = self.python_executor(code_action)
execution_outputs_console = []
if len(execution_logs) > 0:
execution_outputs_console += [
Expand Down
220 changes: 220 additions & 0 deletions src/smolagents/docker_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
import base64
import pickle
import re
import tarfile
import textwrap
from io import BytesIO
from typing import Any, List, Tuple

from PIL import Image

from .tool_validation import validate_tool_attributes
from .tools import Tool
from .utils import BASE_BUILTIN_MODULES, instance_to_source
import socket

class DockerExecutor:
"""
Executes Python code within a Docker container.
"""

def __init__(self, additional_imports: List[str], tools: List[Tool], logger, initial_state, host="127.0.0.1", port=65432):
"""
Initializes the Docker executor.

Args:
additional_imports (List[str]): List of additional Python packages to install.
tools (List[Tool]): List of tools to make available in the execution environment.
logger: Logger for logging messages.
host (str): Host IP to bind the container to.
port (int): Port to bind the container to.
"""
try:
import docker
from docker.models.containers import Container
except ModuleNotFoundError:
raise ModuleNotFoundError(
"""Please install 'docker' extra to use DockerExecutor: pip install `pip install "smolagents[docker]"`"""
)
try:
self.client = docker.from_env()
self.client.ping()
except docker.errors.DockerException:
raise RuntimeError("Could not connect to Docker daemon. Please ensure Docker is installed and running.")
try:
self.container: Container = self.client.containers.run(
"python:3.12-slim",
command=["sleep", "infinity"],
detach=True,
ports={f"{port}/tcp": (host, port)},
volumes={"/tmp/smolagents": {"bind": "/app", "mode": "rw"}},
)
except docker.errors.DockerException as e:
raise RuntimeError(f"Failed to create Docker container: {e}")

self.logger = logger
self.final_answer_pattern = re.compile(r"^final_answer\((.*)\)$")
self.final_answer = False

# Install additional imports
for imp in additional_imports:
exit_code, output = self.container.exec_run(f"pip install {imp}")
if exit_code != 0:
raise Exception(f"Error installing {imp}: {output.decode()}")

# # Generate and inject tool definitions
# tool_definition_code = self._generate_tool_code(tools)
# self._inject_code_into_container("/app/tools.py", tool_definition_code)
# exit_code, output = self.container.exec_run("python /app/tools.py")
# if exit_code != 0:
# raise Exception(f"Tools setup failed: {output.decode()}")
# Start a single persistent Python interactive session
self.python_session = self.container.exec_run(
"python -i",
stdin=True,
stream=True, # To handle continuous I/O
demux=True # Get separate stdout/stderr streams
)
# Connect with socket
self.sock = socket.create_connection(("localhost", 65432))



def _generate_tool_code(self, tools: List[Tool]) -> str:
"""
Generates Python code to define and instantiate tools.

Args:
tools (List[Tool]): List of tools to generate code for.

Returns:
str: Generated Python code.
"""
tool_codes = []
for tool in tools:
validate_tool_attributes(tool.__class__, check_imports=False)
tool_code = instance_to_source(tool, base_cls=Tool)
tool_code = tool_code.replace("from smolagents.tools import Tool", "")
tool_code += f"\n{tool.name} = {tool.__class__.__name__}()\n"
tool_codes.append(tool_code)

tool_definition_code = "\n".join([f"import {module}" for module in BASE_BUILTIN_MODULES])
tool_definition_code += "\nfrom typing import Any"
tool_definition_code += textwrap.dedent("""
class Tool:
def __call__(self, *args, **kwargs):
return self.forward(*args, **kwargs)

def forward(self, *args, **kwargs):
pass # To be implemented in child class
""")
tool_definition_code += "\n\n".join(tool_codes)
return tool_definition_code

def send_variables_to_server(self, state):
"""Pickle state to server"""
state_path = "/app/state.pkl"

pickle.dump(state, state_path)
remote_unloading_code = """import pickle
import os
print("File path", os.path.getsize('/home/state.pkl'))
with open('/home/state.pkl', 'rb') as f:
pickle_dict = pickle.load(f)
locals().update({key: value for key, value in pickle_dict.items()})
"""
execution = self.run_code_raise_errors(remote_unloading_code)
execution_logs = "\n".join([str(log) for log in execution.logs.stdout])
self.logger.log(execution_logs, 1)

def __call__(self, code_action: str) -> Tuple[Any, str, bool]:
"""Check if code is a final answer and run it accordingly"""
return self.run_code(code_action, return_final_answer=self.final_answer_pattern.match(code_action))

def run_code(self, code_action: str, return_final_answer=False) -> Tuple[Any, str, bool]:
"""
Executes the provided Python code in the Docker container.

Args:
code_action (str): Python code to execute.

Returns:
Tuple[Any, str, bool]: A tuple containing the result, execution logs, and a flag indicating if this is a final answer.
"""

# Inject and execute the code
marked_code = f"""
{code_action}
print('---END---')
"""
if return_final_answer:
marked_code += """with open('/app/result.pkl', 'wb') as f:
pickle.dump(_result, f)
print('---OUTPUT_END---')
"""

print("FLAGG", dir(self.python_session.output))

self.python_session.write(marked_code.encode('utf-8'))
self.python_session.flush()

# Read output until we see our marker
output = ""
for line in self.python_session.output:
if line:
output += line.decode()
if "---END---" in output:
break
with open('/tmp/smolagents/result.pkl', 'rb') as f:
result = pickle.load(f)
print("OK reached eth end")

# Return both the object and the printed output
output = output.replace("---END---", "").strip()
return result, output

stderr=False

if stderr:
raise ValueError(f"Code execution failed:\n{output}")
else:
if return_final_answer:
while "---OUTPUT_END---" not in output:
out, err = self.python_session.output # Get both streams
if out:
output += out.decode()
if err:
stderr += err.decode()

# Load output for final answer or specific results
with open('/tmp/smolagents/result.pkl', 'rb') as f: # Note path on host side
result = pickle.load(f)
else:
result = None
return result, execution_logs, return_final_answer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@aymeric-roucher There is an undefined variable execution_logs being referenced outside of any function and a chunk of code is also outside the function why is that?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code is still not working, I'll ping you when it is!


def _parse_output(self, stdout: str) -> Any:
"""
Parses the output from the executed code.

Args:
stdout (str): Standard output from the executed code.

Returns:
Any: Parsed result (e.g., image, text, etc.).
"""
if "IMAGE_BASE64:" in stdout:
img_data = stdout.split("IMAGE_BASE64:")[1].split("\n")[0]
return Image.open(BytesIO(base64.b64decode(img_data)))
return stdout

def __del__(self):
"""
Cleans up the Docker container when the executor is no longer needed.
"""
if hasattr(self, "container"):
self.container.stop()
self.container.remove()


__all__ = ["DockerExecutor"]
34 changes: 18 additions & 16 deletions src/smolagents/e2b_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@


class E2BExecutor:
def __init__(self, additional_imports: List[str], tools: List[Tool], logger):
def __init__(self, additional_imports: List[str], tools: List[Tool], logger, initial_state):
self.logger = logger
try:
from e2b_code_interpreter import Sandbox
Expand Down Expand Up @@ -91,6 +91,8 @@ def forward(self, *args, **kwargs):
tool_definition_execution = self.run_code_raise_errors(tool_definition_code)
self.logger.log(tool_definition_execution.logs)

self.send_variables_to_server(initial_state)

def run_code_raise_errors(self, code: str):
if self.final_answer_pattern.search(code) is not None:
self.final_answer = True
Expand All @@ -107,27 +109,27 @@ def run_code_raise_errors(self, code: str):
raise ValueError(logs)
return execution

def __call__(self, code_action: str, additional_args: dict) -> Tuple[Any, Any]:
if len(additional_args) > 0:
# Pickle additional_args to server
import tempfile

with tempfile.NamedTemporaryFile() as f:
pickle.dump(additional_args, f)
f.flush()
with open(f.name, "rb") as file:
self.sbx.files.write("/home/state.pkl", file)
remote_unloading_code = """import pickle
def send_variables_to_server(self, additional_args):
"""Pickle additional_args to server"""
import tempfile

with tempfile.NamedTemporaryFile() as f:
pickle.dump(additional_args, f)
f.flush()
with open(f.name, "rb") as file:
self.sbx.files.write("/home/state.pkl", file)
remote_unloading_code = """import pickle
import os
print("File path", os.path.getsize('/home/state.pkl'))
with open('/home/state.pkl', 'rb') as f:
pickle_dict = pickle.load(f)
pickle_dict = pickle.load(f)
locals().update({key: value for key, value in pickle_dict.items()})
"""
execution = self.run_code_raise_errors(remote_unloading_code)
execution_logs = "\n".join([str(log) for log in execution.logs.stdout])
self.logger.log(execution_logs, 1)
execution = self.run_code_raise_errors(remote_unloading_code)
execution_logs = "\n".join([str(log) for log in execution.logs.stdout])
self.logger.log(execution_logs, 1)

def __call__(self, code_action: str) -> Tuple[Any, Any]:
execution = self.run_code_raise_errors(code_action)
execution_logs = "\n".join([str(log) for log in execution.logs.stdout])
if not execution.results:
Expand Down
Loading
Loading