Skip to content

Add symbolic context #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jul 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,14 @@ but the intention is to do automatic plugin discovery. Automatic plugin
discovery will enable all downstream planning plugins to be implemented without
touching the omniplanner node.

There's an edge case in `compile_plan` that may require special care: If the
output from your planner (your "plan" type) is a subclass of list, then you
need to implement a `compile_plan` override for `compile_plan(adaptor,
SymbolicContext[YourPlanType])` *even if you do not need the symbolic context*.
This issue is caused by limitations of generic type inference of parameterized
types. You can choose to either not inherit from list, or ensure that you have
this compile plan override implemented.

### Is Omniplanner right for me?

Omniplanner is inherently experimental in nature, but it seems to work pretty
Expand Down
26 changes: 7 additions & 19 deletions omniplanner/examples/goto_points_example.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,12 @@
import numpy as np
from robot_executor_interface.action_descriptions import ActionSequence, Follow
from utils import build_test_dsg
from utils import DummyRobotPlanningAdaptor, build_test_dsg

from omniplanner.goto_points import GotoPointsDomain, GotoPointsGoal
from omniplanner.omniplanner import (
PlanRequest,
full_planning_pipeline,
)


def compile_plan(plan, plan_id, robot_name, frame_id):
"""This function turns the output of the planner into a ROS message that is ingestible by a robot"""
actions = []
for p in plan.plan:
xs = np.interp(np.linspace(0, 1, 10), [0, 1], [p.start[0], p.goal[0]])
ys = np.interp(np.linspace(0, 1, 10), [0, 1], [p.start[1], p.goal[1]])
p_interp = np.vstack([xs, ys])
actions.append(Follow(frame=frame_id, path2d=p_interp.T))

seq = ActionSequence(plan_id=plan_id, robot_name=robot_name, actions=actions)
return seq

from omniplanner_ros.goto_points_ros import compile_plan

print("================================")
print("== Goto Points Domain, no DSG ==")
Expand Down Expand Up @@ -51,7 +37,9 @@ def compile_plan(plan, plan_id, robot_name, frame_id):
print("Plan from planning domain:")
print(plan)

compiled_plan = compile_plan(plan, "abc123", "spot", "a_coordinate_frame")
adaptor = DummyRobotPlanningAdaptor("spot", "spot", "map", "body")

compiled_plan = compile_plan(adaptor, plan)
print("compiled plan:")
print(compiled_plan)

Expand All @@ -61,7 +49,7 @@ def compile_plan(plan, plan_id, robot_name, frame_id):
print("==================================")

robot_poses = {"spot": np.array([0.0, 0.1])}
goal = GotoPointsGoal(["o(0)", "o(1)"], "spot")
goal = GotoPointsGoal(["O(0)", "O(1)"], "spot")

robot_states = robot_poses
req = PlanRequest(domain=GotoPointsDomain(), goal=goal, robot_states=robot_states)
Expand All @@ -72,6 +60,6 @@ def compile_plan(plan, plan_id, robot_name, frame_id):
print("Plan from planning domain:")
print(plan)

compiled_plan = compile_plan(plan, "abc123", "spot", "a_coordinate_frame")
compiled_plan = compile_plan(adaptor, plan)
print("compiled plan:")
print(compiled_plan)
52 changes: 7 additions & 45 deletions omniplanner/examples/language_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,59 +4,25 @@
import dsg_pddl.domains
import nlu_interface.resources
import numpy as np
from dsg_pddl.dsg_pddl_interface import PddlDomain, PddlPlan
from dsg_pddl.pddl_grounding import PddlDomain
from nlu_interface.llm_interface import OpenAIWrapper
from robot_executor_interface.action_descriptions import ActionSequence, Follow, Gaze
from ruamel.yaml import YAML
from utils import build_test_dsg
from utils import DummyRobotPlanningAdaptor, build_test_dsg

from omniplanner.language_planner import LanguageDomain, LanguageGoal
from omniplanner.omniplanner import (
PlanRequest,
full_planning_pipeline,
)
from omniplanner_ros.pddl_planner_ros import compile_plan

logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)

yaml = YAML(typ="safe")


def compile_plan_goto(plan, plan_id, robot_name, frame_id):
"""This function turns the output of the planner into a ROS message that is ingestible by a robot"""
actions = []
for p in plan.plan:
xs = np.interp(np.linspace(0, 1, 10), [0, 1], [p.start[0], p.goal[0]])
ys = np.interp(np.linspace(0, 1, 10), [0, 1], [p.start[1], p.goal[1]])
p_interp = np.vstack([xs, ys])
actions.append(Follow(frame=frame_id, path2d=p_interp.T))

seq = ActionSequence(plan_id=plan_id, robot_name=robot_name, actions=actions)
return seq


def compile_plan(plan: PddlPlan, plan_id, robot_name, frame_id):
actions = []
for symbolic_action, parameters in zip(
plan.symbolic_actions, plan.parameterized_actions
):
match symbolic_action[0]:
case "goto-poi":
actions.append(Follow(frame=frame_id, path2d=parameters))
case "inspect":
robot_point, gaze_point = parameters
actions.append(
Gaze(
frame=frame_id,
robot_point=robot_point,
gaze_point=gaze_point,
stow_after=True,
)
)

seq = ActionSequence(plan_id=plan_id, robot_name=robot_name, actions=actions)
return seq

adaptor = DummyRobotPlanningAdaptor("spot", "spot", "map", "body")
adaptors = {"euclid": adaptor}

print("================================")
print("== Goto Point Language Domain ==")
Expand All @@ -80,9 +46,7 @@ def compile_plan(plan: PddlPlan, plan_id, robot_name, frame_id):
print("Plan from planning domain:")
print(robot_plan)

compiled_plan = compile_plan_goto(
robot_plan.value, "abc123", robot_plan.name, "a_coordinate_frame"
)
compiled_plan = compile_plan(adaptor, robot_plan)
print("compiled plan:")
print(compiled_plan)

Expand Down Expand Up @@ -141,8 +105,6 @@ def compile_plan(plan: PddlPlan, plan_id, robot_name, frame_id):
print(plan)


compiled_plan = compile_plan(
plan[0].value, "abc123", plan[0].name, "a_coordinate_frame"
)
compiled_plan = compile_plan(adaptors, plan)
print("compiled plan:")
print(compiled_plan)
88 changes: 20 additions & 68 deletions omniplanner/examples/pddl_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,82 +4,31 @@
import dsg_pddl.domains
import numpy as np
import spark_dsg
from dsg_pddl.dsg_pddl_planning import PddlPlan
from dsg_pddl.pddl_grounding import PddlDomain, PddlGoal
from robot_executor_interface.action_descriptions import (
ActionSequence,
Follow,
Gaze,
Pick,
Place,
)
from ruamel.yaml import YAML
from utils import DummyRobotPlanningAdaptor

from omniplanner.omniplanner import (
PlanRequest,
full_planning_pipeline,
)
from omniplanner.compile_plan import collect_plans
from omniplanner.omniplanner import PlanRequest, full_planning_pipeline
from omniplanner_ros.pddl_planner_ros import compile_plan

logging.basicConfig()
logging.getLogger().setLevel(logging.INFO)

yaml = YAML(typ="safe")


def compile_plan(plan: PddlPlan, plan_id, robot_name, frame_id):
actions = []
for symbolic_action, parameters in zip(
plan.symbolic_actions, plan.parameterized_actions
):
match symbolic_action[0]:
case "goto-poi":
actions.append(Follow(frame=frame_id, path2d=parameters))
case "inspect":
robot_point, gaze_point = parameters
actions.append(
Gaze(
frame=frame_id,
robot_point=robot_point,
gaze_point=gaze_point,
stow_after=True,
)
)
case "pick-object":
robot_point, pick_point = parameters
actions.append(
Pick(
frame=frame_id,
object_class="",
robot_point=robot_point,
object_point=pick_point,
)
)
case "place-object":
robot_point, place_point = parameters
actions.append(
Place(
frame=frame_id,
object_class="",
robot_point=robot_point,
object_point=place_point,
)
)
case _:
raise NotImplementedError(
f"I don't know how to compile {symbolic_action[0]}"
)

seq = ActionSequence(plan_id=plan_id, robot_name=robot_name, actions=actions)
return seq


# G = build_test_dsg()
# goal = PddlGoal(robot_id="euclid", pddl_goal="(and (visited-object o0) (visited-object o1))")
# goal = PddlGoal(robot_id="euclid", pddl_goal="(object-in-place o1 p0)")
G = spark_dsg.DynamicSceneGraph.load(
"/home/ubuntu/lxc_datashare/west_point_fused_map_wregions.json"
"/home/ubuntu/lxc_datashare/west_point_fused_map_wregions_labelspace.json"
)


adaptor = DummyRobotPlanningAdaptor("euclid", "spot", "map", "body")
adaptors = {"euclid": adaptor}

robot_poses = {"euclid": np.array([0.0, 0.1])}

print("================================")
Expand Down Expand Up @@ -113,10 +62,13 @@ def compile_plan(plan: PddlPlan, plan_id, robot_name, frame_id):
print("Plan from planning domain:")
print(plan)

compiled_plan = compile_plan(plan.value, "abc123", plan.name, "a_coordinate_frame")
print("compiled plan:")
compiled_plan = compile_plan(adaptors, plan)
print(compiled_plan)

collected_plans = collect_plans(compiled_plan)
print("Collected plans:")
print(collected_plans)


print("================================")
print("== PDDL Domain (Pick/Place) ==")
Expand Down Expand Up @@ -146,9 +98,9 @@ def compile_plan(plan: PddlPlan, plan_id, robot_name, frame_id):
print("Plan from planning domain:")
print(plan)

compiled_plan = compile_plan(plan.value, "abc123", plan.name, "a_coordinate_frame")
print("compiled plan:")
print(compiled_plan)
collected_plan = collect_plans(compile_plan(adaptors, plan))
print("collected plan: ", collected_plan)


print("================================")
print("== PDDL Domain (Regions) ==")
Expand Down Expand Up @@ -186,6 +138,6 @@ def compile_plan(plan: PddlPlan, plan_id, robot_name, frame_id):
print("Plan from planning domain:")
print(plan)

compiled_plan = compile_plan(plan.value, "abc123", plan.name, "a_coordinate_frame")
print("compiled plan:")
print(compiled_plan)

collected_plan = collect_plans(compile_plan(adaptors, plan))
print("collected plan: ", collected_plan)
21 changes: 6 additions & 15 deletions omniplanner/examples/tsp_example.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,26 @@
import logging

import numpy as np
from robot_executor_interface.action_descriptions import ActionSequence, Follow
from utils import build_test_dsg
from utils import DummyRobotPlanningAdaptor, build_test_dsg

from omniplanner.omniplanner import (
PlanRequest,
full_planning_pipeline,
)
from omniplanner.tsp import FollowPathPlan, TspDomain, TspGoal
from omniplanner.tsp import TspDomain, TspGoal
from omniplanner_ros.goto_points_ros import compile_plan

logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)


def compile_plan(plan: FollowPathPlan, plan_id, robot_name, frame_id):
actions = []
for p in plan:
actions.append(Follow(frame=frame_id, path2d=p.path))
seq = ActionSequence(plan_id=plan_id, robot_name=robot_name, actions=actions)
return seq

adaptor = DummyRobotPlanningAdaptor("euclid", "spot", "map", "body")

print("==========================")
print("== TSP Domain ==")
print("==========================")
print("")

goal = TspGoal(goal_points=["o(0)", "o(1)"], robot_id="spot")
goal = TspGoal(goal_points=["O(0)", "O(1)"], robot_id="spot")

robot_domain = TspDomain(solver="2opt")

Expand All @@ -45,8 +38,6 @@ def compile_plan(plan: FollowPathPlan, plan_id, robot_name, frame_id):
print("Plan from planning domain:")
print(robot_plan)

compiled_plan = compile_plan(
robot_plan.value, "abc123", robot_plan.name, "a_coordinate_frame"
)
compiled_plan = compile_plan(adaptor, robot_plan)
print("compiled plan:")
print(compiled_plan)
10 changes: 10 additions & 0 deletions omniplanner/examples/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,17 @@
from dataclasses import dataclass

import numpy as np
import spark_dsg


@dataclass
class DummyRobotPlanningAdaptor:
name: str
robot_type: str
parent_frame: str
child_frame: str


def build_test_dsg():
"""An example scene graph for testing"""
G = spark_dsg.DynamicSceneGraph()
Expand Down
Loading