import sys from xarm.wrapper import XArmAPI def print_message(arg): print(arg) def UpdateStatus(arg): pass ROBOTCOM_UNKNOWN = -1000 ROBOTCOM_CONNECTION_PROBLEMS = -3 ROBOTCOM_DISCONNECTED = -2 ROBOTCOM_NOT_CONNECTED = -1 ROBOTCOM_READY = 0 ROBOTCOM_WORKING = 1 ROBOTCOM_WAITING = 2 nDOFs_MIN = 7 class ComRobot: """Robot class for programming Kinova robots""" LAST_MSG = None # Keep a copy of the last message received CONNECTED = False # Connection status is known at all times KINOVAAPI = None BUFFER_SIZE = None #TIMEOUT = None #Speed and accelerations LINEARSPEED = 100 LINEARACEL = 30 JOINTSPEED = 100 JOINTACEL = 80 LAST_TARGET_JOINTS = [] def check_for_end_or_abort(self,e): """Return a closure checking for END or ABORT notifications Arguments: e -- event to signal when the action is completed (will be set when an END or ABORT occurs) """ from kortex_api.autogen.messages import Base_pb2 def check(notification, e=e): print("EVENT : " + \ Base_pb2.ActionEvent.Name(notification.action_event)) if notification.action_event == Base_pb2.ACTION_END \ or notification.action_event == Base_pb2.ACTION_ABORT: e.set() return check def joints_to_kinova(self,jnts): jnts2 = [] for j in jnts: jnts2.append(j+360) return jnts2 def joints_from_kinova(self,jnts): jnts2 = [] for j in jnts: jnts2.append(j-360) return jnts2 # This is executed when the object is created def __init__(self): self.BUFFER_SIZE = 512 # bytes #self.TIMEOUT = 5 * 60 # seconds: it must be enough time for a movement to complete # self.TIMEOUT = 10 # seconds self.CONNECTED = False self.router = None self.base = None self.base_cyclic = None def __del__(self): self.disconnect() # Disconnect from robot def disconnect(self): if self.router is not None: try: self.router.finish() except: print("Failed to properly disconnect") self.CONNECTED = False self.router = None self.base = None self.base_cyclic = None return True # Connect to robot def connect(self, ip, port=-1): self.disconnect() print_message('Connecting to robot %s:%i' % (ip, port)) # Create new socket connection UpdateStatus(ROBOTCOM_WORKING) #try: if True: from kortex_api.autogen.client_stubs.BaseClientRpc import BaseClient from kortex_api.autogen.client_stubs.BaseCyclicClientRpc import BaseCyclicClient #self.router = DeviceConnection(ip, port=port, credentials=('admin', "admin")) self.router = DeviceConnection.createTcpConnection(ip) #with self.router: # self.base = BaseClient(self.router) # self.base_cyclic = BaseCyclicClient(self.router) # Start communication self.router.start() # Create required services self.base = BaseClient(self.router) self.base_cyclic = BaseCyclicClient(self.router) #except Exception as e: # print_message(str(e)) # return False self.CONNECTED = True sys.stdout.flush() return True def MoveJ(self,coordArray): # Joint move to the provided angles from kortex_api.autogen.messages import Base_pb2 print("Starting angular action movement ...") action = Base_pb2.Action() action.name = "Angular action movement" action.application_data = "" actuator_count = self.base.GetActuatorCount() # Set the joint angles for i, joint_id in enumerate(range(actuator_count.count)): joint_angle = action.reach_joint_angles.joint_angles.joint_angles.add() joint_angle.joint_identifier = joint_id offset = 0.0 scale = 1.0 #if joint_id == 3: # # Requires a 90 degree offset # offset = 90 #if joint_id == 0 or joint_id == 5: # # Reverse rotation of the first joint # scale *= -1 #if joint_id == 5: # # Kinova robot must operate from -149 to 149 # if coordArray[i] < -149 or coordArray[i] > 149: # offset -= 180 #if scale * coordArray[i] + offset < 0: # offset += 360 joint_angle.value = scale * coordArray[i] + offset #if joint_angle.value > 360: # joint_angle.value -= 360 #if joint_angle.value < 0: # joint_angle.value += 360 import threading e = threading.Event() notification_handle = self.base.OnNotificationActionTopic( self.check_for_end_or_abort(e), Base_pb2.NotificationOptions() ) print("Executing action") self.base.ExecuteAction(action) #print("Waiting for movement to finish ...") while True: finished = e.wait(0.02) if finished: #print("Angular movement completed") self.base.Unsubscribe(notification_handle) return True # Monitor joints if MONITOR_MOVE: jnts = self.getJoints() print_joints(jnts, True) pass self.base.Unsubscribe(notification_handle) return False def MoveL(self,xyzabc): import threading """Linear move (joint values provided in degrees)""" # Create new action from kortex_api.autogen.messages import Base_pb2 action = Base_pb2.Action() action.name = "Linear move from RoboDK" action.application_data = "" x,y,z,rx,ry,rz = xyzabc[:6] cartesian_pose = action.reach_pose.target_pose cartesian_pose.x = x cartesian_pose.y = y cartesian_pose.z = z cartesian_pose.theta_x = rx cartesian_pose.theta_y = ry cartesian_pose.theta_z = rz e = threading.Event() notification_handle = self.base.OnNotificationActionTopic( self.check_for_end_or_abort(e), Base_pb2.NotificationOptions() ) #print("Executing action") self.base.ExecuteAction(action) #print("Waiting for movement to finish ...") while True: finished = e.wait(0.02) if finished: #print("Angular movement completed") self.base.Unsubscribe(notification_handle) return True # Monitor joints if MONITOR_MOVE: jnts = self.getJoints() print_joints(jnts, True) pass self.base.Unsubscribe(notification_handle) return False def MoveC(self,joints): raise Exception("Circular move not implemented") #try: # self.UARMAPI.set_mode(mode=1) #Mode 1 corresponds to moving # self.UARMAPI.motion_enable(True) # self.UARMAPI.move_circle(pose1=joints[0:6], pose2=joints[7:], percent=50, speed=self.JOINTSPEED, mvacc=self.JOINTACELL, #wait=True) #self.LAST_TARGET_JOINTS = joints #except Exception as e: # print_message(str(e)) # return False #return True def GripperPosition(self, gripperPosition): # Open the gripper the indicated amount # Passed: Integer position between 0 (open) and 1 (closed) # Create the GripperCommand we will send gripper_command = Base_pb2.GripperCommand() finger = gripper_command.gripper.finger.add() # Move the gripper gripper_command.mode = Base_pb2.GRIPPER_POSITION finger.value = gripperPosition finger.finger_identifier = 1 print("Actuating gripper to {:0.2f}...".format(finger.value)) self.base.SendGripperCommand(gripper_command) # Wait for reported position to be reached or speed reaches zero gripper_request1 = Base_pb2.GripperRequest() gripper_request1.mode = Base_pb2.GRIPPER_POSITION gripper_request2 = Base_pb2.GripperRequest() gripper_request2.mode = Base_pb2.GRIPPER_SPEED while True: gripper_measure = self.base.GetMeasuredGripperMovement(gripper_request1) gripper_speed = self.base.GetMeasuredGripperMovement(gripper_request2) if len(gripper_measure.finger): print("Current position is : {0}".format(gripper_measure.finger[0].value)) if abs(gripper_measure.finger[0].value - gripperPosition) < 0.01: break if gripper_speed == 0.0: break else: # Else, no finger present in answer, end loop break def getJoints(self): actuator_count = self.base.GetActuatorCount() # TODO: Debug data: print(actuator_count) print("TODO: DEBUG DATA") joints = actuator_count.values return joints_from_kinova(joints) def setSpeed(self, speed_values): # speed_values[0] = speed_values[0] # linear speed in mm/s # speed_values[1] = speed_values[1] # joint speed in mm/s # speed_values[2] = speed_values[2] # linear acceleration in mm/s2 # speed_values[3] = speed_values[3] # joint acceleration in deg/s2 if (speed_values[0] != -1): self.LINEARSPEED = speed_values[0] if (speed_values[1] != -1): self.JOINTSPEED = speed_values[1] if (speed_values[2] != -1): self.LINEARACEL = speed_values[2] if (speed_values[3] != -1): self.JOINTACEL = speed_values[3] return True def setTool(self,tool_pose): #self.UARMAPI.set_tcp_offset(tool_pose) return True def Pause(self,timeMS): import time time.sleep(timeMS/1000) return True def setRounding(self,rounding): return True def setDO(self,digital_IO_State): return False def WaitDI(self,digital_IO_Num): return False class DeviceConnection: TCP_PORT = 10000 UDP_PORT = 10001 @staticmethod def createTcpConnection(ip, username="admin", password="admin"): """ returns RouterClient required to create services and send requests to device or sub-devices, """ print("Using TCP connection") return DeviceConnection(ip, port=DeviceConnection.TCP_PORT, credentials=(username, password)) @staticmethod def createUdpConnection(ip, username="admin", password="admin"): """ returns RouterClient that allows to create services and send requests to a device or its sub-devices @ 1khz. """ print("Using UDP connection") return DeviceConnection(ip, port=DeviceConnection.UDP_PORT, credentials=(username, password)) def __init__(self, ipAddress, port=TCP_PORT, credentials = ("","")): from kortex_api.TCPTransport import TCPTransport from kortex_api.UDPTransport import UDPTransport from kortex_api.RouterClient import RouterClient, RouterClientSendOptions self.ipAddress = ipAddress self.port = port self.credentials = credentials self.sessionManager = None # Setup API self.transport = TCPTransport() if port == DeviceConnection.TCP_PORT else UDPTransport() self.router = RouterClient(self.transport, RouterClient.basicErrorCallback) def start(self): self.transport.connect(self.ipAddress, self.port) if (self.credentials[0] != ""): from kortex_api.SessionManager import SessionManager from kortex_api.autogen.messages import Session_pb2 session_info = Session_pb2.CreateSessionInfo() session_info.username = self.credentials[0] session_info.password = self.credentials[1] session_info.session_inactivity_timeout = 10000 # (milliseconds) session_info.connection_inactivity_timeout = 2000 # (milliseconds) self.sessionManager = SessionManager(self.router) print("Logging as", self.credentials[0], "on device", self.ipAddress) self.sessionManager.CreateSession(session_info) return self.router def finish(self): if self.sessionManager != None: from kortex_api.RouterClient import RouterClient, RouterClientSendOptions router_options = RouterClientSendOptions() router_options.timeout_ms = 1000 self.sessionManager.CloseSession(router_options) self.transport.disconnect() # Called when entering 'with' statement def __enter__(self): return self.start() # Called when exiting 'with' statement def __exit__(self, exc_type, exc_value, traceback): self.finish() KinovaGen3 = ComRobot() KinovaGen3.connect('192.168.1.10', 12345) # Program Start,prog def prog(): global KinovaGen3 # Generating program: prog #Program generated by RoboDK v5.4.3 for Kinova Gen3 on 27/09/2022 16:46:51 #Using nominal kinematics. KinovaGen3.MoveJ([0.000000,0.000000,0.000000,-90.000200,0.000000,42.971800,37.242300]) KinovaGen3.MoveJ([0.000000,0.000000,0.000000,-90.000200,0.000000,42.971800,37.242300]) KinovaGen3.MoveJ([1.866630,1.685410,-2.212350,-86.515200,0.785025,40.706300,35.315000]) KinovaGen3.MoveJ([3.733260,3.370820,-4.424700,-83.030200,1.570050,38.440700,33.387700]) KinovaGen3.MoveJ([5.599900,5.056230,-6.637050,-79.545300,2.355080,36.175100,31.460400]) KinovaGen3.MoveJ([5.938930,4.792890,-6.223390,-81.586000,2.258240,40.008700,27.772200]) KinovaGen3.MoveJ([6.277960,4.529560,-5.809740,-83.626700,2.161410,43.842200,24.084000]) KinovaGen3.MoveJ([6.616990,4.266220,-5.396080,-85.667500,2.064580,47.675700,20.395800]) KinovaGen3.MoveJ([6.956020,4.002890,-4.982430,-87.708200,1.967750,51.509200,16.707600]) KinovaGen3.MoveJ([7.295050,3.739550,-4.568780,-89.748900,1.870920,55.342800,13.019400]) KinovaGen3.MoveJ([7.634080,3.476220,-4.155120,-91.789700,1.774090,59.176300,9.331170]) KinovaGen3.MoveJ([7.973110,3.212880,-3.741470,-93.830400,1.677260,63.009800,5.642970]) KinovaGen3.MoveJ([8.312140,2.949550,-3.327820,-95.871100,1.580430,66.843300,1.954780]) KinovaGen3.MoveJ([8.651170,2.686210,-2.914160,-97.911900,1.483600,70.676800,-1.733420]) KinovaGen3.MoveJ([8.990200,2.422880,-2.500510,-99.952600,1.386760,74.510400,-5.421620]) KinovaGen3.MoveJ([9.329230,2.159540,-2.086860,-101.993000,1.289930,78.343900,-9.109820]) KinovaGen3.MoveJ([9.668260,1.896200,-1.673200,-104.034000,1.193100,82.177400,-12.798000]) KinovaGen3.MoveJ([10.007300,1.632870,-1.259550,-106.075000,1.096270,86.010900,-16.486200]) KinovaGen3.MoveJ([10.346300,1.369530,-0.845895,-108.116000,0.999440,89.844500,-20.174400]) KinovaGen3.MoveJ([10.685400,1.106200,-0.432241,-110.156000,0.902609,93.678000,-23.862600]) KinovaGen3.MoveJ([11.024400,0.842862,-0.018587,-112.197000,0.805778,97.511500,-27.550800]) KinovaGen3.MoveJ([11.363400,0.579526,0.395066,-114.238000,0.708947,101.345000,-31.239000]) KinovaGen3.MoveJ([11.702400,0.316191,0.808720,-116.278000,0.612116,105.179000,-34.927200]) KinovaGen3.MoveJ([12.041500,0.052855,1.222370,-118.319000,0.515285,109.012000,-38.615400]) KinovaGen3.MoveJ([12.380500,-0.210480,1.636030,-120.360000,0.418454,112.846000,-42.303600]) KinovaGen3.MoveJ([12.719500,-0.473816,2.049680,-122.401000,0.321622,116.679000,-45.991800]) return #Call main prog()