diff --git a/README.md b/README.md index f41bef31..88c59973 100644 --- a/README.md +++ b/README.md @@ -41,15 +41,15 @@ export OPENAI_API_KEY="your-api-key" ``` ### Vision Agent -There are two agents that you can use. Vision Agent is a conversational agent that has +There are two agents that you can use. `VisionAgent` is a conversational agent that has access to tools that allow it to write an navigate python code and file systems. It can -converse with the user in natural language. VisionAgentCoder is an agent that can write -code for vision tasks, such as counting people in an image. However, it cannot converse -and can only respond with code. VisionAgent can call VisionAgentCoder to write vision -code. +converse with the user in natural language. `VisionAgentCoder` is an agent specifically +for writing code for vision tasks, such as counting people in an image. However, it +cannot chat with you and can only respond with code. `VisionAgent` can call +`VisionAgentCoder` to write vision code. #### Basic Usage -To run the streamlit app locally to chat with Vision Agent, you can run the following +To run the streamlit app locally to chat with `VisionAgent`, you can run the following command: ```bash @@ -146,7 +146,7 @@ the code and having it update. You just need to add the code as a response from assistant: ```python -agent = va.agent.VisionAgent(verbosity=2) +agent = va.agent.VisionAgentCoder(verbosity=2) conv = [ { "role": "user", @@ -212,6 +212,10 @@ function. Make sure the documentation is in the same format above with descripti `Parameters:`, `Returns:`, and `Example\n-------`. You can find an example use case [here](examples/custom_tools/) as this is what the agent uses to pick and use the tool. +Can't find the tool you need and want add it to `VisionAgent`? Check out our +[vision-agent-tools](https://github.com/landing-ai/vision-agent-tools) repository where +we add the source code for all the tools used in `VisionAgent`. + ## Additional Backends ### Ollama We also provide a `VisionAgentCoder` that uses Ollama. To get started you must download diff --git a/docs/index.md b/docs/index.md index 8569c5cc..0f5022f9 100644 --- a/docs/index.md +++ b/docs/index.md @@ -38,15 +38,15 @@ export OPENAI_API_KEY="your-api-key" ``` ### Vision Agent -There are two agents that you can use. Vision Agent is a conversational agent that has +There are two agents that you can use. `VisionAgent` is a conversational agent that has access to tools that allow it to write an navigate python code and file systems. It can -converse with the user in natural language. VisionAgentCoder is an agent that can write -code for vision tasks, such as counting people in an image. However, it cannot converse -and can only respond with code. VisionAgent can call VisionAgentCoder to write vision -code. +converse with the user in natural language. `VisionAgentCoder` is an agent specifically +for writing code for vision tasks, such as counting people in an image. However, it +cannot chat with you and can only respond with code. `VisionAgent` can call +`VisionAgentCoder` to write vision code. #### Basic Usage -To run the streamlit app locally to chat with Vision Agent, you can run the following +To run the streamlit app locally to chat with `VisionAgent`, you can run the following command: ```bash @@ -143,7 +143,7 @@ the code and having it update. You just need to add the code as a response from assistant: ```python -agent = va.agent.VisionAgent(verbosity=2) +agent = va.agent.VisionAgentCoder(verbosity=2) conv = [ { "role": "user", @@ -209,6 +209,10 @@ function. Make sure the documentation is in the same format above with descripti `Parameters:`, `Returns:`, and `Example\n-------`. You can find an example use case [here](examples/custom_tools/) as this is what the agent uses to pick and use the tool. +Can't find the tool you need and want add it to `VisionAgent`? Check out our +[vision-agent-tools](https://github.com/landing-ai/vision-agent-tools) repository where +we add the source code for all the tools used in `VisionAgent`. + ## Additional Backends ### Ollama We also provide a `VisionAgentCoder` that uses Ollama. To get started you must download @@ -230,6 +234,7 @@ tools. You can use it just like you would use `VisionAgentCoder`: >>> agent = va.agent.OllamaVisionAgentCoder() >>> agent("Count the apples in the image", media="apples.jpg") ``` +> WARNING: VisionAgent doesn't work well unless the underlying LMM is sufficiently powerful. Do not expect good results or even working code with smaller models like Llama 3.1 8B. ### Azure OpenAI We also provide a `AzureVisionAgentCoder` that uses Azure OpenAI models. To get started @@ -241,7 +246,7 @@ follow the Azure Setup section below. You can use it just like you would use= >>> agent = va.agent.AzureVisionAgentCoder() >>> agent("Count the apples in the image", media="apples.jpg") ``` -> WARNING: VisionAgent doesn't work well unless the underlying LMM is sufficiently powerful. Do not expect good results or even working code with smaller models like Llama 3.1 8B. + ### Azure Setup If you want to use Azure OpenAI models, you need to have two OpenAI model deployments: diff --git a/examples/chat/app.py b/examples/chat/app.py index f1cd62e7..9291f65a 100644 --- a/examples/chat/app.py +++ b/examples/chat/app.py @@ -26,7 +26,14 @@ "response": "saved", "style": {"bottom": "calc(50% - 4.25rem", "right": "0.4rem"}, } -agent = va.agent.VisionAgent(verbosity=1) +# set artifacts remote_path to WORKSPACE +artifacts = va.tools.Artifacts(WORKSPACE / "artifacts.pkl") +if Path("artifacts.pkl").exists(): + artifacts.load("artifacts.pkl") +else: + artifacts.save("artifacts.pkl") + +agent = va.agent.VisionAgent(verbosity=1, local_artifacts_path="artifacts.pkl") st.set_page_config(layout="wide") @@ -44,7 +51,9 @@ def update_messages(messages, lock): - new_chat = agent.chat_with_code(messages) + if Path("artifacts.pkl").exists(): + artifacts.load("artifacts.pkl") + new_chat, _ = agent.chat_with_code(messages, artifacts=artifacts) with lock: for new_message in new_chat: if new_message not in messages: @@ -122,6 +131,9 @@ def main(): with open(WORKSPACE / uploaded_file.name, "wb") as f: f.write(uploaded_file.getbuffer()) + # make it None so it wont load and overwrite the image + artifacts.artifacts[uploaded_file.name] = None + for file in WORKSPACE.iterdir(): if "__pycache__" not in str(file) and not str(file).startswith("."): if st.button(file.name): diff --git a/vision_agent/agent/agent.py b/vision_agent/agent/agent.py index 6b11f297..ca2cf181 100644 --- a/vision_agent/agent/agent.py +++ b/vision_agent/agent/agent.py @@ -11,7 +11,7 @@ def __call__( self, input: Union[str, List[Message]], media: Optional[Union[str, Path]] = None, - ) -> str: + ) -> Union[str, List[Message]]: pass @abstractmethod diff --git a/vision_agent/agent/vision_agent.py b/vision_agent/agent/vision_agent.py index cfb482e1..2bb04343 100644 --- a/vision_agent/agent/vision_agent.py +++ b/vision_agent/agent/vision_agent.py @@ -1,8 +1,9 @@ import copy import logging import os +import tempfile from pathlib import Path -from typing import Any, Dict, List, Optional, Union, cast +from typing import Any, Dict, List, Optional, Tuple, Union, cast from vision_agent.agent import Agent from vision_agent.agent.agent_utils import extract_json @@ -13,8 +14,9 @@ ) from vision_agent.lmm import LMM, Message, OpenAILMM from vision_agent.tools import META_TOOL_DOCSTRING +from vision_agent.tools.meta_tools import Artifacts from vision_agent.utils import CodeInterpreterFactory -from vision_agent.utils.execute import CodeInterpreter +from vision_agent.utils.execute import CodeInterpreter, Execution logging.basicConfig(level=logging.INFO) _LOGGER = logging.getLogger(__name__) @@ -24,23 +26,30 @@ os.environ["PYTHONPATH"] = f"{WORKSPACE}:{os.getenv('PYTHONPATH', '')}" -class DefaultImports: - code = [ +class BoilerplateCode: + pre_code = [ "from typing import *", "from vision_agent.utils.execute import CodeInterpreter", - "from vision_agent.tools.meta_tools import generate_vision_code, edit_vision_code, open_file, create_file, scroll_up, scroll_down, edit_file, get_tool_descriptions", + "from vision_agent.tools.meta_tools import Artifacts, open_code_artifact, create_code_artifact, edit_code_artifact, get_tool_descriptions, generate_vision_code, edit_vision_code, write_media_artifact", + "artifacts = Artifacts('{remote_path}')", + "artifacts.load('{remote_path}')", + ] + post_code = [ + "artifacts.save()", ] @staticmethod - def to_code_string() -> str: - return "\n".join(DefaultImports.code) - - @staticmethod - def prepend_imports(code: str) -> str: + def add_boilerplate(code: str, **format: Any) -> str: """Run this method to prepend the default imports to the code. NOTE: be sure to run this method after the custom tools have been registered. """ - return DefaultImports.to_code_string() + "\n\n" + code + return ( + "\n".join([s.format(**format) for s in BoilerplateCode.pre_code]) + + "\n\n" + + code + + "\n\n" + + "\n".join([s.format(**format) for s in BoilerplateCode.post_code]) + ) def run_conversation(orch: LMM, chat: List[Message]) -> Dict[str, Any]: @@ -60,35 +69,17 @@ def run_conversation(orch: LMM, chat: List[Message]) -> Dict[str, Any]: prompt = VA_CODE.format( documentation=META_TOOL_DOCSTRING, examples=f"{EXAMPLES_CODE1}\n{EXAMPLES_CODE2}", - dir=WORKSPACE, conversation=conversation, ) return extract_json(orch([{"role": "user", "content": prompt}], stream=False)) # type: ignore -def run_code_action(code: str, code_interpreter: CodeInterpreter) -> str: - # Note the code interpreter needs to keep running in the same environment because - # the SWE tools hold state like line numbers and currently open files. - result = code_interpreter.exec_cell(DefaultImports.prepend_imports(code)) - - return_str = "" - if result.success: - for res in result.results: - if res.text is not None: - return_str += res.text.replace("\\n", "\n") - if result.logs.stdout: - return_str += "----- stdout -----\n" - for log in result.logs.stdout: - return_str += log.replace("\\n", "\n") - else: - # for log in result.logs.stderr: - # return_str += log.replace("\\n", "\n") - if result.error: - return_str += ( - "\n" + result.error.value + "\n".join(result.error.traceback_raw) - ) - - return return_str +def run_code_action( + code: str, code_interpreter: CodeInterpreter, artifact_remote_path: str +) -> Execution: + return code_interpreter.exec_isolation( + BoilerplateCode.add_boilerplate(code, remote_path=artifact_remote_path) + ) def parse_execution(response: str) -> Optional[str]: @@ -101,8 +92,8 @@ def parse_execution(response: str) -> Optional[str]: class VisionAgent(Agent): """Vision Agent is an agent that can chat with the user and call tools or other - agents to generate code for it. Vision Agent uses python code to execute actions for - the user. Vision Agent is inspired by by OpenDev + agents to generate code for it. Vision Agent uses python code to execute actions + for the user. Vision Agent is inspired by by OpenDev https://github.com/OpenDevin/OpenDevin and CodeAct https://arxiv.org/abs/2402.01030 Example @@ -118,8 +109,20 @@ def __init__( self, agent: Optional[LMM] = None, verbosity: int = 0, + local_artifacts_path: Optional[Union[str, Path]] = None, code_sandbox_runtime: Optional[str] = None, ) -> None: + """Initialize the VisionAgent. + + Parameters: + agent (Optional[LMM]): The agent to use for conversation and orchestration + of other agents. + verbosity (int): The verbosity level of the agent. + local_artifacts_path (Optional[Union[str, Path]]): The path to the local + artifacts file. + code_sandbox_runtime (Optional[str]): The code sandbox runtime to use. + """ + self.agent = ( OpenAILMM(temperature=0.0, json_mode=True) if agent is None else agent ) @@ -128,12 +131,21 @@ def __init__( self.code_sandbox_runtime = code_sandbox_runtime if self.verbosity >= 1: _LOGGER.setLevel(logging.INFO) + self.local_artifacts_path = cast( + str, + ( + Path(local_artifacts_path) + if local_artifacts_path is not None + else Path(tempfile.NamedTemporaryFile(delete=False).name) + ), + ) def __call__( self, input: Union[str, List[Message]], media: Optional[Union[str, Path]] = None, - ) -> str: + artifacts: Optional[Artifacts] = None, + ) -> List[Message]: """Chat with VisionAgent and get the conversation response. Parameters: @@ -141,6 +153,7 @@ def __call__( [{"role": "user", "content": "describe your task here..."}, ...] or a string of just the contents. media (Optional[Union[str, Path]]): The media file to be used in the task. + artifacts (Optional[Artifacts]): The artifacts to use in the task. Returns: str: The conversation response. @@ -149,22 +162,23 @@ def __call__( input = [{"role": "user", "content": input}] if media is not None: input[0]["media"] = [media] - results = self.chat_with_code(input) - return results # type: ignore + results, _ = self.chat_with_code(input, artifacts) + return results def chat_with_code( self, chat: List[Message], - ) -> List[Message]: + artifacts: Optional[Artifacts] = None, + ) -> Tuple[List[Message], Artifacts]: """Chat with VisionAgent, it will use code to execute actions to accomplish its tasks. Parameters: - chat (List[Message]): A conversation - in the format of: + chat (List[Message]): A conversation in the format of: [{"role": "user", "content": "describe your task here..."}] or if it contains media files, it should be in the format of: [{"role": "user", "content": "describe your task here...", "media": ["image1.jpg", "image2.jpg"]}] + artifacts (Optional[Artifacts]): The artifacts to use in the task. Returns: List[Message]: The conversation response. @@ -173,6 +187,10 @@ def chat_with_code( if not chat: raise ValueError("chat cannot be empty") + if not artifacts: + # this is setting remote artifacts path + artifacts = Artifacts(WORKSPACE / "artifacts.pkl") + with CodeInterpreterFactory.new_instance( code_sandbox_runtime=self.code_sandbox_runtime ) as code_interpreter: @@ -182,9 +200,14 @@ def chat_with_code( for chat_i in int_chat: if "media" in chat_i: for media in chat_i["media"]: - media = code_interpreter.upload_file(media) - chat_i["content"] += f" Media name {media}" # type: ignore - media_list.append(media) + media = cast(str, media) + artifacts.artifacts[Path(media).name] = open(media, "rb").read() + + media_remote_path = ( + Path(code_interpreter.remote_path) / Path(media).name + ) + chat_i["content"] += f" Media name {media_remote_path}" # type: ignore + media_list.append(media_remote_path) int_chat = cast( List[Message], @@ -204,6 +227,22 @@ def chat_with_code( finished = False iterations = 0 + last_response = None + + # Save the current state of artifacts, will include any images the user + # passed in. + artifacts.save(self.local_artifacts_path) + + # Upload artifacts to remote location and show where they are going + # to be loaded to. The actual loading happens in BoilerplateCode as + # part of the pre_code. + remote_artifacts_path = code_interpreter.upload_file( + self.local_artifacts_path + ) + artifacts_loaded = artifacts.show() + int_chat.append({"role": "observation", "content": artifacts_loaded}) + orig_chat.append({"role": "observation", "content": artifacts_loaded}) + while not finished and iterations < self.max_iterations: response = run_conversation(self.agent, int_chat) if self.verbosity >= 1: @@ -211,20 +250,39 @@ def chat_with_code( int_chat.append({"role": "assistant", "content": str(response)}) orig_chat.append({"role": "assistant", "content": str(response)}) + # sometimes it gets stuck in a loop, so we force it to exit + if last_response == response: + response["let_user_respond"] = True + if response["let_user_respond"]: break code_action = parse_execution(response["response"]) if code_action is not None: - obs = run_code_action(code_action, code_interpreter) + result = run_code_action( + code_action, code_interpreter, str(remote_artifacts_path) + ) + obs = str(result.logs) + if self.verbosity >= 1: _LOGGER.info(obs) + # don't add execution results to internal chat int_chat.append({"role": "observation", "content": obs}) - orig_chat.append({"role": "observation", "content": obs}) + orig_chat.append( + {"role": "observation", "content": obs, "execution": result} + ) iterations += 1 - return orig_chat + last_response = response + + # after running the agent, download the artifacts locally + code_interpreter.download_file( + str(remote_artifacts_path.name), str(self.local_artifacts_path) + ) + artifacts.load(self.local_artifacts_path) + artifacts.save() + return orig_chat, artifacts def log_progress(self, data: Dict[str, Any]) -> None: pass diff --git a/vision_agent/agent/vision_agent_coder.py b/vision_agent/agent/vision_agent_coder.py index 7856bdb8..c8488902 100644 --- a/vision_agent/agent/vision_agent_coder.py +++ b/vision_agent/agent/vision_agent_coder.py @@ -722,10 +722,10 @@ def chat_with_workflow( media if type(media) is str and media.startswith(("http", "https")) - else code_interpreter.upload_file(media) + else code_interpreter.upload_file(cast(str, media)) ) chat_i["content"] += f" Media name {media}" # type: ignore - media_list.append(media) + media_list.append(str(media)) int_chat = cast( List[Message], diff --git a/vision_agent/agent/vision_agent_prompts.py b/vision_agent/agent/vision_agent_prompts.py index 4774d84d..85e34cd5 100644 --- a/vision_agent/agent/vision_agent_prompts.py +++ b/vision_agent/agent/vision_agent_prompts.py @@ -1,7 +1,7 @@ VA_CODE = """ **Role**: You are a helpful conversational agent that assists users with their requests by writing code to solve it. -**Taks**: As a conversational agent, you are required to understand the user's request and provide a helpful response. Use a Chain-of-Thought approach to break down the problem, create a plan, and then provide a response. Ensure that your response is clear, concise, and helpful. You can use an interactive Python (Jupyter Notebook) environment, executing code with . +**Taks**: As a conversational agent, you are required to understand the user's request and provide a helpful response. Use a Chain-of-Thought approach to break down the problem, create a plan, and then provide a response. Ensure that your response is clear, concise, and helpful. You can use an interactive Python (Jupyter Notebook) environment, executing code with . You are given access to an `artifacts` object which contains files shared between you and the user. `artifacts` will be automatically saved everytime you execute python code. print("Hello World!") @@ -15,7 +15,6 @@ **Examples**: Here is an example of how you can interact with a user and Actions to complete a task: --- START EXAMPLES --- -[Current directory: /example/workspace] {examples} --- END EXAMPLES --- @@ -26,24 +25,28 @@ **Conversation**: Here is the current conversation so far: --- START CONVERSATION --- -[Current directory: {dir}] - {conversation} """ + EXAMPLES_CODE1 = """ USER: Can you detect the dogs in this image? Media name dog.jpg -AGENT: {"thoughts": "I will use the generate_vision_code to detect the dogs in the image.", "response": "generate_vision_code('/example/workspace/dog_detector.py', 'Can you write code to detect dogs in this image?', media=['/example/workspace/dog.jpg'])", "let_user_respond": false} +OBSERVATION: +[Artifacts loaded] +Artifact dog.jpg loaded to /path/to/images/dog.jpg +[End of artifacts] + +AGENT: {"thoughts": "I will use the generate_vision_code to detect the dogs in the image.", "response": "generate_vision_code(artifacts, 'dog_detector.py', 'Can you write code to detect dogs in this image?', media=['/path/to/images/dog.jpg'])", "let_user_respond": false} OBSERVATION: -[File /example/workspace/dog_detector.py] +[Artifact dog_detector.py] 0|from vision_agent.tools import load_image, owl_v2 1|def detect_dogs(image_path: str): 2| image = load_image(image_path) 3| dogs = owl_v2("dog", image) 4| return dogs -[End of file] +[End of artifact] AGENT: {"thoughts": "I have generated the code to detect the dogs in the image, I must now run the code to get the output.", "response": "from dog_detector import detect_dogs\n print(detect_dogs('/example/workspace/dog.jpg'))", "let_user_respond": false} @@ -56,18 +59,23 @@ USER: The the image only has one dog, can you fix this? -AGENT: {"thoughts": "I will use edit_vision_code to fix the code to detect only one dog. The lower score is 0.23 so I will set the threshold above that.", "response": "edit_vision_code('/example/workspace/dog_detector.py', ['Can you write code to detect dogs in this image?', 'Can you increase the threshold to 0.24?'], media=['/example/workspace/dog.jpg'])", "let_user_respond": false} +[Artifacts loaded] +Artifact dog.jpg loaded to /path/to/images/dog.jpg +Artifact dog_detector.py loaded to /path/to/code/dog_detector.py +[End of artifacts] + +AGENT: {"thoughts": "I will use edit_vision_code to fix the code to detect only one dog. The lower score is 0.23 so I will set the threshold above that.", "response": "edit_vision_code(artifacts, 'dog_detector.py', ['Can you write code to detect dogs in this image?', 'Can you increase the threshold to 0.24?'], media=['/path/to/images/dog.jpg'])", "let_user_respond": false} OBSERVATION: -[File /example/workspace/dog_detector.py] +[Artifact dog_detector.py] 0|from vision_agent.tools import load_image, owl_v2 1|def detect_dogs(image_path: str): 2| image = load_image(image_path) 3| dogs = owl_v2("dog", image, threshold=0.24) 4| return dogs -[End of file] +[End of artifact] -AGENT: {"thoughts": "I have edited the code to detect only one dog, I must now run the code to get the output.", "response": "from dog_detector import detect_dogs\n print(detect_dogs('/example/workspace/dog.jpg'))", "let_user_respond": false} +AGENT: {"thoughts": "I have edited the code to detect only one dog, I must now run the code to get the output.", "response": "from dog_detector import detect_dogs\n print(detect_dogs('/path/to/images/dog.jpg'))", "let_user_respond": false} OBSERVATION: ----- stdout ----- @@ -76,23 +84,34 @@ AGENT: {"thoughts": "One dog is detected, I will show this to the user and ask them if the result looks good.", "response": "I have written the code to detect one dog and shown you the output, do the results look good to you?", "let_user_respond": true} """ + EXAMPLES_CODE2 = """ -USER: Can you create a function to count workers with helmets? +USER: Can you create a function to count workers with helmets, return the count and save a visaulization of the bounding boxes? + +OBSERVATION: +[Artifacts loaded] +[End of artifacts] AGENT: {"thoughts": "The user has asked to count workers with helmets but has not provided an image. I will ask the user for an image and then generate the code to count workers with helmets.", "response": "Can you provide an image of workers with helmets?", "let_user_respond": true} USER: Yes you can use workers.png -AGENT: {"thoughts": "I will use the generate_vision_code to count the workers with helmets in the image.", "response": "generate_vision_code('/example/workspace/code.py', 'Can you write code to count workers with helmets in this image?', media=['/example/workspace/workers.png'])", "let_user_respond": false} +OBSERVATION: +[Artifacts loaded] +Artifact workers.png loaded to /path/to/images/workers.png +[End of artifacts] + +AGENT: {"thoughts": "I will use the generate_vision_code to count the workers with helmets in the image.", "response": "generate_vision_code(artifacts, 'code.py', 'Can you create a function to count workers with helmets, return the count and save a visaulization of the bounding boxes?', media=['/paths/to/images/workers.png'])", "let_user_respond": false} OBSERVATION: -[File /example/workspace/code.py] -0|from vision_agent.tools import load_image, owl_v2, closest_box_distance -1|def count_workers_with_helmets(image_path: str): +[Artifact code.py] +0|from vision_agent.tools import load_image, owl_v2, closest_box_distance, overlay_bounding_boxes, save_image +1|def count_workers_with_helmets(image_path: str, output_path: str): 2| image = load_image(image_path) -3| workers = owl_v2("worker", image) -4| helmets = owl_v2("helmet", image) -5| count = 0 +3| detections = owl_v2("worker, helmet", image) +4| workers = [d for d in detections if d['label'] == 'worker'] +5| helmets = [d for d in detections if d['label'] == 'helmet'] +6| count = 0 6| for worker in workers: 7| person_box = worker['bbox'] 8| person_has_helmet = False @@ -102,14 +121,16 @@ 12| break 13| if person_has_helmet: 14| count += 1 +15| overlay_bounding_boxes(image, detections) +16| save_image(output_path, image) 15| return count -[End of file] +[End of artifact] -AGENT: {"thoughts": "I have generated the code to count the workers with helmets in the image, I must now run the code to get the output.", "response": "from code import count_workers_with_helmets\n print(count_workers_with_helmets('/example/workspace/workers.png'))", "let_user_respond": false} +AGENT: {"thoughts": "I have generated the code to count the workers with helmets in the image, I must now run the code to get the output and write the visualization to the artifacts so the user can see it.", "response": "from code import count_workers_with_helmets\n print(count_workers_with_helmets('/path/to/images/workers.png', 'workers_viz.png'))\n write_media_artifact(artifacts, 'workers_viz.png')", "let_user_respond": false} OBSERVATION: ----- stdout ----- 2 -AGENT: {"thoughts": "Two workers with helmets are detected, I will show this to the user and ask them if the result looks good.", "response": "I have written the code to count the workers wearing helmets in code.py", "let_user_respond": true} +AGENT: {"thoughts": "Two workers with helmets are detected, I will show this to the user and ask them if the result looks good.", "response": "I have written the code to count the workers wearing helmets in code.py and saved the visualization under 'workers_viz.png'.", "let_user_respond": true} """ diff --git a/vision_agent/clients/landing_public_api.py b/vision_agent/clients/landing_public_api.py index eec218ad..2319bf89 100644 --- a/vision_agent/clients/landing_public_api.py +++ b/vision_agent/clients/landing_public_api.py @@ -5,9 +5,9 @@ from requests.exceptions import HTTPError from vision_agent.clients.http import BaseHTTP -from vision_agent.utils.type_defs import LandingaiAPIKey +from vision_agent.tools.tools_types import BboxInputBase64, JobStatus, PromptTask from vision_agent.utils.exceptions import FineTuneModelNotFound -from vision_agent.tools.tools_types import BboxInputBase64, PromptTask, JobStatus +from vision_agent.utils.type_defs import LandingaiAPIKey class LandingPublicAPI(BaseHTTP): diff --git a/vision_agent/lmm/lmm.py b/vision_agent/lmm/lmm.py index 15df5ac9..76481f3f 100644 --- a/vision_agent/lmm/lmm.py +++ b/vision_agent/lmm/lmm.py @@ -138,7 +138,7 @@ def chat( fixed_c["content"] = [{"type": "text", "text": c["content"]}] # type: ignore if "media" in c: for media in c["media"]: - encoded_media = encode_media(media) + encoded_media = encode_media(cast(str, media)) fixed_c["content"].append( # type: ignore { @@ -389,7 +389,9 @@ def chat( fixed_chat = [] for message in chat: if "media" in message: - message["images"] = [encode_media(m) for m in message["media"]] + message["images"] = [ + encode_media(cast(str, m)) for m in message["media"] + ] del message["media"] fixed_chat.append(message) url = f"{self.url}/chat" diff --git a/vision_agent/lmm/types.py b/vision_agent/lmm/types.py index ded6a42b..b9c99fe2 100644 --- a/vision_agent/lmm/types.py +++ b/vision_agent/lmm/types.py @@ -1,5 +1,7 @@ from pathlib import Path from typing import Dict, Sequence, Union +from vision_agent.utils.execute import Execution + TextOrImage = Union[str, Sequence[Union[str, Path]]] -Message = Dict[str, TextOrImage] +Message = Dict[str, Union[TextOrImage, Execution]] diff --git a/vision_agent/tools/__init__.py b/vision_agent/tools/__init__.py index 3372fcbb..e82d7553 100644 --- a/vision_agent/tools/__init__.py +++ b/vision_agent/tools/__init__.py @@ -1,6 +1,6 @@ from typing import Callable, List, Optional -from .meta_tools import META_TOOL_DOCSTRING +from .meta_tools import META_TOOL_DOCSTRING, Artifacts from .prompts import CHOOSE_PARAMS, SYSTEM_PROMPT from .tool_utils import get_tool_descriptions_by_names from .tools import ( @@ -21,8 +21,8 @@ dpt_hybrid_midas, extract_frames, florence2_image_caption, - florence2_phrase_grounding, florence2_ocr, + florence2_phrase_grounding, florence2_roberta_vqa, florence2_sam2_image, florence2_sam2_video, diff --git a/vision_agent/tools/meta_tools.py b/vision_agent/tools/meta_tools.py index 4a82436d..833ad542 100644 --- a/vision_agent/tools/meta_tools.py +++ b/vision_agent/tools/meta_tools.py @@ -1,12 +1,17 @@ import os +import pickle as pkl import subprocess +import tempfile from pathlib import Path -from typing import Any, Dict, List, Union +from typing import Any, Dict, List, Optional, Union + +from IPython.display import display import vision_agent as va from vision_agent.lmm.types import Message from vision_agent.tools.tool_utils import get_tool_documentation from vision_agent.tools.tools import TOOL_DESCRIPTIONS +from vision_agent.utils.execute import Execution, MimeType # These tools are adapted from SWE-Agent https://github.com/princeton-nlp/SWE-agent @@ -35,97 +40,91 @@ def filter_file(file_name: Union[str, Path]) -> bool: ) -def generate_vision_code(save_file: str, chat: str, media: List[str]) -> str: - """Generates python code to solve vision based tasks. - - Parameters: - save_file (str): The file path to save the code. - chat (str): The chat message from the user. - media (List[str]): The media files to use. - - Returns: - str: The generated code. - - Examples - -------- - >>> generate_vision_code("code.py", "Can you detect the dogs in this image?", ["image.jpg"]) - from vision_agent.tools import load_image, owl_v2 - def detect_dogs(image_path: str): - image = load_image(image_path) - dogs = owl_v2("dog", image) - return dogs +def redisplay_results(execution: Execution) -> None: + """This function is used to add previous execution results to the current output. + This is handy if you are inside a notebook environment, call it notebook1, and you + have a nested notebook environment, call it notebook2, and you want the execution + results from notebook2 to be included in the execution results for notebook1. + """ + for result in execution.results: + if result.text is not None: + display({MimeType.TEXT_PLAIN: result.text}) + if result.html is not None: + display({MimeType.TEXT_HTML: result.html}) + if result.markdown is not None: + display({MimeType.TEXT_MARKDOWN: result.markdown}) + if result.svg is not None: + display({MimeType.IMAGE_SVG: result.svg}) + if result.png is not None: + display({MimeType.IMAGE_PNG: result.png}) + if result.jpeg is not None: + display({MimeType.IMAGE_JPEG: result.jpeg}) + if result.mp4 is not None: + display({MimeType.VIDEO_MP4_B64: result.mp4}) + if result.latex is not None: + display({MimeType.TEXT_LATEX: result.latex}) + if result.json is not None: + display({MimeType.APPLICATION_JSON: result.json}) + if result.extra is not None: + display(result.extra) + + +class Artifacts: + """Artifacts is a class that allows you to sync files between a local and remote + environment. In our case, the remote environment could be where the VisionAgent is + executing code and as the user adds new images, files or modifies files, those + need to be in sync with the remote environment the VisionAgent is running in. """ - if ZMQ_PORT is not None: - agent = va.agent.VisionAgentCoder( - report_progress_callback=lambda inp: report_progress_callback( - int(ZMQ_PORT), inp + def __init__(self, remote_save_path: Union[str, Path]) -> None: + self.remote_save_path = Path(remote_save_path) + self.artifacts: Dict[str, Any] = {} + + self.code_sandbox_runtime = None + + def load(self, file_path: Union[str, Path]) -> None: + """Loads are artifacts into the remote environment. If an artifact value is None + it will skip loading it. + + Parameters: + file_path (Union[str, Path]): The file path to load the artifacts from + """ + with open(file_path, "rb") as f: + self.artifacts = pkl.load(f) + for k, v in self.artifacts.items(): + if v is not None: + mode = "w" if isinstance(v, str) else "wb" + with open(self.remote_save_path.parent / k, mode) as f: + f.write(v) + + def show(self) -> str: + """Shows the artifacts that have been loaded and their remote save paths.""" + out_str = "[Artifacts loaded]\n" + for k in self.artifacts.keys(): + out_str += ( + f"Artifact {k} loaded to {str(self.remote_save_path.parent / k)}\n" ) + out_str += "[End of artifacts]\n" + return out_str + + def save(self, local_path: Optional[Union[str, Path]] = None) -> None: + save_path = ( + Path(local_path) if local_path is not None else self.remote_save_path ) - else: - agent = va.agent.VisionAgentCoder() - try: - fixed_chat: List[Message] = [{"role": "user", "content": chat, "media": media}] - response = agent.chat_with_workflow(fixed_chat) - code = response["code"] - with open(save_file, "w") as f: - f.write(code) - code_lines = code.splitlines(keepends=True) - total_lines = len(code_lines) - return view_lines(code_lines, 0, total_lines, save_file, total_lines) - except Exception as e: - return str(e) - - -def edit_vision_code(code_file: str, chat_history: List[str], media: List[str]) -> str: - """Edits python code to solve a vision based task. + with open(save_path, "wb") as f: + pkl.dump(self.artifacts, f) - Parameters: - code_file (str): The file path to the code. - chat_history (List[str]): The chat history to used to generate the code. + def __iter__(self) -> Any: + return iter(self.artifacts) - Returns: - str: The edited code. + def __getitem__(self, name: str) -> Any: + return self.artifacts[name] - Examples - -------- - >>> edit_vision_code( - >>> "code.py", - >>> ["Can you detect the dogs in this image?", "Can you use a higher threshold?"], - >>> ["dog.jpg"], - >>> ) - from vision_agent.tools import load_image, owl_v2 - def detect_dogs(image_path: str): - image = load_image(image_path) - dogs = owl_v2("dog", image, threshold=0.8) - return dogs - """ + def __setitem__(self, name: str, value: Any) -> None: + self.artifacts[name] = value - agent = va.agent.VisionAgentCoder() - with open(code_file, "r") as f: - code = f.read() - - # Append latest code to second to last message from assistant - fixed_chat_history: List[Message] = [] - for i, chat in enumerate(chat_history): - if i == 0: - fixed_chat_history.append({"role": "user", "content": chat, "media": media}) - elif i > 0 and i < len(chat_history) - 1: - fixed_chat_history.append({"role": "user", "content": chat}) - elif i == len(chat_history) - 1: - fixed_chat_history.append({"role": "assistant", "content": code}) - fixed_chat_history.append({"role": "user", "content": chat}) - - try: - response = agent.chat_with_workflow(fixed_chat_history, test_multi_plan=False) - code = response["code"] - with open(code_file, "w") as f: - f.write(code) - code_lines = code.splitlines(keepends=True) - total_lines = len(code_lines) - return view_lines(code_lines, 0, total_lines, code_file, total_lines) - except Exception as e: - return str(e) + def __contains__(self, name: str) -> bool: + return name in self.artifacts def format_lines(lines: List[str], start_idx: int) -> str: @@ -136,34 +135,40 @@ def format_lines(lines: List[str], start_idx: int) -> str: def view_lines( - lines: List[str], line_num: int, window_size: int, file_path: str, total_lines: int + lines: List[str], line_num: int, window_size: int, name: str, total_lines: int ) -> str: start = max(0, line_num - window_size) end = min(len(lines), line_num + window_size) - return ( - f"[File: {file_path} ({total_lines} lines total)]\n" + return_str = ( + f"[Artifact: {name} ({total_lines} lines total)]\n" + format_lines(lines[start:end], start) - + ("[End of file]" if end == len(lines) else f"[{len(lines) - end} more lines]") + + ( + "[End of artifact]" + if end == len(lines) + else f"[{len(lines) - end} more lines]" + ) ) + print(return_str) + return return_str -def open_file(file_path: str, line_num: int = 0, window_size: int = 100) -> str: - """Opens the file at at the given path in the editor. If `line_num` is provided, - the window will be moved to include that line. It only shows the first 100 lines by - default! Max `window_size` supported is 2000. use `scroll up/down` to view the file - if you want to see more. +def open_code_artifact( + artifacts: Artifacts, name: str, line_num: int = 0, window_size: int = 100 +) -> str: + """Opens the provided code artifact. If `line_num` is provided, the window will be + moved to include that line. It only shows the first 100 lines by default! Max + `window_size` supported is 2000. Parameters: - file_path (str): The file path to open, preferred absolute path. + artifacts (Artifacts): The artifacts object to open the artifact from. + name (str): The name of the artifact to open. line_num (int): The line number to move the window to. window_size (int): The number of lines to show above and below the line. """ + if name not in artifacts: + return f"[Artifact {name} does not exist]" - file_path_p = Path(file_path) - if not file_path_p.exists(): - return f"[File {file_path} does not exist]" - - total_lines = sum(1 for _ in open(file_path_p)) + total_lines = len(artifacts[name].splitlines()) window_size = min(window_size, 2000) window_size = window_size // 2 if line_num - window_size < 0: @@ -171,211 +176,218 @@ def open_file(file_path: str, line_num: int = 0, window_size: int = 100) -> str: elif line_num >= total_lines: line_num = total_lines - 1 - window_size - global CURRENT_LINE, CURRENT_FILE - CURRENT_LINE = line_num - CURRENT_FILE = file_path + lines = artifacts[name].splitlines(keepends=True) - with open(file_path, "r") as f: - lines = f.readlines() + return view_lines(lines, line_num, window_size, name, total_lines) - return view_lines(lines, line_num, window_size, file_path, total_lines) - -def create_file(file_path: str) -> str: - """Creates and opens a new file with the given name. +def create_code_artifact(artifacts: Artifacts, name: str) -> str: + """Creates a new code artifiact with the given name. Parameters: - file_path (str): The file path to create, preferred absolute path. + artifacts (Artifacts): The artifacts object to add the new artifact to. + name (str): The name of the new artifact. """ + if name in artifacts: + return_str = f"[Artifact {name} already exists]" + else: + artifacts[name] = "" + return_str = f"[Artifact {name} created]" + print(return_str) - file_path_p = Path(file_path) - if file_path_p.exists(): - return f"[File {file_path} already exists]" - file_path_p.touch() - global CURRENT_FILE - CURRENT_FILE = file_path - return f"[File created {file_path}]" - + display({MimeType.APPLICATION_JSON: {"last_artifact": name}}) + return return_str -def scroll_up() -> str: - """Moves the window up by 100 lines.""" - if CURRENT_FILE is None: - return "[No file is open]" - return open_file(CURRENT_FILE, CURRENT_LINE + DEFAULT_WINDOW_SIZE) +def edit_code_artifact( + artifacts: Artifacts, name: str, start: int, end: int, content: str +) -> str: + """Edits the given code artifact with the provided content. The content will be + inserted between the `start` and `end` line numbers. If the `start` and `end` are + the same, the content will be inserted at the `start` line number. If the `end` is + greater than the total number of lines in the file, the content will be inserted at + the end of the file. If the `start` or `end` are negative, the function will return + an error message. + Parameters: + artifacts (Artifacts): The artifacts object to edit the artifact from. + name (str): The name of the artifact to edit. + start (int): The line number to start the edit. + end (int): The line number to end the edit. + content (str): The content to insert. + """ + # just make the artifact if it doesn't exist instead of forcing agent to call + # create_artifact + if name not in artifacts: + artifacts[name] = "" -def scroll_down() -> str: - """Moves the window down by 100 lines.""" - if CURRENT_FILE is None: - return "[No file is open]" + total_lines = len(artifacts[name].splitlines()) + if start < 0 or end < 0 or start > end or end > total_lines: + return "[Invalid line range]" + if start == end: + end += 1 - return open_file(CURRENT_FILE, CURRENT_LINE - DEFAULT_WINDOW_SIZE) + new_content_lines = content.splitlines(keepends=True) + new_content_lines = [ + line if line.endswith("\n") else line + "\n" for line in new_content_lines + ] + lines = artifacts[name].splitlines() + edited_lines = lines[:start] + new_content_lines + lines[end:] + cur_line = start + len(content.split("\n")) // 2 + with tempfile.NamedTemporaryFile(delete=True) as f: + with open(f.name, "w") as f: # type: ignore + f.writelines(edited_lines) + + process = subprocess.Popen( + [ + "flake8", + "--isolated", + "--select=F821,F822,F831,E111,E112,E113,E999,E902", + f.name, + ], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + stdout, _ = process.communicate() + + if stdout != "": + stdout = stdout.replace(f.name, name) + error_msg = "[Edit failed with the following status]\n" + stdout + original_view = view_lines( + lines, + start + ((end - start) // 2), + DEFAULT_WINDOW_SIZE, + name, + total_lines, + ) + total_lines_edit = sum(1 for _ in edited_lines) + edited_view = view_lines( + edited_lines, cur_line, DEFAULT_WINDOW_SIZE, name, total_lines_edit + ) -def search_dir(search_term: str, dir_path: str) -> str: - """Searches for search_term in all files in a directory. + error_msg += f"\n[This is how your edit would have looked like if applied]\n{edited_view}\n\n[This is the original code before your edit]\n{original_view}" + return error_msg - Parameters: - search_term (str): The search term to look for. - dir_path (str): The directory path to search in, preferred absolute path. - """ + artifacts[name] = "".join(edited_lines) - dir_path_p = Path(dir_path) - if not dir_path_p.exists(): - return f"[Directory {dir_path} does not exist]" - - matches = [] - for file in dir_path_p.glob("**/*"): - if filter_file(file): - with open(file, "r") as f: - lines = f.readlines() - for i, line in enumerate(lines): - if search_term in line: - matches.append(f"{file}:{i}|{line.strip()}\n") - if not matches: - return f"[No matches found for {search_term} in {dir_path}]" - if len(matches) > 100: - return f"[More than {len(matches)} matches found for {search_term} in {dir_path}. Please narrow your search]" - - return_str = f"[Found {len(matches)} matches for {search_term} in {dir_path}]\n" - for match in matches: - return_str += match - - return_str += f"[End of matches for {search_term} in {dir_path}]" - return return_str + display({MimeType.APPLICATION_JSON: {"last_artifact": name}}) + return open_code_artifact(artifacts, name, cur_line) -def search_file(search_term: str, file_path: str) -> str: - """Searches the file for the given search term. +def generate_vision_code( + artifacts: Artifacts, name: str, chat: str, media: List[str] +) -> str: + """Generates python code to solve vision based tasks. Parameters: - search_term (str): The search term to look for. - file_path (str): The file path to search in, preferred absolute path. - """ - - file_path_p = Path(file_path) - if not file_path_p.exists(): - return f"[File {file_path} does not exist]" + artifacts (Artifacts): The artifacts object to save the code to. + name (str): The name of the artifact to save the code to. + chat (str): The chat message from the user. + media (List[str]): The media files to use. - with open(file_path_p, "r") as f: - lines = f.readlines() + Returns: + str: The generated code. - search_results = [] - for i, line in enumerate(lines): - if search_term in line: - search_results.append(f"{i}|{line.strip()}\n") + Examples + -------- + >>> generate_vision_code(artifacts, "code.py", "Can you detect the dogs in this image?", ["image.jpg"]) + from vision_agent.tools import load_image, owl_v2 + def detect_dogs(image_path: str): + image = load_image(image_path) + dogs = owl_v2("dog", image) + return dogs + """ - if not search_results: - return f"[No matches found for {search_term} in {file_path}]" + if ZMQ_PORT is not None: + agent = va.agent.VisionAgentCoder( + report_progress_callback=lambda inp: report_progress_callback( + int(ZMQ_PORT), inp + ) + ) + else: + agent = va.agent.VisionAgentCoder() - return_str = ( - f"[Found {len(search_results)} matches for {search_term} in {file_path}]\n" - ) - for result in search_results: - return_str += result + fixed_chat: List[Message] = [{"role": "user", "content": chat, "media": media}] + response = agent.chat_with_workflow(fixed_chat, test_multi_plan=True) + redisplay_results(response["test_result"]) + code = response["code"] + artifacts[name] = code + code_lines = code.splitlines(keepends=True) + total_lines = len(code_lines) - return_str += f"[End of matches for {search_term} in {file_path}]" - return return_str + display({MimeType.APPLICATION_JSON: {"last_artifact": name}}) + return view_lines(code_lines, 0, total_lines, name, total_lines) -def find_file(file_name: str, dir_path: str = "./") -> str: - """Finds all files with the given name in the specified directory. +def edit_vision_code( + artifacts: Artifacts, name: str, chat_history: List[str], media: List[str] +) -> str: + """Edits python code to solve a vision based task. Parameters: - file_name (str): The file name to look for. - dir_path (str): The directory path to search in, preferred absolute path. - """ - - dir_path_p = Path(dir_path) - if not dir_path_p.exists(): - return f"[Directory {dir_path} does not exist]" - - files = list(dir_path_p.glob(f"**/*{file_name}*")) - files = [f for f in files if filter_file(f)] - if not files: - return f"[No files found in {dir_path} with name {file_name}]" - - return_str = f"[Found {len(files)} matches for {file_name} in {dir_path}]\n" - for match in files: - return_str += str(match) + "\n" + artifacts (Artifacts): The artifacts object to save the code to. + name (str): The file path to the code. + chat_history (List[str]): The chat history to used to generate the code. - return_str += f"[End of matches for {file_name} in {dir_path}]" - return return_str + Returns: + str: The edited code. + Examples + -------- + >>> edit_vision_code( + >>> artifacts, + >>> "code.py", + >>> ["Can you detect the dogs in this image?", "Can you use a higher threshold?"], + >>> ["dog.jpg"], + >>> ) + from vision_agent.tools import load_image, owl_v2 + def detect_dogs(image_path: str): + image = load_image(image_path) + dogs = owl_v2("dog", image, threshold=0.8) + return dogs + """ -def edit_file(file_path: str, start: int, end: int, content: str) -> str: - """Edits the file at the given path with the provided content. The content will be - inserted between the `start` and `end` line numbers. If the `start` and `end` are - the same, the content will be inserted at the `start` line number. If the `end` is - greater than the total number of lines in the file, the content will be inserted at - the end of the file. If the `start` or `end` are negative, the function will return - an error message. + agent = va.agent.VisionAgentCoder() + if name not in artifacts: + return f"[Artifact {name} does not exist]" - Parameters: - file_path (str): The file path to edit, preferred absolute path. - start (int): The line number to start the edit. - end (int): The line number to end the edit. - content (str): The content to insert. - """ - file_path_p = Path(file_path) - if not file_path_p.exists(): - return f"[File {file_path} does not exist]" + code = artifacts[name] - total_lines = sum(1 for _ in open(file_path_p)) - if start < 0 or end < 0 or start > end or end > total_lines: - return "[Invalid line range]" - if start == end: - end += 1 + # Append latest code to second to last message from assistant + fixed_chat_history: List[Message] = [] + for i, chat in enumerate(chat_history): + if i == 0: + fixed_chat_history.append({"role": "user", "content": chat, "media": media}) + elif i > 0 and i < len(chat_history) - 1: + fixed_chat_history.append({"role": "user", "content": chat}) + elif i == len(chat_history) - 1: + fixed_chat_history.append({"role": "assistant", "content": code}) + fixed_chat_history.append({"role": "user", "content": chat}) - new_content_lines = content.splitlines(keepends=True) - new_content_lines = [ - line if line.endswith("\n") else line + "\n" for line in new_content_lines - ] - with open(file_path_p, "r") as f: - lines = f.readlines() - edited_lines = lines[:start] + new_content_lines + lines[end:] + response = agent.chat_with_workflow(fixed_chat_history, test_multi_plan=False) + redisplay_results(response["test_result"]) + code = response["code"] + artifacts[name] = code + code_lines = code.splitlines(keepends=True) + total_lines = len(code_lines) - cur_line = start + len(content.split("\n")) // 2 - tmp_file = file_path_p.with_suffix(".tmp") - with open(tmp_file, "w") as f: - f.writelines(edited_lines) - - process = subprocess.Popen( - [ - "flake8", - "--isolated", - "--select=F821,F822,F831,E111,E112,E113,E999,E902", - tmp_file, - ], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - text=True, - ) - stdout, _ = process.communicate() - tmp_file.unlink() - if stdout != "": - stdout = stdout.replace(tmp_file.name, file_path) - error_msg = "[Edit failed with the following status]\n" + stdout - original_view = view_lines( - lines, - start + ((end - start) // 2), - DEFAULT_WINDOW_SIZE, - file_path, - total_lines, - ) - total_lines_edit = sum(1 for _ in edited_lines) - edited_view = view_lines( - edited_lines, cur_line, DEFAULT_WINDOW_SIZE, file_path, total_lines_edit - ) + display({MimeType.APPLICATION_JSON: {"last_artifact": name}}) + return view_lines(code_lines, 0, total_lines, name, total_lines) - error_msg += f"\n[This is how your edit would have looked like if applied]\n{edited_view}\n\n[This is the original code before your edit]\n{original_view}" - return error_msg - with open(file_path_p, "w") as f: - f.writelines(edited_lines) +def write_media_artifact(artifacts: Artifacts, local_path: str) -> str: + """Writes a media file to the artifacts object. - return open_file(file_path, cur_line) + Parameters: + artifacts (Artifacts): The artifacts object to save the media to. + local_path (str): The local path to the media file. + """ + with open(local_path, "rb") as f: + media = f.read() + artifacts[Path(local_path).name] = media + return f"[Media {Path(local_path).name} saved]" def get_tool_descriptions() -> str: @@ -388,15 +400,11 @@ def get_tool_descriptions() -> str: META_TOOL_DOCSTRING = get_tool_documentation( [ get_tool_descriptions, + open_code_artifact, + create_code_artifact, + edit_code_artifact, generate_vision_code, edit_vision_code, - open_file, - create_file, - scroll_up, - scroll_down, - edit_file, - search_dir, - search_file, - find_file, + write_media_artifact, ] ) diff --git a/vision_agent/tools/tools.py b/vision_agent/tools/tools.py index 62a1908a..0695b547 100644 --- a/vision_agent/tools/tools.py +++ b/vision_agent/tools/tools.py @@ -1,8 +1,9 @@ -import os import io import json import logging +import os import tempfile +import urllib.request from importlib import resources from pathlib import Path from typing import Any, Dict, List, Optional, Tuple, Union, cast @@ -15,7 +16,6 @@ from PIL import Image, ImageDraw, ImageFont from pillow_heif import register_heif_opener # type: ignore from pytube import YouTube # type: ignore -import urllib.request from vision_agent.clients.landing_public_api import LandingPublicAPI from vision_agent.tools.tool_utils import ( @@ -1332,7 +1332,7 @@ def save_video( video.write_videofile(f.name, codec="libx264") f.close() _save_video_to_result(f.name) - return f.name + return f.name def _save_video_to_result(video_uri: str) -> None: diff --git a/vision_agent/tools/tools_types.py b/vision_agent/tools/tools_types.py index aeb45c95..7b640adb 100644 --- a/vision_agent/tools/tools_types.py +++ b/vision_agent/tools/tools_types.py @@ -1,8 +1,8 @@ -from uuid import UUID from enum import Enum -from typing import List, Tuple, Optional +from typing import List, Optional, Tuple +from uuid import UUID -from pydantic import BaseModel, ConfigDict, Field, field_serializer, SerializationInfo +from pydantic import BaseModel, ConfigDict, Field, SerializationInfo, field_serializer class BboxInput(BaseModel): diff --git a/vision_agent/utils/execute.py b/vision_agent/utils/execute.py index 033276d3..37c8d260 100644 --- a/vision_agent/utils/execute.py +++ b/vision_agent/utils/execute.py @@ -5,7 +5,6 @@ import platform import re import sys -import tempfile import traceback import warnings from enum import Enum @@ -40,6 +39,7 @@ load_dotenv() _LOGGER = logging.getLogger(__name__) _SESSION_TIMEOUT = 600 # 10 minutes +WORKSPACE = Path(os.getenv("WORKSPACE", "")) class MimeType(str, Enum): @@ -384,8 +384,15 @@ def from_e2b_execution(exec: E2BExecution) -> "Execution": class CodeInterpreter(abc.ABC): """Code interpreter interface.""" - def __init__(self, timeout: int, *args: Any, **kwargs: Any) -> None: + def __init__( + self, + timeout: int, + remote_path: Optional[Union[str, Path]] = None, + *args: Any, + **kwargs: Any, + ) -> None: self.timeout = timeout + self.remote_path = Path(remote_path if remote_path is not None else WORKSPACE) def __enter__(self) -> Self: return self @@ -406,17 +413,21 @@ def exec_isolation(self, code: str) -> Execution: self.restart_kernel() return self.exec_cell(code) - def upload_file(self, file: Union[str, Path]) -> str: + def upload_file(self, file: Union[str, Path]) -> Path: # Default behavior is a no-op (for local code interpreter) - return str(file) + return Path(file) - def download_file(self, file_path: str) -> Path: + def download_file( + self, remote_file_path: Union[str, Path], local_file_path: Union[str, Path] + ) -> Path: # Default behavior is a no-op (for local code interpreter) - return Path(file_path) + return Path(local_file_path) class E2BCodeInterpreter(CodeInterpreter): - def __init__(self, *args: Any, **kwargs: Any) -> None: + def __init__( + self, remote_path: Optional[Union[str, Path]] = None, *args: Any, **kwargs: Any + ) -> None: super().__init__(*args, **kwargs) assert os.getenv("E2B_API_KEY"), "E2B_API_KEY environment variable must be set" try: @@ -443,6 +454,9 @@ def __init__(self, *args: Any, **kwargs: Any) -> None: _LOGGER.info( f"E2BCodeInterpreter (sandbox id: {self.interpreter.sandbox_id}) initialized:\n{sys_versions}" ) + self.remote_path = Path( + remote_path if remote_path is not None else "/home/user" + ) def close(self, *args: Any, **kwargs: Any) -> None: try: @@ -516,19 +530,22 @@ def exec_cell(self, code: str) -> Execution: before_sleep=tenacity.before_sleep_log(_LOGGER, logging.INFO), after=tenacity.after_log(_LOGGER, logging.INFO), ) - def upload_file(self, file: Union[str, Path]) -> str: + def upload_file(self, file: Union[str, Path]) -> Path: file_name = Path(file).name - remote_path = f"/home/user/{file_name}" with open(file, "rb") as f: - self.interpreter.files.write(path=remote_path, data=f) - _LOGGER.info(f"File ({file}) is uploaded to: {remote_path}") - return remote_path - - def download_file(self, file_path: str) -> Path: - with tempfile.NamedTemporaryFile(mode="w+b", delete=False) as file: - file.write(self.interpreter.files.read(path=file_path, format="bytes")) - _LOGGER.info(f"File ({file_path}) is downloaded to: {file.name}") - return Path(file.name) + self.interpreter.files.write(path=str(self.remote_path / file_name), data=f) + _LOGGER.info(f"File ({file}) is uploaded to: {str(self.remote_path)}") + return self.remote_path / file_name + + def download_file( + self, remote_file_path: Union[str, Path], local_file_path: Union[str, Path] + ) -> Path: + with open(local_file_path, "w+b") as f: + f.write( + self.interpreter.files.read(path=str(remote_file_path), format="bytes") + ) + _LOGGER.info(f"File ({remote_file_path}) is downloaded to: {local_file_path}") + return Path(local_file_path) @staticmethod def _new_e2b_interpreter_impl(*args, **kwargs) -> E2BCodeInterpreterImpl: # type: ignore @@ -540,7 +557,11 @@ def _new_e2b_interpreter_impl(*args, **kwargs) -> E2BCodeInterpreterImpl: # typ class LocalCodeInterpreter(CodeInterpreter): - def __init__(self, timeout: int = _SESSION_TIMEOUT) -> None: + def __init__( + self, + timeout: int = _SESSION_TIMEOUT, + remote_path: Optional[Union[str, Path]] = None, + ) -> None: super().__init__(timeout=timeout) self.nb = nbformat.v4.new_notebook() self.nb_client = NotebookClient(self.nb, timeout=self.timeout) @@ -554,6 +575,7 @@ def __init__(self, timeout: int = _SESSION_TIMEOUT) -> None: ) sleep(1) self._new_kernel() + self.remote_path = Path(remote_path if remote_path is not None else WORKSPACE) def _new_kernel(self) -> None: if self.nb_client.kc is None or not run_sync(self.nb_client.kc.is_alive)(): # type: ignore @@ -607,6 +629,25 @@ def exec_cell(self, code: str) -> Execution: traceback_raw = traceback.format_exc().splitlines() return Execution.from_exception(e, traceback_raw) + def upload_file(self, file_path: Union[str, Path]) -> Path: + with open(file_path, "rb") as f: + contents = f.read() + with open(self.remote_path / Path(file_path).name, "wb") as f: + f.write(contents) + _LOGGER.info(f"File ({file_path}) is uploaded to: {str(self.remote_path)}") + + return Path(self.remote_path / file_path) + + def download_file( + self, remote_file_path: Union[str, Path], local_file_path: Union[str, Path] + ) -> Path: + with open(self.remote_path / remote_file_path, "rb") as f: + contents = f.read() + with open(local_file_path, "wb") as f: + f.write(contents) + _LOGGER.info(f"File ({remote_file_path}) is downloaded to: {local_file_path}") + return Path(local_file_path) + class CodeInterpreterFactory: """Factory class for creating code interpreters. @@ -630,13 +671,19 @@ def get_default_instance() -> CodeInterpreter: return instance @staticmethod - def new_instance(code_sandbox_runtime: Optional[str] = None) -> CodeInterpreter: + def new_instance( + code_sandbox_runtime: Optional[str] = None, remote_path: Optional[str] = None + ) -> CodeInterpreter: if not code_sandbox_runtime: code_sandbox_runtime = os.getenv("CODE_SANDBOX_RUNTIME", "local") if code_sandbox_runtime == "e2b": - instance: CodeInterpreter = E2BCodeInterpreter(timeout=_SESSION_TIMEOUT) + instance: CodeInterpreter = E2BCodeInterpreter( + timeout=_SESSION_TIMEOUT, remote_path=remote_path + ) elif code_sandbox_runtime == "local": - instance = LocalCodeInterpreter(timeout=_SESSION_TIMEOUT) + instance = LocalCodeInterpreter( + timeout=_SESSION_TIMEOUT, remote_path=remote_path + ) else: raise ValueError( f"Unsupported code sandbox runtime: {code_sandbox_runtime}. Supported runtimes: e2b, local" diff --git a/vision_agent/utils/image_utils.py b/vision_agent/utils/image_utils.py index d2bc8a6d..c1cc8eb6 100644 --- a/vision_agent/utils/image_utils.py +++ b/vision_agent/utils/image_utils.py @@ -70,7 +70,7 @@ def rle_decode_array(rle: Dict[str, List[int]]) -> np.ndarray: r"""Decode a run-length encoded mask. Returns numpy array, 1 - mask, 0 - background. Parameters: - mask: The mask in run-length encoded as an array. + rle: The run-length encoded mask. """ size = rle["size"] counts = rle["counts"] @@ -100,7 +100,7 @@ def frames_to_bytes( """ with tempfile.NamedTemporaryFile(delete=True) as temp_file: clip = ImageSequenceClip(frames, fps=fps) - clip.write_videofile(temp_file.name + f".{file_ext}", fps=fps) + clip.write_videofile(temp_file.name + f".{file_ext}", fps=fps, codec="libx264") with open(temp_file.name + f".{file_ext}", "rb") as f: buffer_bytes = f.read() return buffer_bytes