#!/usr/bin/env python3
"""blh motor driver controller"""
from datetime import datetime, timezone
from time import sleep
import numpy as np
import serial
from socs.agents.orientalmotor_blh.om_comm import (ALMCLR, INIT_CMD, MOVE_BKW,
MOVE_FWD, STATUS, STOP)
DEV_NAME = '/dev/ttyACM0'
UTC = timezone.utc
def calc_parity(data):
"""Parity calculation"""
return np.bitwise_xor.reduce([i for i in data])
[docs]
class BLH:
"""BLH motor driver controller"""
def __init__(self, port=DEV_NAME):
self._ser = serial.Serial(port, 9600, timeout=0.1)
self._packet_index = 0
self._start_dt = None
self._connected = False
def __w(self, data):
self._ser.write(data)
def __r(self, length):
return self._ser.read(length)
[docs]
def connect(self):
"""Make connection to BLH"""
self.__w(b'\x06')
ret = self.__r(1)
if ret[0] != 0x86:
raise Exception('Not connected. Abort.')
self._connected = True
self._start_dt = datetime.now(tz=UTC)
# Initialization codes
for cmd in INIT_CMD:
_ = self._wr(cmd)
def _wr(self, data, readlen=40):
"""Write and read"""
assert self._connected
assert len(data) == 35
tmpdt = datetime.now(tz=UTC)
# Duration from the connection establishment
ds_td = tmpdt - self._start_dt
ds_int = int((ds_td.total_seconds() * 1e3) % 65536)
ds_b = ds_int.to_bytes(2, 'little')
# Packet index modulo 256
pi_b = (self._packet_index % 256).to_bytes(1, 'little')
# Packet creation
pkt_pre = b'\xff' + data + ds_b + pi_b
# The last byte is a parity
parity = calc_parity(pkt_pre)
pkt = pkt_pre + int(parity).to_bytes(1, 'little')
self.__w(pkt)
self._packet_index += 1
return self.__r(readlen)
[docs]
def set_op_number(self, num):
"""Set operation number
Parameter
---------
num : int
Operation number
"""
assert 0 <= num < 16
mark_0 = (((0b110 << 4) + num) << 5).to_bytes(2, 'little')
mark_1 = (((num + 4) % 8) << 5) + (5 - int(num / 8))
data = bytearray([0x03, 0x00, 0x01, 0x0a, 0x00, 0x86, 0x01]) + mark_0
data += bytearray([0x04, 0x00, 0x00, mark_1]) + bytearray([0] * 22)
return self._wr(data)
[docs]
def set_speed(self, speed):
"""Set speed
Parameter
---------
speed : int
Speed in RPM
"""
assert 50 <= speed <= 3000
data = bytearray([0x03, 0x00, 0x01, 0x0c, 0x00, 0x81, 0x00, 0xc4, 0x01])
data += speed.to_bytes(2, 'little')
data += bytearray([0] * 3)
# parity byte inside the packet.
p_check = calc_parity(data[3:])
data += bytearray([p_check] + [0] * 20)
return self._wr(data)
[docs]
def set_accl_time(self, sec, accl=True):
"""Set acceleration time
Parameters
----------
sec : float
Acceleration time in seconds
accl : bool, default True
True: Acceleration setting
False: Decceleration setting
"""
assert 0.5 <= sec <= 15.0
data = bytearray([0x03, 0x00, 0x01, 0x0c, 0x00, 0x81, 0x00, 0xc5 if accl else 0xc6, 0x01])
data += int(sec * 10).to_bytes(1, 'little')
data += bytearray([0] * 4)
# parity byte inside the packet
p_check = calc_parity(data[3:])
data += bytearray([p_check] + [0] * 20)
return self._wr(data)
[docs]
def get_status(self):
"""Get status.
Returns
-------
speed : int
Speed in RPM
"""
pkt_st = self._wr(STATUS, readlen=80)
speed = int.from_bytes(pkt_st[7:11], 'little', signed=True)
error = pkt_st[47]
return speed, error
[docs]
def start(self, forward=True):
"""Start rotation.
Parameter
---------
forward : bool, default True
Move forward if True
Returns
-------
result : bool
True if rotation command published correctly.
"""
resp = self._wr(MOVE_FWD if forward else MOVE_BKW)
result = resp[7] == 0
return result
[docs]
def stop(self):
"""Stop rotation
Returns
-------
result : bool
True if stop command published correctly.
"""
resp = self._wr(STOP)
result = resp[7] == 0
return result
[docs]
def clear_alarm(self):
"""Clear alarm"""
self._wr(ALMCLR)
def main():
"""Test function"""
blh = BLH()
blh.connect()
speed, error = blh.get_status()
print(f'current speed: {speed}')
print(f'current error: {error}')
blh.set_speed(1500)
blh.set_accl_time(5, False)
blh.set_accl_time(5, True)
print('MOVING FORWARD')
blh.start(forward=True)
for _ in range(200):
print(f'{blh.get_status()}')
sleep(0.1)
print('STOPPED')
blh.stop()
for _ in range(10):
print(f'{blh.get_status()}')
sleep(0.1)
print('MOVING BACKWARD')
blh.start(forward=False)
for _ in range(200):
print(f'{blh.get_status()}')
sleep(0.1)
print('STOPPED')
blh.stop()
for _ in range(10):
print(f'{blh.get_status()}')
sleep(0.1)
if __name__ == '__main__':
main()