| 
 | 1 | +"""  | 
 | 2 | +This module implements the communication protocol to power on/off ports on a  | 
 | 3 | +LinkPi SmartHUB, a 12-port USB3.0 HUB utilizing four RTS5411 USB3.0 4-port HUB  | 
 | 4 | +controllers, a FT232R USB UART IC and a STM32F103RB MCU for port power control.  | 
 | 5 | +
  | 
 | 6 | +The protocol is a simple line-based protocol over a serial port.  | 
 | 7 | +
  | 
 | 8 | +Known commands:  | 
 | 9 | +- onoff <port> <1|0> - switch port power on/off  | 
 | 10 | +- state - get current power state of all ports  | 
 | 11 | +- SetOWP <1|0> <1|0> ... - set the power-on state of all ports  | 
 | 12 | +- GetOWP - get the power-on state of all ports  | 
 | 13 | +
  | 
 | 14 | +Responses are in JSON format, e.g.:  | 
 | 15 | +
  | 
 | 16 | +.. code-block:: text  | 
 | 17 | +
  | 
 | 18 | +  > onoff 5 1  | 
 | 19 | +  < {"Cmd":"OnOffResp","SeqNum":1,"ret":0}  | 
 | 20 | +  > state  | 
 | 21 | +  < {"Cmd":"StateResp","SeqNum":2,"state":[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0]}  | 
 | 22 | +  > SetOWP 0 0 0 0 0 0 0 0 0 0 0 1  | 
 | 23 | +  < {"Cmd":"SetOWPResp","SeqNum":3,"ret":0}  | 
 | 24 | +  > GetOWP  | 
 | 25 | +  < {"Cmd":"GetOWPResp","SeqNum":4,"owp":[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]}  | 
 | 26 | +
  | 
 | 27 | +A version announcement is continuously sent every second:  | 
 | 28 | +
  | 
 | 29 | +.. code-block:: text  | 
 | 30 | +
  | 
 | 31 | +  < {"Cmd":"VerResp","ver":"SmartHUB_<ver>","uid":"<uid>"}  | 
 | 32 | +"""  | 
 | 33 | +import json  | 
 | 34 | +import serial  | 
 | 35 | + | 
 | 36 | + | 
 | 37 | +PORT_INDEX_MAP = {  | 
 | 38 | +    "1": 5, "2": 4, "3": 3, "4": 2, "5": 1, "6": 0,  | 
 | 39 | +    "7": 11, "8": 10, "9": 9, "10": 8, "11": 7, "12": 6,  | 
 | 40 | +}  | 
 | 41 | + | 
 | 42 | + | 
 | 43 | +# Port names printed on the device do not match the internal port index  | 
 | 44 | +def name_to_index(name_or_index):  | 
 | 45 | +    return PORT_INDEX_MAP.get(name_or_index, name_or_index)  | 
 | 46 | + | 
 | 47 | + | 
 | 48 | +class LinkPiSmartHUB:  | 
 | 49 | +    def __init__(self, path):  | 
 | 50 | +        if not path:  | 
 | 51 | +            raise ValueError("Device not found")  | 
 | 52 | +        self.path = path  | 
 | 53 | + | 
 | 54 | +    def _command(self, command):  | 
 | 55 | +        with serial.Serial(self.path, 115200, timeout=2) as s:  | 
 | 56 | +            # wait on next version announcement  | 
 | 57 | +            s.readline()  | 
 | 58 | +            # send the command  | 
 | 59 | +            s.write(f"{command}\r\n".encode())  | 
 | 60 | +            # read and return the response  | 
 | 61 | +            return s.readline().decode().strip()  | 
 | 62 | + | 
 | 63 | +    def onoff(self, index, state):  | 
 | 64 | +        return self._command(f"onoff {index} {state}")  | 
 | 65 | + | 
 | 66 | +    def state(self):  | 
 | 67 | +        return self._command("state")  | 
 | 68 | + | 
 | 69 | + | 
 | 70 | +def handle_set(path, name_or_index, state):  | 
 | 71 | +    smarthub = LinkPiSmartHUB(path)  | 
 | 72 | +    smarthub.onoff(name_to_index(name_or_index), state)  | 
 | 73 | + | 
 | 74 | + | 
 | 75 | +def handle_get(path, name_or_index):  | 
 | 76 | +    smarthub = LinkPiSmartHUB(path)  | 
 | 77 | +    state = smarthub.state()  | 
 | 78 | +    return bool(json.loads(state).get("state")[name_to_index(name_or_index)])  | 
 | 79 | + | 
 | 80 | + | 
 | 81 | +methods = {  | 
 | 82 | +    "set": handle_set,  | 
 | 83 | +    "get": handle_get,  | 
 | 84 | +}  | 
0 commit comments