diff --git a/.circleci/config.yml b/.circleci/config.yml
index 78a4f25..2637fde 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -5,7 +5,7 @@ jobs:
docker:
- image: public.ecr.aws/shadowrobot/build-tools:focal-noetic
environment:
- toolset_branch: master
+ toolset_branch: lint
server_type: circle
ros_release_name: noetic
ubuntu_version_name: focal
diff --git a/aws.yml b/aws.yml
index 86f4cdc..0424dbf 100644
--- a/aws.yml
+++ b/aws.yml
@@ -9,7 +9,7 @@ settings:
tag: focal-noetic
template_project_name: template_unit_tests_and_code_coverage
toolset:
- branch: master
+ branch: lint
modules:
- check_cache
- code_coverage
diff --git a/sr_3dmouse/scripts/mousecontrol b/sr_3dmouse/scripts/mousecontrol
index 94233c6..3570d6d 100755
--- a/sr_3dmouse/scripts/mousecontrol
+++ b/sr_3dmouse/scripts/mousecontrol
@@ -1,6 +1,6 @@
-#!/usr/bin/env python 3
+#!/usr/bin/env python3
-# Copyright 2019 Shadow Robot Company Ltd.
+# Copyright 2019, 2022 Shadow Robot Company Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
@@ -15,7 +15,6 @@
# with this program. If not, see .
"3D Mouse controller"
-from __future__ import absolute_import, division
from copy import deepcopy
import rospy
import tf
@@ -37,7 +36,7 @@ class MouseCommand():
"angularX": 3,
"angularY": 4,
"angularZ": 5}
- self.toggleT = False
+ self.toggle_t = False
self.data = Joy()
self.start_pose = Pose()
# subscribe to mouse topic
@@ -58,11 +57,11 @@ class MouseCommand():
if self.data.axes:
if self.data.buttons[1]:
# toggle teach switch
- self.toggleT = not self.toggleT
- self._arm_commander.set_teach_mode(self.toggleT)
- rospy.loginfo("Teaching toggle is:" + str(self.toggleT))
+ self.toggle_t = not self.toggle_t
+ self._arm_commander.set_teach_mode(self.toggle_t)
+ rospy.loginfo("Teaching toggle is:" + str(self.toggle_t))
rospy.sleep(1)
- if self.toggleT is False:
+ if self.toggle_t is False:
self.compute_joy_pose(self.start_pose)
else:
rospy.logwarn("no data from spacenav_node. Is it running?")
diff --git a/sr_3dmouse/test/test_controller.py b/sr_3dmouse/test/test_controller.py
index 7d40d46..68198f1 100755
--- a/sr_3dmouse/test/test_controller.py
+++ b/sr_3dmouse/test/test_controller.py
@@ -17,7 +17,6 @@
'''
3d mouse tester module
'''
-from __future__ import absolute_import, division
import unittest
import actionlib
import rospy
diff --git a/sr_grasp_fast_planner/src/sr_fast_grasp/fast_grasp.py b/sr_grasp_fast_planner/src/sr_fast_grasp/fast_grasp.py
index 5012223..fe25bed 100755
--- a/sr_grasp_fast_planner/src/sr_fast_grasp/fast_grasp.py
+++ b/sr_grasp_fast_planner/src/sr_fast_grasp/fast_grasp.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python 3
-# Copyright 2019 Shadow Robot Company Ltd.
+# Copyright 2019, 2022 Shadow Robot Company Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
@@ -14,71 +14,70 @@
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
-from __future__ import absolute_import, division
-import numpy
from copy import deepcopy
-
+import math
+import numpy
import rospy
from shape_msgs.msg import SolidPrimitive
from visualization_msgs.msg import Marker
from sr_robot_msgs.srv import GetFastGraspFromBoundingBox
-from moveit_msgs.msg import Grasp
from moveit_commander import MoveGroupCommander
+from moveit_msgs.msg import Grasp
from moveit_msgs.srv import GetRobotStateFromWarehouse as GetState
from moveit_msgs.srv import GetPositionIK
-def quaternion_from_matrix(matrix, isprecise=False):
- M = numpy.array(matrix, dtype=numpy.float64, copy=False)[:4, :4]
+def quaternion_from_matrix(input_matrix, isprecise=False):
+ matrix = numpy.array(input_matrix, dtype=numpy.float64, copy=False)[:4, :4]
if isprecise:
- q = numpy.empty((4, ))
- t = numpy.trace(M)
- if t > M[3, 3]:
- q[0] = t
- q[3] = M[1, 0] - M[0, 1]
- q[2] = M[0, 2] - M[2, 0]
- q[1] = M[2, 1] - M[1, 2]
+ quaternion = numpy.empty((4, ))
+ matrix_trace = numpy.trace(matrix)
+ if matrix_trace > matrix[3, 3]:
+ quaternion[0] = matrix_trace
+ quaternion[3] = matrix[1, 0] - matrix[0, 1]
+ quaternion[2] = matrix[0, 2] - matrix[2, 0]
+ quaternion[1] = matrix[2, 1] - matrix[1, 2]
else:
i, j, k = 1, 2, 3
- if M[1, 1] > M[0, 0]:
+ if matrix[1, 1] > matrix[0, 0]:
i, j, k = 2, 3, 1
- if M[2, 2] > M[i, i]:
+ if matrix[2, 2] > matrix[i, i]:
i, j, k = 3, 1, 2
- t = M[i, i] - (M[j, j] + M[k, k]) + M[3, 3]
- q[i] = t
- q[j] = M[i, j] + M[j, i]
- q[k] = M[k, i] + M[i, k]
- q[3] = M[k, j] - M[j, k]
- q *= 0.5 / math.sqrt(t * M[3, 3])
+ matrix_trace = matrix[i, i] - (matrix[j, j] + matrix[k, k]) + matrix[3, 3]
+ quaternion[i] = matrix_trace
+ quaternion[j] = matrix[i, j] + matrix[j, i]
+ quaternion[k] = matrix[k, i] + matrix[i, k]
+ quaternion[3] = matrix[k, j] - matrix[j, k]
+ quaternion *= 0.5 / math.sqrt(matrix_trace * matrix[3, 3])
else:
- m00 = M[0, 0]
- m01 = M[0, 1]
- m02 = M[0, 2]
- m10 = M[1, 0]
- m11 = M[1, 1]
- m12 = M[1, 2]
- m20 = M[2, 0]
- m21 = M[2, 1]
- m22 = M[2, 2]
- # symmetric matrix K
- K = numpy.array([[m00-m11-m22, 0.0, 0.0, 0.0],
- [m01+m10, m11-m00-m22, 0.0, 0.0],
- [m02+m20, m12+m21, m22-m00-m11, 0.0],
- [m21-m12, m02-m20, m10-m01, m00+m11+m22]])
- K /= 3.0
- # quaternion is eigenvector of K that corresponds to largest eigenvalue
- w, V = numpy.linalg.eigh(K)
- q = V[[3, 0, 1, 2], numpy.argmax(w)]
- if q[0] < 0.0:
- numpy.negative(q, q)
- return q
+ m00 = matrix[0, 0]
+ m01 = matrix[0, 1]
+ m02 = matrix[0, 2]
+ m10 = matrix[1, 0]
+ m11 = matrix[1, 1]
+ m12 = matrix[1, 2]
+ m20 = matrix[2, 0]
+ m21 = matrix[2, 1]
+ m22 = matrix[2, 2]
+ # symmetric matrix
+ sym_matrix = numpy.array([[m00-m11-m22, 0.0, 0.0, 0.0],
+ [m01+m10, m11-m00-m22, 0.0, 0.0],
+ [m02+m20, m12+m21, m22-m00-m11, 0.0],
+ [m21-m12, m02-m20, m10-m01, m00+m11+m22]])
+ sym_matrix /= 3.0
+ # quaternion is eigenvector of sym_matrix that corresponds to largest eigenvalue
+ eigh_val, eigh_matrix = numpy.linalg.eigh(sym_matrix)
+ quaternion = eigh_matrix[[3, 0, 1, 2], numpy.argmax(eigh_val)]
+ if quaternion[0] < 0.0:
+ numpy.negative(quaternion, quaternion)
+ return quaternion
class SrFastGrasp:
def __init__(self):
self.__marker_pub = rospy.Publisher("visualization_marker",
Marker, queue_size=1)
- self.__grasp_server = rospy.Service("grasp_from_bounding_box",
+ self.__grasp_server = rospy.Service("grasp_from_bounding_box", # pylint: disable=W0238
GetFastGraspFromBoundingBox,
self.__bounding_box_cb)
self.__default_grasp = 'super_amazing_grasp'
@@ -89,43 +88,42 @@ def __init__(self):
arm_group = rospy.get_param("~arm_group", "right_arm")
self.__group = MoveGroupCommander(hand_group)
- self.__arm_g = MoveGroupCommander(arm_group)
- self.__ik = rospy.ServiceProxy("compute_ik", GetPositionIK)
+ self.__arm_g = MoveGroupCommander(arm_group) # pylint: disable=W0238
+ self.__ik = rospy.ServiceProxy("compute_ik", GetPositionIK) # pylint: disable=W0238
def __modify_grasp_pose(self, grasp, pose):
+ # pylint: disable=R0201
"""
Aligns grasp with axis from origin to center of object.
A crude way to make a vaguely sane orientation for the hand
that seems to more or less work.
"""
- v1 = numpy.array([pose.pose.position.x,
- pose.pose.position.y,
- pose.pose.position.z])
- v1_length = numpy.linalg.norm(v1)
+ val1 = numpy.array([pose.pose.position.x,
+ pose.pose.position.y,
+ pose.pose.position.z])
+ val1_length = numpy.linalg.norm(val1)
- v1 = v1/v1_length
+ val1 = val1/val1_length
- v2 = [1, 0, -v1[0]/v1[2]]
- v2 = v2/numpy.linalg.norm(v2)
+ val2 = [1, 0, -val1[0]/val1[2]]
+ val2 = val2/numpy.linalg.norm(val2)
- v3 = numpy.cross(v1, v2)
- v3 = v3/numpy.linalg.norm(v3)
+ val3 = numpy.cross(val1, val2)
+ val3 = val3/numpy.linalg.norm(val3)
- m = [
- [v3[0], v1[0], v2[0]],
- [v3[1], v1[1], v2[1]],
- [v3[2], v1[2], v2[2]]
+ matrix = [
+ [val3[0], val1[0], val2[0]],
+ [val3[1], val1[1], val2[1]],
+ [val3[2], val1[2], val2[2]]
]
- q = quaternion_from_matrix(m)
-
+ quaternion = quaternion_from_matrix(matrix)
grasp.grasp_pose = deepcopy(pose)
-
- grasp.grasp_pose.pose.orientation.x = q[0]
- grasp.grasp_pose.pose.orientation.y = q[1]
- grasp.grasp_pose.pose.orientation.z = q[2]
- grasp.grasp_pose.pose.orientation.w = q[3]
+ grasp.grasp_pose.pose.orientation.x = quaternion[0]
+ grasp.grasp_pose.pose.orientation.y = quaternion[1]
+ grasp.grasp_pose.pose.orientation.z = quaternion[2]
+ grasp.grasp_pose.pose.orientation.w = quaternion[3]
def __bounding_box_cb(self, request):
box = request.bounding_box
@@ -174,16 +172,16 @@ def __get_grasp(self, name):
return grasp
- def __get_major_axis(self, box):
- m = max(box.dimensions)
- max_index = [i for i, j in enumerate(box.dimensions) if j == m]
+ def __get_major_axis(self, box): # pylint: disable=W0238,R0201
+ max_box = max(box.dimensions)
+ max_index = [i for i, j in enumerate(box.dimensions) if j == max_box]
return max_index[-1] # Get the LAST axis with max val.
def __send_marker_to_rviz(self, box, pose):
marker = self.__get_marker_from_box(box, pose)
self.__marker_pub.publish(marker)
- def __get_marker_from_box(self, box, pose):
+ def __get_marker_from_box(self, box, pose): # pylint: disable=R0201
marker = Marker()
marker.pose = pose.pose
marker.header.frame_id = pose.header.frame_id
@@ -204,7 +202,7 @@ def __get_marker_from_box(self, box, pose):
return marker
-if "__main__" == __name__:
+if __name__ == "__main__":
rospy.init_node('sr_fast_grasp')
- grasp = SrFastGrasp()
+ grasp_class = SrFastGrasp()
rospy.spin()
diff --git a/sr_grasp_fast_planner/src/sr_fast_grasp/grasp_saver.py b/sr_grasp_fast_planner/src/sr_fast_grasp/grasp_saver.py
index 1872f1b..abc5dd1 100755
--- a/sr_grasp_fast_planner/src/sr_fast_grasp/grasp_saver.py
+++ b/sr_grasp_fast_planner/src/sr_fast_grasp/grasp_saver.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python 3
-# Copyright 2019 Shadow Robot Company Ltd.
+# Copyright 2019, 2022 Shadow Robot Company Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
@@ -14,33 +14,32 @@
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
-from __future__ import absolute_import
-from sys import argv
+import sys
import rospy
from moveit_msgs.srv import SaveRobotStateToWarehouse as SaveState
-from sensor_msgs.msg import JointState
from moveit_msgs.msg import RobotState
+from sensor_msgs.msg import JointState
class GraspSaver:
def __init__(self, name):
- self.__group_name = "right_hand"
+ self.__group_name = "right_hand" # pylint: disable=W0238
self.__save = rospy.ServiceProxy(
'/moveit_warehouse_services/save_robot_state', SaveState)
- self.__js_subscriber = rospy.Subscriber("joint_states",
+ self.__js_subscriber = rospy.Subscriber("joint_states", # pylint: disable=W0238
JointState,
self.__js_cb)
self.__js = None
self.__done = False
self.__name = name
- def __js_cb(self, js):
+ def __js_cb(self, js): # pylint: disable=C0103
self.__js = js
def __save_out(self):
- rs = RobotState()
- rs.joint_state = self.__js
- self.__save(self.__name, "", rs)
+ robotstate = RobotState()
+ robotstate.joint_state = self.__js
+ self.__save(self.__name, "", robotstate)
def spin(self):
while not self.__done and not rospy.is_shutdown():
@@ -51,10 +50,10 @@ def spin(self):
rospy.sleep(.1)
-if "__main__" == __name__:
+if __name__ == "__main__":
rospy.init_node("grasp_saver")
- if len(argv) <= 1 or "" == argv[1]:
+ if len(sys.argv) <= 1 or sys.argv[1] == "":
rospy.logerr("You didn't enter a name.")
- exit(-1)
- gs = GraspSaver(argv[1])
+ sys.exit(-1)
+ gs = GraspSaver(sys.argv[1])
gs.spin()
diff --git a/sr_grasp_fast_planner/test/test_fast_grasp_planner.py b/sr_grasp_fast_planner/test/test_fast_grasp_planner.py
index 386d834..536e601 100755
--- a/sr_grasp_fast_planner/test/test_fast_grasp_planner.py
+++ b/sr_grasp_fast_planner/test/test_fast_grasp_planner.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python 3
-# Copyright 2019 Shadow Robot Company Ltd.
+# Copyright 2019, 2022 Shadow Robot Company Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
@@ -14,12 +14,11 @@
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
-from __future__ import absolute_import
from builtins import round
from math import log10, floor
from unittest import TestCase
-
+import rostest
import rospy
from sr_robot_msgs.srv import GetFastGraspFromBoundingBox \
as GetGrasp
@@ -27,8 +26,8 @@
from geometry_msgs.msg import PoseStamped
-def round_sig(x, sig):
- return round(x, sig-int(floor(log10(x)))-1)
+def round_sig(val, sig):
+ return round(val, sig-int(floor(log10(val)))-1)
class TestFastGraspPlanner(TestCase):
@@ -47,6 +46,7 @@ def setUp(self):
# pass
def make_box_and_pose(self):
+ # pylint: disable=R0201
pose = PoseStamped()
pose.pose.position.x = 0.5
pose.pose.position.y = 0.5
@@ -86,8 +86,6 @@ def test_grasp(self):
if __name__ == '__main__':
- import rostest
-
rospy.sleep(20)
# rostest.rosrun("test_service_running",
diff --git a/sr_hand_health_report/CMakeLists.txt b/sr_hand_health_report/CMakeLists.txt
index ef95f04..c35874f 100644
--- a/sr_hand_health_report/CMakeLists.txt
+++ b/sr_hand_health_report/CMakeLists.txt
@@ -146,7 +146,6 @@ include_directories(
## Mark executable scripts (Python etc.) for installation
## in contrast to setup.py, you can choose the destination
install(PROGRAMS
- scripts/aws_manager.py
DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}
)
diff --git a/sr_hand_health_report/launch/sr_hand_health_report.launch b/sr_hand_health_report/launch/sr_hand_health_report.launch
index a73a624..ea87334 100644
--- a/sr_hand_health_report/launch/sr_hand_health_report.launch
+++ b/sr_hand_health_report/launch/sr_hand_health_report.launch
@@ -14,7 +14,7 @@
-
+
$(arg checks_to_run)
$(arg fingers_to_test)
$(arg results_path)
diff --git a/sr_hand_health_report/launch/sr_log_aws_files.launch b/sr_hand_health_report/launch/sr_log_aws_files.launch
index 46b8df2..65daa6c 100644
--- a/sr_hand_health_report/launch/sr_log_aws_files.launch
+++ b/sr_hand_health_report/launch/sr_log_aws_files.launch
@@ -1,24 +1,23 @@
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
-
- $(arg download)
- $(arg upload)
- $(arg file_path)
- $(arg folder_path)
- $(arg report_name)
- $(arg bag_name)
- $(arg param_dump_file_name)
+
+ $(arg function_mode)
+ $(arg skip_check)
+ $(arg files_base_path)
+ $(arg files_folder_path)
+ $(arg bucket_name)
+ $(arg bucket_subfolder)
+ $(arg file_names)
diff --git a/sr_hand_health_report/package.xml b/sr_hand_health_report/package.xml
index 2601c73..338986d 100644
--- a/sr_hand_health_report/package.xml
+++ b/sr_hand_health_report/package.xml
@@ -16,6 +16,7 @@
message_runtime
python-numpy
rospy
+ sr_utilities_common
sr_system_info
sr_controllers_tools
sr_logging_common
diff --git a/sr_hand_health_report/scripts/aws_manager.py b/sr_hand_health_report/scripts/aws_manager.py
deleted file mode 100755
index a93cfb0..0000000
--- a/sr_hand_health_report/scripts/aws_manager.py
+++ /dev/null
@@ -1,110 +0,0 @@
-#!/usr/bin/env python 3
-
-# Copyright 2020 Shadow Robot Company Ltd.
-#
-# This program is free software: you can redistribute it and/or modify it
-# under the terms of the GNU General Public License as published by the Free
-# Software Foundation version 2 of the License.
-#
-# This program is distributed in the hope that it will be useful, but WITHOUT
-# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
-# FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
-# more details.
-#
-# You should have received a copy of the GNU General Public License along
-# with this program. If not, see .
-
-from __future__ import absolute_import
-import rospy
-import logging
-import boto3
-from botocore.exceptions import ClientError
-import subprocess
-import requests
-import re
-import json
-import os
-
-if __name__ == "__main__":
- rospy.init_node("aws_manager_node")
-
- download_param = rospy.get_param("~download")
- upload_param = rospy.get_param("~upload")
-
- file_path = rospy.get_param("~file_path")
- folder_path = rospy.get_param("~folder_path")
- report_file_name = rospy.get_param("~report_name")
- rosbag_file_name = rospy.get_param("~bag_name")
- param_dump_file_name = rospy.get_param("~param_dump_file_name")
-
- full_report_path = "{}/{}/{}".format(file_path, folder_path, report_file_name)
- full_bag_path = "{}/{}/{}".format(file_path, folder_path, rosbag_file_name)
- full_yaml_dump_path = "{}/{}/{}".format(file_path, folder_path, param_dump_file_name)
-
- aws_report_path = "{}/{}".format(folder_path, report_file_name)
- aws_bag_path = "{}/{}".format(folder_path, rosbag_file_name)
- aws_yaml_dump_path = "{}/{}".format(folder_path, param_dump_file_name)
-
- try:
- with open('/usr/local/bin/customer.key', 'r') as customer_key_file:
- customer_key = customer_key_file.read()
- except Exception:
- rospy.logerr("Could not find customer key, ask software team for help!")
-
- headers = {
- 'x-api-key': '{}'.format(customer_key[:-1]),
- }
-
- try:
- response = requests.get('https://5vv2z6j3a7.execute-api.eu-west-2.amazonaws.com/prod', headers=headers)
- except Exception:
- rospy.logerr("Could request secret AWS access key, ask software team for help!")
-
- result = re.search('ACCESS_KEY_ID=(.*)\nSECRET_ACCESS', response.text)
- aws_access_key_id = result.group(1)
-
- result = re.search('SECRET_ACCESS_KEY=(.*)\nSESSION_TOKEN', response.text)
- aws_secret_access_key = result.group(1)
-
- result = re.search('SESSION_TOKEN=(.*)\nEXPIRATION', response.text)
- aws_session_token = result.group(1)
-
- client = boto3.client(
- 's3',
- aws_access_key_id=aws_access_key_id,
- aws_secret_access_key=aws_secret_access_key,
- aws_session_token=aws_session_token
- )
-
- bucket_name = "shadowrobot.healthreport.results"
- if upload_param is True:
- rospy.loginfo("Uploading report yaml file..")
- client.upload_file(full_report_path, bucket_name, aws_report_path)
- rospy.loginfo("Upload download of report yaml file!")
-
- rospy.loginfo("Uploading bag file..")
- client.upload_file(full_bag_path, bucket_name, aws_bag_path)
- rospy.loginfo("Completed Upload of bag file!")
-
- rospy.loginfo("Uploading param dump yaml file..")
- client.upload_file(full_yaml_dump_path, bucket_name, aws_yaml_dump_path)
- rospy.loginfo("Completed Upload of param dump yaml file!")
-
- if download_param is True:
- directory = "{}/{}".format(file_path, folder_path)
- if not os.path.exists(directory):
- os.makedirs(directory)
- rospy.loginfo("Downloading report yaml file..")
- client.download_file(bucket_name, aws_report_path, "{}/{}/{}".format(file_path, folder_path, report_file_name))
- rospy.loginfo("Completed download of report yaml file!")
-
- rospy.loginfo("Downloading bag file..")
- client.download_file(bucket_name, aws_bag_path, "{}/{}/{}".format(file_path, folder_path, rosbag_file_name))
- rospy.loginfo("Completed download of bag file!")
-
- rospy.loginfo("Downloading param dump yaml file..")
- client.download_file(bucket_name, aws_yaml_dump_path, "{}/{}/{}".format(file_path, folder_path,
- param_dump_file_name))
- rospy.loginfo("Completed download of param dump yaml file!")
-
- rospy.signal_shutdown("")
diff --git a/sr_hand_health_report/setup.py b/sr_hand_health_report/setup.py
index 8e237c2..09e25d5 100644
--- a/sr_hand_health_report/setup.py
+++ b/sr_hand_health_report/setup.py
@@ -1,6 +1,5 @@
# ! DO NOT MANUALLY INVOKE THIS setup.py, USE CATKIN INSTEAD
-from __future__ import absolute_import
from setuptools import setup
from catkin_pkg.python_setup import generate_distutils_setup
diff --git a/sr_hand_health_report/src/sr_hand_health_report/monotonicity_check.py b/sr_hand_health_report/src/sr_hand_health_report/monotonicity_check.py
index 4a597d3..c549fa7 100644
--- a/sr_hand_health_report/src/sr_hand_health_report/monotonicity_check.py
+++ b/sr_hand_health_report/src/sr_hand_health_report/monotonicity_check.py
@@ -13,21 +13,17 @@
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
-
-from __future__ import absolute_import, division
from builtins import round
import rospy
from sr_hand_health_report_check import SrHealthReportCheck, SENSOR_CUTOUT_THRESHOLD, NR_OF_BITS_NOISE_WARNING
-from std_msgs.msg import Float64
import numpy as np
-import decimal
SW_LIMITS_FOR_JOINTS = {"wrj1": -0.785, "thj5": 1.047}
class MonotonicityCheck(SrHealthReportCheck):
def __init__(self, hand_side, fingers_to_test):
- super(MonotonicityCheck, self).__init__(hand_side, fingers_to_test)
+ super().__init__(hand_side, fingers_to_test)
self._is_joint_monotonous = True
self._dict_of_monotonic_joints = {}
self._publishing_rate = rospy.Rate(50) # 50 Hz
@@ -35,6 +31,8 @@ def __init__(self, hand_side, fingers_to_test):
self._previous_difference = 0
self._pwm_command = 250
self._check_duration = rospy.Duration(7.0)
+ self._first_end_stop_sensor_value = None
+ self._second_end_stop_sensor_value = None
def run_check(self):
result = {"monotonicity_check": []}
@@ -72,10 +70,9 @@ def _run_check_per_joint(self, joint, command, finger):
self._is_joint_monotonous = True
end_reached = False
is_joint_monotonous = True
- joint_limit_reached = False
time = rospy.Time.now() + self._check_duration
- while (rospy.Time.now() < time):
+ while rospy.Time.now() < time:
if end_reached is False:
joint.move_joint(extend_command, "effort")
else:
@@ -87,14 +84,13 @@ def _run_check_per_joint(self, joint, command, finger):
if (round(rospy.Time.now().to_sec(), 1) == round(time.to_sec(), 1)) and end_reached is False:
time = rospy.Time.now() + self._check_duration
end_reached = True
- self._first_end_stop_sensor_value = self._get_raw_sensor_value(joint._raw_sensor_data)
- joint_limit_reached = False
- self._second_end_stop_sensor_value = self._get_raw_sensor_value(joint._raw_sensor_data)
+ self._first_end_stop_sensor_value = get_raw_sensor_value(joint.get_raw_sensor_data())
+ self._second_end_stop_sensor_value = get_raw_sensor_value(joint.get_raw_sensor_data())
- higher_value, lower_value = self._check_sensor_range(self._first_end_stop_sensor_value,
- self._second_end_stop_sensor_value)
+ higher_value, lower_value = check_sensor_range(self._first_end_stop_sensor_value,
+ self._second_end_stop_sensor_value)
- self._add_result_to_dict(joint.joint_name, self._is_joint_monotonous, higher_value, lower_value)
+ self._add_result_to_dict(joint.joint_name, higher_value, lower_value)
self._reset_joint_to_position(finger, joint, extend_command, flex_command)
def _reset_joint_to_position(self, finger, joint, extend_command, flex_command):
@@ -114,25 +110,22 @@ def _reset_joint_to_position(self, finger, joint, extend_command, flex_command):
self.command_sign_map[finger.finger_name + "j3"] * 250, 3.0,
self._publishing_rate)
- def _add_result_to_dict(self, joint_name, is_joint_monotonous, higher_value, lower_value):
+ def _add_result_to_dict(self, joint_name, higher_value, lower_value):
self._dict_of_monotonic_joints[joint_name] = {}
self._dict_of_monotonic_joints[joint_name]["is_monotonic"] = self._is_joint_monotonous
self._dict_of_monotonic_joints[joint_name]["higher_raw_sensor_value"] = higher_value
self._dict_of_monotonic_joints[joint_name]["lower_raw_sensor_value"] = lower_value
- def _get_raw_sensor_value(self, data):
- return sum(data) / len(data)
-
def _check_monotonicity(self, joint):
if self._older_raw_sensor_value == 0:
- self._older_raw_sensor_value = self._get_raw_sensor_value(joint._raw_sensor_data)
+ self._older_raw_sensor_value = get_raw_sensor_value(joint.get_raw_sensor_data())
- difference_between_raw_data = (self._get_raw_sensor_value(joint._raw_sensor_data) -
+ difference_between_raw_data = (get_raw_sensor_value(joint.get_raw_sensor_data()) -
self._older_raw_sensor_value)
- self._older_raw_sensor_value = self._get_raw_sensor_value(joint._raw_sensor_data)
+ self._older_raw_sensor_value = get_raw_sensor_value(joint.get_raw_sensor_data())
if abs(difference_between_raw_data) <= SENSOR_CUTOUT_THRESHOLD:
- if (abs(difference_between_raw_data) > NR_OF_BITS_NOISE_WARNING):
- if (abs(self._previous_difference) > NR_OF_BITS_NOISE_WARNING):
+ if abs(difference_between_raw_data) > NR_OF_BITS_NOISE_WARNING:
+ if abs(self._previous_difference) > NR_OF_BITS_NOISE_WARNING:
if np.sign(difference_between_raw_data) != 0 and np.sign(self._previous_difference) != 0:
if np.sign(difference_between_raw_data) != np.sign(self._previous_difference):
rospy.logwarn("Unmonotonic behaviour detected")
@@ -142,20 +135,6 @@ def _check_monotonicity(self, joint):
self._previous_difference = difference_between_raw_data
return True
- def _check_sensor_range(self, first_sensor_value, second_sensor_value):
- """
- This function records the minimum and maximum range hit by the joint
- during the monotonicity check, this is collected to sanity check the
- sensor range
- """
- if first_sensor_value > second_sensor_value:
- higher_value = first_sensor_value
- lower_value = second_sensor_value
- else:
- higher_value = second_sensor_value
- lower_value = first_sensor_value
- return higher_value, lower_value
-
def _check_joint_limit(self, joint):
"""
This function check the joint position to avoid intense stress on WR1 and TH5,
@@ -163,9 +142,28 @@ def _check_joint_limit(self, joint):
"""
limit_reached = False
if joint.joint_name == self._hand_prefix + "_wrj1":
- if joint._current_position - SW_LIMITS_FOR_JOINTS["wrj1"] < 0.01:
+ if joint.get_current_position() - SW_LIMITS_FOR_JOINTS["wrj1"] < 0.01:
limit_reached = True
elif joint.joint_name == self._hand_prefix + "_thj5":
- if abs(abs(joint._current_position) - SW_LIMITS_FOR_JOINTS["thj5"]) < 0.01:
+ if abs(abs(joint.get_current_position()) - SW_LIMITS_FOR_JOINTS["thj5"]) < 0.01:
limit_reached = True
return limit_reached
+
+
+def check_sensor_range(first_sensor_value, second_sensor_value):
+ """
+ This function records the minimum and maximum range hit by the joint
+ during the monotonicity check, this is collected to sanity check the
+ sensor range
+ """
+ if first_sensor_value > second_sensor_value:
+ higher_value = first_sensor_value
+ lower_value = second_sensor_value
+ else:
+ higher_value = second_sensor_value
+ lower_value = first_sensor_value
+ return higher_value, lower_value
+
+
+def get_raw_sensor_value(data):
+ return sum(data) / len(data)
diff --git a/sr_hand_health_report/src/sr_hand_health_report/position_sensor_noise_check.py b/sr_hand_health_report/src/sr_hand_health_report/position_sensor_noise_check.py
index be6ede8..4ba3daf 100644
--- a/sr_hand_health_report/src/sr_hand_health_report/position_sensor_noise_check.py
+++ b/sr_hand_health_report/src/sr_hand_health_report/position_sensor_noise_check.py
@@ -14,17 +14,17 @@
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
-from __future__ import absolute_import
import rospy
from sr_hand_health_report_check import SrHealthReportCheck, SENSOR_CUTOUT_THRESHOLD, NR_OF_BITS_NOISE_WARNING
class PositionSensorNoiseCheck(SrHealthReportCheck):
def __init__(self, hand_side, fingers_to_test):
- super(PositionSensorNoiseCheck, self).__init__(hand_side, fingers_to_test)
+ super().__init__(hand_side, fingers_to_test)
self._check_duration = rospy.Duration(5.0)
- self._shared_dict = dict()
+ self._shared_dict = {}
self._publishing_rate = rospy.Rate(200)
+ self._initial_raw_value = None
def run_check(self):
result = {"position_sensor_noise_check": []}
@@ -35,7 +35,7 @@ def run_check(self):
rospy.loginfo("collecting and analyzing data for FINGER {}".format(finger.finger_name))
for joint in finger.joints_dict.values():
rospy.loginfo("collecting and analyzing data for JOINT {}".format(joint.joint_name))
- self._initial_raw_value = joint._raw_sensor_data
+ self._initial_raw_value = joint.get_raw_sensor_data()
self.check_joint_raw_sensor_value(self._initial_raw_value, joint, self._shared_dict)
result["position_sensor_noise_check"].append(dict(self._shared_dict))
rospy.loginfo("Position Sensor Noise Check finished, exporting results")
@@ -43,14 +43,13 @@ def run_check(self):
def check_joint_raw_sensor_value(self, initial_raw_value, joint, dictionary):
status = ""
- warning = False
test_failed = False
if joint.joint_name != self._hand_prefix + "_wrj1" and joint.joint_name != self._hand_prefix + "_thj5":
initial_raw_value = initial_raw_value[-1:]
- for i, value in enumerate(initial_raw_value):
+ for index in range(len(initial_raw_value)):
time = rospy.Time.now() + self._check_duration
while rospy.Time.now() < time and test_failed is not True:
- difference = joint._raw_sensor_data[i] - initial_raw_value[i]
+ difference = joint.get_raw_sensor_data()[index] - initial_raw_value[index]
if abs(difference) <= SENSOR_CUTOUT_THRESHOLD:
if abs(difference) < NR_OF_BITS_NOISE_WARNING:
status = "{} bits noise - CHECK PASSED".format(abs(difference) + 1)
diff --git a/sr_hand_health_report/src/sr_hand_health_report/rosbag_manager.py b/sr_hand_health_report/src/sr_hand_health_report/rosbag_manager.py
index 02f355b..2981e60 100755
--- a/sr_hand_health_report/src/sr_hand_health_report/rosbag_manager.py
+++ b/sr_hand_health_report/src/sr_hand_health_report/rosbag_manager.py
@@ -13,16 +13,12 @@
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
-
-
-from __future__ import absolute_import
-import rospy
import os
-import datetime
import subprocess
+import rospy
-class RosbagManager(object):
+class RosbagManager:
def __init__(self, log_test_directory):
self._rosbag_proc = None
self._log_test_directory = log_test_directory
@@ -31,24 +27,23 @@ def __init__(self, log_test_directory):
def start_log(self):
if not os.path.exists(self._log_test_directory):
os.mkdir(self._log_test_directory)
- f = open("{}/README.txt".format(self._log_test_directory), "w+")
- f.write("Test")
- f.close()
- os.system("rosparam dump {}/param_dump.yaml".format(self._log_test_directory))
+ with open(f"{self._log_test_directory}/README.txt", "w+", encoding="ASCII") as log:
+ log.write("Test")
+ os.system(f"rosparam dump {self._log_test_directory}/param_dump.yaml")
def record_bag(self, log_test_directory, bag_name):
"""
Start data logging in folder and with test description
"""
- rospy.loginfo("Loggin data in {} in {} bag file".format(log_test_directory, bag_name))
+ rospy.loginfo(f"Loggin data in {log_test_directory} in {bag_name} bag file")
os.system("killall rosbag")
os.system("killall rostopic")
try:
- self._rosbag_proc = subprocess.Popen(['rosbag record \
- -O {}/{} -a __name:=bag_node'.format(log_test_directory, bag_name)], shell=True)
+ self._rosbag_proc = subprocess.Popen([f'rosbag record \
+ -O {log_test_directory}/{bag_name} -a __name:=bag_node'], shell=True) # pylint: disable=R1732
rospy.sleep(2.0) # give time to record to start
- except OSError as e:
+ except OSError:
rospy.logerr("Could not start rosbag record")
self._rosbag_proc.kill()
@@ -56,12 +51,12 @@ def play_bag(self, log_test_directory, bag_name):
"""
Start playing a rosbg in a given folder
"""
- rospy.loginfo("Playing rosbag {}/{}".format(log_test_directory, bag_name))
+ rospy.loginfo(f"Playing rosbag {log_test_directory}/{bag_name}")
try:
- self._rosbag_proc = subprocess.Popen(['rosbag play -q {}/{} \
- --clock'.format(log_test_directory, bag_name)], shell=True)
- except OSError as e:
+ self._rosbag_proc = subprocess.Popen([f'rosbag play -q {log_test_directory}/{bag_name} \
+ --clock'], shell=True) # pylint: disable=R1732
+ except OSError:
rospy.logerr("Could not play rosbag")
self._rosbag_proc.kill()
diff --git a/sr_hand_health_report/src/sr_hand_health_report/sr_hand_health_report_check.py b/sr_hand_health_report/src/sr_hand_health_report/sr_hand_health_report_check.py
index d426ff8..5e18cf9 100644
--- a/sr_hand_health_report/src/sr_hand_health_report/sr_hand_health_report_check.py
+++ b/sr_hand_health_report/src/sr_hand_health_report/sr_hand_health_report_check.py
@@ -13,20 +13,12 @@
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
-
-
-from __future__ import absolute_import
+from collections import OrderedDict
+from std_msgs.msg import Float64
import rospy
-import os
-from controller_manager_msgs.srv import ListControllers
-from controller_manager_msgs.srv import SwitchController, LoadController, UnloadController
from sensor_msgs.msg import JointState
-from std_msgs.msg import Float64
-from sr_robot_commander.sr_hand_commander import SrHandCommander
-from std_srvs.srv import SetBool
-from collections import OrderedDict
-from sr_robot_msgs.msg import EthercatDebug
from sr_controllers_tools.sr_controller_helper import ControllerHelper
+from sr_robot_msgs.msg import EthercatDebug
from sr_robot_msgs.msg import ControlType
from sr_robot_msgs.srv import ChangeControlType
@@ -36,18 +28,18 @@
NR_OF_BITS_NOISE_WARNING = 3
-class Finger(object):
+class Finger:
def __init__(self, hand_prefix, finger_name):
self._hand_prefix = hand_prefix
self.finger_name = finger_name
self.joints_dict = OrderedDict()
def move_finger(self, command, control_type):
- for j in self.joints_dict.values():
- j.move_joint(command, control_type)
+ for joint in self.joints_dict.values():
+ joint.move_joint(command, control_type)
-class Joint(object):
+class Joint:
def __init__(self, hand_prefix, finger_name, joint_index):
self._finger_name = finger_name
self.joint_index = joint_index
@@ -72,13 +64,25 @@ def __init__(self, hand_prefix, finger_name, joint_index):
self._current_position = float()
def move_joint(self, command, control_type):
- if control_type is "effort":
+ if control_type == "effort":
self._pwm_command_publisher.publish(command)
- elif control_type is "position":
+ elif control_type == "position":
self._position_command_publisher.publish(command)
+ def set_current_position(self, new_position):
+ self._current_position = new_position
+
+ def get_current_position(self):
+ return self._current_position
+
+ def set_raw_sensor_data(self, new_raw_sensor_data):
+ self._raw_sensor_data = new_raw_sensor_data
+
+ def get_raw_sensor_data(self):
+ return self._raw_sensor_data
+
-class SrHealthReportCheck(object):
+class SrHealthReportCheck:
def __init__(self, hand_side, fingers_to_test):
self._hand_prefix = hand_side[0] + "h"
self._hand_name = hand_side + "_hand"
@@ -118,7 +122,7 @@ def __init__(self, hand_side, fingers_to_test):
def _init_map_finger_joints(self):
fingers_to_joint_map = OrderedDict()
- for joint in self._joint_msg.name:
+ for joint in self._joint_msg.name: # pylint: disable=E1101
finger_name = joint[3:-2]
if finger_name not in fingers_to_joint_map:
fingers_to_joint_map[finger_name] = []
@@ -168,8 +172,8 @@ def _init_raw_sensor_data_list(self):
if "TH" in self._fingers_to_joint_map:
for joint_index in self._fingers_to_joint_map["TH"]:
if joint_index == "J5":
- for s in range(0, 2):
- name_to_append = self._hand_prefix + "_TH" + joint_index + "_{}".format(s)
+ for iteration in range(0, 2):
+ name_to_append = self._hand_prefix + "_TH" + joint_index + f"_{iteration}"
raw_sensor_names_list.append(name_to_append.lower())
else:
name_to_append = self._hand_prefix + "_TH" + joint_index
@@ -177,8 +181,8 @@ def _init_raw_sensor_data_list(self):
if "WR" in self._fingers_to_joint_map:
for joint_index in self._fingers_to_joint_map["WR"]:
if joint_index == "J1":
- for s in range(0, 2):
- name_to_append = self._hand_prefix + "_WR" + joint_index + "_{}".format(s)
+ for iteration in range(0, 2):
+ name_to_append = self._hand_prefix + "_WR" + joint_index + f"_{iteration}"
raw_sensor_names_list.append(name_to_append.lower())
else:
name_to_append = self._hand_prefix + "_WR" + joint_index
@@ -190,7 +194,7 @@ def _joint_states_callback(self, sensor_msg):
for finger in self.fingers_to_check:
for joint in finger.joints_dict.values():
if joint.joint_name == sensor_msg.name[count].lower():
- joint._current_position = sensor_msg.position[count]
+ joint.set_current_position(sensor_msg.position[count])
count += 1
def _raw_data_sensor_callback(self, ethercat_data):
@@ -208,18 +212,18 @@ def _raw_data_sensor_callback(self, ethercat_data):
raw_sensor_data_list.append(self._raw_sensor_data_map[self._hand_prefix + '_wrj1_1'])
else:
raw_sensor_data_list.insert(0, self._raw_sensor_data_map[joint.joint_name])
- joint._raw_sensor_data = raw_sensor_data_list
+ joint.set_raw_sensor_data(raw_sensor_data_list)
def switch_controller_mode(self, control_type):
- if control_type is "trajectory":
+ if control_type == "trajectory":
rospy.loginfo("Changing trajectory controllers to RUN")
self.ctrl_helper.change_trajectory_ctrl("run")
- elif control_type is "position" or control_type is "effort":
+ elif control_type in ("position", "effort"):
self.ctrl_helper.change_trajectory_ctrl("stop")
change_type_msg = ChangeControlType()
change_type_msg.control_type = ControlType.PWM
self.ctrl_helper.change_force_ctrl_type(change_type_msg)
- rospy.loginfo("Changing controllers to: %s", control_type)
+ rospy.loginfo(f"Changing controllers to: {control_type}")
self.ctrl_helper.change_hand_ctrl(control_type)
def drive_joint_to_position(self, joint, command):
@@ -227,9 +231,9 @@ def drive_joint_to_position(self, joint, command):
joint.move_joint(command, "position")
self.switch_controller_mode("effort")
- def drive_joint_with_pwm(self, joint, command, duration, rate):
+ def drive_joint_with_pwm(self, joint, command, duration, rate): # pylint: disable=R0201
now = rospy.Time.now()
- while (rospy.Time.now() < now + rospy.Duration(duration)):
+ while rospy.Time.now() < now + rospy.Duration(duration):
joint.move_joint(command, "effort")
rate.sleep()
joint.move_joint(0, "effort")
diff --git a/sr_hand_health_report/src/sr_hand_health_report/sr_hand_health_report_node_v0.0.1.py b/sr_hand_health_report/src/sr_hand_health_report/sr_hand_health_report_node.py
similarity index 93%
rename from sr_hand_health_report/src/sr_hand_health_report/sr_hand_health_report_node_v0.0.1.py
rename to sr_hand_health_report/src/sr_hand_health_report/sr_hand_health_report_node.py
index 91d5efe..71590ac 100755
--- a/sr_hand_health_report/src/sr_hand_health_report/sr_hand_health_report_node_v0.0.1.py
+++ b/sr_hand_health_report/src/sr_hand_health_report/sr_hand_health_report_node.py
@@ -13,25 +13,20 @@
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
-
-
-from __future__ import absolute_import
import argparse
-import rospy
-import rospkg
import time
-import yaml
import os
-from sr_hand_health_report_check import SrHealthReportCheck
+import yaml
+import rospy
+import rospkg
from monotonicity_check import MonotonicityCheck
from position_sensor_noise_check import PositionSensorNoiseCheck
-from sr_hand_health_report.msg import CheckStatus
-from sr_system_info.system_info import SystemInfo
-from collections import OrderedDict
from rosbag_manager import RosbagManager
+from sr_system_info.system_info import SystemInfo
+from sr_hand_health_report.msg import CheckStatus
-class HealthReportScriptNode(object):
+class HealthReportScriptNode:
def __init__(self, real_hand, home_folder_path):
self._real_hand = real_hand
self._health_report_checks_status_publisher = rospy.Publisher("/health_report_checks_status_publisher",
@@ -100,7 +95,7 @@ def run_position_sensor_noise_check(self, publish_state=True):
def write_results_to_file(self, filename=None):
if filename is None:
filename = self._results_path
- with open(filename, 'w') as yaml_file:
+ with open(filename, 'w', encoding="ASCII") as yaml_file:
yaml.safe_dump(self._results, stream=yaml_file, default_flow_style=False)
rospy.signal_shutdown("All checks completed!")
@@ -142,14 +137,14 @@ def run_checks_bag_file(self, rosbag_path, rosbag_name):
args, unknown_args = parser.parse_known_args()
rospy.init_node('sr_hand_health_report_script')
- real_hand = rospy.get_param("~real_hand")
- home_folder_path = rospy.get_param("~results_path")
- sr_hand_health_report_script = HealthReportScriptNode(real_hand, home_folder_path)
+ real_hand_param = rospy.get_param("~real_hand")
+ home_folder_path_param = rospy.get_param("~results_path")
+ sr_hand_health_report_script = HealthReportScriptNode(real_hand_param, home_folder_path_param)
- if real_hand is True:
+ if real_hand_param is True:
sr_hand_health_report_script.run_checks_real_hand()
else:
- rosbag_path = rospy.get_param("~rosbag_path")
- rosbag_name = rospy.get_param("~rosbag_name")
- sr_hand_health_report_script.run_checks_bag_file(rosbag_path, rosbag_name)
+ rosbag_path_param = rospy.get_param("~rosbag_path")
+ rosbag_name_param = rospy.get_param("~rosbag_name")
+ sr_hand_health_report_script.run_checks_bag_file(rosbag_path_param, rosbag_name_param)
rospy.spin()
diff --git a/sr_world_generator/scripts/world_generator_gui.py b/sr_world_generator/scripts/world_generator_gui.py
index d80b020..432b754 100755
--- a/sr_world_generator/scripts/world_generator_gui.py
+++ b/sr_world_generator/scripts/world_generator_gui.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
-# Copyright 2019 Shadow Robot Company Ltd.
+# Copyright 2019, 2022 Shadow Robot Company Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
@@ -13,31 +13,22 @@
#
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
-
-from __future__ import absolute_import
+import os
import sys
-from python_qt_binding.QtGui import *
-from python_qt_binding.QtCore import *
-from qt_gui.plugin import Plugin
-from python_qt_binding import loadUi
-
-try:
- from python_qt_binding.QtWidgets import *
-except ImportError:
- pass
-
-from sr_world_generator.save_world_file import GazeboWorldSaver
-
+import subprocess
import signal
import rospy
-import os
import rospkg
-import subprocess
+from qt_gui.plugin import Plugin
+from python_qt_binding.QtGui import QWidget, QPushButton, QRadioButton, QLineEdit, \
+ QGroupBox, QFileDialog, QMessageBox, QApplication
+from python_qt_binding import loadUi
+from sr_world_generator.save_world_file import GazeboWorldSaver
class SrWorldGeneratorGui(Plugin):
def __init__(self, context):
- super(SrWorldGeneratorGui, self).__init__(context)
+ super().__init__(context)
self.setObjectName("SrWorldGeneratorGui")
self._widget = QWidget()
@@ -59,6 +50,7 @@ def __init__(self, context):
self.empty_world_yes_radio.clicked.connect(self.disable_world_path)
self.empty_world_no_radio.clicked.connect(self.enable_world_path)
+ self.process = None
def init_widget_children(self):
self.open_gazebo_button = self._widget.findChild(QPushButton, "open_gazebo_button")
@@ -95,7 +87,7 @@ def start_gazebo_process(self):
if not is_starting_with_empty_world:
world_file_path = self.world_line_edit.displayText()
if not os.path.isfile(world_file_path):
- self.throw_warning_dialog("File does not exist!")
+ throw_warning_dialog("File does not exist!")
return
gazebo_start_command += ' world:={}'.format(world_file_path)
@@ -103,8 +95,7 @@ def start_gazebo_process(self):
self.close_gazebo_button.setEnabled(True)
self.transform_file_group_box.setEnabled(False)
- self.process = subprocess.Popen([gazebo_start_command],
- shell=True)
+ self.process = subprocess.Popen([gazebo_start_command], shell=True) # pylint: disable=R1732
def stop_gazebo_process(self):
self.process.kill()
@@ -120,9 +111,9 @@ def transform_world_file(self):
'World Files (*.world)')
gazebo_generated_world_file_path = self.gazebo_generated_world_path_line_edit.displayText()
if not os.path.isfile(gazebo_generated_world_file_path):
- self.throw_warning_dialog("File chosen to be transformed does not exist!")
+ throw_warning_dialog("File chosen to be transformed does not exist!")
return
- gws = GazeboWorldSaver(gazebo_generated_world_file_path, output_file_path[0])
+ GazeboWorldSaver(gazebo_generated_world_file_path, output_file_path[0])
def enable_world_path(self):
self.world_line_edit.setEnabled(True)
@@ -146,19 +137,20 @@ def get_file_path(self):
chosen_path = QFileDialog.getOpenFileName(self._widget, 'Open file', "")
return chosen_path[0]
- def throw_warning_dialog(self, message):
- msg = QMessageBox()
- msg.setIcon(QMessageBox.Warning)
- msg.setText(message)
- msg.setWindowTitle("Warning!")
- msg.setStandardButtons(QMessageBox.Ok)
- msg.exec_()
+
+def throw_warning_dialog(message):
+ msg = QMessageBox()
+ msg.setIcon(QMessageBox.Warning)
+ msg.setText(message)
+ msg.setWindowTitle("Warning!")
+ msg.setStandardButtons(QMessageBox.Ok)
+ msg.exec_()
if __name__ == "__main__":
rospy.init_node("sr_gazebo_world_generator")
app = QApplication(sys.argv)
planner_benchmarking_gui = SrWorldGeneratorGui(None)
- planner_benchmarking_gui._widget.show()
+ planner_benchmarking_gui._widget.show() # pylint: disable=W0212
signal.signal(signal.SIGINT, signal.SIG_DFL)
sys.exit(app.exec_())
diff --git a/sr_world_generator/setup.py b/sr_world_generator/setup.py
index ec30eea..17a6bd6 100644
--- a/sr_world_generator/setup.py
+++ b/sr_world_generator/setup.py
@@ -1,5 +1,4 @@
# ! DO NOT MANUALLY INVOKE THIS setup.py, USE CATKIN INSTEAD
-from __future__ import absolute_import
from setuptools import setup
from catkin_pkg.python_setup import generate_distutils_setup
diff --git a/sr_world_generator/src/sr_world_generator/save_world_file.py b/sr_world_generator/src/sr_world_generator/save_world_file.py
index 7bedc5c..cb42d13 100755
--- a/sr_world_generator/src/sr_world_generator/save_world_file.py
+++ b/sr_world_generator/src/sr_world_generator/save_world_file.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
-# Copyright 2019 Shadow Robot Company Ltd.
+# Copyright 2019, 2022 Shadow Robot Company Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
@@ -14,22 +14,19 @@
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
-from __future__ import absolute_import
import re
import os
-import rospy
-import rospkg
import subprocess
+import rospy
import rospkg
from gazebo_msgs.msg import ModelStates
from tf.transformations import euler_from_quaternion
-class GazeboWorldSaver(object):
+class GazeboWorldSaver:
def __init__(self, gazebo_generated_world_file_path=None, output_world_file_path=None):
self.model_and_pose = {}
-
if gazebo_generated_world_file_path is None:
self.gazebo_generated_world_file_path = rospy.get_param('~gazebo_generated_world_file_path')
else:
@@ -60,6 +57,7 @@ def _check_if_world_file_exists(self):
raise IOError("Gazebo generated world file does not exist!")
def _start_gazebo_with_newly_created_world(self):
+ # pylint: disable=R1732
self.process = subprocess.Popen(['xterm -e roslaunch sr_world_generator \
create_world_template.launch gui:=false \
scene:=true world:={}'.format(self.gazebo_generated_world_file_path)],
@@ -73,8 +71,9 @@ def _get_gazebo_models_states(self):
self.gazebo_model_states_msg = rospy.wait_for_message('/gazebo/model_states', ModelStates)
def _extract_model_data_from_msg(self):
- for model_name, pose in zip(self.gazebo_model_states_msg.name, self.gazebo_model_states_msg.pose):
- if 'ursr' == model_name:
+ for model_name, pose in zip(self.gazebo_model_states_msg.name,
+ self.gazebo_model_states_msg.pose): # pylint: disable=E1101
+ if model_name == 'ursr':
continue
position_as_list = [pose.position.x, pose.position.y, pose.position.z]
orientation_as_list = [pose.orientation.x, pose.orientation.y, pose.orientation.z, pose.orientation.w]
@@ -94,13 +93,13 @@ def _save_models_to_world_file(self):
self._save_to_world_file(all_objects_string)
def _save_lighting_config_to_world_file(self):
- with open(self.config_path + '/gazebo_light_string', 'r') as myfile:
+ with open(self.config_path + '/gazebo_light_string', 'r', encoding="ASCII") as myfile:
data = myfile.readlines()
data = ''.join(data) + '\n'
self._save_to_world_file(data)
def _save_physics_config_to_world_file(self):
- with open(self.config_path + '/gazebo_physics_string', 'r') as myfile:
+ with open(self.config_path + '/gazebo_physics_string', 'r', encoding="ASCII") as myfile:
data = myfile.readlines()
data = ''.join(data) + '\n'
self._save_to_world_file(data)
@@ -115,7 +114,7 @@ def _finish_up_world_file(self):
self._save_to_world_file(trailing_string)
def _save_to_world_file(self, string):
- with open(self.output_world_file_path, 'a') as myfile:
+ with open(self.output_world_file_path, 'a', encoding="ASCII") as myfile:
myfile.write(string)
def _remove_output_file_if_exists(self):
diff --git a/strain_gauge_calibration/scripts/calib_strain_gauges.py b/strain_gauge_calibration/scripts/calib_strain_gauges.py
index dc4a3c7..129d696 100755
--- a/strain_gauge_calibration/scripts/calib_strain_gauges.py
+++ b/strain_gauge_calibration/scripts/calib_strain_gauges.py
@@ -1,6 +1,6 @@
-#!/usr/bin/env python 3
+#!/usr/bin/env python3
-# Copyright 2019 Shadow Robot Company Ltd.
+# Copyright 2019, 2022 Shadow Robot Company Ltd.
#
# This program is free software: you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the Free
@@ -14,18 +14,15 @@
# You should have received a copy of the GNU General Public License along
# with this program. If not, see .
-from __future__ import absolute_import, division
from builtins import round, input
-import os
-import rospy
-import numpy
import csv
+import numpy
+import rospy
from diagnostic_msgs.msg import DiagnosticArray
-class Calib_gauges():
+class CalibGauges():
def __init__(self):
-
self.initial_weight = rospy.get_param('~initial_weight')
self.incremental_weight = rospy.get_param('~incremental_weight')
self.final_weight = rospy.get_param('~final_weight')
@@ -37,23 +34,18 @@ def __init__(self):
self.temp_0 = []
self.headline_1 = []
self.temp_1 = []
+ self.sub = None
def data_callback(self, data):
-
# iterate over the DiagnosticStatus message
- for i, value in enumerate(data.status):
-
+ for i, ds_value in enumerate(data.status):
fields = {}
-
# iterate over values array and store them in a dictionary with index and relative value
- for key, s in enumerate(value.values):
-
- fields[i, s.key] = s.value
-
+ for _, value in enumerate(ds_value.values):
+ fields[i, value.key] = value.value
# iterate over dictionary elements to search the wanted motor_id and the relative strain_gauge value
- for index, motor_id in list(fields.items()):
-
- if fields.get((index[0], 'Motor ID')) == "%s" % self.motor_id:
+ for index, _ in list(fields.items()):
+ if fields.get((index[0], 'Motor ID')) == f"{self.motor_id}":
if self.strain_gauges_id == 0:
# store measurement in array
self.measurement.append(fields.get((index[0], 'Strain Gauge Left')))
@@ -62,36 +54,27 @@ def data_callback(self, data):
self.measurement.append(fields.get((index[0], 'Strain Gauge Right')))
def run_test(self):
-
self.motor_id = input("insert motor id: ")
-
for self.strain_gauges_id in range(0, 2):
-
self.initial_weight = 0
-
- for weight in range(0, self.number_of_tests):
-
+ for _ in range(0, self.number_of_tests):
self.initial_weight += self.incremental_weight
-
- print(("Apply %s g to strain_gauges %s" % (self.initial_weight, self.strain_gauges_id)))
+ print(f"Apply {self.initial_weight} g to strain_gauges {self.strain_gauges_id}")
input("Press enter when you are done...")
-
# Subscribe diagnostic topic to collect data
self.sub = rospy.Subscriber("/diagnostics_agg", DiagnosticArray, self.data_callback)
print("Measuring...")
-
rospy.sleep(5.0) # give time to collect measurements
self.sub.unregister() # stop subscribing topic
-
self.update_csv_file(self.measurement, self.strain_gauges_id) # write measurements into csv file
self.measurement = [] # empty measurement array
def update_csv_file(self, measurement, strain_gauges_id):
-
+ # pylint: disable=R1732
if strain_gauges_id == 0:
# create csv to write
- csv_output = csv.writer(open("Motor_%s_strain_gauge_%s.csv" % (self.motor_id, strain_gauges_id), "wb"),
- lineterminator='\n')
+ csv_output = csv.writer(open(f"Motor_{self.motor_id}_strain_gauge_{strain_gauges_id}.csv",
+ "wb", encoding="ASCII"), lineterminator='\n')
self.headline_0.append(self.initial_weight)
csv_output.writerow(self.headline_0) # write headline row with weight values
measurement = list(map(int, measurement)) # cast values to int
@@ -100,8 +83,8 @@ def update_csv_file(self, measurement, strain_gauges_id):
elif strain_gauges_id == 1:
# create csv to write
- csv_output = csv.writer(open("Motor_%s_strain_gauge_%s.csv" % (self.motor_id, strain_gauges_id), "wb"),
- lineterminator='\n')
+ csv_output = csv.writer(open(f"Motor_{self.motor_id}_strain_gauge_{strain_gauges_id}.csv", "wb",
+ encoding="ASCII"), lineterminator='\n')
self.headline_1.append(self.initial_weight)
csv_output.writerow(self.headline_1) # write headline row with weight values
measurement = list(map(int, measurement))
@@ -110,9 +93,8 @@ def update_csv_file(self, measurement, strain_gauges_id):
if __name__ == '__main__':
-
rospy.init_node("Calib_strain_gauges", anonymous=True)
- calib = Calib_gauges()
+ calib = CalibGauges()
calib.run_test()
# Output of the file Motor_11_strain_gauge_0.csv