diff --git a/BlynkLib.py b/BlynkLib.py index 3ba71b3..d093162 100644 --- a/BlynkLib.py +++ b/BlynkLib.py @@ -2,10 +2,12 @@ __version__ = "1.0.0" +import socket import struct -import time import sys -import os +import time + +IOError = OSError try: import machine @@ -42,6 +44,19 @@ def dummy(*args): DISCONNECTED = const(0) CONNECTING = const(1) CONNECTED = const(2) +RETRIES_TX_MAX_NUM = const(3) +RETRIES_TX_DELAY = const(2) + + +class BlynkError(Exception): + pass + + +class RedirectError(Exception): + def __init__(self, server, port): + self.server = server + self.port = port + print(""" ___ __ __ @@ -69,8 +84,13 @@ def emit(self, evt, *a, **kv): class BlynkProtocol(EventEmitter): - def __init__(self, auth, tmpl_id=None, fw_ver=None, heartbeat=50, buffin=1024, log=None): + def __init__(self, auth, tmpl_id=None, fw_ver=None, heartbeat=5, buffin=1024, log=None): EventEmitter.__init__(self) + self._init_variables(auth, tmpl_id, fw_ver, heartbeat, buffin, log) + self.connect() + + def _init_variables(self, auth, tmpl_id, fw_ver, heartbeat, buffin, log): + self.lastRecv = None self.heartbeat = heartbeat*1000 self.buffin = buffin self.log = log or dummy @@ -78,7 +98,6 @@ def __init__(self, auth, tmpl_id=None, fw_ver=None, heartbeat=50, buffin=1024, l self.tmpl_id = tmpl_id self.fw_ver = fw_ver self.state = DISCONNECTED - self.connect() def virtual_write(self, pin, *val): self._send(MSG_HW, 'vw', pin, *val) @@ -96,6 +115,11 @@ def log_event(self, *val): self._send(MSG_EVENT_LOG, *val) def _send(self, cmd, *args, **kwargs): + self.check_and_reconnect() + id, data, dlen = self._prepare_send_variables(cmd, args, kwargs) + self._log_and_send_msg(cmd, id, args, data, dlen) + + def _prepare_send_variables(self, cmd, args, kwargs): if 'id' in kwargs: id = kwargs.get('id') else: @@ -103,14 +127,15 @@ def _send(self, cmd, *args, **kwargs): self.msg_id += 1 if self.msg_id > 0xFFFF: self.msg_id = 1 - if cmd == MSG_RSP: data = b'' dlen = args[0] else: data = ('\0'.join(map(str, args))).encode('utf8') dlen = len(data) - + return id, data, dlen + + def _log_and_send_msg(self, cmd, id, args, data, dlen): self.log('<', cmd, id, '|', *args) msg = struct.pack("!BHH", cmd, id, dlen) + data self.lastSend = gettime() @@ -118,148 +143,232 @@ def _send(self, cmd, *args, **kwargs): def connect(self): if self.state != DISCONNECTED: return + self._init_connect_variables() + self._send(MSG_HW_LOGIN, self.auth) + + def _init_connect_variables(self): self.msg_id = 1 (self.lastRecv, self.lastSend, self.lastPing) = (gettime(), 0, 0) self.bin = b"" self.state = CONNECTING - self._send(MSG_HW_LOGIN, self.auth) + + def is_disconnected(self): + return self.state == DISCONNECTED + + def reconnected(self): + self.disconnect() + self.connect() + + def check_and_reconnect(self): + if self.is_disconnected(): + self.connect() + + if self._is_device_offline(): + self.reconnected() + + def _is_device_offline(self): + now = gettime() + return now - self.lastRecv > self.heartbeat+(self.heartbeat//2) def disconnect(self): - if self.state == DISCONNECTED: return + if self.state == DISCONNECTED: + return + self._init_disconnect_variables() + self.emit('disconnected') + + def _init_disconnect_variables(self): self.bin = b"" self.state = DISCONNECTED - self.emit('disconnected') def process(self, data=None): - if not (self.state == CONNECTING or self.state == CONNECTED): return + if not (self.state == CONNECTING or self.state == CONNECTED): + return + if self._is_device_offline(): + return self.reconnected() + self._send_ping() + self._process_data(data) + + def _send_ping(self): now = gettime() - if now - self.lastRecv > self.heartbeat+(self.heartbeat//2): - return self.disconnect() if (now - self.lastPing > self.heartbeat//10 and - (now - self.lastSend > self.heartbeat or - now - self.lastRecv > self.heartbeat)): + (now - self.lastSend > self.heartbeat or + now - self.lastRecv > self.heartbeat)): self._send(MSG_PING) self.lastPing = now - - if data != None and len(data): + + def _process_data(self, data): + if data is not None and len(data): self.bin += data + self._parse_commands() + def _parse_commands(self): while True: if len(self.bin) < 5: break + cmd, i, dlen = self._unpack_bin() + if self._process_msg_rsp(cmd, i, dlen): + continue + if self._check_command_length(dlen): + break + data = self.bin[5:5+dlen] + self.bin = self.bin[5+dlen:] + self.process_command(cmd, i, data) + + def _unpack_bin(self): + cmd, i, dlen = struct.unpack("!BHH", self.bin[:5]) + if i == 0: + self.disconnect() + self.lastRecv = gettime() + return cmd, i, dlen + + def _process_msg_rsp(self, cmd, i, dlen): + if cmd == MSG_RSP: + self.bin = self.bin[5:] + self.log('>', cmd, i, '|', dlen) + if self.state == CONNECTING and i == 1: + self._handle_connecting(dlen) + return True + return False + + def _handle_connecting(self, dlen): + if dlen == STA_SUCCESS: + self._handle_connection_success() + else: + if dlen == STA_INVALID_TOKEN: + self.emit("invalid_auth") + print("Invalid auth token") + self.disconnect() + + def _handle_connection_success(self): + self.state = CONNECTED + dt = gettime() - self.lastSend + info = self._create_connection_info() + self._send(MSG_INTERNAL, *info) + try: + self.emit('connected', ping=dt) + except TypeError: + self.emit('connected') + + def _create_connection_info(self): + info = ['ver', __version__, 'h-beat', self.heartbeat//1000, 'buff-in', self.buffin, 'dev', sys.platform+'-py'] + if self.tmpl_id: + info.extend(['tmpl', self.tmpl_id]) + info.extend(['fw-type', self.tmpl_id]) + if self.fw_ver: + info.extend(['fw', self.fw_ver]) + return info + + def _check_command_length(self, dlen): + if dlen >= self.buffin: + print("Cmd too big: ", dlen) + self.disconnect() + return True + return False + + def process_command(self, cmd, i, data): + args = list(map(lambda x: x.decode('utf8'), data.split(b'\0'))) + self.log('>', cmd, i, '|', ','.join(args)) + if cmd == MSG_PING: + self._send(MSG_RSP, STA_SUCCESS, id=i) + elif cmd == MSG_HW or cmd == MSG_BRIDGE: + self._process_hw_command(args) + elif cmd == MSG_INTERNAL: + self.emit("internal:"+args[0], args[1:]) + elif cmd == MSG_REDIRECT: + self.emit("redirect", args[0], int(args[1])) + else: + print("Unexpected command: ", cmd) + self.disconnect() - cmd, i, dlen = struct.unpack("!BHH", self.bin[:5]) - if i == 0: return self.disconnect() - - self.lastRecv = now - if cmd == MSG_RSP: - self.bin = self.bin[5:] - - self.log('>', cmd, i, '|', dlen) - if self.state == CONNECTING and i == 1: - if dlen == STA_SUCCESS: - self.state = CONNECTED - dt = now - self.lastSend - info = ['ver', __version__, 'h-beat', self.heartbeat//1000, 'buff-in', self.buffin, 'dev', sys.platform+'-py'] - if self.tmpl_id: - info.extend(['tmpl', self.tmpl_id]) - info.extend(['fw-type', self.tmpl_id]) - if self.fw_ver: - info.extend(['fw', self.fw_ver]) - self._send(MSG_INTERNAL, *info) - try: - self.emit('connected', ping=dt) - except TypeError: - self.emit('connected') - else: - if dlen == STA_INVALID_TOKEN: - self.emit("invalid_auth") - print("Invalid auth token") - return self.disconnect() - else: - if dlen >= self.buffin: - print("Cmd too big: ", dlen) - return self.disconnect() - - if len(self.bin) < 5+dlen: - break - - data = self.bin[5:5+dlen] - self.bin = self.bin[5+dlen:] - - args = list(map(lambda x: x.decode('utf8'), data.split(b'\0'))) - - self.log('>', cmd, i, '|', ','.join(args)) - if cmd == MSG_PING: - self._send(MSG_RSP, STA_SUCCESS, id=i) - elif cmd == MSG_HW or cmd == MSG_BRIDGE: - if args[0] == 'vw': - self.emit("V"+args[1], args[2:]) - self.emit("V*", args[1], args[2:]) - elif cmd == MSG_INTERNAL: - self.emit("internal:"+args[0], args[1:]) - elif cmd == MSG_REDIRECT: - self.emit("redirect", args[0], int(args[1])) - else: - print("Unexpected command: ", cmd) - return self.disconnect() + def _process_hw_command(self, args): + if args[0] == 'vw': + self.emit("V"+args[1], args[2:]) + self.emit("V*", args[1], args[2:]) -import socket class Blynk(BlynkProtocol): - def __init__(self, auth, **kwargs): - self.insecure = kwargs.pop('insecure', False) - self.server = kwargs.pop('server', 'blynk.cloud') - self.port = kwargs.pop('port', 80 if self.insecure else 443) + def __init__(self, auth, server='blynk.cloud', insecure=False, port=None, **kwargs): + self.conn = None + self.insecure = insecure + self.server = server + self.port = port if port else 80 if self.insecure else 443 BlynkProtocol.__init__(self, auth, **kwargs) self.on('redirect', self.redirect) def redirect(self, server, port): self.server = server self.port = port + self.disconnect_and_connect() + + def disconnect_and_connect(self): self.disconnect() self.connect() def connect(self): print('Connecting to %s:%d...' % (self.server, self.port)) + end_time = time.time() + 0.5 + while not (self.state == CONNECTED): + if self.state == DISCONNECTED: + try: + self.create_socket() + BlynkProtocol.connect(self) + return True + except BlynkError as b_err: + self.disconnect() + except RedirectError as r_err: + self.disconnect() + + if time.time() >= end_time: + return False + + def create_socket(self): s = socket.socket() - s.connect(socket.getaddrinfo(self.server, self.port)[0][-1]) try: - s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) - except: + s.connect(socket.getaddrinfo(self.server, self.port)[0][-1]) + if self.insecure: + s.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) + self.conn = s + else: + try: + import ussl + ssl_context = ussl + except ImportError: + import ssl + ssl_context = ssl.create_default_context() + self.conn = ssl_context.wrap_socket(s, server_hostname=self.server) + try: + self.conn.settimeout(SOCK_TIMEOUT) + except: + s.settimeout(SOCK_TIMEOUT) + except OSError: pass - if self.insecure: - self.conn = s - else: - try: - import ussl - ssl_context = ussl - except ImportError: - import ssl - ssl_context = ssl.create_default_context() - self.conn = ssl_context.wrap_socket(s, server_hostname=self.server) - try: - self.conn.settimeout(SOCK_TIMEOUT) - except: - s.settimeout(SOCK_TIMEOUT) - BlynkProtocol.connect(self) + def _write(self, data): - #print('<', data) - self.conn.write(data) - # TODO: handle disconnect + retries = RETRIES_TX_MAX_NUM + while retries > 0: + try: + retries -= 1 + self._last_send_time = gettime() + self.conn.write(data) + break + except (IOError, OSError): + time.sleep(RETRIES_TX_DELAY/1000) + def run(self): - data = b'' + self.check_and_reconnect() + try: data = self.conn.read(self.buffin) - #print('>', data) + self.process(data) except KeyboardInterrupt: raise - except socket.timeout: - # No data received, call process to send ping messages when needed - pass - except: # TODO: handle disconnect - return - self.process(data) + except BlynkError as b_err: + self.log(b_err) + self.disconnect() + except Exception as g_exc: + self.log(g_exc) + +