import sys
import time
import serial
from socs.common import moxa_serial as mx
[docs]
class PMX:
"""The PMX object is for communicating with the Kikusui PMX power supplies.
Args:
rtu_port (str): Serial RTU port
tcp_ip (str): TCP IP address
tcp_port (int): TCP port
timeout (int): Connection timeout
"""
def __init__(self, rtu_port=None, tcp_ip=None, tcp_port=None, timeout=None):
# Connect to device
self.using_tcp = None
self._rtu_port = None
self.ser = None
msg = self.__conn(rtu_port, tcp_ip, tcp_port, timeout)
print(msg)
self._remote_Mode()
# Timing variables
self._tstep = 0.1 # sec
def __del__(self):
if not self.using_tcp:
print(
"Disconnecting from RTU port %s"
% (self._rtu_port))
if self.ser:
self.ser.close()
else:
print(
"Disconnecting from TCP IP %s at port %d"
% (self._tcp_ip, self._tcp_port))
return
[docs]
def check_connect(self):
"""Check the connection."""
try:
if not self.using_tcp:
self.ser.inWaiting()
else:
self.clean_serial()
self.wait()
self.ser.write("OUTP?\n\r")
self.wait()
val = (self.ser.readline().strip())
val = int(val)
except Exception as e:
msg = 'Could not connect to the PMX serial! | Error: "{}"'.format(e)
return msg, False
return 'Successfully connect to the PMX serial.', True
[docs]
def check_voltage(self):
"""Check the voltage."""
self.clean_serial()
self.ser.write("MEAS:VOLT?\n\r")
self.wait()
try:
val = float(self.ser.readline())
msg = "Measured voltage = %.3f V" % (val)
# print(msg)
except ValueError:
val = -999.
msg = 'WARNING! Could not get correct voltage value! | Response = "%s"' % (val)
print(msg)
return msg, val
[docs]
def check_current(self):
"""Check the current."""
self.clean_serial()
self.ser.write("MEAS:CURR?\n\r")
self.wait()
try:
val = self.ser.readline()
curr = float(val)
msg = "Measured current = %.3f A" % (curr)
# print(msg)
except ValueError:
print(f"Could not convert '{val}' to float")
curr = -999.
msg = 'WARNING! Could not get correct current value! | Response = "%s"' % (val)
print(msg)
return msg, curr
[docs]
def check_voltage_current(self):
"""Check both the voltage and current."""
self.clean_serial()
voltage = self.check_voltage()[1]
current = self.check_current()[1]
# msg = (
# "Measured voltage = %.3f V\n"
# "Measured current = %.3f A\n"
# % (voltage, current))
# print(msg)
return voltage, current
[docs]
def check_voltage_setting(self):
""" Check the voltage setting """
self.clean_serial()
for i in range(10):
self.ser.write("VOLT?\n\r")
self.wait()
val = (self.ser.readline().strip())
if len(val) > 0:
break
try:
val = float(val)
msg = "Voltage setting = %.3f V" % (val)
# print(msg)
except ValueError:
val = -999.
msg = 'WARNING! Could not get correct voltage-setting value! | Response = "%s"' % (val)
print(msg)
return msg, val
[docs]
def check_current_setting(self):
""" Check the current setting """
self.clean_serial()
for i in range(10):
self.ser.write("CURR?\n\r")
self.wait()
val = (self.ser.readline().strip())
if len(val) > 0:
break
try:
val = float(val)
msg = "Current setting = %.3f A" % (val)
# print(msg)
except ValueError:
val = -999.
msg = 'WARNING! Could not get correct current-setting value! | Response = "%s"' % (val)
print(msg)
return msg, val
[docs]
def check_voltage_current_setting(self):
""" Check both the voltage and current setting """
self.clean_serial()
voltage = self.check_voltage_setting()[1]
current = self.check_current_setting()[1]
msg = (
"Voltage setting = %.3f V\n"
"Current setting = %.3f A\n"
% (voltage, current))
# print(msg)
return msg, voltage, current
[docs]
def check_output(self):
"""Return the output status."""
self.clean_serial()
self.ser.write("OUTP?\n\r")
self.wait()
try:
val = int(self.ser.readline())
except ValueError:
val = -999
msg = 'WARNING! Could not get correct output value! | Response = "%s"' % (val)
print(msg)
return msg, val
if val == 0:
msg = "Measured output state = OFF"
elif val == 1:
msg = "Measured output state = ON"
else:
msg = "Failed to measure output..."
# print(msg)
return msg, val
[docs]
def set_voltage(self, val, silent=False):
"""Set the PMX voltage."""
self.clean_serial()
self.ser.write("VOLT %f\n\r" % (float(val)))
self.wait()
self.ser.write("VOLT?\n\r")
self.wait()
val = self.ser.readline()
msg = "Voltage set = %.3f V" % (float(val))
if silent is not True:
print(msg)
return msg
[docs]
def set_current(self, val, silent=False):
"""Set the PMX on."""
self.clean_serial()
self.ser.write("CURR %f\n\r" % (float(val)))
self.wait()
self.ser.write("CURR?\n\r")
self.wait()
val = self.ser.readline()
msg = "Current set = %.3f A\n" % (float(val))
if silent is not True:
print(msg)
return msg
[docs]
def use_external_voltage(self):
"""Set PMX to use external voltage."""
self.clean_serial()
self.ser.write("VOLT:EXT:SOUR VOLT\n\r")
self.wait()
self.ser.write("VOLT:EXT:SOUR?\n\r")
self.wait()
val = self.ser.readline()
msg = "External source = %s" % (val)
print(msg)
return msg
[docs]
def ign_external_voltage(self):
"""Set PMX to ignore external voltage."""
self.clean_serial()
self.ser.write("VOLT:EXT:SOUR NONE\n\r")
self.wait()
self.ser.write("VOLT:EXT:SOUR?\n\r")
self.wait()
val = self.ser.readline()
msg = "External source = %s" % (val)
print(msg)
return msg
[docs]
def set_voltage_limit(self, val, silent=False):
"""Set the PMX voltage limit."""
self.clean_serial()
self.ser.write("VOLT:PROT %f\n\r" % (float(val)))
self.wait()
self.ser.write("VOLT:PROT?\n\r")
self.wait()
val = self.ser.readline()
msg = "Voltage limit set = %.3f V" % (float(val))
if silent is not True:
print(msg)
return msg
[docs]
def set_current_limit(self, val, silent=False):
"""Set the PMX current limit."""
self.clean_serial()
self.ser.write("CURR:PROT %f\n\r" % (float(val)))
self.wait()
self.ser.write("CURR:PROT?\n\r")
self.wait()
val = self.ser.readline()
msg = "Current limit set = %.3f A\n" % (float(val))
if silent is not True:
print(msg)
return msg
[docs]
def turn_on(self):
"""Turn the PMX on."""
self.clean_serial()
self.ser.write("OUTP ON\n\r")
self.wait()
self.ser.write("OUTP?\n\r")
self.wait()
val = self.ser.readline()
msg = "Output state = %s" % (val)
print(msg)
return msg
[docs]
def turn_off(self):
"""Turn the PMX off."""
self.clean_serial()
self.ser.write("OUTP OFF\n\r")
self.wait()
self.ser.write("OUTP?\n\r")
self.wait()
val = self.ser.readline()
msg = "Output state = %s" % (val)
print(msg)
return msg
# ***** Helper Methods *****
def __conn(self, rtu_port=None, tcp_ip=None, tcp_port=None, timeout=None):
"""Connect to the PMX module.
Args:
rtu_port (str): Serial RTU port
tcp_ip (str): TCP IP address
tcp_port (int): TCP port
timeout (int): Connection timeout
"""
if rtu_port is None and (tcp_ip is None or tcp_port is None):
raise Exception(
"Aborted PMX._conn() due to no RTU or "
"TCP port specified")
elif (rtu_port is not None
and (tcp_ip is not None or tcp_port is not None)):
raise Exception(
"Aborted PMX._conn() due to RTU and TCP port both being "
"specified. Can only have one or the other.")
elif rtu_port is not None:
self.ser = serial.Serial(
port=rtu_port, baudrate=19200, bytesize=8,
parity='N', stopbits=1, timeout=timeout)
self._rtu_port = rtu_port
self.using_tcp = False
msg = "Connected to RTU port %s" % (rtu_port)
elif tcp_ip is not None and tcp_port is not None:
self.ser = mx.Serial_TCPServer((tcp_ip, tcp_port), timeout)
self._tcp_ip = tcp_ip
self._tcp_port = int(tcp_port)
self.using_tcp = True
msg = "Connected to TCP IP %s at port %d" % (tcp_ip, tcp_port)
else:
raise Exception(
"Aborted PMX._conn() due to unknown error")
return msg
[docs]
def wait(self):
"""Sleep."""
time.sleep(0.05)
return True
[docs]
def clean_serial(self):
"""Flush the serial buffer."""
# if not False:
# self.ser.reset_input_buffer()
# self.ser.reset_output_buffer()
# self.ser.flush()
# else:
# self.ser.flushInput()
self.ser.flushInput()
return True
def _remote_Mode(self):
"""Enable remote control."""
self.clean_serial()
self.ser.write('SYST:REM\n\r')
self.wait()
return True
[docs]
class Command:
"""The Command object is used to command the PMX.
Args:
PMX (PMX): PMX object
"""
def __init__(self, input_PMX):
# PMX connection
if input_PMX is not None:
self._PMX = input_PMX
else:
raise Exception(
"PMX object not passed to Command constructor\n")
# Dict of commands
self._cmds = {
"set_port": "P",
"check_v": "V?",
"check_c": "C?",
"check_vc": "VC?",
"check_vs": "VS?",
"check_cs": "CS?",
"check_vcs": "VCS?",
"check_out": "O?",
"set_v": "V",
"set_c": "C",
"set_v_lim": "VL",
"set_c_lim": "CL",
"set_on": "ON",
"set_off": "OFF",
"get_help": "H",
"use_ext": "U",
"ign_ext": "I",
"stop": "Q"}
[docs]
def get_help(self):
"""Print possible commands."""
wrstr = (
"\nChange ttyUSB port = '%s'\n"
"Check output voltage = '%s'\n"
"Check output current = '%s'\n"
"Check output voltage and current = '%s'\n"
"Check voltage setting = '%s'\n"
"Check current setting = '%s'\n"
"Check voltage and current setting = '%s'\n"
"Check output state = '%s'\n"
"Set output voltage = '%s' [setting]\n"
"Set output current = '%s' [setting]\n"
"Set output voltage limit = '%s'\n"
"Set output current limit = '%s'\n"
"Turn output on = '%s'\n"
"Turn output off = '%s'\n"
"Print possible commands = '%s'\n"
"Use external voltage = '%s'\n"
"Ignore external voltage = '%s'\n"
"Quit program = '%s'\n"
% (self._cmds["set_port"],
self._cmds["check_v"],
self._cmds["check_c"],
self._cmds["check_vc"],
self._cmds["check_vs"],
self._cmds["check_cs"],
self._cmds["check_vcs"],
self._cmds["check_out"],
self._cmds["set_v"],
self._cmds["set_c"],
self._cmds["set_v_lim"],
self._cmds["set_c_lim"],
self._cmds["set_on"],
self._cmds["set_off"],
self._cmds["get_help"],
self._cmds["use_ext"],
self._cmds["ign_ext"],
self._cmds["stop"]))
return wrstr
# ***** Helper Methods *****
def _float(self, val):
"""Try to convert a value to a float."""
try:
return float(val)
except ValueError:
print("Input '%s' not understood, must be a float..." % (val))
return None
def _int(self, val):
"""Try to convert a value to an int."""
try:
return int(val)
except ValueError:
print("Input '%s' not understood, must be a int..." % (val))
return None