Skip to content

Commit

Permalink
Agent V3 Tuning (#94)
Browse files Browse the repository at this point in the history
* Adding minor prompt tuning to make sure test cases run without any errors

* fix linting

* fix linting

* fixing type errors

* fix type errors

* ignoring type errors on return value of derived class call function

* last try on fixing mypy errors

* fixing a bug on retries and adding prompt for logging
  • Loading branch information
shankar-vision-eng authored May 24, 2024
1 parent 42e64ca commit c16820a
Show file tree
Hide file tree
Showing 3 changed files with 52 additions and 28 deletions.
60 changes: 38 additions & 22 deletions vision_agent/agent/vision_agent_v3.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional, Union, cast, Callable
from typing import Any, Dict, List, Optional, Union, cast, Callable, no_type_check

from rich.console import Console
from rich.syntax import Syntax
Expand Down Expand Up @@ -117,21 +117,26 @@ def write_and_test_code(
log_progress: Callable[[Dict[str, Any]], None],
verbosity: int = 0,
max_retries: int = 3,
input_media: Optional[Union[str, Path]] = None,
) -> Dict[str, Any]:
code = extract_code(
coder(CODE.format(docstring=tool_info, question=task, feedback=working_memory))
)
test = extract_code(
tester(
SIMPLE_TEST.format(
docstring=tool_utils, question=task, code=code, feedback=working_memory
docstring=tool_utils,
question=task,
code=code,
feedback=working_memory,
media=input_media,
)
)
)

success, result = _EXECUTE.run_isolation(f"{code}\n{test}")
if verbosity == 2:
_LOGGER.info("First code and tests:")
_LOGGER.info("Initial code and tests:")
log_progress(
{
"log": "Code:",
Expand All @@ -153,7 +158,7 @@ def write_and_test_code(
"result": result,
}
)
_LOGGER.info(f"First result: {result}")
_LOGGER.info(f"Initial result: {result}")

count = 0
new_working_memory = []
Expand Down Expand Up @@ -198,16 +203,18 @@ def write_and_test_code(
_LOGGER.info(f"Debug result: {result}")
count += 1

if verbosity == 1:
if verbosity >= 1:
_LOGGER.info("Final code and tests:")
_CONSOLE.print(
Syntax(f"{code}\n{test}", "python", theme="gruvbox-dark", line_numbers=True)
)
_LOGGER.info(f"Result: {result}")
_LOGGER.info(f"Final Result: {result}")

return {
"code": code,
"test": test,
"success": success,
"test_result": result,
"working_memory": new_working_memory,
}

Expand Down Expand Up @@ -263,23 +270,26 @@ def __init__(
else tool_recommender
)
self.verbosity = verbosity
self.max_retries = 3
self.max_retries = 2
self.report_progress_callback = report_progress_callback

@no_type_check
def __call__(
self,
input: Union[List[Dict[str, str]], str],
image: Optional[Union[str, Path]] = None,
) -> str:
) -> Dict[str, Any]:
if isinstance(input, str):
input = [{"role": "user", "content": input}]
results = self.chat_with_workflow(input, image)
return results["code"] # type: ignore
results.pop("working_memory")
return results

def chat_with_workflow(
self,
chat: List[Dict[str, str]],
image: Optional[Union[str, Path]] = None,
self_reflection: bool = False,
) -> Dict[str, Any]:
if len(chat) == 0:
raise ValueError("Chat cannot be empty.")
Expand All @@ -302,13 +312,14 @@ def chat_with_workflow(
chat, TOOL_DESCRIPTIONS, format_memory(working_memory), self.planner
)
plan_i_str = "\n-".join([e["instructions"] for e in plan_i])
if self.verbosity == 1 or self.verbosity == 2:
if self.verbosity >= 1:
self.log_progress(
{
"log": "Going to run the following plan(s) in sequence:\n",
"plan": plan_i,
}
)

_LOGGER.info(
f"""
{tabulate(tabular_data=plan_i, headers="keys", tablefmt="mixed_grid", maxcolwidths=_MAX_TABULATE_COL_WIDTH)}"""
Expand All @@ -330,25 +341,29 @@ def chat_with_workflow(
self.debugger,
self.log_progress,
verbosity=self.verbosity,
input_media=image,
)
success = cast(bool, results["success"])
code = cast(str, results["code"])
test = cast(str, results["test"])
working_memory.extend(results["working_memory"]) # type: ignore
plan.append({"code": code, "test": test, "plan": plan_i})

reflection = reflect(chat, plan_i_str, code, self.planner)
if self.verbosity > 0:
self.log_progress(
{
"log": "Reflection:",
"reflection": reflection,
}
)
_LOGGER.info(f"Reflection: {reflection}")
feedback = cast(str, reflection["feedback"])
success = cast(bool, reflection["success"])
working_memory.append({"code": f"{code}\n{test}", "feedback": feedback})
if self_reflection:
reflection = reflect(chat, plan_i_str, code, self.planner)
if self.verbosity > 0:
self.log_progress(
{
"log": "Reflection:",
"reflection": reflection,
}
)
_LOGGER.info(f"Reflection: {reflection}")
feedback = cast(str, reflection["feedback"])
success = cast(bool, reflection["success"])
working_memory.append({"code": f"{code}\n{test}", "feedback": feedback})

retries += 1

self.log_progress(
{
Expand All @@ -360,6 +375,7 @@ def chat_with_workflow(
return {
"code": code,
"test": test,
"test_result": results["test_result"],
"plan": plan,
"working_memory": working_memory,
}
Expand Down
11 changes: 8 additions & 3 deletions vision_agent/agent/vision_agent_v3_prompts.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
2. **Algorithm/Method Selection**: Decide on the most efficient way.
3. **Pseudocode Creation**: Write down the steps you will follow in pseudocode.
4. **Code Generation**: Translate your pseudocode into executable Python code.
5. **Logging**: Log the output of the custom functions that were provided to you from `from vision_agent.tools.tools_v2 import *`. Use a debug flag in the function parameters to toggle logging on and off.
"""

TEST = """
Expand Down Expand Up @@ -149,7 +150,7 @@ def find_text(image_path: str, text: str) -> str:
**Input Code Snippet**:
```python
### Please decided how would you want to generate test cases. Based on incomplete code or completed version.
### Please decide how would you want to generate test cases. Based on incomplete code or completed version.
{code}
```
Expand All @@ -159,8 +160,12 @@ def find_text(image_path: str, text: str) -> str:
**Instructions**:
1. Verify the fundamental functionality under normal conditions.
2. Ensure each test case is well-documented with comments explaining the scenario it covers.
3. DO NOT use any files that are not provided by the user's instructions, your test must be run and will crash if it tries to load a non-existent file.
4. DO NOT mock any functions, you must test their functionality as is.
3. Your test case MUST run only on the given image which is {media}
4. DO NOT use any non-existent or dummy image or video files that are not provided by the user's instructions.
5. DO NOT mock any functions, you must test their functionality as is.
6. DO NOT assert the output value, run the code and verify it runs without any errors and assert only the output format or data structure.
7. DO NOT import the testing function as it will available in the testing environment.
8. Print the output of the function that is being tested.
"""


Expand Down
9 changes: 6 additions & 3 deletions vision_agent/tools/tools_v2.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,12 +416,15 @@ def closest_mask_distance(mask1: np.ndarray, mask2: np.ndarray) -> float:
return cast(float, np.min(dist_matrix))


def closest_box_distance(box1: List[float], box2: List[float]) -> float:
def closest_box_distance(
box1: List[float], box2: List[float], image_size: Tuple[int, int]
) -> float:
"""'closest_box_distance' calculates the closest distance between two bounding boxes.
Parameters:
box1 (List[float]): The first bounding box.
box2 (List[float]): The second bounding box.
image_size (Tuple[int, int]): The size of the image given as (height, width).
Returns:
float: The closest distance between the two bounding boxes.
Expand All @@ -432,8 +435,8 @@ def closest_box_distance(box1: List[float], box2: List[float]) -> float:
141.42
"""

x11, y11, x12, y12 = box1
x21, y21, x22, y22 = box2
x11, y11, x12, y12 = denormalize_bbox(box1, image_size)
x21, y21, x22, y22 = denormalize_bbox(box2, image_size)

horizontal_distance = np.max([0, x21 - x12, x11 - x22])
vertical_distance = np.max([0, y21 - y12, y11 - y22])
Expand Down

0 comments on commit c16820a

Please sign in to comment.