-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathddar.py
More file actions
238 lines (194 loc) · 11.2 KB
/
ddar.py
File metadata and controls
238 lines (194 loc) · 11.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
"""
DDAR (Deductive Database + Algebraic Reasoning) System
Integrated approach where DD and AR work together in alternating cycles.
"""
from typing import List, Tuple, Dict
from dd import DeductiveDatabase
from ar import AlgebraicReasoner
from Problem import GeometricProblem as GP
from rabbit_generation import RabbitGenerator
class DDARSystem:
"""Unified system coordinating Deductive Database and Algebraic Reasoning"""
def __init__(self, rules_file: str = "rules.txt", max_iterations: int = 1000, check_diagram: bool = True, max_ar_equations: int = None, use_rabbit: bool = False, rabbit_mode: str = 'user', max_rabbit_calls: int = 1, num_random_constructions: int = 3, sanitize_on_retry: bool = False):
self.dd = DeductiveDatabase(rules_file)
self.ar = AlgebraicReasoner(check_diagram)
self.max_iterations = max_iterations
self.max_ar_equations = max_ar_equations
self.reasoning_steps = []
self.use_rabbit = use_rabbit
if use_rabbit:
self.rabbit_generator = RabbitGenerator(mode=rabbit_mode, max_rabbit_calls=max_rabbit_calls, num_random_constructions=num_random_constructions, sanitize_on_retry=sanitize_on_retry)
else:
self.rabbit_generator = None
def solve_problem(self, problem) -> Tuple[bool, List[str], List]:
"""
Main DDAR solving loop alternating between DD and AR
Returns: (solved, trace, all_derived_facts)
"""
print("Starting DDAR (DD + AR) solving...")
print("="*60)
# Ensure AR state is clean between runs of the same DDARSystem instance
self.ar.clear()
problem.derived_facts = list(problem.assumptions)
problem.dd_derived = list(problem.assumptions)
goals = list(problem.goals)
trace = []
applied_combinations = []
problem.reasoning_steps = [[{
'predicate': assumption,
'dependencies': {
'type': 'ASSUMPTION',
'premises': []
}
} for assumption in list(problem.assumptions)]]
problem.proved_goals = []
# Rabbit generation tracking
consecutive_failed_rounds = 0
if self.rabbit_generator:
self.rabbit_generator.reset_count()
# Give AR access to the diagram
self.ar.set_problem(problem)
print(f"Assumptions: {len(problem.derived_facts)}, Goals: {len(goals)}\n")
for iteration in range(1, self.max_iterations + 1):
print(f"Iteration {iteration}")
print("-" * 40)
initial_count = len(problem.derived_facts)
# Phase 1: Deductive Database
print("DD Phase:")
dd_new, dd_rules, dd_combinations, dd_reasoning_level = self._run_dd_phase(problem.derived_facts, applied_combinations)
problem.reasoning_steps.append([])
if dd_new:
print(f" +{len(dd_new)} facts")
problem.derived_facts.extend(dd_new)
problem.dd_derived.extend(dd_new)
applied_combinations.extend(dd_combinations)
trace.extend(f"DD: {rule}" for rule in dd_rules)
problem.reasoning_steps[iteration].extend(dd_reasoning_level)
are_satisified, goals_satisfied = self._all_goals_solved(goals,
dd_reasoning_level + problem.proved_goals)
# This bit could be improved, as I believe we will never have two dictionaries with the same predicate, that is, no two ways of deriving the same conclusion will be recorded
for g in goals_satisfied:
# Goal had already accounted for
if any(g['predicate'] == p_g['predicate'] for p_g in problem.proved_goals):
continue
problem.proved_goals.append(g)
if are_satisified:
self._print_success("DD", iteration, goals)
return True, trace, problem.derived_facts
# Phase 2: Algebraic Reasoning
print("AR Phase:")
ar_new, ar_reasoning_level = self._run_ar_phase(problem, goals)
if ar_new:
print(f" +{len(ar_new)} facts")
problem.derived_facts.extend(ar_new)
# trace.extend(ar_explanations)
problem.reasoning_steps[iteration].extend(ar_reasoning_level)
# Might need to be careful with this part - check that when goals are indeed fulfilled they get correctly added to the proved goals list
are_satisified, goals_satisfied = self._all_goals_solved(goals,
ar_reasoning_level + problem.proved_goals)
# This bit could be improved, as I believe we will never have two dictionaries with the same predicate, that is, no two ways of deriving the same conclusion will be recorded
for g in goals_satisfied:
# Goal had already accounted for
if any(g['predicate'] == p_g['predicate'] for p_g in problem.proved_goals):
continue
problem.proved_goals.append(g)
if are_satisified:
self._print_success("AR", iteration, goals)
return True, trace, problem.derived_facts
# Check termination
if len(problem.derived_facts) == initial_count:
consecutive_failed_rounds += 1
# Try rabbit generation if enabled and available
if self.use_rabbit and self.rabbit_generator and self.rabbit_generator.can_generate():
if consecutive_failed_rounds == 1:
# First failure - try rabbit generation
# Sanitize before generating if this is a retry (we've already tried rabbit before)
sanitize_before = self.rabbit_generator.rabbit_call_count > 0
success, new_predicates, rabbit_reasoning = self.rabbit_generator.generate_construction(
problem, problem.derived_facts, sanitize_before=sanitize_before
)
if success and new_predicates:
# Add new predicates to derived facts
problem.derived_facts.extend(new_predicates)
problem.dd_derived.extend(new_predicates)
problem.reasoning_steps[iteration].extend(rabbit_reasoning)
# Check if rabbit construction solved any goals
are_satisfied, goals_satisfied = self._all_goals_solved(goals,
rabbit_reasoning + problem.proved_goals)
for g in goals_satisfied:
if any(g['predicate'] == p_g['predicate'] for p_g in problem.proved_goals):
continue
problem.proved_goals.append(g)
if are_satisfied:
self._print_success("RABBIT", iteration, goals)
return True, trace, problem.derived_facts
# Reset counter since we made progress
consecutive_failed_rounds = 0
print(f"Total facts: {len(problem.derived_facts)}\n")
continue
# If we've failed twice in a row (or rabbit not available), give up
if consecutive_failed_rounds >= 2 or not (self.use_rabbit and self.rabbit_generator and self.rabbit_generator.can_generate()):
print("\n✗ Fixed point reached")
solved = [g for g in goals if any(g ==f for f in problem.derived_facts)]
print(f"Solved: {len(solved)}/{len(goals)} goals")
return len(solved) == len(goals), trace, problem.derived_facts
else:
# Made progress, reset counter
consecutive_failed_rounds = 0
print(f"Total facts: {len(problem.derived_facts)}\n")
print(f"\n✗ Max iterations ({self.max_iterations}) reached")
return False, trace, problem.derived_facts
def _run_dd_phase(self, facts: List, prev_combinations: List) -> Tuple[List, List[str], List, List[dict]]:
"""Run DD reasoning phase"""
new_implied, implied_reasonings = self.dd.check_implied_predicates(facts)
new_derivations, rules, combinations, new_reasoning_level = self.dd.check_applicable_predicates(facts + new_implied, prev_combinations)
new_derivations.extend(new_implied)
new_reasoning_level.extend(implied_reasonings)
return new_derivations, rules, combinations, new_reasoning_level
def _run_ar_phase(self, problem: GP, goals: List) -> Tuple[List, List[str], List[Dict]]:
"""Run AR reasoning phase"""
# self.ar.clear()
self.ar.add_assumptions(problem.dd_derived)
if self.max_ar_equations is not None and len(self.ar.equations) > self.max_ar_equations:
print(f" Skipping AR: too many equations ({len(self.ar.equations)} > {self.max_ar_equations})")
return [], []
if not self.ar.equations:
print(" No AR equations")
return [], []
print(f" AR: {len(self.ar.variables)} vars, {len(self.ar.equations)} eqs")
derived = []
reasonings = []
# Try to derive goals
for goal in (set(goals) - set([g['predicate'] for g in problem.proved_goals])):
can_derive, pred_reason = self.ar.can_derive(goal)
if can_derive:
derived.append(goal)
reasonings.append(pred_reason)
print(f" ✓ Derived goal: {goal}")
# Find other derivable facts
all_derivable = self.ar.find_all_derivable_facts()
for fact, reason in all_derivable:
if fact and not any(fact == f for f in problem.derived_facts + derived):
print(f" → {str(fact)}")
derived.append(fact)
reasonings.append(reason)
return derived, reasonings
def _all_goals_solved(self, goals: List, facts: List) -> Tuple[bool, List[dict]]:
"""Check if all goals are solved"""
if not goals:
return False, []
all_solved = True
goals_fullfiled = []
for g in goals:
match = next((f for f in facts if g == f['predicate']), None)
if not match:
all_solved = False
continue
goals_fullfiled.append(match)
return all_solved, goals_fullfiled
def _print_success(self, method: str, iterations: int, goals: List):
"""Print success message"""
print("\n" + "="*60)
print(f"🎉 SOLVED by {method} in {iterations} iterations")
print(f"Goals: {len(goals)}")
print("="*60)