MECA_IP = "192.168.0.100" # IP of the robot MECA_PORT = 10000 # Communication port import time import sys global robot def Main(): '''Main procedure''' global robot # RobotConnect() print_message("Running program Main...") # Program generated by RoboDK v5.3.0 for Meca500 on 14/11/2021 12:12:00 # Using nominal kinematics. X_Tar01() Meca_Tar01() X_Tar02() Meca_Tar01() print_message("Program Main Sent") def Meca_Tar01(): global robot print_message("Running program Meca_Tar01...") robot.Run('SetWRF', [-75.000, 250.000, -100.000, 0.000, 0.000, -90.000]) robot.Run('SetTRF', [0.000, 0.000, 50.000, -0.000, 0.000, -0.000]) robot.Run('MoveJoints', [0.000000, 0.000000, 0.000000, 0.000000, 0.000000, -0.000000]) print_message("Program Meca_Tar01 Sent") def _MainProgram(): Main() def Gripper(set_open=0): """Gripper command for Meca R1""" global robot robot.Run("Gripper", [set_open]) def GripperOpen(): """Gripper command for Meca R2 and later""" global robot robot.Run("GripperOpen") def GripperClose(): """Gripper command for Meca R2 and later""" global robot robot.Run("GripperClose") def RobotConnect(): # Important: try to first disconnect: RobotDisconnect() # Establish connection with the robot global robot # Try to connect robot = MecaRobot(MECA_IP, MECA_PORT) def Sync(): global robot robot.Run('Delay', 0, sync=True) def RobotDisconnect(): # Disconnect from the robot if we have already been connected to the robot if not 'robot' in globals(): return global robot try: robot.disconnect() except Exception as e: print_message(str(e)) #----------- communication class ------------- def print_message(message): """Force a print output""" print(message) sys.stdout.flush() def msg_info(robot_msg): if robot_msg is None: return False, -1, "No communication" problems = False msg_id = int(robot_msg[1:5]) msg_str = robot_msg[7:-2] # msg_id = 1000 to 1500 are error codes if msg_id < 1500: problems = True else: # Other error codes error_codes = [3001, 3003] if msg_id in error_codes: problems = True if problems: print_message(robot_msg) return problems, msg_id, msg_str class MecaRobot: """Robot class for programming Mecademic robots""" def __init__(self, ip, port): import socket # This pause provokes required flushing and it needs to be longer for older Meca robots pause_com = 0.1 self.BUFFER_SIZE = 512 # bytes self.TIMEOUT = 999999 # seconds self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(self.TIMEOUT) print_message('Connecting to robot %s:%i' % (ip, port)) try: self.sock.connect((ip, port)) except ConnectionRefusedError: #print_message("Connection refused. You may need to reboot your Meca robot.") UpdateStatus(ROBOTCOM_CONNECTION_PROBLEMS) print_message("Connection Refused: Reboot Meca robot.") time.sleep(2) return print_message('Waiting for welcome message...') # receive welcome message and output to the log problems, msg_id, msg_str = msg_info(self.recv_str()) if problems: print_message(msg_str) time.sleep(2) raise Exception(msg_str) return # Reset errors, send activate robot and read confirmation self.sock.settimeout(10) self.Run('ResetError',sync=True) self.Run('ActivateRobot',sync=True) self.sock.settimeout(30) self.Run('Home',sync=True) time.sleep(pause_com) def __del__(self): self.disconnect() def disconnect(self): if self.sock != None: self.sock.close() self.sock = None def send_str(self, msg): sent = self.sock.send(bytes(msg+'\0','ascii')) if sent == 0: raise RuntimeError("Robot connection broken") def recv_str(self): bdata = self.sock.recv(self.BUFFER_SIZE) if bdata == b'': raise RuntimeError("Robot connection broken") return bdata.decode('ascii') def Run(self, cmd, values=None, sync=False): if isinstance(values, list): if cmd == "Gripper": str_send = cmd + '(' + (','.join(format(vi, ".0f") for vi in values)) + ')' else: str_send = cmd + '(' + (','.join(format(vi, ".6f") for vi in values)) + ')' elif values is None: str_send = cmd else: str_send = cmd + '(' + str(values) + ')' print_message('Running: ' + str_send) # send command to robot self.send_str(str_send) if sync: robot_msg = self.recv_str() problems, msg_id, msg_str = msg_info(robot_msg) print_message('Received: ' + robot_msg) if problems: raise Exception(msg_str) return True def UploadProgram(prog_id=1): global robot print_message("Uploading program %i" % prog_id) robot.Run("DeactivateRobot", sync=True) robot.Run("StartSaving(%i)" % prog_id, sync=True) _MainProgram() robot.Run("StopSaving", sync=True) if __name__ == "__main__": """Call Main procedure""" # It is important to disconnect the robot if we force to stop the process import atexit atexit.register(RobotDisconnect) # Connect to robot RobotConnect() # Check command line options. For example, we can upload the program as program 1 passing -up1 import sys if len(sys.argv) >= 2: prog_id = sys.argv[1] print("Running command line option: %s" % prog_id) if prog_id.startswith("-up"): prog_id = int(prog_id[3:]) UploadProgram(prog_id) print_message("Done with command line options") else: # Normal execution as a file itself _MainProgram() print_message("Program sent.") print_message("Waiting for program to finish...") robot.sock.settimeout(1e6) # Wait for the robot to process commands problems, msg_id, msg_str = msg_info(robot.recv_str()) while not problems and msg_id != 3012: print_message("Working... (" + msg_str + ")") problems, msg_id, msg_str = msg_info(robot.recv_str()) print_message("Done!") #print_message("Done. Closing in 2 seconds...") #time.sleep(2) RobotDisconnect() # pause execution before closing process (allows users to read last message) #time.sleep(2)