From d68829deaa3917281ae7f445f88756a25cab3889 Mon Sep 17 00:00:00 2001 From: Dillon Laird Date: Mon, 25 Mar 2024 10:36:07 -0700 Subject: [PATCH] Add Vision Agent (#24) * add vision agent * fix issues with easytool * update tools * fixed typing issues --- vision_agent/agent/__init__.py | 1 + vision_agent/agent/easytool.py | 2 +- vision_agent/agent/easytool_prompts.py | 3 +- vision_agent/agent/vision_agent.py | 442 +++++++++++++++++++++ vision_agent/agent/vision_agent_prompts.py | 129 ++++++ vision_agent/tools/tools.py | 50 +++ 6 files changed, 624 insertions(+), 3 deletions(-) create mode 100644 vision_agent/agent/vision_agent.py create mode 100644 vision_agent/agent/vision_agent_prompts.py diff --git a/vision_agent/agent/__init__.py b/vision_agent/agent/__init__.py index f0954251..7d11231f 100644 --- a/vision_agent/agent/__init__.py +++ b/vision_agent/agent/__init__.py @@ -1,3 +1,4 @@ from .agent import Agent from .easytool import EasyTool from .reflexion import Reflexion +from .vision_agent import VisionAgent diff --git a/vision_agent/agent/easytool.py b/vision_agent/agent/easytool.py index 7a7cf7d1..63aad9ba 100644 --- a/vision_agent/agent/easytool.py +++ b/vision_agent/agent/easytool.py @@ -45,7 +45,7 @@ def format_tools(tools: Dict[int, Any]) -> str: # Format this way so it's clear what the ID's are tool_str = "" for key in tools: - tool_str += f"ID: {key}, {tools[key]}\n" + tool_str += f"ID: {key} - {tools[key]}\n" return tool_str diff --git a/vision_agent/agent/easytool_prompts.py b/vision_agent/agent/easytool_prompts.py index acc0a111..445e42b3 100644 --- a/vision_agent/agent/easytool_prompts.py +++ b/vision_agent/agent/easytool_prompts.py @@ -14,7 +14,7 @@ Output: """ TASK_TOPOLOGY = """Given a complex user's question, I have decompose this question into some simple subtasks. I think there exists a logical connections and order amontg the tasks. Thus you need to help me output this logical connections and order. -You must ONLY output in a parsible JSON format with the following format:" +You must ONLY output in a parsible JSON format with the following format: {{"Tasks": [{{"task": task, "id", task_id, "dep": [dependency_task_id1, dependency_task_id2, ...]}}]}} @@ -31,7 +31,6 @@ CHOOSE_TOOL = """This is the user's question: {question} These are the tools you can select to solve the question: -Tool List: {tools} Please note that: diff --git a/vision_agent/agent/vision_agent.py b/vision_agent/agent/vision_agent.py new file mode 100644 index 00000000..7fc3cecd --- /dev/null +++ b/vision_agent/agent/vision_agent.py @@ -0,0 +1,442 @@ +import json +import logging +import sys +from pathlib import Path +from typing import Any, Callable, Dict, List, Optional, Tuple, Union + +from vision_agent.llm import LLM, OpenAILLM +from vision_agent.lmm import LMM, OpenAILMM +from vision_agent.tools import TOOLS + +from .agent import Agent +from .easytool_prompts import ( + ANSWER_GENERATE, + ANSWER_SUMMARIZE, + CHOOSE_PARAMETER, + CHOOSE_TOOL, + TASK_DECOMPOSE, + TASK_TOPOLOGY, +) +from .vision_agent_prompts import ( + ANSWER_GENERATE_DEPENDS, + ANSWER_SUMMARIZE_DEPENDS, + CHOOSE_PARAMETER_DEPENDS, + CHOOSE_TOOL_DEPENDS, + TASK_DECOMPOSE_DEPENDS, + VISION_AGENT_REFLECTION, +) + +logging.basicConfig(stream=sys.stdout) +_LOGGER = logging.getLogger(__name__) + + +def parse_json(s: str) -> Any: + s = ( + s.replace(": true", ": True") + .replace(": false", ": False") + .replace(":true", ": True") + .replace(":false", ": False") + .replace("```", "") + .strip() + ) + return json.loads(s) + + +def change_name(name: str) -> str: + change_list = ["from", "class", "return", "false", "true", "id", "and", "", "ID"] + if name in change_list: + name = "is_" + name.lower() + return name + + +def format_tools(tools: Dict[int, Any]) -> str: + # Format this way so it's clear what the ID's are + tool_str = "" + for key in tools: + tool_str += f"ID: {key} - {tools[key]}\n" + return tool_str + + +def topological_sort(tasks: List[Dict]) -> List[Dict]: + in_degree = {task["id"]: 0 for task in tasks} + for task in tasks: + for dep in task["dep"]: + if dep in in_degree: + in_degree[task["id"]] += 1 + + queue = [task for task in tasks if in_degree[task["id"]] == 0] + sorted_order = [] + + while queue: + current = queue.pop(0) + sorted_order.append(current) + + for task in tasks: + if current["id"] in task["dep"]: + in_degree[task["id"]] -= 1 + if in_degree[task["id"]] == 0: + queue.append(task) + + if len(sorted_order) != len(tasks): + completed_ids = set([task["id"] for task in sorted_order]) + remaining_tasks = [task for task in tasks if task["id"] not in completed_ids] + sorted_order.extend(remaining_tasks) + return sorted_order + + +def task_decompose( + model: Union[LLM, LMM, Agent], + question: str, + tools: Dict[int, Any], + reflections: str, +) -> Optional[Dict]: + if reflections: + prompt = TASK_DECOMPOSE_DEPENDS.format( + question=question, tools=format_tools(tools), reflections=reflections + ) + else: + prompt = TASK_DECOMPOSE.format(question=question, tools=format_tools(tools)) + tries = 0 + str_result = "" + while True: + try: + str_result = model(prompt) + result = parse_json(str_result) + return result["Tasks"] # type: ignore + except Exception: + if tries > 10: + _LOGGER.error(f"Failed task_decompose on: {str_result}") + return None + tries += 1 + continue + + +def task_topology( + model: Union[LLM, LMM, Agent], question: str, task_list: List[Dict] +) -> List[Dict[str, Any]]: + prompt = TASK_TOPOLOGY.format(question=question, task_list=task_list) + tries = 0 + str_result = "" + while True: + try: + str_result = model(prompt) + result = parse_json(str_result) + for elt in result["Tasks"]: + if isinstance(elt["dep"], str): + elt["dep"] = [int(dep) for dep in elt["dep"].split(",")] + elif isinstance(elt["dep"], int): + elt["dep"] = [elt["dep"]] + elif isinstance(elt["dep"], list): + elt["dep"] = [int(dep) for dep in elt["dep"]] + return result["Tasks"] # type: ignore + except Exception: + if tries > 10: + _LOGGER.error(f"Failed task_topology on: {str_result}") + return task_list + tries += 1 + continue + + +def choose_tool( + model: Union[LLM, LMM, Agent], + question: str, + tools: Dict[int, Any], + reflections: str, +) -> Optional[int]: + if reflections: + prompt = CHOOSE_TOOL_DEPENDS.format( + question=question, tools=format_tools(tools), reflections=reflections + ) + else: + prompt = CHOOSE_TOOL.format(question=question, tools=format_tools(tools)) + tries = 0 + str_result = "" + while True: + try: + str_result = model(prompt) + result = parse_json(str_result) + return result["ID"] # type: ignore + except Exception: + if tries > 10: + _LOGGER.error(f"Failed choose_tool on: {str_result}") + return None + tries += 1 + continue + + +def choose_parameter( + model: Union[LLM, LMM, Agent], + question: str, + tool_usage: Dict, + previous_log: str, + reflections: str, +) -> Optional[Any]: + # TODO: should format tool_usage + if reflections: + prompt = CHOOSE_PARAMETER_DEPENDS.format( + question=question, + tool_usage=tool_usage, + previous_log=previous_log, + reflections=reflections, + ) + else: + prompt = CHOOSE_PARAMETER.format( + question=question, tool_usage=tool_usage, previous_log=previous_log + ) + tries = 0 + str_result = "" + while True: + try: + str_result = model(prompt) + result = parse_json(str_result) + return result["Parameters"] + except Exception: + if tries > 10: + _LOGGER.error(f"Failed choose_parameter on: {str_result}") + return None + tries += 1 + continue + + +def answer_generate( + model: Union[LLM, LMM, Agent], + question: str, + call_results: str, + previous_log: str, + reflections: str, +) -> str: + if reflections: + prompt = ANSWER_GENERATE_DEPENDS.format( + question=question, + call_results=call_results, + previous_log=previous_log, + reflections=reflections, + ) + else: + prompt = ANSWER_GENERATE.format( + question=question, call_results=call_results, previous_log=previous_log + ) + return model(prompt) + + +def answer_summarize( + model: Union[LLM, LMM, Agent], question: str, answers: List[Dict], reflections: str +) -> str: + if reflections: + prompt = ANSWER_SUMMARIZE_DEPENDS.format( + question=question, answers=answers, reflections=reflections + ) + else: + prompt = ANSWER_SUMMARIZE.format(question=question, answers=answers) + return model(prompt) + + +def function_call(tool: Callable, parameters: Dict[str, Any]) -> Any: + try: + return tool()(**parameters) + except Exception as e: + _LOGGER.error(f"Failed function_call on: {e}") + # return error message so it can self-correct + return str(e) + + +def retrieval( + model: Union[LLM, LMM, Agent], + question: str, + tools: Dict[int, Any], + previous_log: str, + reflections: str, +) -> Tuple[List[Dict], str]: + tool_id = choose_tool( + model, question, {k: v["description"] for k, v in tools.items()}, reflections + ) + if tool_id is None: + return [{}], "" + _LOGGER.info(f"\t(Tool ID, name): ({tool_id}, {tools[tool_id]['name']})") + + tool_instructions = tools[tool_id] + tool_usage = tool_instructions["usage"] + tool_name = tool_instructions["name"] + + parameters = choose_parameter( + model, question, tool_usage, previous_log, reflections + ) + _LOGGER.info(f"\tParameters: {parameters} for {tool_name}") + if parameters is None: + return [{}], "" + tool_results = [ + {"task": question, "tool_name": tool_name, "parameters": parameters} + ] + + def parse_tool_results(result: Dict[str, Union[Dict, List]]) -> Any: + call_results: List[Any] = [] + if isinstance(result["parameters"], Dict): + call_results.append( + function_call(tools[tool_id]["class"], result["parameters"]) + ) + elif isinstance(result["parameters"], List): + for parameters in result["parameters"]: + call_results.append(function_call(tools[tool_id]["class"], parameters)) + return call_results + + call_results = [] + for i, result in enumerate(tool_results): + call_results.extend(parse_tool_results(result)) + tool_results[i]["call_results"] = call_results + + call_results_str = "\n\n".join([str(e) for e in call_results if e is not None]) + _LOGGER.info(f"\tCall Results: {call_results_str}") + return tool_results, call_results_str + + +def create_tasks( + task_model: Union[LLM, LMM], question: str, tools: Dict[int, Any], reflections: str +) -> List[Dict]: + tasks = task_decompose( + task_model, + question, + {k: v["description"] for k, v in tools.items()}, + reflections, + ) + + _LOGGER.info(f"Tasks: {tasks}") + if tasks is not None: + task_list = [{"task": task, "id": i + 1} for i, task in enumerate(tasks)] + task_list = task_topology(task_model, question, task_list) + try: + task_list = topological_sort(task_list) + except Exception: + _LOGGER.error(f"Failed topological_sort on: {task_list}") + else: + task_list = [] + return task_list + + +def self_reflect( + reflect_model: Union[LLM, LMM], + question: str, + tool_result: List[Dict], + final_answer: str, + image: Optional[Union[str, Path]] = None, +) -> str: + prompt = VISION_AGENT_REFLECTION.format( + question=question, tool_results=str(tool_result), final_answer=final_answer + ) + if issubclass(type(reflect_model), LMM): + return reflect_model(prompt, image=image) # type: ignore + return reflect_model(prompt) + + +def parse_reflect(reflect: str) -> bool: + return reflect.lower() == "finish" + + +class VisionAgent(Agent): + r"""Vision Agent is an agent framework that utilizes tools as well as self + reflection to accomplish tasks, in particular vision tasks. Vision Agent is based + off of EasyTool https://arxiv.org/abs/2401.06201 and Reflexion + https://arxiv.org/abs/2303.11366 where it will attempt to complete a task and then + reflect on whether or not it was able to accomplish the task based off of the plan + and final results, if not it will redo the task with this newly added reflection. + + Examples:: + >>> from vision_agent.agent import VisionAgent + >>> agent = VisionAgent() + >>> resp = agent("If red tomatoes cost $5 each and yellow tomatoes cost $2.50 each, what is the total cost of all the tomatoes in the image?", image="tomatoes.jpg") + >>> print(resp) + >>> "The total cost is $57.50." + """ + + def __init__( + self, + task_model: Optional[Union[LLM, LMM]] = None, + answer_model: Optional[Union[LLM, LMM]] = None, + reflect_model: Optional[Union[LLM, LMM]] = None, + max_retries: int = 2, + verbose: bool = False, + ): + self.task_model = ( + OpenAILLM(json_mode=True) if task_model is None else task_model + ) + self.answer_model = OpenAILLM() if answer_model is None else answer_model + self.reflect_model = OpenAILMM() if reflect_model is None else reflect_model + self.max_retries = max_retries + + self.tools = TOOLS + if verbose: + _LOGGER.setLevel(logging.INFO) + + def __call__( + self, + input: Union[List[Dict[str, str]], str], + image: Optional[Union[str, Path]] = None, + ) -> str: + if isinstance(input, str): + input = [{"role": "user", "content": input}] + return self.chat(input, image=image) + + def chat_with_workflow( + self, chat: List[Dict[str, str]], image: Optional[Union[str, Path]] = None + ) -> Tuple[str, List[Dict]]: + question = chat[0]["content"] + if image: + question += f" Image name: {image}" + + reflections = "" + final_answer = "" + all_tool_results: List[Dict] = [] + + for _ in range(self.max_retries): + task_list = create_tasks(self.task_model, question, self.tools, reflections) + + _LOGGER.info(f"Task Dependency: {task_list}") + task_depend = {"Original Quesiton": question} + previous_log = "" + answers = [] + for task in task_list: + task_depend[task["id"]] = {"task": task["task"], "answer": "", "call_result": ""} # type: ignore + all_tool_results = [] + + for task in task_list: + task_str = task["task"] + previous_log = str(task_depend) + _LOGGER.info(f"\tSubtask: {task_str}") + tool_results, call_results = retrieval( + self.task_model, + task_str, + self.tools, + previous_log, + reflections, + ) + answer = answer_generate( + self.answer_model, task_str, call_results, previous_log, reflections + ) + + for tool_result in tool_results: + tool_result["answer"] = answer + all_tool_results.extend(tool_results) + + _LOGGER.info(f"\tAnswer: {answer}") + answers.append({"task": task_str, "answer": answer}) + task_depend[task["id"]]["answer"] = answer # type: ignore + task_depend[task["id"]]["call_result"] = call_results # type: ignore + final_answer = answer_summarize( + self.answer_model, question, answers, reflections + ) + + reflection = self_reflect( + self.reflect_model, question, all_tool_results, final_answer, image + ) + _LOGGER.info(f"\tReflection: {reflection}") + if parse_reflect(reflection): + break + else: + reflections += reflection + + return final_answer, all_tool_results + + def chat( + self, chat: List[Dict[str, str]], image: Optional[Union[str, Path]] = None + ) -> str: + answer, _ = self.chat_with_workflow(chat, image=image) + return answer diff --git a/vision_agent/agent/vision_agent_prompts.py b/vision_agent/agent/vision_agent_prompts.py new file mode 100644 index 00000000..7d4a724c --- /dev/null +++ b/vision_agent/agent/vision_agent_prompts.py @@ -0,0 +1,129 @@ +VISION_AGENT_REFLECTION = """You are an advanced reasoning agent that can improve based on self refection. You will be given a previous reasoning trial in which you were given the user's question, the decomposed tasks and tools that the agent used to answer teh question and the final answer the agent provided. You must determine if the agent's answer was correct or incorrect. If the agen'ts answer was correct, respond with Finish. If the agent's answer was incorrect, you must diagnose a possible reason for failure or phrasing discrepancy and devise a new, concise, high level plan that aims to mitigate the same failure. Use complete sentences. + +User's question: {question} + +Tasks and tools used: +{tool_results} + +Final answer: +{final_answer} + +Reflection: """ + +TASK_DECOMPOSE = """You need to decompose a complex user's question into some simple subtasks and let the model execute it step by step. +This is the user's question: {question} +This is tool list: +{tools} + +Please note that: +1. You should only decompose this complex user's question into some simple subtasks which can be executed easily by using one single tool in the tool list. +2. If one subtask need the results from other subtask, you can should write clearly. For example: +{{"Tasks": ["Convert 23 km/h to X km/min by 'divide_'", "Multiply X km/min by 45 min to get Y by 'multiply_'"]}} +3. You must ONLY output in a parsible JSON format. An example output looks like: + +{{"Tasks": ["Task 1", "Task 2", ...]}} + +Output: """ + +TASK_DECOMPOSE_DEPENDS = """You need to decompose a complex user's question into some simple subtasks and let the model execute it step by step. +This is the user's question: {question} + +This is tool list: +{tools} + +This is a reflection from a previous failed attempt: +{reflections} + +Please note that: +1. You should only decompose this complex user's question into some simple subtasks which can be executed easily by using one single tool in the tool list. +2. If one subtask need the results from other subtask, you can should write clearly. For example: +{{"Tasks": ["Convert 23 km/h to X km/min by 'divide_'", "Multiply X km/min by 45 min to get Y by 'multiply_'"]}} +3. You must ONLY output in a parsible JSON format. An example output looks like: + +{{"Tasks": ["Task 1", "Task 2", ...]}} + +Output: """ + +CHOOSE_TOOL = """This is the user's question: {question} +These are the tools you can select to solve the question: + +{tools} + +Please note that: +1. You should only chooce one tool the Tool List to solve this question. +2. You must ONLY output the ID of the tool you chose in a parsible JSON format. Two example outputs look like: + +Example 1: {{"ID": 1}} +Example 2: {{"ID": 2}} + +Output: """ + +CHOOSE_TOOL_DEPENDS = """This is the user's question: {question} +These are the tools you can select to solve the question: + +{tools} + +This is a reflection from a previous failed attempt: +{reflections} + +Please note that: +1. You should only chooce one tool the Tool List to solve this question. +2. You must ONLY output the ID of the tool you chose in a parsible JSON format. Two example outputs look like: + +Example 1: {{"ID": 1}} +Example 2: {{"ID": 2}} + +Output: """ + +CHOOSE_PARAMETER_DEPENDS = """Given a user's question and a API tool documentation, you need to output parameters according to the API tool documentation to successfully call the API to solve the user's question. +Please note that: +1. The Example in the API tool documentation can help you better understand the use of the API. +2. Ensure the parameters you output are correct. The output must contain the required parameters, and can contain the optional parameters based on the question. If no paremters in the required parameters and optional parameters, just leave it as {{"Parameters":{{}}}} +3. If the user's question mentions other APIs, you should ONLY consider the API tool documentation I give and do not consider other APIs. +4. The question may have dependencies on answers of other questions, so we will provide logs of previous questions and answers for your reference. +5. If you need to use this API multiple times,, please set "Parameters" to a list. +6. You must ONLY output in a parsible JSON format. Two examples output looks like: + +Example 1: {{"Parameters":{{"input": [1,2,3]}}}} +Example 2: {{"Parameters":[{{"input": [1,2,3]}}, {{"input": [2,3,4]}}]}} + +This is a reflection from a previous failed attempt: +{reflections} + +There are logs of previous questions and answers: +{previous_log} + +This is the current user's question: {question} +This is API tool documentation: {tool_usage} +Output: """ + +ANSWER_GENERATE_DEPENDS = """You should answer the question based on the response output by the API tool. +Please note that: +1. Try to organize the response into a natural language answer. +2. We will not show the API response to the user, thus you need to make full use of the response and give the information in the response that can satisfy the user's question in as much detail as possible. +3. If the API tool does not provide useful information in the response, please answer with your knowledge. +4. The question may have dependencies on answers of other questions, so we will provide logs of previous questions and answers. + +This is a reflection from a previous failed attempt: +{reflections} + +There are logs of previous questions and answers: +{previous_log} + +This is the user's question: {question} + +This is the response output by the API tool: +{call_results} + +We will not show the API response to the user, thus you need to make full use of the response and give the information in the response that can satisfy the user's question in as much detail as possible. +Output: """ + +ANSWER_SUMMARIZE_DEPENDS = """We break down a complex user's problems into simple subtasks and provide answers to each simple subtask. You need to organize these answers to each subtask and form a self-consistent final answer to the user's question +This is the user's question: {question} + +These are subtasks and their answers: +{answers} + +This is a reflection from a previous failed attempt: +{reflections} +Final answer: """ diff --git a/vision_agent/tools/tools.py b/vision_agent/tools/tools.py index fdcece18..a2b75851 100644 --- a/vision_agent/tools/tools.py +++ b/vision_agent/tools/tools.py @@ -180,6 +180,7 @@ def __call__(self, prompt: str, image: Union[str, Path, ImageType]) -> List[Dict ] if "scores" in elt: elt["scores"] = [round(score, 2) for score in elt["scores"]] + elt["size"] = (image_size[1], image_size[0]) return cast(List[Dict], resp_data) @@ -341,6 +342,53 @@ def __call__(self, bbox: List[float], image: Union[str, Path]) -> str: return tmp.name +class BboxArea(Tool): + name = "bbox_area_" + description = "'bbox_area_' returns the area of the bounding box in pixels normalized to 2 decimal places." + usage = { + "required_parameters": [{"name": "bbox", "type": "List[int]"}], + "examples": [ + { + "scenario": "If you want to calculate the area of the bounding box [0, 0, 100, 100]", + "parameters": {"bboxes": [0.2, 0.21, 0.34, 0.42]}, + } + ], + } + + def __call__(self, bboxes: List[Dict]) -> List[Dict]: + areas = [] + for elt in bboxes: + height, width = elt["size"] + for label, bbox in zip(elt["labels"], elt["bboxes"]): + x1, y1, x2, y2 = bbox + areas.append( + { + "area": round((x2 - x1) * (y2 - y1) * width * height, 2), + "label": label, + } + ) + return areas + + +class SegArea(Tool): + name = "seg_area_" + description = "'seg_area_' returns the area of the segmentation mask in pixels normalized to 2 decimal places." + usage = { + "required_parameters": [{"name": "masks", "type": "str"}], + "examples": [ + { + "scenario": "If you want to calculate the area of the segmentation mask, pass the masks file name.", + "parameters": {"masks": "mask_file.jpg"}, + }, + ], + } + + def __call__(self, masks: Union[str, Path]) -> float: + pil_mask = Image.open(str(masks)) + np_mask = np.array(pil_mask) + return cast(float, round(np.sum(np_mask) / 255, 2)) + + class Add(Tool): name = "add_" description = "'add_' returns the sum of all the arguments passed to it, normalized to 2 decimal places." @@ -418,6 +466,8 @@ def __call__(self, input: List[int]) -> float: AgentGroundingSAM, Counter, Crop, + BboxArea, + SegArea, Add, Subtract, Multiply,