Module Gnumed.pycommon.gmScriptingListener

GNUmed scripting listener.

This module implements threaded listening for scripting.

Expand source code
"""GNUmed scripting listener.

This module implements threaded listening for scripting.
"""
#=====================================================================
__author__ = "K.Hilbert <karsten.hilbert@gmx.net>"

import sys
import time
import threading
import select
import logging
import xmlrpc.server


_log = logging.getLogger('gm.scripting')

#=====================================================================
# FIXME: this should use /var/run/gnumed/xml-rpc-port.pid
# FIXME: and store the current port there
class cScriptingListener:
        """This class handles how GNUmed listens for external requests.

        It starts an XML-RPC server and forks a thread which
        listens for incoming requests. Those requests are then
        handed over to a macro executor and the results handed
        back to the caller.
        """
        def __init__(self, port=None, macro_executor=None, poll_interval:int=3):
                """Setup the listener.

                Args:
                        port: Port number (int or str) to listen on. Always on <localhost>.
                        macro_executor: Class processing GNUmed macros.
                        poll_interval: Listening socket select() timeout.
                """
                # listener thread will regularly try to acquire
                # this lock, when it succeeds it will quit
                self._quit_lock = threading.Lock()
                if not self._quit_lock.acquire(blocking = True, timeout = 2):
                        _log.error('cannot acquire thread quit lock !?! aborting')
                        raise OSError("cannot acquire thread quit-lock")

                # check for data every 'poll_interval' seconds
                self._poll_interval = poll_interval
                # localhost only for somewhat better security
                self._listener_address = '127.0.0.1'
                self._port = int(port)
                self._macro_executor = macro_executor

                self._server = xmlrpc.server.SimpleXMLRPCServer(addr=(self._listener_address, self._port), logRequests=False)
                self._server.register_instance(self._macro_executor)
                self._server.allow_reuse_address = True

                self._thread = threading.Thread (
                        target = self._process_RPCs,
                        name = self.__class__.__name__
                )
                self._thread.setDaemon(True)
                self._thread.start()

                _log.info('scripting listener started on [%s:%s]' % (self._listener_address, self._port))
                _log.info('macro executor: %s' % self._macro_executor)
                _log.info('poll interval: %s seconds', self._poll_interval)

        #-------------------------------
        # public API
        #-------------------------------
        def shutdown(self):
                """Cleanly shut down resources."""
                if self._thread is None:
                        return

                _log.info('stopping frontend scripting listener thread')
                self._quit_lock.release()
                try:
                        self._thread.join(self._poll_interval+5)
                        try:
                                if self._thread.is_alive():
                                        _log.error('listener thread still alive after join()')
                                        _log.debug('active threads: %s' % threading.enumerate())
                        except Exception:
                                pass
                except Exception:
                        print(sys.exc_info())
                self._thread = None
                try:
                        self._server.socket.shutdown(2)
                except Exception:
                        _log.exception('cannot cleanly shutdown(2) scripting listener socket')
                try:
                        self._server.socket.close()
                except Exception:
                        _log.exception('cannot cleanly close() scripting listener socket')

        #-------------------------------
        # internal helpers
        #-------------------------------
        def _process_RPCs(self):
                """Poll for incoming requests on the XML RPC server socket and invoke the request handler."""
                while 1:
                        if self._quit_lock.acquire(blocking = False):
                                break
                        time.sleep(0.35)                                        # give others time to acquire lock
                        if self._quit_lock.acquire(blocking = False):
                                break
                        # wait at most self.__poll_interval for new data
                        ready_input_sockets = select.select([self._server.socket], [], [], self._poll_interval)[0]
                        # any input available ?
                        if len(ready_input_sockets) == 0:
                                time.sleep(0.35)
                                if self._quit_lock.acquire(blocking = False):
                                        break
                                continue
                        # we may be in __del__ so we might fail here
                        try:
                                self._server.handle_request()
                        except Exception:
                                _log.exception('cannot serve RPC')
                                print("ERROR: cannot serve RPC")
                                break
                        if self._quit_lock.acquire(blocking = False):
                                break
                        time.sleep(0.25)
                        if self._quit_lock.acquire(blocking = False):
                                break
                # exit thread activity
                return

#=====================================================================
# main
#=====================================================================
if __name__ == "__main__":

        #-------------------------------
        class runner:
                def tell_time(self):
                        return time.asctime()

        #-------------------------------
        if (len(sys.argv) > 1) and (sys.argv[1] == 'test'):
                import xmlrpc.client
                try:
                        listener = cScriptingListener(macro_executor=runner(), port=9999)
                except Exception:
                        _log.exception('cannot instantiate scripting listener')
                        sys.exit(1)

                srv = xmlrpc.client.ServerProxy('http://localhost:9999')
                try:
                        t = srv.tell_time()
                        print(t)
                except Exception:
                        _log.exception('cannot interact with server')
                listener.shutdown()

Classes

class cScriptingListener (port=None, macro_executor=None, poll_interval: int = 3)

This class handles how GNUmed listens for external requests.

It starts an XML-RPC server and forks a thread which listens for incoming requests. Those requests are then handed over to a macro executor and the results handed back to the caller.

Setup the listener.

Args

port
Port number (int or str) to listen on. Always on .
macro_executor
Class processing GNUmed macros.
poll_interval
Listening socket select() timeout.
Expand source code
class cScriptingListener:
        """This class handles how GNUmed listens for external requests.

        It starts an XML-RPC server and forks a thread which
        listens for incoming requests. Those requests are then
        handed over to a macro executor and the results handed
        back to the caller.
        """
        def __init__(self, port=None, macro_executor=None, poll_interval:int=3):
                """Setup the listener.

                Args:
                        port: Port number (int or str) to listen on. Always on <localhost>.
                        macro_executor: Class processing GNUmed macros.
                        poll_interval: Listening socket select() timeout.
                """
                # listener thread will regularly try to acquire
                # this lock, when it succeeds it will quit
                self._quit_lock = threading.Lock()
                if not self._quit_lock.acquire(blocking = True, timeout = 2):
                        _log.error('cannot acquire thread quit lock !?! aborting')
                        raise OSError("cannot acquire thread quit-lock")

                # check for data every 'poll_interval' seconds
                self._poll_interval = poll_interval
                # localhost only for somewhat better security
                self._listener_address = '127.0.0.1'
                self._port = int(port)
                self._macro_executor = macro_executor

                self._server = xmlrpc.server.SimpleXMLRPCServer(addr=(self._listener_address, self._port), logRequests=False)
                self._server.register_instance(self._macro_executor)
                self._server.allow_reuse_address = True

                self._thread = threading.Thread (
                        target = self._process_RPCs,
                        name = self.__class__.__name__
                )
                self._thread.setDaemon(True)
                self._thread.start()

                _log.info('scripting listener started on [%s:%s]' % (self._listener_address, self._port))
                _log.info('macro executor: %s' % self._macro_executor)
                _log.info('poll interval: %s seconds', self._poll_interval)

        #-------------------------------
        # public API
        #-------------------------------
        def shutdown(self):
                """Cleanly shut down resources."""
                if self._thread is None:
                        return

                _log.info('stopping frontend scripting listener thread')
                self._quit_lock.release()
                try:
                        self._thread.join(self._poll_interval+5)
                        try:
                                if self._thread.is_alive():
                                        _log.error('listener thread still alive after join()')
                                        _log.debug('active threads: %s' % threading.enumerate())
                        except Exception:
                                pass
                except Exception:
                        print(sys.exc_info())
                self._thread = None
                try:
                        self._server.socket.shutdown(2)
                except Exception:
                        _log.exception('cannot cleanly shutdown(2) scripting listener socket')
                try:
                        self._server.socket.close()
                except Exception:
                        _log.exception('cannot cleanly close() scripting listener socket')

        #-------------------------------
        # internal helpers
        #-------------------------------
        def _process_RPCs(self):
                """Poll for incoming requests on the XML RPC server socket and invoke the request handler."""
                while 1:
                        if self._quit_lock.acquire(blocking = False):
                                break
                        time.sleep(0.35)                                        # give others time to acquire lock
                        if self._quit_lock.acquire(blocking = False):
                                break
                        # wait at most self.__poll_interval for new data
                        ready_input_sockets = select.select([self._server.socket], [], [], self._poll_interval)[0]
                        # any input available ?
                        if len(ready_input_sockets) == 0:
                                time.sleep(0.35)
                                if self._quit_lock.acquire(blocking = False):
                                        break
                                continue
                        # we may be in __del__ so we might fail here
                        try:
                                self._server.handle_request()
                        except Exception:
                                _log.exception('cannot serve RPC')
                                print("ERROR: cannot serve RPC")
                                break
                        if self._quit_lock.acquire(blocking = False):
                                break
                        time.sleep(0.25)
                        if self._quit_lock.acquire(blocking = False):
                                break
                # exit thread activity
                return

Methods

def shutdown(self)

Cleanly shut down resources.

Expand source code
def shutdown(self):
        """Cleanly shut down resources."""
        if self._thread is None:
                return

        _log.info('stopping frontend scripting listener thread')
        self._quit_lock.release()
        try:
                self._thread.join(self._poll_interval+5)
                try:
                        if self._thread.is_alive():
                                _log.error('listener thread still alive after join()')
                                _log.debug('active threads: %s' % threading.enumerate())
                except Exception:
                        pass
        except Exception:
                print(sys.exc_info())
        self._thread = None
        try:
                self._server.socket.shutdown(2)
        except Exception:
                _log.exception('cannot cleanly shutdown(2) scripting listener socket')
        try:
                self._server.socket.close()
        except Exception:
                _log.exception('cannot cleanly close() scripting listener socket')