-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnavigation-debugger.py
More file actions
executable file
·160 lines (140 loc) · 8.5 KB
/
navigation-debugger.py
File metadata and controls
executable file
·160 lines (140 loc) · 8.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
#!/usr/bin/env python
import rospy
import numpy as np
from moveit_msgs.msg import ExecuteTrajectoryActionGoal, RobotTrajectory
from control_msgs.msg import FollowJointTrajectoryActionGoal, FollowJointTrajectoryActionFeedback
from trajectory_msgs.msg import JointTrajectoryPoint
from actionlib_msgs.msg import GoalID
from std_msgs.msg import Header
class NavigationDebugger:
def __init__(self):
rospy.init_node('navigation_debugger', anonymous=True)
self.joint_trajectory_feedback_sub = rospy.Subscriber("/scaled_pos_joint_traj_controller/follow_joint_trajectory/feedback", FollowJointTrajectoryActionFeedback, self.joint_trajectory_feedback_callback)
self.joint_trajectory_sub = rospy.Subscriber("/scaled_pos_joint_traj_controller/follow_joint_trajectory/goal", FollowJointTrajectoryActionGoal, self.joint_trajectory_callback)
self.execute_goal_sub = rospy.Subscriber("/execute_trajectory/goal", ExecuteTrajectoryActionGoal, self.execute_goal_callback)
self.execute_cancel_pub = rospy.Publisher("/execute_trajectory/cancel", GoalID, queue_size=10)
self.execute_goal_pub = rospy.Publisher("/execute_trajectory/goal", ExecuteTrajectoryActionGoal, queue_size=10)
self.scaled_pos_Joint_cancel_pub = rospy.Publisher("/scaled_pos_joint_traj_controller/follow_joint_trajectory/cancel", GoalID, queue_size=10)
self.bad_plan = False
self.last_known_state_dict = None
def cancel_all(self, current_state_dict=None):
self.scaled_pos_Joint_cancel_pub.publish(GoalID())
self.execute_cancel_pub.publish(GoalID())
if current_state_dict is None:
print("WARNING - No current state dict provided")
return
if all([v == 0.0 for v in current_state_dict.values()]):
print("WARNING - Attempted to stop the robot but all joint positions are zero")
return
if len(current_state_dict) != 6:
print("WARNING - Current state dict does not contain 6 joints")
return
# publish zero velocity to stop the robot
# -- Note that the joint name order is important
# -- AND comes in a different order from the trajectory feedback
goal_joint_names = ['shoulder_pan_joint', 'shoulder_lift_joint', 'elbow_joint', 'wrist_1_joint', 'wrist_2_joint', 'wrist_3_joint']
zero_goal = ExecuteTrajectoryActionGoal()
zero_goal.goal.trajectory = RobotTrajectory()
zero_goal.goal.trajectory.joint_trajectory.header = Header()
zero_goal.goal.trajectory.joint_trajectory.header.frame_id = "robot_footprint"
zero_goal.goal.trajectory.joint_trajectory.joint_names = goal_joint_names
zero_point = JointTrajectoryPoint()
# -- Extract the correct joint order for a goal
zero_point.positions = np.array([current_state_dict[joint_name] for joint_name in goal_joint_names])
zero_point.velocities = [0.0]*6
zero_point.accelerations = [0.0]*6
zero_point.effort = [0.0]*6
zero_point.time_from_start = rospy.Duration()
zero_goal.goal.trajectory.joint_trajectory.points = [zero_point]
print("Stopping the robot")
print(f"-- Publishing the following goal to stop the robot\n{zero_goal}")
self.execute_goal_pub.publish(zero_goal)
def joint_trajectory_feedback_callback(self, data):
#print(f"JT Feedback: Received feedback")
positions = data.feedback.actual.positions
joint_names = data.feedback.joint_names
self.last_known_state_dict = dict(zip(joint_names, positions))
if self.bad_plan:
#print(f"JT Feedback: Canceling all goals due to bad plan")
#print(f"JT Feedback: Current state dict: {current_state_dict}")
self.cancel_all(self.last_known_state_dict)
return
# Function used just display when new plan/goal is received
def joint_trajectory_callback(self, data):
print(f"JT Goal: Received goal")
def execute_goal_callback(self, data):
print(f"Execute Goal: Received goal")
trajectory = data.goal.trajectory.joint_trajectory
if len(trajectory.points) == 0:
print("Empty trajectory")
return
if len(trajectory.points) == 1:
print("-- Detected a single point trajectory")
# check all velocities and accelerations are zero
zv = all([v == 0.0 for v in trajectory.points[0].velocities])
za = all([a == 0.0 for a in trajectory.points[0].accelerations])
if self.bad_plan and zv and za:
print("-- -- Single point with zero velocity and zero acceleration")
print("-- -- This plan is fine")
self.bad_plan = False
idx_of_lift_joint = trajectory.joint_names.index('shoulder_lift_joint')
idx_of_elbow_joint = trajectory.joint_names.index('elbow_joint')
current_plan_positions = np.array(trajectory.points[0].positions)
for idx, point in enumerate(trajectory.points):
if idx == 0:
continue
# This check should not be required...it should be handeled by moveit config
if point.positions[idx_of_lift_joint] > 0.05 or point.positions[idx_of_elbow_joint] < -0.05:
print("--------------------")
rospy.logerr("OMG - Attempting to invert the elbow joint")
print(f'Position:({idx}) in the plan -> \n{point.positions}')
print(f'Time to reach position: {point.time_from_start.to_sec()} seconds')
print("ABORTING")
# - NOTE: Avoiding console spam when error occurs
# - Typically if the error is early it is secondary attempt - no need to ouput the trajectory
if idx > 1:
print(f"-- The Trajectory in question...\n {trajectory}")
print("--------------------")
self.bad_plan = True
if self.last_known_state_dict is not None:
cancel_state = self.last_known_state_dict
else:
cancel_state = dict(zip(trajectory.joint_names, current_plan_positions))
# Just in case the 'current_plan_positions' first point is the problem
if cancel_state['shoulder_lift_joint'] > 0.05 and cancel_state['shoulder_lift_joint'] < 0.1: # Upper bound before it becomes risky
cancel_state['shoulder_lift_joint'] = cancel_state['shoulder_lift_joint']*-1
if cancel_state['elbow_joint'] < -0.05 and cancel_state['elbow_joint'] > -0.1: # Upper bound before it becomes risky
cancel_state['elbow_joint'] = cancel_state['elbow_joint']*-1
self.cancel_all(cancel_state)
break
# This check should not be required either...need to migrate to Pilz planner...
if any(np.greater(np.abs(current_plan_positions - np.array(point.positions)), 3.0)):
print("--------------------")
print("What tha..attempting to rotate at least one joint by more than 3.0 radians")
current_plan_positions = np.array([round(x, 4) for x in current_plan_positions])
next_plan_positions = [round(x, 4) for x in point.positions]
print(f'Position A:({max(idx-1, 0)}) in the plan -> \n{current_plan_positions}')
print(f'Position B:({idx}) in the plan -> \n{next_plan_positions}')
print(f'Difference: {np.abs(np.array(current_plan_positions) - np.array(next_plan_positions))}')
print(f'Time from A to B: {point.time_from_start.to_sec()} seconds')
print("ABORTING")
# - NOTE: Avoiding console spam when error occurs
# - Typically if the error is early it is secondary attempt - no need to ouput the trajectory
if idx > 1:
print(f"-- The Trajectory in question...\n {trajectory}")
print("--------------------")
self.bad_plan = True
if self.last_known_state_dict is not None:
cancel_state = self.last_known_state_dict
else:
# Not ideal...as the current state is not known, this might still be a bad plan
cancel_state = dict(zip(trajectory.joint_names, current_plan_positions))
self.cancel_all(cancel_state)
break
self.bad_plan = False
if __name__ == '__main__':
try:
NavigationDebugger()
rospy.spin()
except rospy.ROSInterruptException:
pass