Source code for socs.agents.wiregrid_actuator.drivers.Actuator

# Built-in python modules
import os
import time

# Specific module for actuator controller
on_rtd = os.environ.get('READTHEDOCS') == 'True'
if not on_rtd:
    import gclib

from socs.agents.wiregrid_actuator.drivers.DigitalIO import DigitalIO


[docs] class Actuator: """ The Actuator class is for writing commands and reading status of the actuator via the Galil actuator controller. Args: ip_address(string) : IP address of the actuator controller sleep(float) : sleep time for every commands ls_list(list) : limit-switch IO configurations st_list(list) : stopper IO configurations verbose(int) : verbosity level """ def __init__(self, ip_address='192.168.1.100', sleep=0.05, ls_list=[], st_list=[], verbose=0): self.ip_address = ip_address self.sleep = sleep self.verbose = verbose self.STOP = False # This will become True in emergency stop. self.maxwaitloop = 10 self.maxwaitloop_for_read = 1000 # Actuator speed/distance setup self.speed_max = 2000 self.speed_min = 0 # scale factor [pulses/mm] mutiplied to distance [mm] self.distance_factor = 1000. / 5.5 # Open communication to the controller self.g = None self._connect() # Initialize Digital IO classes # for limit-switch & stopper self.ls = DigitalIO( 'limit-switch', ls_list, self.g, get_onoff_reverse=False) self.st = DigitalIO( 'stopper', st_list, self.g, get_onoff_reverse=False, set_onoff_reverse=False) def __del__(self): self._cleanG() print('Actuator:__del__(): ' 'Successfully close the actuator controller.') ###################### # Internal functions # ###################### # Send command & get return function # If an error occurs, it raises an error def _command(self, cmd, doSleep=True): if self.verbose > 0: print('Actuator:_command(): command = "{}"\\n'.format(cmd)) try: ret = self.g.GCommand(cmd) except Exception as e: msg = 'Actuator:_command(): ERROR!: Failed to send command({}).\n'\ 'Actuator:_command(): ERROR!: Exception = "{}"'\ .format(cmd, e) print(msg) raise if self.verbose > 0: print('Actuator:_command(): response = "{}"\\n'.format(ret)) if doSleep: time.sleep(self.sleep) return ret def _cleanG(self): if self.g is not None: self.g.GClose() del self.g self.g = None if self.verbose > 0: print('Actuator:_cleanG(): ' 'Successfully cleaned the Galil connection') return True def _connect(self): self._cleanG() # Open communication to the controller print('Actuator:_connect(): ' 'Initialize the Galil actuator controller') self.g = gclib.py() if self.g is None: msg = 'Actuator:_connect() : ERROR!: Failed to '\ 'initialize the connection to the actuator controller.' raise RuntimeError(msg) self.g.GOpen('{}'.format(self.ip_address)) # Connection check print('Actuator:_connect(): {}'.format(self.g.GInfo())) status = self._check_motortype() if not status: print('Actuator:_connect(): WARNING! ' 'Motor type is not correct!') print('Actuator:_connect(): ' '--> Power off & change the motor types!') # Set controller parameters # Motor OFF (need for MT command) self._set_motor_onoff(onoff=False) # Motor type: stepper with active low(2)/high(2.5) step pulses self._command('MT 2,2') # Motor ON (A,B,N[virtual gear]) self._set_motor_onoff(onoff=True) self._set_actuator_parameters() status = self.check_connect() if not status: msg = 'Actuator:_connect(): ERROR!: '\ 'check_connect() is failed.' raise RuntimeError(msg) time.sleep(1) print('Actuator:_connect(): Successfully make a connection.') return True # Set motor ON/OFF # Args: onoff = True or False def _set_motor_onoff(self, onoff): if onoff: # ON self._command('SH AB') else: # OFF self._command('MO') return True # Check the motor type ([2,2] is correct.) def _check_motortype(self): try: ret = self._command('MT ?,?') mts = [int(float(motor_type)) for motor_type in ret.split(',')] except Exception as e: msg = \ 'Actuator:_check_motortype(): ERROR!: Failed to check '\ 'the motor type! | '\ 'Exception = "{}"'.format(e) raise RuntimeError(msg) if len(mts) != 2: print('Actuator:_check_motortype(): ' 'WARNING!: Returned motor type = {}. ' 'Array size is not correct.' .format(mts)) return False else: if not (mts[0] == 2 and mts[1] == 2): print('Actuator:_check_motortype(): ' 'WARNING!: Returned motor type = {}. ' 'Motor type is not correct (should be [2,2]).' .format(mts)) return False if self.verbose > 0: print('Actuator:_check_motortype(): ' 'Motor type is correct!') return True def _set_actuator_parameters(self): # Stop motion self._command('ST') # Set master axis of A & B is N self._command('GAA=N') self._command('GAB=N') # Set gear to N self._command('GRA=1') self._command('GRB=-1') # Smoothing pulse: sample value=16 self._command('KSA=16') self._command('KSB=16') # Set current: 3A self._command('AGA=3') self._command('AGB=3') # Set microstepping: 1/8 self._command('YAA=8') self._command('YAB=8') # Set motor resolutio: 200 steps/revolution = 1.8deg/step self._command('YBA=200') self._command('YBB=200') self._command('YBN=200') # Set speed: speed_max self._command('SPA={}'.format(self.speed_max)) self._command('SPB={}'.format(self.speed_max)) self._command('SPN={}'.format(self.speed_max)) # Set positioin self._command('PRA=0') self._command('PRB=0') self._command('PRN=0') if self.verbose > 0: print('Actuator:_set_actuator_parameters(): ' 'Successfully set the actuator controller parameters!') return True ################## # Main functions # ################## # Return: success (True) or failure (False) def move(self, distance, speedrate=0.1): self._set_actuator_parameters() print('Actuator:move(): distance = {}, speedrate = {}' .format(distance, speedrate)) if self.STOP: msg = 'Actuator:move(): WARNING!: Did NOT move due to STOP flag.' print(msg) return False if speedrate < 0. or speedrate > 5.: print('Actuator:move(): WARNING!: ' 'Speedrate should be between 0 and 5.') print('Actuator:move(): WARNING!: ' 'Speedrate is set to 1.0.') speedrate = 1.0 speed = \ int(speedrate * (self.speed_max - self.speed_min) + self.speed_min) # distance_count is an absolute value distance_count = int(abs(distance) * self.distance_factor) # Change the gearing if distance >= 0.: # Forwarding self._command('GRA=1') self._command('GRB=-1') dlabel = 'forwarding' else: # Backwarding self._command('GRA=-1') self._command('GRB=1') dlabel = 'backwarding' if self.verbose > 0: print('Actuator:move(): distance_count = {} ({})' .format(distance_count, dlabel)) # Set the speed and distance self._command('SPA={}'.format(0)) self._command('SPB={}'.format(0)) self._command('SPN={}'.format(speed)) self._command('PRA={}'.format(0)) self._command('PRB={}'.format(0)) self._command('PRN={}'.format(distance_count)) # Start motion print('Actuator:move(): Start the moving...') self._command('BGN') if self.verbose > 0: print('Actuator:move(): Succsessfully sent the moving commands!') return True # Return 1 (running) or 0 (stopping) def is_run(self): ret = self._command('MG _BGN', doSleep=True) isrun = (int)((float)(ret)) if self.verbose > 0: print('Actuator:is_run() : running status = "{}"'.format(isrun)) return isrun # Wait for the end of moving # Args: max_loop_time = maximum waiting time [sec] # Return: success (True) or failure (False) def wait_idle(self, max_loop_time=180): # Number of loop for max_loop_time [sec] max_loop = int(max_loop_time / self.sleep) for i in range(max_loop): isrun = self.is_run() if not isrun: print('Actuator:wait_idle(): The running is finished!') return True print('Actuator:wait_idle(): ERROR!: ' 'Exceed max. number of loop ({} times)'.format(i)) return False # Check the connection def check_connect(self): status = self._check_motortype() if not status: print('Actuator:check_connect(): ERROR!: ' 'The connection to the actuator controller is BAD!') return False else: if self.verbose > 0: print('Actuator:check_connect(): ' 'The connection to the actuator controller is OK!') return True def reconnect(self): print('Actuator:reconnect() : *** Trying to reconnect... ***') for i in range(self.maxwaitloop): # reconnect print('Actuator:reconnect(): {}th try to reconnection'.format(i)) try: self._connect() except Exception as e: msg = 'Actuator:reconnect(): WARNING!: '\ 'Failed to reconnect to the actuator controller! (i={})'\ ' | Exception = "{}"'.format(i, e) print(msg) time.sleep(1) continue # Set g on DigitalIOs self.ls.g = self.g self.st.g = self.g print('Actuator:reconnect(): ' 'Successfully reconnect to the actuator controller!') return True print('Actuator:reconnect(): ERROR!: Exceed the max. number of ' 'trying to reconnect to the actuator controller.') return False # Get motor ON/OFF # Return: 1 (ON) or 0 (OFF) def get_motor_onoff(self): try: # 0: ON, 1:OFF retA = self._command('MG _MOA') retB = self._command('MG _MOB') onoffA = int(float(retA)) onoffB = int(float(retB)) # invert to 0: OFF, 1: ON onoffA = int(not onoffA) onoffB = int(not onoffB) except Exception as e: msg =\ 'Actuator:get_motor_onoff(): ERROR!: '\ 'Failed to get motor on/off! | '\ 'Exception = "{}"'.format(e) print(msg) raise if onoffA == 1 and onoffB == 1: return 1 elif onoffA == 0 and onoffB == 0: return 0 else: msg =\ 'Actuator:get_motor_onoff(): ERROR!: '\ 'Strange status of motor on/off! '\ 'motor A = {} / motor B = {}'.format( 'ON' if onoffA else 'OFF', 'ON' if onoffB else 'OFF') raise RuntimeError(msg) # Set motor ON/OFF # Args: onoff = True or False def set_motor_onoff(self, onoff): try: self._set_motor_onoff(onoff) except Exception as e: msg = \ 'Actuator:set_motor_onoff(): ERROR!: '\ 'Failed to set motor {}! | '\ 'Exception = "{}"'\ .format('ON' if onoff else 'OFF', e) print(msg) raise onoff_test = self.get_motor_onoff() if onoff_test != int(onoff): msg = \ 'Actuator:set_motor_onoff(): ERROR!: '\ 'Set motor {} but the current ON/OFF ("{}") is different!'\ .format('ON' if onoff else 'OFF', onoff_test) raise RuntimeError(msg) print('Actuator:set_motor_onoff(): ' 'Successfully {} the actuator motors!' .format('ON' if onoff else 'OFF')) return True # Return: success (True) or failure (False) def stop(self): if self.verbose > 0: print('Actuator:stop(): Stop the actuator') for i in range(self.maxwaitloop): self._command('ST') isrun = self.is_run() if not isrun: if self.verbose > 0: print('Actuator:stop(): Successfully stop the actuator!') return True print('Actuator:stop(): WARNING!: ' 'Could not stop the actuator! --> Retry') print('Actuator:stop(): ERROR!: ' 'Exceed the max. number of retries ({} times).'.format(i)) return False # Return: success (True) or failure (False) def hold(self): if self.verbose > 0: print('Actuator:hold(): Hold the actuator') self.STOP = True ret = self.stop() return ret # Release(unhold) the hold state def release(self): if self.verbose > 0: print('Actuator:release(): Release the actuator from hold state') self.STOP = False if self.verbose > 0: print('Actuator:release(): Successfully release the actuator!') return True