diff --git a/README.md b/README.md index 17f1d6085..bc5aac813 100644 --- a/README.md +++ b/README.md @@ -58,9 +58,7 @@ git submodule init git submodule update ``` -Wait for the datasets to download, it may take a while. Once they are downloaded, you need to install `pytest`, so that you can run the test suite: - -`pip install pytest` +Wait for the datasets to download, it may take a while. Then to run the tests: diff --git a/planning.py b/planning.py index 1e4a19209..35c2ef70d 100644 --- a/planning.py +++ b/planning.py @@ -4,7 +4,6 @@ import itertools from collections import deque, defaultdict from functools import reduce as _reduce - import numpy as np import search @@ -13,7 +12,6 @@ from search import Node from utils import Expr, expr, first - class PlanningProblem: """ Planning Domain Definition Language (PlanningProblem) used to define a search problem. @@ -80,8 +78,9 @@ def expand_fluents(self, name=None): return expansions + def expand_actions(self, name=None): - """Generate all possible actions with variable bindings for precondition selection heuristic""" + "Generate all possible actions with variable bindings for precondition selection heuristic" has_domains = all(action.domain for action in self.actions if action.precond) kb = None @@ -136,7 +135,6 @@ def expand_actions(self, name=None): new_effect = Expr(str(effect.op), *new_effect_args) new_effects.append(new_effect) expansions.append(Action(new_expr, new_preconds, new_effects)) - return expansions def is_strips(self): @@ -221,7 +219,7 @@ def relaxed(self): list(filter(lambda effect: effect.op[:3] != 'Not', self.effect))) def substitute(self, e, args): - """Replaces variables in expression with their respective Propositional symbol""" + "Replaces variables in expression with their respective Propositional symbol" new_args = list(e.args) for num, x in enumerate(e.args): @@ -231,7 +229,7 @@ def substitute(self, e, args): return Expr(e.op, *new_args) def check_precond(self, kb, args): - """Checks if the precondition is satisfied in the current state""" + "Checks if the precondition is satisfied in the current state" if isinstance(kb, list): kb = FolKB(kb) @@ -274,292 +272,6 @@ def goal_test(goals, state): return all(kb.ask(q) is not False for q in goals) -def air_cargo(): - """ - [Figure 10.1] AIR-CARGO-PROBLEM - - An air-cargo shipment problem for delivering cargo to different locations, - given the starting location and airplanes. - - Example: - >>> from planning import * - >>> ac = air_cargo() - >>> ac.goal_test() - False - >>> ac.act(expr('Load(C2, P2, JFK)')) - >>> ac.act(expr('Load(C1, P1, SFO)')) - >>> ac.act(expr('Fly(P1, SFO, JFK)')) - >>> ac.act(expr('Fly(P2, JFK, SFO)')) - >>> ac.act(expr('Unload(C2, P2, SFO)')) - >>> ac.goal_test() - False - >>> ac.act(expr('Unload(C1, P1, JFK)')) - >>> ac.goal_test() - True - >>> - """ - - return PlanningProblem(initial='At(C1, SFO) & At(C2, JFK) & At(P1, SFO) & At(P2, JFK)', - goals='At(C1, JFK) & At(C2, SFO)', - actions=[Action('Load(c, p, a)', - precond='At(c, a) & At(p, a)', - effect='In(c, p) & ~At(c, a)', - domain='Cargo(c) & Plane(p) & Airport(a)'), - Action('Unload(c, p, a)', - precond='In(c, p) & At(p, a)', - effect='At(c, a) & ~In(c, p)', - domain='Cargo(c) & Plane(p) & Airport(a)'), - Action('Fly(p, f, to)', - precond='At(p, f)', - effect='At(p, to) & ~At(p, f)', - domain='Plane(p) & Airport(f) & Airport(to)')], - domain='Cargo(C1) & Cargo(C2) & Plane(P1) & Plane(P2) & Airport(SFO) & Airport(JFK)') - - -def spare_tire(): - """ - [Figure 10.2] SPARE-TIRE-PROBLEM - - A problem involving changing the flat tire of a car - with a spare tire from the trunk. - - Example: - >>> from planning import * - >>> st = spare_tire() - >>> st.goal_test() - False - >>> st.act(expr('Remove(Spare, Trunk)')) - >>> st.act(expr('Remove(Flat, Axle)')) - >>> st.goal_test() - False - >>> st.act(expr('PutOn(Spare, Axle)')) - >>> st.goal_test() - True - >>> - """ - - return PlanningProblem(initial='At(Flat, Axle) & At(Spare, Trunk)', - goals='At(Spare, Axle) & At(Flat, Ground)', - actions=[Action('Remove(obj, loc)', - precond='At(obj, loc)', - effect='At(obj, Ground) & ~At(obj, loc)', - domain='Tire(obj)'), - Action('PutOn(t, Axle)', - precond='At(t, Ground) & ~At(Flat, Axle)', - effect='At(t, Axle) & ~At(t, Ground)', - domain='Tire(t)'), - Action('LeaveOvernight', - precond='', - effect='~At(Spare, Ground) & ~At(Spare, Axle) & ~At(Spare, Trunk) & \ - ~At(Flat, Ground) & ~At(Flat, Axle) & ~At(Flat, Trunk)')], - domain='Tire(Flat) & Tire(Spare)') - - -def three_block_tower(): - """ - [Figure 10.3] THREE-BLOCK-TOWER - - A blocks-world problem of stacking three blocks in a certain configuration, - also known as the Sussman Anomaly. - - Example: - >>> from planning import * - >>> tbt = three_block_tower() - >>> tbt.goal_test() - False - >>> tbt.act(expr('MoveToTable(C, A)')) - >>> tbt.act(expr('Move(B, Table, C)')) - >>> tbt.goal_test() - False - >>> tbt.act(expr('Move(A, Table, B)')) - >>> tbt.goal_test() - True - >>> - """ - return PlanningProblem(initial='On(A, Table) & On(B, Table) & On(C, A) & Clear(B) & Clear(C)', - goals='On(A, B) & On(B, C)', - actions=[Action('Move(b, x, y)', - precond='On(b, x) & Clear(b) & Clear(y)', - effect='On(b, y) & Clear(x) & ~On(b, x) & ~Clear(y)', - domain='Block(b) & Block(y)'), - Action('MoveToTable(b, x)', - precond='On(b, x) & Clear(b)', - effect='On(b, Table) & Clear(x) & ~On(b, x)', - domain='Block(b) & Block(x)')], - domain='Block(A) & Block(B) & Block(C)') - - -def simple_blocks_world(): - """ - SIMPLE-BLOCKS-WORLD - - A simplified definition of the Sussman Anomaly problem. - - Example: - >>> from planning import * - >>> sbw = simple_blocks_world() - >>> sbw.goal_test() - False - >>> sbw.act(expr('ToTable(A, B)')) - >>> sbw.act(expr('FromTable(B, A)')) - >>> sbw.goal_test() - False - >>> sbw.act(expr('FromTable(C, B)')) - >>> sbw.goal_test() - True - >>> - """ - - return PlanningProblem(initial='On(A, B) & Clear(A) & OnTable(B) & OnTable(C) & Clear(C)', - goals='On(B, A) & On(C, B)', - actions=[Action('ToTable(x, y)', - precond='On(x, y) & Clear(x)', - effect='~On(x, y) & Clear(y) & OnTable(x)'), - Action('FromTable(y, x)', - precond='OnTable(y) & Clear(y) & Clear(x)', - effect='~OnTable(y) & ~Clear(x) & On(y, x)')]) - - -def have_cake_and_eat_cake_too(): - """ - [Figure 10.7] CAKE-PROBLEM - - A problem where we begin with a cake and want to - reach the state of having a cake and having eaten a cake. - The possible actions include baking a cake and eating a cake. - - Example: - >>> from planning import * - >>> cp = have_cake_and_eat_cake_too() - >>> cp.goal_test() - False - >>> cp.act(expr('Eat(Cake)')) - >>> cp.goal_test() - False - >>> cp.act(expr('Bake(Cake)')) - >>> cp.goal_test() - True - >>> - """ - - return PlanningProblem(initial='Have(Cake)', - goals='Have(Cake) & Eaten(Cake)', - actions=[Action('Eat(Cake)', - precond='Have(Cake)', - effect='Eaten(Cake) & ~Have(Cake)'), - Action('Bake(Cake)', - precond='~Have(Cake)', - effect='Have(Cake)')]) - - -def shopping_problem(): - """ - SHOPPING-PROBLEM - - A problem of acquiring some items given their availability at certain stores. - - Example: - >>> from planning import * - >>> sp = shopping_problem() - >>> sp.goal_test() - False - >>> sp.act(expr('Go(Home, HW)')) - >>> sp.act(expr('Buy(Drill, HW)')) - >>> sp.act(expr('Go(HW, SM)')) - >>> sp.act(expr('Buy(Banana, SM)')) - >>> sp.goal_test() - False - >>> sp.act(expr('Buy(Milk, SM)')) - >>> sp.goal_test() - True - >>> - """ - - return PlanningProblem(initial='At(Home) & Sells(SM, Milk) & Sells(SM, Banana) & Sells(HW, Drill)', - goals='Have(Milk) & Have(Banana) & Have(Drill)', - actions=[Action('Buy(x, store)', - precond='At(store) & Sells(store, x)', - effect='Have(x)', - domain='Store(store) & Item(x)'), - Action('Go(x, y)', - precond='At(x)', - effect='At(y) & ~At(x)', - domain='Place(x) & Place(y)')], - domain='Place(Home) & Place(SM) & Place(HW) & Store(SM) & Store(HW) & ' - 'Item(Milk) & Item(Banana) & Item(Drill)') - - -def socks_and_shoes(): - """ - SOCKS-AND-SHOES-PROBLEM - - A task of wearing socks and shoes on both feet - - Example: - >>> from planning import * - >>> ss = socks_and_shoes() - >>> ss.goal_test() - False - >>> ss.act(expr('RightSock')) - >>> ss.act(expr('RightShoe')) - >>> ss.act(expr('LeftSock')) - >>> ss.goal_test() - False - >>> ss.act(expr('LeftShoe')) - >>> ss.goal_test() - True - >>> - """ - - return PlanningProblem(initial='', - goals='RightShoeOn & LeftShoeOn', - actions=[Action('RightShoe', - precond='RightSockOn', - effect='RightShoeOn'), - Action('RightSock', - precond='', - effect='RightSockOn'), - Action('LeftShoe', - precond='LeftSockOn', - effect='LeftShoeOn'), - Action('LeftSock', - precond='', - effect='LeftSockOn')]) - - -def double_tennis_problem(): - """ - [Figure 11.10] DOUBLE-TENNIS-PROBLEM - - A multiagent planning problem involving two partner tennis players - trying to return an approaching ball and repositioning around in the court. - - Example: - >>> from planning import * - >>> dtp = double_tennis_problem() - >>> goal_test(dtp.goals, dtp.initial) - False - >>> dtp.act(expr('Go(A, RightBaseLine, LeftBaseLine)')) - >>> dtp.act(expr('Hit(A, Ball, RightBaseLine)')) - >>> goal_test(dtp.goals, dtp.initial) - False - >>> dtp.act(expr('Go(A, LeftNet, RightBaseLine)')) - >>> goal_test(dtp.goals, dtp.initial) - True - >>> - """ - - return PlanningProblem( - initial='At(A, LeftBaseLine) & At(B, RightNet) & Approaching(Ball, RightBaseLine) & Partner(A, B) & Partner(B, A)', - goals='Returned(Ball) & At(a, LeftNet) & At(a, RightNet)', - actions=[Action('Hit(actor, Ball, loc)', - precond='Approaching(Ball, loc) & At(actor, loc)', - effect='Returned(Ball)'), - Action('Go(actor, to, loc)', - precond='At(actor, loc)', - effect='At(actor, to) & ~At(actor, loc)')]) - - class ForwardPlan(search.Problem): """ [Section 10.2.1] @@ -591,6 +303,7 @@ def h(self, state): actions=[action.relaxed() for action in self.planning_problem.actions]) try: + # Relies upon GraphPlan exiting when leveled off return len(linearize(GraphPlan(relaxed_planning_problem).execute())) except: return np.inf @@ -762,20 +475,45 @@ def __init__(self, kb): self.kb = kb # current state self.current_state = kb.clauses + # current action to state link + # Action -> preconditions for that action self.current_action_links = {} + # current state to action link + # Precondition -> what is applicable (actions) self.current_state_links = {} - # current action to next state link + + # current action to next state link (E.g. Go(Home, HW) --> At(HW) and NotAt(Home)) + # aka forward link in time (dependency) self.next_action_links = {} - # next state to current action link + + # next state to current action link (E.g. NotAt(Home): [Go(Home, HW), Go(Home, SM)]) + # aka backwards link in time (dependency) self.next_state_links = {} + # mutually exclusive actions - self.mutex = [] + self.action_mutexes = [] + # mutually exclusive states + self.state_mutexes = [] + def __call__(self, actions, objects): self.build(actions, objects) self.find_mutex() + + def __str__(self): + state_str = ", ".join(str(s) for s in self.current_state) + action_str = ", ".join(str(a) for a in self.current_action_links.keys()) + mutex_str = ", ".join([str(m) for m in self.action_mutexes]) + return ( + f"\n" + f" Current State: {{{state_str}}}\n" + f" Actions: {{{action_str}}}\n" + f" Mutex: {{{mutex_str}}}\n" + ) + + __repr__ = __str__ def separate(self, e): """Separates an iterable of elements into positive and negative parts""" @@ -789,46 +527,129 @@ def separate(self, e): positive.append(clause) return positive, negative + def find_mutex(self): - """Finds mutually exclusive actions""" - - # Inconsistent effects - pos_nsl, neg_nsl = self.separate(self.next_state_links) - - for negeff in neg_nsl: - new_negeff = Expr(negeff.op[3:], *negeff.args) - for poseff in pos_nsl: - if new_negeff == poseff: - for a in self.next_state_links[poseff]: - for b in self.next_state_links[negeff]: - if {a, b} not in self.mutex: - self.mutex.append({a, b}) - - # Interference will be calculated with the last step - pos_csl, neg_csl = self.separate(self.current_state_links) - - # Competing needs - for pos_precond in pos_csl: - for neg_precond in neg_csl: - new_neg_precond = Expr(neg_precond.op[3:], *neg_precond.args) - if new_neg_precond == pos_precond: - for a in self.current_state_links[pos_precond]: - for b in self.current_state_links[neg_precond]: - if {a, b} not in self.mutex: - self.mutex.append({a, b}) - - # Inconsistent support + "Finds mutually exclusive actions" + + self.action_mutexes = [] # clear out effects from state mutex prior computation + + # Competing Needs - two actions are mutex if any of their preconditions are mutex at the previous state level + for a1, a2 in itertools.combinations(self.current_action_links.keys(), 2): + preconds_a1 = self.current_action_links[a1] + preconds_a2 = self.current_action_links[a2] + + if any({p, q} in self.state_mutexes for p in preconds_a1 for q in preconds_a2): + mutex_pair = {a1, a2} + if mutex_pair not in self.action_mutexes: + self.action_mutexes.append(mutex_pair) + + # Interference AND Inconsistent Effects Mutex Calculation + for a1, a2 in itertools.combinations(self.next_action_links.keys(), 2): + preconds_a1 = self.current_action_links.get(a1, []) + preconds_a2 = self.current_action_links.get(a2, []) + effects_a1 = self.next_action_links.get(a1, []) + effects_a2 = self.next_action_links.get(a2, []) + + interference = False + # Interference Check + for p1 in preconds_a1: + if p1.predicate_negate() in effects_a2: + interference = True + for p2 in preconds_a2: + if p2.predicate_negate() in effects_a1: + interference = True + + # Inconsistent Effects Check + for e1 in effects_a1: + if e1.predicate_negate() in effects_a2: + interference = True + for e2 in effects_a2: + if e2.predicate_negate() in effects_a1: + interference = True + + if interference: + mutex_pair = {a1, a2} + if mutex_pair not in self.action_mutexes: + self.action_mutexes.append(mutex_pair) + + def populate_prop_mutexes(self): + "Compute the next level's proposition mutexes based on our current action mutexes" + + # Inconsistent support - two props cannot be true given competing supporting actions state_mutex = [] - for pair in self.mutex: - next_state_0 = self.next_action_links[list(pair)[0]] - if len(pair) == 2: - next_state_1 = self.next_action_links[list(pair)[1]] - else: - next_state_1 = self.next_action_links[list(pair)[0]] - if (len(next_state_0) == 1) and (len(next_state_1) == 1): - state_mutex.append({next_state_0[0], next_state_1[0]}) + next_state_pairs = itertools.combinations(self.next_state_links.keys(), 2) + for next_state_pair in list(next_state_pairs): + s1, s2 = list(next_state_pair) + acts_to_s1 = self.next_state_links.get(s1, []) + acts_to_s2 = self.next_state_links.get(s2, []) + + # ensure our mutexes only apply to pairs, not single states. + if acts_to_s1 == [] or acts_to_s2 == []: + continue + + # if any two actions that lead to these states is not mutex, do not add a mutex to these states. + if all([{a1,a2} in self.action_mutexes or {a2,a1} in self.action_mutexes for a1 in acts_to_s1 for a2 in acts_to_s2]): + mutex_pair = {s1, s2} + if mutex_pair not in state_mutex: + state_mutex.append(mutex_pair) + + # If there are pairs of propositions that are negations of each other, they need to be mutex + for s1i in range(len(self.current_state)): + for s2i in range(s1i,len(self.current_state)): + s1, s2 = self.current_state[s1i], self.current_state[s2i] + if repr(s2)[0:3] == "Not" and repr(s1) == repr(s2)[3:] or repr(s1)[0:3] == "Not" and repr(s1)[3:] == repr(s2): + mutex_pair = {s1, s2} + if mutex_pair not in state_mutex: + state_mutex.append(mutex_pair) + + return state_mutex + + def prune_invalid_actions(self): + """Remove actions whose own preconditions are mutex (unsupportable).""" + to_remove = [] + + # Normalize state mutex set for fast membership checks: + state_mutex_lookup = set() + for m in self.state_mutexes: + state_mutex_lookup.add(frozenset(m)) + + for action, preconds in list(self.current_action_links.items()): + invalid = False + + for p1, p2 in itertools.combinations(preconds, 2): + if frozenset({p1, p2}) in state_mutex_lookup: + invalid = True + break - self.mutex = self.mutex + state_mutex + if invalid: + to_remove.append(action) + + # Remove invalid actions from all mappings + for action in to_remove: + # forward mappings + self.current_action_links.pop(action, None) + self.next_action_links.pop(action, None) + + # reverse mapping: state -> actions (current_state_links) + for precond in list(self.current_state_links.keys()): + actions_for_pre = self.current_state_links.get(precond, []) + if action in actions_for_pre: + actions_for_pre.remove(action) + # if no more actions support this precond, drop the key + if not actions_for_pre: + self.current_state_links.pop(precond, None) + else: + self.current_state_links[precond] = actions_for_pre + + # reverse mapping: next_state -> actions (next_state_links) + for effect in list(self.next_state_links.keys()): + actions_for_eff = self.next_state_links.get(effect, []) + if action in actions_for_eff: + actions_for_eff.remove(action) + if not actions_for_eff: + self.next_state_links.pop(effect, None) + else: + self.next_state_links[effect] = actions_for_eff def build(self, actions, objects): """Populates the lists and dictionaries containing the state action dependencies""" @@ -847,7 +668,7 @@ def build(self, actions, objects): for arg in possible_args: if a.check_precond(self.kb, arg): for num, symbol in enumerate(a.args): - if not symbol.op.islower(): + if not symbol.op.islower(): # If not lowercase, we care about exact match to object? arg = list(arg) arg[num] = symbol arg = tuple(arg) @@ -874,7 +695,7 @@ def build(self, actions, objects): self.next_state_links[new_clause] = [new_action] def perform_actions(self): - """Performs the necessary actions and returns a new Level""" + "Performs the necessary actions and returns a new Level" new_kb = FolKB(list(set(self.next_state_links.keys()))) return Level(new_kb) @@ -895,19 +716,34 @@ def __init__(self, planning_problem): def __call__(self): self.expand_graph() + def __str__(self): + levels_str = "\n".join( + f"Level {i}:\n{str(level)}" for i, level in enumerate(self.levels) + ) + return ( + f"\n" + f" Objects: {self.objects}\n" + f"{levels_str}\n" + ) + + __repr__ = __str__ + def expand_graph(self): """Expands the graph by a level""" last_level = self.levels[-1] - last_level(self.planning_problem.actions, self.objects) - self.levels.append(last_level.perform_actions()) + last_level(self.planning_problem.actions, self.objects) # populate state/actions/mutexes + last_level.prune_invalid_actions() + new_level = last_level.perform_actions() # Create new level + new_level.state_mutexes = last_level.populate_prop_mutexes() # Populate the mutexes for the next state level to come + self.levels.append(new_level) def non_mutex_goals(self, goals, index): - """Checks whether the goals are mutually exclusive""" + "Checks whether the goals are mutually exclusive" goal_perm = itertools.combinations(goals, 2) for g in goal_perm: - if set(g) in self.levels[index].mutex: + if set(g) in self.levels[index].state_mutexes: return False return True @@ -924,110 +760,169 @@ def __init__(self, planning_problem): self.no_goods = [] self.solution = [] + def __str__(self): + sol_str = ( + "No solution found" + if not self.solution + else f"Solution with {len(self.solution)} steps" + ) + return ( + f"\n" + f" Nogoods: {len(self.no_goods)}\n" + f" {sol_str}\n" + ) + + __repr__ = __str__ + def check_leveloff(self): - """Checks if the graph has levelled off""" + """Checks if the graph has leveled off""" + if len(self.graph.levels) < 2: + return False - check = (set(self.graph.levels[-1].current_state) == set(self.graph.levels[-2].current_state)) + level = self.graph.levels[-1] + prev_level = self.graph.levels[-2] - if check: - return True + same_state = set(level.current_state) == set(prev_level.current_state) - def extract_solution(self, goals, index): - """Extracts the solution""" + level_mutex = set(frozenset(m) for m in level.state_mutexes) + prev_mutex = set(frozenset(m) for m in prev_level.state_mutexes) + same_mutex = level_mutex == prev_mutex + + return same_state and same_mutex - level = self.graph.levels[index] - if not self.graph.non_mutex_goals(goals, index): - self.no_goods.append((level, goals)) - return + def _get_preconditions_for(self, action_set, level): + """Collects all unique preconditions for a given set of actions in a level""" + all_preconditions = set() + for action in action_set: + preconditions = level.current_action_links.get(action, []) + all_preconditions.update(preconditions) + return all_preconditions - level = self.graph.levels[index - 1] + def _find_valid_action_sets(self, goals, level): + """ + Finds sets of actions in the given level that are not mutually exclusive + and that collectively satisfy all the goals. + """ + valid_sets = [] - # Create all combinations of actions that satisfy the goal - actions = [] - for goal in goals: - actions.append(level.next_state_links[goal]) + actions_for_goal = {g: level.next_state_links.get(g, []) for g in goals} + potential_action_groups = [actions_for_goal[g] for g in goals] - all_actions = list(itertools.product(*actions)) + for action_combination in itertools.product(*potential_action_groups): + action_set = set(action_combination) - # Filter out non-mutex actions - non_mutex_actions = [] - for action_tuple in all_actions: - action_pairs = itertools.combinations(list(set(action_tuple)), 2) - non_mutex_actions.append(list(set(action_tuple))) - for pair in action_pairs: - if set(pair) in level.mutex: - non_mutex_actions.pop(-1) + is_mutex = False + for a1, a2 in itertools.combinations(action_set, 2): + if {a1, a2} in level.action_mutexes: + is_mutex = True break - # Recursion - for action_list in non_mutex_actions: - if [action_list, index] not in self.solution: - self.solution.append([action_list, index]) + if not is_mutex and action_set not in valid_sets: + valid_sets.append(action_set) - new_goals = [] - for act in set(action_list): - if act in level.current_action_links: - new_goals = new_goals + level.current_action_links[act] + return valid_sets - if abs(index) + 1 == len(self.graph.levels): - return - elif (level, new_goals) in self.no_goods: - return - else: - self.extract_solution(new_goals, index - 1) + def extract_solution(self, goals): + """ + Primary method to start the solution extraction process. + It calls the recursive helper and returns the final plan. + """ + return self._extract_solution_recursive(set(goals), len(self.graph.levels)-1) - # Level-Order multiple solutions - solution = [] - for item in self.solution: - if item[1] == -1: - solution.append([]) - solution[-1].append(item[0]) - else: - solution[-1].append(item[0]) + # Place this within your GraphPlan class, replacing the previous version + def _extract_solution_recursive(self, goals, level_index): + """ + Recursively searches for a plan backwards from a given level using negative indexing. + + Args: + goals (set): The set of goal propositions to satisfy. + level_index (int): level index + """ - for num, item in enumerate(solution): - item.reverse() - solution[num] = item + #BASE CASE: We've recursed back to the initial proposition layer (Level 0). + #The 'goals' at this point are the preconditions for the very first set of actions. + #We must check if they exist in the initial state. No further recursion is needed. - return solution + if level_index == 0: + initial_state = set(self.graph.levels[0].current_state) + if goals.issubset(initial_state): + return [] # Success! Return the empty plan to be built upon. + else: + return None # Failure. This path is invalid as preconditions are not met. + + # MEMOIZATION: Check if we've already proven this subproblem is unsolvable. + # TODO: ADD SUPERSET CHECK + if (level_index, frozenset(goals)) in self.no_goods: + return None + + # RECURSIVE STEP: + # To satisfy goals at `level_index`, we need to find a suitable set of non-mutex actions + # from the *previous* level's action layer. + action_level = self.graph.levels[level_index - 1] + valid_action_sets = self._find_valid_action_sets(goals, action_level) + + # Iterate through each valid action set and try to find a path. + for action_set in valid_action_sets: + # The new sub-goals are the combined preconditions for this action set. + new_goals = self._get_preconditions_for(action_set, action_level) + + # Recurse to solve for the new goals at the previous proposition layer. + sub_plan = self._extract_solution_recursive(new_goals, level_index - 1) + + # If the recursive call succeeded, we have a solution! + if sub_plan is not None: + # Append the current level's actions and return the full plan. + return sub_plan + [list(action_set)] + + # If the loop finishes without a solution, this subproblem is a "no-good". + nogood_item = (level_index, frozenset(goals)) + if nogood_item not in self.no_goods: + self.no_goods.append(nogood_item) + return None def goal_test(self, kb): - return all(kb.ask(q) is not False for q in self.graph.planning_problem.goals) + goal_achieved = all(kb.ask(q) is not False for q in self.graph.planning_problem.goals) + return goal_achieved def execute(self): - """Executes the GraphPlan algorithm for the given problem""" + "Executes the GraphPlan algorithm for the given problem" while True: self.graph.expand_graph() if (self.goal_test(self.graph.levels[-1].kb) and self.graph.non_mutex_goals( self.graph.planning_problem.goals, -1)): - solution = self.extract_solution(self.graph.planning_problem.goals, -1) + + solution = self.extract_solution(self.graph.planning_problem.goals) if solution: - return solution + return [solution] - if len(self.graph.levels) >= 2 and self.check_leveloff(): + if self.check_leveloff(): return None - class Linearize: + """ + Problem wrapper / coordinator that linearizes partially ordered solutions generated by GraphPlan object. + """ def __init__(self, planning_problem): self.planning_problem = planning_problem def filter(self, solution): - """Filter out persistence actions from a solution""" + "Filter out persistence actions from a solution" new_solution = [] - for section in solution[0]: + for section in solution: new_section = [] for operation in section: if not (operation.op[0] == 'P' and operation.op[1].isupper()): new_section.append(operation) - new_solution.append(new_section) + # filter may remove all actions if all actions are persistent + if new_section != []: + new_solution.append(new_section) return new_solution def orderlevel(self, level, planning_problem): - """Return valid linear order of actions for a given level""" + "Return valid linear order of actions for a given level" for permutation in itertools.permutations(level): temp = copy.deepcopy(planning_problem) @@ -1039,25 +934,43 @@ def orderlevel(self, level, planning_problem): except: count = 0 temp = copy.deepcopy(planning_problem) - break + continue if count == len(permutation): return list(permutation), temp - return None + # identifying a linear ordering for level failed ... return no solution and same planning problem state + return None, planning_problem def execute(self): - """Finds total-order solution for a planning graph""" + "Finds a total-order solution for a planning graph. Possibly not the only linearization possible." graphPlan_solution = GraphPlan(self.planning_problem).execute() - filtered_solution = self.filter(graphPlan_solution) - ordered_solution = [] - planning_problem = self.planning_problem - for level in filtered_solution: - level_solution, planning_problem = self.orderlevel(level, planning_problem) - for element in level_solution: - ordered_solution.append(element) - return ordered_solution + # Exit if no plan found + if graphPlan_solution is None: + return None + + for possible_plan in graphPlan_solution: + filtered_solution = self.filter(possible_plan) + ordered_solution = [] + # planning_problem will maintain the current state as we iterate over levels, allowing test application of actions + planning_problem = self.planning_problem + for level in filtered_solution: + level_solution, planning_problem = self.orderlevel(level, planning_problem) + if not level_solution: + # level failed to apply, this plan shouldn't work + ordered_solution = None + break # technically could try `continue` anyway, but we shouldn't need to + + for element in level_solution: + ordered_solution.append(element) + + if not ordered_solution: + continue ## no plan possible from the partial plan at the level + else: + break + + return ordered_solution def linearize(solution): """Converts a level-ordered solution into a linear solution""" @@ -1380,41 +1293,6 @@ def execute(self, display=True): return self.constraints, self.causal_links -def spare_tire_graphPlan(): - """Solves the spare tire problem using GraphPlan""" - return GraphPlan(spare_tire()).execute() - - -def three_block_tower_graphPlan(): - """Solves the Sussman Anomaly problem using GraphPlan""" - return GraphPlan(three_block_tower()).execute() - - -def air_cargo_graphPlan(): - """Solves the air cargo problem using GraphPlan""" - return GraphPlan(air_cargo()).execute() - - -def have_cake_and_eat_cake_too_graphPlan(): - """Solves the cake problem using GraphPlan""" - return [GraphPlan(have_cake_and_eat_cake_too()).execute()[1]] - - -def shopping_graphPlan(): - """Solves the shopping problem using GraphPlan""" - return GraphPlan(shopping_problem()).execute() - - -def socks_and_shoes_graphPlan(): - """Solves the socks and shoes problem using GraphPlan""" - return GraphPlan(socks_and_shoes()).execute() - - -def simple_blocks_world_graphPlan(): - """Solves the simple blocks world problem""" - return GraphPlan(simple_blocks_world()).execute() - - class HLA(Action): """ Define Actions for the real-world (that may be refined further), and satisfy resource diff --git a/planning_envs.py b/planning_envs.py new file mode 100644 index 000000000..6e2c8ac63 --- /dev/null +++ b/planning_envs.py @@ -0,0 +1,573 @@ +from planning import * + +def air_cargo(): + """ + [Figure 10.1] AIR-CARGO-PROBLEM + + An air-cargo shipment problem for delivering cargo to different locations, + given the starting location and airplanes. + + Example: + >>> from planning import * + >>> ac = air_cargo() + >>> ac.goal_test() + False + >>> ac.act(expr('Load(C2, P2, JFK)')) + >>> ac.act(expr('Load(C1, P1, SFO)')) + >>> ac.act(expr('Fly(P1, SFO, JFK)')) + >>> ac.act(expr('Fly(P2, JFK, SFO)')) + >>> ac.act(expr('Unload(C2, P2, SFO)')) + >>> ac.goal_test() + False + >>> ac.act(expr('Unload(C1, P1, JFK)')) + >>> ac.goal_test() + True + >>> + """ + + return PlanningProblem(initial='At(C1, SFO) & At(C2, JFK) & At(P1, SFO) & At(P2, JFK)', + goals='At(C1, JFK) & At(C2, SFO)', + actions=[Action('Load(c, p, a)', + precond='At(c, a) & At(p, a)', + effect='In(c, p) & ~At(c, a)', + domain='Cargo(c) & Plane(p) & Airport(a)'), + Action('Unload(c, p, a)', + precond='In(c, p) & At(p, a)', + effect='At(c, a) & ~In(c, p)', + domain='Cargo(c) & Plane(p) & Airport(a)'), + Action('Fly(p, f, to)', + precond='At(p, f)', + effect='At(p, to) & ~At(p, f)', + domain='Plane(p) & Airport(f) & Airport(to)')], + domain='Cargo(C1) & Cargo(C2) & Plane(P1) & Plane(P2) & Airport(SFO) & Airport(JFK)') + + +def spare_tire(): + """ + [Figure 10.2] SPARE-TIRE-PROBLEM + + A problem involving changing the flat tire of a car + with a spare tire from the trunk. + + Example: + >>> from planning import * + >>> st = spare_tire() + >>> st.goal_test() + False + >>> st.act(expr('Remove(Spare, Trunk)')) + >>> st.act(expr('Remove(Flat, Axle)')) + >>> st.goal_test() + False + >>> st.act(expr('PutOn(Spare, Axle)')) + >>> st.goal_test( + True + >>> + """ + + return PlanningProblem(initial='At(Flat, Axle) & At(Spare, Trunk)', + goals='At(Spare, Axle) & At(Flat, Ground)', + actions=[Action('Remove(obj, loc)', + precond='At(obj, loc)', + effect='At(obj, Ground) & ~At(obj, loc)', + domain='Tire(obj)'), + Action('PutOn(t, Axle)', + precond='At(t, Ground) & ~At(Flat, Axle)', + effect='At(t, Axle) & ~At(t, Ground)', + domain='Tire(t)'), + Action('LeaveOvernight', + precond='', + effect='~At(Spare, Ground) & ~At(Spare, Axle) & ~At(Spare, Trunk) & \ + ~At(Flat, Ground) & ~At(Flat, Axle) & ~At(Flat, Trunk)')], + domain='Tire(Flat) & Tire(Spare)') + + +def three_block_tower(): + """ + [Figure 10.3] THREE-BLOCK-TOWER + + A blocks-world problem of stacking three blocks in a certain configuration, + also known as the Sussman Anomaly. + + Example: + >>> from planning import * + >>> tbt = three_block_tower() + >>> tbt.goal_test() + False + >>> tbt.act(expr('MoveToTable(C, A)')) + >>> tbt.act(expr('Move(B, Table, C)')) + >>> tbt.goal_test() + False + >>> tbt.act(expr('Move(A, Table, B)')) + >>> tbt.goal_test() + True + >>> + """ + return PlanningProblem(initial='On(A, Table) & On(B, Table) & On(C, A) & Clear(B) & Clear(C)', + goals='On(A, B) & On(B, C)', + actions=[Action('Move(b, x, y)', + precond='On(b, x) & Clear(b) & Clear(y)', + effect='On(b, y) & Clear(x) & ~On(b, x) & ~Clear(y)', + domain='Block(b) & Block(y)'), + Action('MoveToTable(b, x)', + precond='On(b, x) & Clear(b)', + effect='On(b, Table) & Clear(x) & ~On(b, x)', + domain='Block(b) & Block(x)')], + domain='Block(A) & Block(B) & Block(C)') + +def logisticsPlanCustom(initial_state=None, goal_state=None): + if initial_state == None: + initial_state = 'In(C1, R1) & In(C2, D1) & In(C3, D2) & In(R1, D1) & Holding(R1)' + if goal_state == None: + raise ValueError("Goal must be defined") + + planning_problem = \ + PlanningProblem(initial = initial_state, + goals = goal_state, + actions=[Action('PickUp(r, c, d)', + precond='In(r, d) & In (c, d) & ~Holding(r)', + effect='Holding(r) & ~In(c, d) & In(c, r)', + domain='Robot(r) & Place(d) & Container(c)'), + Action('PutDown(r, c, d)', + precond='In(r, d) & In(c, r) & Holding(r)', + effect='~Holding(r) & ~In(c, r) & In(c, d)', + domain='Robot(r) & Place(d) & Container(c)'), + Action('Move(r, d_start, d_end)', + precond='In(r,d_start)', + effect='~In(r, d_start) & In(r, d_end)', + domain='Robot(r) & Place(d_start) & Place(d_end)')], + domain='Container(C1) & Container(C2) & Container(C3) & Place(D1) & Place(D2) & Place(D3) & Robot(R1)') + + return planning_problem + + +def simple_blocks_world(): + """ + SIMPLE-BLOCKS-WORLD + + A simplified definition of the Sussman Anomaly problem. + + Example: + >>> from planning import * + >>> sbw = simple_blocks_world() + >>> sbw.goal_test() + False + >>> sbw.act(expr('ToTable(A, B)')) + >>> sbw.act(expr('FromTable(B, A)')) + >>> sbw.goal_test() + False + >>> sbw.act(expr('FromTable(C, B)')) + >>> sbw.goal_test() + True + >>> + """ + + return PlanningProblem(initial='On(A, B) & Clear(A) & OnTable(B) & OnTable(C) & Clear(C)', + goals='On(B, A) & On(C, B)', + actions=[Action('ToTable(x, y)', + precond='On(x, y) & Clear(x)', + effect='~On(x, y) & Clear(y) & OnTable(x)'), + Action('FromTable(y, x)', + precond='OnTable(y) & Clear(y) & Clear(x)', + effect='~OnTable(y) & ~Clear(x) & On(y, x)')]) + + +def blocks_world(initial, goals, blocks): + """ + GENERALIZED-BLOCKS-WORLD-PROBLEM + + A flexible constructor for creating blocks-world planning problems. + You can specify any initial and goal configuration for a given set of blocks. + + Example: + >>> from planning import * + >>> # Let's define the classic Sussman Anomaly + >>> initial_state = 'On(C, A) & On(A, Table) & On(B, Table) & Clear(C) & Clear(B)' + >>> goal_state = 'On(A, B) & On(B, C)' + >>> block_names = ['A', 'B', 'C'] + >>> sussman_anomaly = blocks_world(initial_state, goal_state, block_names) + >>> + >>> sussman_anomaly.goal_test() + False + >>> # A sequence of moves to solve it + >>> sussman_anomaly.act(expr('MoveToTable(C, A)')) + >>> sussman_anomaly.act(expr('Move(B, Table, C)')) + >>> sussman_anomaly.act(expr('Move(A, Table, B)')) + >>> sussman_anomaly.goal_test() + True + >>> + """ + # Dynamically generate the domain knowledge based on the list of blocks + domain_knowledge = ' & '.join([f'Block({b})' for b in blocks]) + + # Define the fundamental actions for moving blocks + actions = [ + Action('Move(b, x, y)', + precond='On(b, x) & Clear(b) & Clear(y)', + effect='On(b, y) & Clear(x) & ~On(b, x) & ~Clear(y)', + domain='Block(b) & Block(y)'), # 'x' can be another block or 'Table' + Action('MoveToTable(b, x)', + precond='On(b, x) & Clear(b)', + effect='On(b, Table) & Clear(x) & ~On(b, x)', + domain='Block(b) & Block(x)') # 'x' must be a block + ] + + return PlanningProblem(initial=initial, + goals=goals, + actions=actions, + domain=domain_knowledge) + + +def have_cake_and_eat_cake_too(): + """ + [Figure 10.7] CAKE-PROBLEM + + A problem where we begin with a cake and want to + reach the state of having a cake and having eaten a cake. + The possible actions include baking a cake and eating a cake. + + Example: + >>> from planning import * + >>> cp = have_cake_and_eat_cake_too() + >>> cp.goal_test() + False + >>> cp.act(expr('Eat(Cake)')) + >>> cp.goal_test() + False + >>> cp.act(expr('Bake(Cake)')) + >>> cp.goal_test() + True + >>> + """ + + return PlanningProblem(initial='Have(Cake)', + goals='Have(Cake) & Eaten(Cake)', + actions=[Action('Eat(Cake)', + precond='Have(Cake)', + effect='Eaten(Cake) & ~Have(Cake)'), + Action('Bake(Cake)', + precond='~Have(Cake)', + effect='Have(Cake)')]) + + +def shopping_problem(): + """ + SHOPPING-PROBLEM + + A problem of acquiring some items given their availability at certain stores. + + Example: + >>> from planning import * + >>> sp = shopping_problem() + >>> sp.goal_test() + False + >>> sp.act(expr('Go(Home, HW)')) + >>> sp.act(expr('Buy(Drill, HW)')) + >>> sp.act(expr('Go(HW, SM)')) + >>> sp.act(expr('Buy(Banana, SM)')) + >>> sp.goal_test() + False + >>> sp.act(expr('Buy(Milk, SM)')) + >>> sp.goal_test() + True + >>> + """ + + return PlanningProblem(initial='At(Home) & Sells(SM, Milk) & Sells(SM, Banana) & Sells(HW, Drill)', + goals='Have(Milk) & Have(Banana) & Have(Drill)', + actions=[Action('Buy(x, store)', + precond='At(store) & Sells(store, x)', + effect='Have(x)', + domain='Store(store) & Item(x)'), + Action('Go(x, y)', + precond='At(x)', + effect='At(y) & ~At(x)', + domain='Place(x) & Place(y)')], + domain='Place(Home) & Place(SM) & Place(HW) & Store(SM) & Store(HW) & ' + 'Item(Milk) & Item(Banana) & Item(Drill)') + +def socks_and_shoes(): + """ + SOCKS-AND-SHOES-PROBLEM + + A task of wearing socks and shoes on both feet + + Example: + >>> from planning import * + >>> ss = socks_and_shoes() + >>> ss.goal_test() + False + >>> ss.act(expr('RightSock')) + >>> ss.act(expr('RightShoe')) + >>> ss.act(expr('LeftSock')) + >>> ss.goal_test() + False + >>> ss.act(expr('LeftShoe')) + >>> ss.goal_test() + True + >>> + """ + + return PlanningProblem(initial='', + goals='RightShoeOn & LeftShoeOn', + actions=[Action('RightShoe', + precond='RightSockOn', + effect='RightShoeOn'), + Action('RightSock', + precond='', + effect='RightSockOn'), + Action('LeftShoe', + precond='LeftSockOn', + effect='LeftShoeOn'), + Action('LeftSock', + precond='', + effect='LeftSockOn')]) + + +def double_tennis_problem_simple(): + return PlanningProblem( + initial='At(A, LeftNet) & At(B, RightNet) & Approaching(ball, RightBaseline)', + goals='At(A, LeftBaseline) & Returned(ball)', + actions=[Action('Hit(actor, ball, loc)', + precond='Approaching(ball, loc) & At(actor, loc)', + effect='Returned(ball)'), + Action('Go(actor, to, loc)', + precond='At(actor, loc)', + effect='At(actor, to) & ~At(actor, loc)')], + domain="Loc(LeftBaseline)") + +def double_tennis_problem_simple2(): + return PlanningProblem( + initial='At(A, LeftNet) & At(B, LeftNet) & Approaching(ball, RightNet)', + goals='At(A, LeftNet) & Returned(ball) & At(B, LeftNet)', + actions=[Action('Hit(actor, ball, loc)', + precond='Approaching(ball, loc) & At(actor, loc)', + effect='Returned(ball)'), + Action('Go(actor, to, loc)', + precond='At(actor, loc)', + effect='At(actor, to) & ~At(actor, loc)')]) + +def double_tennis_problem_simple3(): + return PlanningProblem( + initial='At(A, LeftNet) & Approaching(ball, RightNet)', + goals='At(A, LeftNet) & Returned(ball)', + actions=[Action('Hit(actor, ball, loc)', + precond='Approaching(ball, loc) & At(actor, loc)', + effect='Returned(ball)'), + Action('Go(actor, to, loc)', + precond='At(actor, loc)', + effect='At(actor, to) & ~At(actor, loc)')]) + + +def double_tennis_problem(): + """ + [Figure 11.10] DOUBLE-TENNIS-PROBLEM + + A multiagent planning problem involving two partner tennis players + trying to return an approaching ball and repositioning around in the court. + + Example: + >>> from planning import * + >>> dtp = double_tennis_problem() + >>> goal_test(dtp.goals, dtp.initial) + False + >>> dtp.act(expr('Go(A, RightBaseLine, LeftNet)')) + >>> dtp.act(expr('Hit(A, Ball, RightBaseLine)')) + >>> goal_test(dtp.goals, dtp.initial) + False + >>> dtp.act(expr('Go(A, LeftBaseLine, RightBaseLine)')) + >>> goal_test(dtp.goals, dtp.initial) + True + """ + + return PlanningProblem( + initial='At(A, LeftNet) & At(B, RightNet) & Approaching(Ball, RightBaseLine)', + goals='At(A, LeftBaseLine) & At(B, LeftNet) & Returned(Ball)', + actions=[Action('Hit(actor, ball, loc)', + precond='Approaching(ball, loc) & At(actor, loc)', + effect='Returned(ball)'), + Action('Go(actor, to, loc)', + precond='At(actor, loc)', + effect='At(actor, to) & ~At(actor, loc)')], + domain="Loc(LeftBaseLine)") + +def rush_hour(): + """ + RUSH-HOUR-PROBLEM (Non-Numeric Version) + + A planning problem for the Rush Hour sliding block puzzle. The goal is to + maneuver the RedCar to the exit. This version uses non-numeric symbols for + grid positions (e.g., R1, C1) instead of integers. + + This specific instance uses: + - RedCar (2x1, horizontal) starting at (R3, C1) + - GreenTruck (3x1, vertical) starting at (R1, C4) + - BlueCar (2x1, vertical) starting at (R5, C2) + """ + # Initial state: Define vehicle locations and clear spots using non-numeric identifiers. + initial_state = 'At(RedCar, R3, C1) & At(GreenTruck, R1, C4) & At(BlueCar, R5, C2) & ' \ + 'IsHorizontal(RedCar) & IsVertical(GreenTruck) & IsVertical(BlueCar) & ' \ + 'Clear(R1, C1) & Clear(R1, C2) & Clear(R1, C3) & Clear(R1, C5) & Clear(R1, C6) & ' \ + 'Clear(R2, C1) & Clear(R2, C2) & Clear(R2, C3) & Clear(R2, C5) & Clear(R2, C6) & ' \ + 'Clear(R3, C3) & Clear(R3, C4) & Clear(R3, C5) & Clear(R3, C6) & ' \ + 'Clear(R4, C1) & Clear(R4, C2) & Clear(R4, C3) & Clear(R4, C4) & Clear(R4, C5) & Clear(R4, C6) & ' \ + 'Clear(R5, C1) & Clear(R5, C3) & Clear(R5, C4) & Clear(R5, C5) & Clear(R5, C6) & ' \ + 'Clear(R6, C1) & Clear(R6, C3) & Clear(R6, C4) & Clear(R6, C5) & Clear(R6, C6)' + + # Goal state: The RedCar's left-most part is at column C5. + goal_state = 'At(RedCar, R3, C5)' + + # Domain: Define objects, types (Row, Col), and adjacency relationships. + domain_knowledge = 'Vehicle(RedCar) & Vehicle(GreenTruck) & Vehicle(BlueCar) & ' \ + 'Car(RedCar) & Truck(GreenTruck) & Car(BlueCar) & ' \ + 'Row(R1) & Row(R2) & Row(R3) & Row(R4) & Row(R5) & Row(R6) & ' \ + 'Col(C1) & Col(C2) & Col(C3) & Col(C4) & Col(C5) & Col(C6) & ' \ + 'NextTo(R1, R2) & NextTo(R2, R3) & NextTo(R3, R4) & NextTo(R4, R5) & NextTo(R5, R6) & ' \ + 'NextTo(C1, C2) & NextTo(C2, C3) & NextTo(C3, C4) & NextTo(C4, C5) & NextTo(C5, C6)' + + actions = [ + # --- CAR ACTIONS (length 2) --- + Action('MoveRightCar(v, r, c1, c2, c3)', + precond='At(v, r, c1) & Car(v) & IsHorizontal(v) & NextTo(c1, c2) & NextTo(c2, c3) & Clear(r, c3)', + effect='At(v, r, c2) & ~At(v, r, c1) & Clear(r, c1) & ~Clear(r, c3)', + domain='Vehicle(v) & Row(r) & Col(c1) & Col(c2) & Col(c3)'), + Action('MoveLeftCar(v, r, c1, c2, c3)', + precond='At(v, r, c2) & Car(v) & IsHorizontal(v) & NextTo(c1, c2) & NextTo(c2, c3) & Clear(r, c1)', + effect='At(v, r, c1) & ~At(v, r, c2) & Clear(r, c3) & ~Clear(r, c1)', + domain='Vehicle(v) & Row(r) & Col(c1) & Col(c2) & Col(c3)'), + Action('MoveDownCar(v, r1, r2, r3, c)', + precond='At(v, r1, c) & Car(v) & IsVertical(v) & NextTo(r1, r2) & NextTo(r2, r3) & Clear(r3, c)', + effect='At(v, r2, c) & ~At(v, r1, c) & Clear(r1, c) & ~Clear(r3, c)', + domain='Vehicle(v) & Row(r1) & Row(r2) & Row(r3) & Col(c)'), + Action('MoveUpCar(v, r1, r2, r3, c)', + precond='At(v, r2, c) & Car(v) & IsVertical(v) & NextTo(r1, r2) & NextTo(r2, r3) & Clear(r1, c)', + effect='At(v, r1, c) & ~At(v, r2, c) & Clear(r3, c) & ~Clear(r1, c)', + domain='Vehicle(v) & Row(r1) & Row(r2) & Row(r3) & Col(c)'), + + # --- TRUCK ACTIONS (length 3) --- + Action('MoveRightTruck(v, r, c1, c2, c3, c4)', + precond='At(v, r, c1) & Truck(v) & IsHorizontal(v) & NextTo(c1, c2) & NextTo(c2, c3) & NextTo(c3, c4) & Clear(r, c4)', + effect='At(v, r, c2) & ~At(v, r, c1) & Clear(r, c1) & ~Clear(r, c4)', + domain='Vehicle(v) & Row(r) & Col(c1) & Col(c2) & Col(c3) & Col(c4)'), + Action('MoveLeftTruck(v, r, c1, c2, c3, c4)', + precond='At(v, r, c2) & Truck(v) & IsHorizontal(v) & NextTo(c1, c2) & NextTo(c2, c3) & NextTo(c3, c4) & Clear(r, c1)', + effect='At(v, r, c1) & ~At(v, r, c2) & Clear(r, c4) & ~Clear(r, c1)', + domain='Vehicle(v) & Row(r) & Col(c1) & Col(c2) & Col(c3) & Col(c4)'), + Action('MoveDownTruck(v, r1, r2, r3, r4, c)', + precond='At(v, r1, c) & Truck(v) & IsVertical(v) & NextTo(r1, r2) & NextTo(r2, r3) & NextTo(r3, r4) & Clear(r4, c)', + effect='At(v, r2, c) & ~At(v, r1, c) & Clear(r1, c) & ~Clear(r4, c)', + domain='Vehicle(v) & Row(r1) & Row(r2) & Row(r3) & Row(r4) & Col(c)'), + Action('MoveUpTruck(v, r1, r2, r3, r4, c)', + precond='At(v, r2, c) & Truck(v) & IsVertical(v) & NextTo(r1, r2) & NextTo(r2, r3) & NextTo(r3, r4) & Clear(r1, c)', + effect='At(v, r1, c) & ~At(v, r2, c) & Clear(r4, c) & ~Clear(r1, c)', + domain='Vehicle(v) & Row(r1) & Row(r2) & Row(r3) & Row(r4) & Col(c)') + ] + + return PlanningProblem(initial=initial_state, + goals=goal_state, + actions=actions, + domain=domain_knowledge) + + +def rush_hour_optimized(): + """ + RUSH-HOUR-PROBLEM (Optimized Version) + + This version optimizes the planning problem by creating vehicle-specific + actions. Since each vehicle's orientation is fixed, we can remove generic + predicates like `IsHorizontal` and create actions that only apply to the + correct vehicle on its fixed axis of movement. This drastically reduces + the number of permutations the planner needs to generate and check. + """ + # Initial state is simpler as orientation is now baked into the actions. + initial_state = 'At(RedCar, R3, C1) & At(GreenTruck, R1, C4) & At(BlueCar, R5, C2) & ' \ + 'Clear(R1, C1) & Clear(R1, C2) & Clear(R1, C3) & Clear(R1, C5) & Clear(R1, C6) & ' \ + 'Clear(R2, C1) & Clear(R2, C2) & Clear(R2, C3) & Clear(R2, C5) & Clear(R2, C6) & ' \ + 'Clear(R3, C3) & Clear(R3, C4) & Clear(R3, C5) & Clear(R3, C6) & ' \ + 'Clear(R4, C1) & Clear(R4, C2) & Clear(R4, C3) & Clear(R4, C4) & Clear(R4, C5) & Clear(R4, C6) & ' \ + 'Clear(R5, C1) & Clear(R5, C3) & Clear(R5, C4) & Clear(R5, C5) & Clear(R5, C6) & ' \ + 'Clear(R6, C1) & Clear(R6, C3) & Clear(R6, C4) & Clear(R6, C5) & Clear(R6, C6)' + + # Goal state remains the same. + goal_state = 'At(RedCar, R3, C5)' + + # Domain knowledge defines the grid and vehicles. + domain_knowledge = 'Vehicle(RedCar) & Vehicle(GreenTruck) & Vehicle(BlueCar) & ' \ + 'Row(R1) & Row(R2) & Row(R3) & Row(R4) & Row(R5) & Row(R6) & ' \ + 'Col(C1) & Col(C2) & Col(C3) & Col(C4) & Col(C5) & Col(C6) & ' \ + 'NextTo(R1, R2) & NextTo(R2, R3) & NextTo(R3, R4) & NextTo(R4, R5) & NextTo(R5, R6) & ' \ + 'NextTo(C1, C2) & NextTo(C2, C3) & NextTo(C3, C4) & NextTo(C4, C5) & NextTo(C5, C6)' + + # Optimized Actions: Specific to each vehicle and its fixed orientation. + actions = [ + # RedCar is horizontal on Row 3, length 2 + Action('MoveRedCarRight(c1, c2, c3)', + precond='At(RedCar, R3, c1) & NextTo(c1, c2) & NextTo(c2, c3) & Clear(R3, c3)', + effect='At(RedCar, R3, c2) & ~At(RedCar, R3, c1) & Clear(R3, c1) & ~Clear(R3, c3)', + domain='Col(c1) & Col(c2) & Col(c3)'), + Action('MoveRedCarLeft(c1, c2, c3)', + precond='At(RedCar, R3, c2) & NextTo(c1, c2) & NextTo(c2, c3) & Clear(R3, c1)', + effect='At(RedCar, R3, c1) & ~At(RedCar, R3, c2) & Clear(R3, c3) & ~Clear(R3, c1)', + domain='Col(c1) & Col(c2) & Col(c3)'), + + # GreenTruck is vertical on Column 4, length 3 + Action('MoveGreenTruckDown(r1, r2, r3, r4)', + precond='At(GreenTruck, r1, C4) & NextTo(r1, r2) & NextTo(r2, r3) & NextTo(r3, r4) & Clear(r4, C4)', + effect='At(GreenTruck, r2, C4) & ~At(GreenTruck, r1, C4) & Clear(r1, C4) & ~Clear(r4, C4)', + domain='Row(r1) & Row(r2) & Row(r3) & Row(r4)'), + Action('MoveGreenTruckUp(r1, r2, r3, r4)', + precond='At(GreenTruck, r2, C4) & NextTo(r1, r2) & NextTo(r2, r3) & NextTo(r3, r4) & Clear(r1, C4)', + effect='At(GreenTruck, r1, C4) & ~At(GreenTruck, r2, C4) & Clear(r4, C4) & ~Clear(r1, C4)', + domain='Row(r1) & Row(r2) & Row(r3) & Row(r4)'), + + # BlueCar is vertical on Column 2, length 2 + Action('MoveBlueCarDown(r1, r2, r3)', + precond='At(BlueCar, r1, C2) & NextTo(r1, r2) & NextTo(r2, r3) & Clear(r3, C2)', + effect='At(BlueCar, r2, C2) & ~At(BlueCar, r1, C2) & Clear(r1, C2) & ~Clear(r3, C2)', + domain='Row(r1) & Row(r2) & Row(r3)'), + Action('MoveBlueCarUp(r1, r2, r3)', + precond='At(BlueCar, r2, C2) & NextTo(r1, r2) & NextTo(r2, r3) & Clear(r1, C2)', + effect='At(BlueCar, r1, C2) & ~At(BlueCar, r2, C2) & Clear(r3, C2) & ~Clear(r1, C2)', + domain='Row(r1) & Row(r2) & Row(r3)'), + ] + + return PlanningProblem(initial=initial_state, + goals=goal_state, + actions=actions, + domain=domain_knowledge) + + +#### For pytests + +def spare_tire_graphPlan(): + """Solves the spare tire problem using GraphPlan""" + return GraphPlan(spare_tire()).execute() + + +def three_block_tower_graphPlan(): + """Solves the Sussman Anomaly problem using GraphPlan""" + return GraphPlan(three_block_tower()).execute() + + +def air_cargo_graphPlan(): + """Solves the air cargo problem using GraphPlan""" + return GraphPlan(air_cargo()).execute() + + +def have_cake_and_eat_cake_too_graphPlan(): + """Solves the cake problem using GraphPlan""" + return GraphPlan(have_cake_and_eat_cake_too()).execute() + + +def shopping_graphPlan(): + """Solves the shopping problem using GraphPlan""" + return GraphPlan(shopping_problem()).execute() + + +def socks_and_shoes_graphPlan(): + """Solves the socks and shoes problem using GraphPlan""" + return GraphPlan(socks_and_shoes()).execute() + + +def simple_blocks_world_graphPlan(): + """Solves the simple blocks world problem""" + return GraphPlan(simple_blocks_world()).execute() diff --git a/requirements.txt b/requirements.txt index dd6b1be8a..1241f7f67 100644 --- a/requirements.txt +++ b/requirements.txt @@ -11,8 +11,9 @@ numpy opencv-python pandas pillow -pytest-cov qpsolvers scipy sortedcontainers -tensorflow \ No newline at end of file +tensorflow +pytest +pytest-cov \ No newline at end of file diff --git a/tests/test_graphplan.py b/tests/test_graphplan.py new file mode 100644 index 000000000..b241a77f1 --- /dev/null +++ b/tests/test_graphplan.py @@ -0,0 +1,262 @@ +import sys, os +import collections +from multiprocessing import Process, Queue +from planning import * +from planning_envs import * +from logic import * + +import pytest + +def test_blocksworld_manual(): + sbw = simple_blocks_world() + assert sbw.goal_test() == False + sbw.act(expr('ToTable(A, B)')) + sbw.act(expr('FromTable(B, A)')) + assert sbw.goal_test() == False + sbw.act(expr('FromTable(C, B)')) + assert sbw.goal_test() == True + + +def test_logistics_manual(): + init = "In(C1, R1) & In(C2, D1) & In(C3, D2) & In(R1, D1) & Holding(R1)" + goal_state = "In(C2, D3) & In(C3, D3)" + P = logisticsPlanCustom(init, goal_state) + assert P.goal_test() == False + P.act(expr('PutDown(R1, C1, D1)')) + P.act(expr('PickUp(R1, C2, D1)')) + P.act(expr('Move(R1, D1, D3)')) + P.act(expr('PutDown(R1, C2, D3)')) + P.act(expr('Move(R1, D3, D2)')) + P.act(expr('PickUp(R1, C3, D2)')) + P.act(expr('Move(R1, D2, D3)')) + assert P.goal_test() == False + P.act(expr('PutDown(R1, C3, D3)')) + assert P.goal_test() == True + +def test_double_tennis_manual(): + p = double_tennis_problem() + + assert not p.goal_test() + p.act(expr('Go(A, RightBaseLine, LeftNet)')) + assert not p.goal_test() + p.act(expr('Hit(A, Ball, RightBaseLine)')) + assert not p.goal_test() + p.act(expr('Go(A, LeftBaseLine, RightBaseLine)')) + assert not p.goal_test() + p.act(expr('Go(B, LeftNet, RightNet)')) + assert p.goal_test() + + +def test_generalized_blocksworld_manual(): + """ + Manual test for the generalized Blocks World problem constructor. + This test case involves stacking four blocks (A, B, C, D) into a single tower. + """ + # 1. Define the problem parameters + initial_state = 'On(A, Table) & On(B, Table) & On(C, Table) & On(D, Table) & Clear(A) & Clear(B) & Clear(C) & Clear(D)' + goal_state = 'On(A, B) & On(B, C) & On(C, D)' + block_names = ['A', 'B', 'C', 'D'] + + bw_problem = blocks_world(initial_state, goal_state, block_names) + assert bw_problem.goal_test() == False + bw_problem.act(expr('Move(C, Table, D)')) + assert bw_problem.goal_test() == False + bw_problem.act(expr('Move(B, Table, C)')) + assert bw_problem.goal_test() == False + bw_problem.act(expr('Move(A, Table, B)')) + assert bw_problem.goal_test() == True + + +def verify_solution(P): + sol = Linearize(P).execute() + print(sol) + assert P.goal_test() == False + for act in sol: + P.act(expr(act)) + assert P.goal_test() == True + +def test_air_cargo(): + P = air_cargo() + verify_solution(P) + +def test_spare_tire(): + P = spare_tire() + verify_solution(P) + +def test_three_block_tower(): + P = three_block_tower() + verify_solution(P) + +def test_simple_blocks_world(): + P = simple_blocks_world() + verify_solution(P) + +def test_shopping_problem(): + P = shopping_problem() + verify_solution(P) + +def test_socks_and_shoes(): + P = socks_and_shoes() + verify_solution(P) + +def test_have_cake_and_eat_cake_too(): + P = have_cake_and_eat_cake_too() + verify_solution(P) + +@pytest.mark.parametrize("goal_state", [ + "In(C1, D1)", + "In(C1, D2)", + "In(C1, D1) & In(R1, D2)", + "In(R1, D2) & In(C1, D1)", + "In(C1, D1) & In(C3, R1)", + "In(C1, D1) & In(C3, R1) & In(R1, D3)", + "In(C1, D1) & In(R1, D3) & In(C3, R1)", + "In(C1, D1) & In(C3, D3)", + "In(C1, D1) & In(R1, D2) & In(C3, R1)", + "In(C1, D1) & In(C3, R1) & In(R1, D3)", + "In(C1, D1) & In(C2, D3)", + "In(C3, D1)", + "In(C2, D3)", + "In(C2, D3) & In(C3, D3)", \ + "In(C3, D3) & In(C2, D3)", \ + "In(C1, D2) & In(C3, D3)", + "In(C1, D3) & In(C2, D3) & In(C3, D3)", \ + "In(C1, D2) & In(C3, D3) & In(C2, D1)", + "In(C3, D3)", + "In(C1, D2) & In(C3, D3) & In(C2, D3) & In(R1, D1)" \ +]) +def test_logistics_plan_valid(goal_state): + """These should yield a valid (non-crashing) plan, even if empty.""" + init = "In(C1, R1) & In(C2, D1) & In(C3, D2) & In(R1, D1) & Holding(R1)" + P = logisticsPlanCustom(init, goal_state) + verify_solution(P) + +def test_double_tennis_problem_simple(): + P = double_tennis_problem_simple() + verify_solution(P) + + +def test_double_tennis_problem_simple2(): + P = double_tennis_problem_simple2() + verify_solution(P) + +def test_double_tennis_problem_simple3(): + P = double_tennis_problem_simple3() + verify_solution(P) + +def test_double_tennis_problem(): + P = double_tennis_problem() + verify_solution(P) + +def test_rush_hour_manual_alt_sequence(): + """ + Provides an alternative manual test for the Rush Hour problem. + + This solution is less efficient but still valid. It interleaves + the movements of different vehicles and includes an unnecessary move + to verify that the actions correctly modify the game state without + breaking the rules. + """ + # Initialize the problem + problem = rush_hour() + + # Initial state is not the goal + assert not problem.goal_test() + + # Step 1: Make an unnecessary move with the BlueCar to show it works. + # Move BlueCar from (R5, C2) down to (R6, C2). + # Note: The BlueCar occupies R5 and R6, so we must move it up first to free R6. + # Let's move it from R5,C2 -> R4,C2 instead. + problem.act(expr('MoveUpCar(BlueCar, R4, R5, R6, C2)')) + assert not problem.goal_test(), "Moving the BlueCar should not solve the puzzle." + + # Step 2: Start clearing the main path by moving the GreenTruck. + # Move GreenTruck down once: R1,C4 -> R2,C4 + problem.act(expr('MoveDownTruck(GreenTruck, R1, R2, R3, R4, C4)')) + + # Step 3: Move the RedCar into the newly available space. + # Move RedCar right once: R3,C1 -> R3,C2 + problem.act(expr('MoveRightCar(RedCar, R3, C1, C2, C3)')) + assert not problem.goal_test() + + # Step 4: Continue clearing the path. + # Move GreenTruck down again: R2,C4 -> R3,C4 + problem.act(expr('MoveDownTruck(GreenTruck, R2, R3, R4, R5, C4)')) + # Move RedCar right again: R3,C2 -> R3,C3 + problem.act(expr('MoveRightCar(RedCar, R3, C2, C3, C4)')) + assert not problem.goal_test() + + # Step 5: Final moves to solve the puzzle. + # Move GreenTruck a final time to completely clear the row: R3,C4 -> R4,C4 + problem.act(expr('MoveDownTruck(GreenTruck, R3, R4, R5, R6, C4)')) + + # Move RedCar to the goal position. + problem.act(expr('MoveRightCar(RedCar, R3, C3, C4, C5)')) + problem.act(expr('MoveRightCar(RedCar, R3, C4, C5, C6)')) + + # The sequence of actions should now result in the goal state. + assert problem.goal_test() + + +# Fails due to massive search space (hangs during building layer 1 maps) +""" +def test_rush_hour(): + P = rush_hour() + verify_solution(P) +""" + +def test_rush_hour_optimized(): + P = rush_hour_optimized() + verify_solution(P) + + +def test_planner_leveloff(): + def run_planner_in_queue(problem, queue): + queue.put(Linearize(problem).execute()) + + P = blocks_world( + 'On(A, Table) & On(B, Table) & On(C, Table) & Clear(A) & Clear(B) & Clear(C)', + 'On(A, B) & On(B, C) & On(C, A)', + ['A', 'B', 'C'] + ) + + result_queue = Queue() + proc = Process(target=run_planner_in_queue, args=(P, result_queue)) + proc.start() + proc.join(timeout=3) + + if proc.is_alive(): + proc.terminate() + proc.join() + assert False # Ran for 3 seconds and didn't exit in leveloff + else: + result = result_queue.get() + assert result is None or result == [] or result == [[]] + +def test_impossible_cake_exits_via_leveloff(): + """ + Verify that GraphPlan terminates and returns None for the impossible cake problem. + """ + + def impossible_cake_problem(): + """ + An impossible planning problem to demonstrate GraphPlan's level-off detection. + + The goal is to both Have(Cake) and Eaten(Cake). However, the only available + action, Eat(Cake), has the effect of ~Have(Cake). The propositions + Have(Cake) and Eaten(Cake) will become mutually exclusive at the first + level, and the graph will quickly level off, proving the goal is unreachable. + """ + return PlanningProblem( + initial='Have(Cake) & ~Eaten(Cake)', + goals='Have(Cake) & Eaten(Cake)', + actions=[ + Action('Eat(Cake)', + precond='Have(Cake)', + effect='Eaten(Cake) & ~Have(Cake)') + ] + ) + + problem = impossible_cake_problem() + solution = Linearize(problem).execute() + assert solution is None \ No newline at end of file diff --git a/tests/test_planning.py b/tests/test_planning.py index a39152adc..c24f13174 100644 --- a/tests/test_planning.py +++ b/tests/test_planning.py @@ -3,6 +3,7 @@ import pytest from planning import * +from planning_envs import * from search import astar_search from utils import expr from logic import FolKB, conjuncts @@ -229,8 +230,7 @@ def test_graphPlan(): shopping_problem_solution = shopping_graphPlan() shopping_problem_solution = linearize(shopping_problem_solution) - assert expr('Go(Home, HW)') in shopping_problem_solution - assert expr('Go(Home, SM)') in shopping_problem_solution + assert expr('Go(Home, HW)') in shopping_problem_solution or expr('Go(Home, SM)') in shopping_problem_solution assert expr('Buy(Drill, HW)') in shopping_problem_solution assert expr('Buy(Banana, SM)') in shopping_problem_solution assert expr('Buy(Milk, SM)') in shopping_problem_solution @@ -514,9 +514,12 @@ def test_double_tennis(): p = double_tennis_problem() assert not goal_test(p.goals, p.initial) - solution = [expr('Go(A, RightBaseLine, LeftBaseLine)'), - expr('Hit(A, Ball, RightBaseLine)'), - expr('Go(A, LeftNet, RightBaseLine)')] + solution = [ + expr('Go(A, RightBaseLine, LeftNet)'), + expr('Hit(A, Ball, RightBaseLine)'), + expr('Go(A, LeftBaseLine, RightBaseLine)'), + expr('Go(B, LeftNet, RightNet)') + ] for action in solution: p.act(action) diff --git a/utils.py b/utils.py index 3158e3793..d05d2756e 100644 --- a/utils.py +++ b/utils.py @@ -618,6 +618,17 @@ def __repr__(self): else: # (x - y) opp = (' ' + op + ' ') return '(' + opp.join(args) + ')' + + def predicate_negate(self): + """ + Return the logical negation of an Expr. + Assumes we are using "Not" as our negation operator. Avoids double "Not" as prefix + """ + if self.op.startswith("Not"): + # strip the Not prefix + return Expr(self.op[3:], *self.args) + else: + return Expr("Not" + self.op, *self.args) # An 'Expression' is either an Expr or a Number.