Source code for remi.server

#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
   Licensed under the Apache License, Version 2.0 (the "License");
   you may not use this file except in compliance with the License.
   You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

   Unless required by applicable law or agreed to in writing, software
   distributed under the License is distributed on an "AS IS" BASIS,
   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   See the License for the specific language governing permissions and
   limitations under the License.
"""
import logging
try:
    from http.server import HTTPServer, BaseHTTPRequestHandler
except ImportError:
    from BaseHTTPServer import HTTPServer, BaseHTTPRequestHandler
try:
    import socketserver
except ImportError:
    import SocketServer as socketserver
import socket
import ssl

import mimetypes
import webbrowser
import struct
import socket
import base64
import hashlib
import sys
import threading
import signal
import time
import os
import re
try:
    from urllib import unquote
    from urllib import quote
    from urlparse import urlparse
    from urlparse import parse_qs
except ImportError:
    from urllib.parse import unquote
    from urllib.parse import quote
    from urllib.parse import unquote_to_bytes
    from urllib.parse import urlparse
    from urllib.parse import parse_qs
import cgi
import weakref


clients = {}
runtimeInstances = weakref.WeakValueDictionary()

pyLessThan3 = sys.version_info < (3,)


_MSG_ACK = '3'
_MSG_JS = '2'
_MSG_UPDATE = '1'


[docs]def to_websocket(data): # encoding end decoding utility function if pyLessThan3: return quote(data) return quote(data, encoding='utf-8')
[docs]def from_websocket(data): # encoding end decoding utility function if pyLessThan3: return unquote(data) return unquote(data, encoding='utf-8')
[docs]def encode_text(data): if not pyLessThan3: return data.encode('utf-8') return data
[docs]def get_method_by_name(root_node, name): val = None if hasattr(root_node, name): val = getattr(root_node, name) return val
[docs]def get_method_by_id(_id): global runtimeInstances return runtimeInstances.get(str(_id), None)
[docs]class WebSocketsHandler(socketserver.StreamRequestHandler): magic = b'258EAFA5-E914-47DA-95CA-C5AB0DC85B11' def __init__(self, headers, *args, **kwargs): self.headers = headers self.handshake_done = False self._log = logging.getLogger('remi.server.ws') socketserver.StreamRequestHandler.__init__(self, *args, **kwargs)
[docs] def setup(self): socketserver.StreamRequestHandler.setup(self) self._log.info('connection established: %r' % (self.client_address,)) self.handshake_done = False
[docs] def handle(self): global clients self._log.debug('handle') # on some systems like ROS, the default socket timeout # is less than expected, we force it to infinite (None) as default socket value self.request.settimeout(None) if self.handshake(): while True: if not self.read_next_message(): clients[self.session].websockets.remove(self) self.handshake_done = False self._log.debug('ws ending websocket service') break
[docs] @staticmethod def bytetonum(b): if pyLessThan3: b = ord(b) return b
[docs] def read_next_message(self): # noinspection PyBroadException try: try: length = self.rfile.read(2) except ValueError: # socket was closed, just return without errors return False length = self.bytetonum(length[1]) & 127 if length == 126: length = struct.unpack('>H', self.rfile.read(2))[0] elif length == 127: length = struct.unpack('>Q', self.rfile.read(8))[0] masks = [self.bytetonum(byte) for byte in self.rfile.read(4)] decoded = '' for char in self.rfile.read(length): decoded += chr(self.bytetonum(char) ^ masks[len(decoded) % 4]) self.on_message(from_websocket(decoded)) except socket.timeout: return False except Exception: return False return True
[docs] def send_message(self, message): if not self.handshake_done: self._log.warning("ignoring message %s (handshake not done)" % message[:10]) return self._log.debug('send_message: %s... -> %s' % (message[:10], self.client_address)) out = bytearray() out.append(129) length = len(message) if length <= 125: out.append(length) elif 126 <= length <= 65535: out.append(126) out += struct.pack('>H', length) else: out.append(127) out += struct.pack('>Q', length) if not pyLessThan3: message = message.encode('utf-8') out = out + message self.request.send(out)
[docs] def handshake(self): self._log.debug('handshake') key = self.headers['Sec-WebSocket-Key'] self.session = None if 'cookie' in self.headers: if self.headers['cookie']!=None: self.session = parse_session_cookie(self.headers['cookie']) if self.session == None: return False if not self.session in clients.keys(): return False digest = hashlib.sha1((key.encode("utf-8")+self.magic)) digest = digest.digest() digest = base64.b64encode(digest) response = 'HTTP/1.1 101 Switching Protocols\r\n' response += 'Upgrade: websocket\r\n' response += 'Connection: Upgrade\r\n' response += 'Sec-WebSocket-Accept: %s\r\n\r\n' % digest.decode("utf-8") self._log.info('handshake complete') self.request.sendall(response.encode("utf-8")) self.handshake_done = True #if an update happens since the websocket connection to its handshake, # it gets not displayed. it is required to inform App about handshake done, # to get a full refresh clients[self.session].websocket_handshake_done(self) return True
[docs] def on_message(self, message): global runtimeInstances self.send_message(_MSG_ACK) with clients[self.session].update_lock: # noinspection PyBroadException try: # saving the websocket in order to update the client if self not in clients[self.session].websockets: clients[self.session].websockets.append(self) # parsing messages chunks = message.split('/') self._log.debug('on_message: %s' % chunks[0]) if len(chunks) > 3: # msgtype,widget,function,params # if this is a callback msg_type = 'callback' if chunks[0] == msg_type: widget_id = chunks[1] function_name = chunks[2] params = message[ len(msg_type) + len(widget_id) + len(function_name) + 3:] param_dict = parse_parametrs(params) callback = get_method_by_name(runtimeInstances[widget_id], function_name) if callback is not None: callback(**param_dict) except Exception: self._log.error('error parsing websocket', exc_info=True)
[docs] def close(self): try: self.request.shutdown(socket.SHUT_WR) self.finish() self.server.shutdown() except: self._log.error("exception in WebSocketsHandler.close method", exc_info=True)
[docs]def parse_parametrs(p): """ Parses the parameters given from POST or websocket reqs expecting the parameters as: "11|par1='asd'|6|par2=1" returns a dict like {par1:'asd',par2:1} """ ret = {} while len(p) > 1 and p.count('|') > 0: s = p.split('|') l = int(s[0]) # length of param field if l > 0: p = p[len(s[0]) + 1:] field_name = p.split('|')[0].split('=')[0] field_value = p[len(field_name) + 1:l] p = p[l + 1:] ret[field_name] = field_value return ret
# noinspection PyPep8Naming
[docs]class App(BaseHTTPRequestHandler, object): """ This class will handles any incoming request from the browser The main application class can subclass this In the do_POST and do_GET methods it is expected to receive requests such as: - function calls with parameters - file requests """ re_static_file = re.compile(r"^([\/]*[\w\d]+:[-_. $@?#£'%=()\/\[\]!+°§^,\w\d]+)") #https://regex101.com/r/uK1sX1/6 re_attr_call = re.compile(r"^/*(\w+)\/(\w+)\?{0,1}(\w*\={1}(\w|\.)+\&{0,1})*$") def __init__(self, request, client_address, server, **app_args): self._app_args = app_args self.root = None self._log = logging.getLogger('remi.request') super(App, self).__init__(request, client_address, server) def _get_list_from_app_args(self, name): try: v = self._app_args[name] if isinstance(v, (tuple, list)): vals = v else: vals = [v] except KeyError: vals = [] return vals def _instance(self): global clients global runtimeInstances """ This method is used to get the Application instance previously created managing on this, it is possible to switch to "single instance for multiple clients" or "multiple instance for multiple clients" execution way """ self.session = 0 #checking previously defined session if 'cookie' in self.headers: self.session = parse_session_cookie(self.headers['cookie']) #if not a valid session id if self.session == None: self.session = 0 if not self.session in clients.keys(): self.session = 0 #if no session id if self.session == 0: if self.server.multiple_instance: self.session = int(time.time()*1000) #send session to browser del self.headers['cookie'] #if the client instance doesn't exist if not(self.session in clients): self.update_interval = self.server.update_interval from remi import gui head = gui.HEAD(self.server.title) # use the default css, but append a version based on its hash, to stop browser caching head.add_child('internal_css', "<link href='/res:style.css' rel='stylesheet' />\n") body = gui.BODY() body.onload.connect(self.onload) body.onerror.connect(self.onerror) body.ononline.connect(self.ononline) body.onpagehide.connect(self.onpagehide) body.onpageshow.connect(self.onpageshow) body.onresize.connect(self.onresize) self.page = gui.HTML() self.page.add_child('head', head) self.page.add_child('body', body) if not hasattr(self, 'websockets'): self.websockets = [] self.update_lock = threading.RLock() if not hasattr(self, '_need_update_flag'): self._need_update_flag = False self._stop_update_flag = False if self.update_interval > 0: self._update_thread = threading.Thread(target=self._idle_loop) self._update_thread.setDaemon(True) self._update_thread.start() runtimeInstances[str(id(self))] = self clients[self.session] = self else: #restore instance attributes client = clients[self.session] self.websockets = client.websockets self.page = client.page self.update_lock = client.update_lock self.update_interval = client.update_interval self._need_update_flag = client._need_update_flag if hasattr(client, '_update_thread'): self._update_thread = client._update_thread net_interface_ip = self.headers.get('Host', "%s:%s"%(self.connection.getsockname()[0],self.server.server_address[1])) websocket_timeout_timer_ms = str(self.server.websocket_timeout_timer_ms) pending_messages_queue_length = str(self.server.pending_messages_queue_length) self.page.children['head'].set_internal_js(net_interface_ip, pending_messages_queue_length, websocket_timeout_timer_ms)
[docs] def main(self, *_): """ Subclasses of App class *must* declare a main function that will be the entry point of the application. Inside the main function you have to declare the GUI structure and return the root widget. """ raise NotImplementedError("Applications must implement 'main()' function.")
def _idle_loop(self): """ This is used to exec the idle function in a safe context and a separate thread """ while not self._stop_update_flag: time.sleep(self.update_interval) with self.update_lock: try: self.idle() except: self._log.error("exception in App.idle method", exc_info=True) if self._need_update_flag: try: self.do_gui_update() except: self._log.error('''exception during gui update. It is advisable to use App.update_lock using external threads.''', exc_info=True)
[docs] def idle(self): """ Idle function called every UPDATE_INTERVAL before the gui update. Useful to schedule tasks. """ pass
def _need_update(self, emitter=None): if self.update_interval == 0: #no interval, immadiate update self.do_gui_update() else: #will be updated after idle loop self._need_update_flag = True
[docs] def do_gui_update(self): """ This method gets called also by Timer, a new thread, and so needs to lock the update """ with self.update_lock: changed_widget_dict = {} self.root.repr(changed_widget_dict) for widget in changed_widget_dict.keys(): html = changed_widget_dict[widget] __id = str(widget.identifier) self._send_spontaneous_websocket_message(_MSG_UPDATE + __id + ',' + to_websocket(html)) self._need_update_flag = False
[docs] def websocket_handshake_done(self, ws_instance_to_update): with self.update_lock: msg = "0" + self.root.identifier + ',' + to_websocket(self.page.children['body'].innerHTML({})) ws_instance_to_update.send_message(msg)
[docs] def set_root_widget(self, widget): self.page.children['body'].append(widget, 'root') self.root = widget self.root.disable_refresh() self.root.attributes['data-parent-widget'] = str(id(self)) self.root._parent = self self.root.enable_refresh() msg = "0" + self.root.identifier + ',' + to_websocket(self.page.children['body'].innerHTML({})) self._send_spontaneous_websocket_message(msg)
def _send_spontaneous_websocket_message(self, message): for ws in self.websockets: # noinspection PyBroadException try: #self._log.debug("sending websocket spontaneous message") ws.send_message(message) except: self._log.error("sending websocket spontaneous message", exc_info=True) try: self.websockets.remove(ws) ws.close() except: self._log.error("unable to remove websocket client - already not in list", exc_info=True)
[docs] def execute_javascript(self, code): self._send_spontaneous_websocket_message(_MSG_JS + code)
[docs] def notification_message(self, title, content, icon=""): """This function sends "javascript" message to the client, that executes its content. In this particular code, a notification message is shown """ code = """ var options = { body: "%(content)s", icon: "%(icon)s" } if (!("Notification" in window)) { alert("%(content)s"); }else if (Notification.permission === "granted") { var notification = new Notification("%(title)s", options); }else if (Notification.permission !== 'denied') { Notification.requestPermission(function (permission) { if (permission === "granted") { var notification = new Notification("%(title)s", options); } }); } """ % {'title': title, 'content': content, 'icon': icon} self.execute_javascript(code)
[docs] def do_POST(self): self._instance() file_data = None # listener_widget = None # listener_function = None try: # Parse the form data posted filename = self.headers['filename'] listener_widget = runtimeInstances[self.headers['listener']] listener_function = self.headers['listener_function'] form = cgi.FieldStorage(fp=self.rfile, headers=self.headers, environ={'REQUEST_METHOD': 'POST', 'CONTENT_TYPE': self.headers['Content-Type']}) # Echo back information about what was posted in the form for field in form.keys(): field_item = form[field] if field_item.filename: # The field contains an uploaded file file_data = field_item.file.read() file_len = len(file_data) self._log.debug('post: uploaded %s as "%s" (%d bytes)\n' % (field, field_item.filename, file_len)) get_method_by_name(listener_widget, listener_function)(file_data, filename) else: # Regular form value self._log.debug('post: %s=%s\n' % (field, form[field].value)) if file_data is not None: # the filedata is sent to the listener self._log.debug('GUI - server.py do_POST: fileupload name= %s' % (filename)) self.send_response(200) except Exception: self._log.error('post: failed', exc_info=True) self.send_response(400) self.send_header('Content-type', 'text/plain') self.end_headers()
[docs] def do_HEAD(self): self.send_response(200) self.end_headers()
[docs] def do_AUTHHEAD(self): self.send_response(401) self.send_header('WWW-Authenticate', 'Basic realm=\"Protected\"') self.send_header('Content-type', 'text/html') self.end_headers()
[docs] def do_GET(self): # check here request header to identify the type of req, if http or ws # if this is a ws req, instance a ws handler, add it to App's ws list, return if "Upgrade" in self.headers: if self.headers['Upgrade'] == 'websocket': #passing arguments to websocket handler, otherwise it will lost the last message, # and will be unable to handshake ws = WebSocketsHandler(self.headers, self.request, self.client_address, self.server) return """Handler for the GET requests.""" do_process = False if self.server.auth is None: do_process = True else: if not ('Authorization' in self.headers) or self.headers['Authorization'] is None: self._log.info("Authenticating") self.do_AUTHHEAD() self.wfile.write(encode_text('no auth header received')) elif self.headers['Authorization'] == 'Basic ' + self.server.auth.decode(): do_process = True else: self.do_AUTHHEAD() self.wfile.write(encode_text(self.headers['Authorization'])) self.wfile.write(encode_text('not authenticated')) if do_process: path = str(unquote(self.path)) # noinspection PyBroadException try: self._instance() # build the page (call main()) in user code, if not built yet with self.update_lock: # build the root page once if necessary if not 'root' in self.page.children['body'].children.keys(): self._log.info('built UI (path=%s)' % path) self.set_root_widget(self.main(*self.server.userdata)) self._process_all(path) except: self._log.error('error processing GET request', exc_info=True)
def _get_static_file(self, filename): filename = filename.replace("..", "") #avoid backdirs __i = filename.find(':') if __i < 0: return None key = filename[:__i] path = filename[__i+1:] key = key.replace("/","") paths = {'res': os.path.join(os.path.dirname(__file__), "res")} static_paths = self._app_args.get('static_file_path', {}) if not type(static_paths)==dict: self._log.error("App's parameter static_file_path must be a Dictionary.", exc_info=False) static_paths = {} paths.update(static_paths) if not key in paths: return None return os.path.join(paths[key], path) def _process_all(self, func): self._log.debug('get: %s' % func) static_file = self.re_static_file.match(func) attr_call = self.re_attr_call.match(func) if (func == '/') or (not func): self.send_response(200) self.send_header("Set-Cookie", "remi_session=%s"%(self.session)) self.send_header('Content-type', 'text/html') self.end_headers() with self.update_lock: # render the HTML page_content = self.page.repr() self.wfile.write(encode_text("<!DOCTYPE html>\n")) self.wfile.write(encode_text(page_content)) elif static_file: filename = self._get_static_file(static_file.groups()[0]) if not filename: self.send_response(404) return mimetype, encoding = mimetypes.guess_type(filename) self.send_response(200) self.send_header('Content-type', mimetype if mimetype else 'application/octet-stream') if self.server.enable_file_cache: self.send_header('Cache-Control', 'public, max-age=86400') self.end_headers() with open(filename, 'rb') as f: content = f.read() self.wfile.write(content) elif attr_call: with self.update_lock: param_dict = parse_qs(urlparse(func).query) # parse_qs returns patameters as list, here we take the first element for k in param_dict: param_dict[k] = param_dict[k][0] widget, func = attr_call.group(1, 2) try: content, headers = get_method_by_name(get_method_by_id(widget), func)(**param_dict) if content is None: self.send_response(503) return self.send_response(200) except IOError: self._log.error('attr %s/%s call error' % (widget, func), exc_info=True) self.send_response(404) return except (TypeError, AttributeError): self._log.error('attr %s/%s not available' % (widget, func)) self.send_response(503) return for k in headers: self.send_header(k, headers[k]) self.end_headers() try: self.wfile.write(content) except TypeError: self.wfile.write(encode_text(content))
[docs] def close(self): """ Command to initiate an App to close """ self._log.debug('shutting down...') self.server.server_starter_instance.stop()
[docs] def on_close(self): """ Called by the server when the App have to be terminated """ self._stop_update_flag = True for ws in self.websockets: ws.close()
[docs] def onload(self, emitter): """ WebPage Event that occurs on webpage loaded """ self._log.debug('App.onload event occurred')
[docs] def onerror(self, emitter, message, source, lineno, colno): """ WebPage Event that occurs on webpage errors """ self._log.debug("""App.onerror event occurred in webpage: \nMESSAGE:%s\nSOURCE:%s\nLINENO:%s\nCOLNO:%s\n"""%(message, source, lineno, colno))
[docs] def ononline(self, emitter): """ WebPage Event that occurs on webpage goes online after a disconnection """ self._log.debug('App.ononline event occurred')
[docs] def onpagehide(self, emitter): """ WebPage Event that occurs on webpage when the user navigates away """ self._log.debug('App.onpagehide event occurred')
[docs] def onpageshow(self, emitter): """ WebPage Event that occurs on webpage gets shown """ self._log.debug('App.onpageshow event occurred')
[docs] def onresize(self, emitter, width, height): """ WebPage Event that occurs on webpage gets resized """ self._log.debug('App.onresize event occurred. Width:%s Height:%s'%(width, height))
[docs]class ThreadedHTTPServer(socketserver.ThreadingMixIn, HTTPServer): daemon_threads = False # noinspection PyPep8Naming def __init__(self, server_address, RequestHandlerClass, auth, multiple_instance, enable_file_cache, update_interval, websocket_timeout_timer_ms, pending_messages_queue_length, title, server_starter_instance, certfile, keyfile, ssl_version, *userdata): HTTPServer.__init__(self, server_address, RequestHandlerClass) self.auth = auth self.multiple_instance = multiple_instance self.enable_file_cache = enable_file_cache self.update_interval = update_interval self.websocket_timeout_timer_ms = websocket_timeout_timer_ms self.pending_messages_queue_length = pending_messages_queue_length self.title = title self.server_starter_instance = server_starter_instance self.userdata = userdata self.certfile = certfile self.keyfile = keyfile self.ssl_version = ssl_version if self.ssl_version!=None: self.socket = ssl.wrap_socket(self.socket, keyfile=self.keyfile, certfile=self.certfile, server_side=True, ssl_version=self.ssl_version, do_handshake_on_connect=True)
[docs]class Server(object): # noinspection PyShadowingNames def __init__(self, gui_class, title='', start=True, address='127.0.0.1', port=8081, username=None, password=None, multiple_instance=False, enable_file_cache=True, update_interval=0.1, start_browser=True, websocket_timeout_timer_ms=1000, pending_messages_queue_length=1000, certfile=None, keyfile=None, ssl_version=None, userdata=()): self._gui = gui_class self._title = title or gui_class.__name__ self._sserver = None self._sth = None self._base_address = '' self._address = address self._sport = port self._multiple_instance = multiple_instance self._enable_file_cache = enable_file_cache self._update_interval = update_interval self._start_browser = start_browser self._websocket_timeout_timer_ms = websocket_timeout_timer_ms self._pending_messages_queue_length = pending_messages_queue_length self._certfile = certfile self._keyfile = keyfile self._ssl_version = ssl_version self._userdata = userdata if username and password: self._auth = base64.b64encode(encode_text("%s:%s" % (username, password))) else: self._auth = None if not isinstance(userdata, tuple): raise ValueError('userdata must be a tuple') self._log = logging.getLogger('remi.server') self._alive = True if start: self._myid = threading.Thread.ident self.start() self.serve_forever() @property def title(self): return self._title @property def address(self): return self._base_address
[docs] def start(self): # Create a web server and define the handler to manage the incoming # request self._sserver = ThreadedHTTPServer((self._address, self._sport), self._gui, self._auth, self._multiple_instance, self._enable_file_cache, self._update_interval, self._websocket_timeout_timer_ms, self._pending_messages_queue_length, self._title, self, self._certfile, self._keyfile, self._ssl_version, *self._userdata) shost, sport = self._sserver.socket.getsockname()[:2] # when listening on multiple net interfaces the browsers connects to localhost if shost == '0.0.0.0': shost = '127.0.0.1' self._base_address = 'http://%s:%s/' % (shost,sport) self._log.info('Started httpserver %s' % self._base_address) if self._start_browser: try: import android android.webbrowser.open(self._base_address) except ImportError: # use default browser instead of always forcing IE on Windows if os.name == 'nt': webbrowser.get('windows-default').open(self._base_address) else: webbrowser.open(self._base_address) self._sth = threading.Thread(target=self._sserver.serve_forever) self._sth.daemon = False self._sth.start()
[docs] def serve_forever(self): # we could join on the threads, but join blocks all interrupts (including # ctrl+c, so just spin here # noinspection PyBroadException try: def sig_manager(sig, callstack): self.stop() self._log.info('*** signal %d received.' % sig) return signal.SIG_IGN prev_handler = signal.signal(signal.SIGINT, sig_manager) except Exception: # signal.pause() is missing for Windows; wait 1ms and loop instead pass except KeyboardInterrupt: pass while self._alive: try: time.sleep(1) except: self._alive = False self._log.debug(' ** serve_forever() quitting')
[docs] def stop(self): global clients self._alive = False self._sserver.shutdown() for client in clients.values(): client.on_close()
[docs]class StandaloneServer(Server): def __init__(self, gui_class, title='', width=800, height=600, resizable=True, fullscreen=False, start=True, userdata=()): Server.__init__(self, gui_class, title=title, start=False, address='127.0.0.1', port=0, username=None, password=None, multiple_instance=False, enable_file_cache=True, update_interval=0.1, start_browser=False, websocket_timeout_timer_ms=1000, pending_messages_queue_length=1000, userdata=userdata) self._application_conf = {'width': width, 'height': height, 'resizable': resizable, 'fullscreen': fullscreen} if start: self.serve_forever()
[docs] def serve_forever(self): try: import webview except ImportError: raise ImportError('PyWebView is missing. Please install it by:\n ' 'pip install pywebview\n ' 'more info at https://github.com/r0x0r/pywebview') else: Server.start(self) webview.create_window(self.title, self.address, **self._application_conf) Server.stop(self)
[docs]def start(main_gui_class, **kwargs): """This method starts the webserver with a specific App subclass.""" debug = kwargs.pop('debug', False) standalone = kwargs.pop('standalone', False) logging.basicConfig(level=logging.DEBUG if debug else logging.INFO, format='%(name)-16s %(levelname)-8s %(message)s') logging.getLogger('remi').setLevel( level=logging.DEBUG if debug else logging.INFO) if standalone: s = StandaloneServer(main_gui_class, start=True, **kwargs) else: s = Server(main_gui_class, start=True, **kwargs)