Module Gnumed.pycommon.gmBackendListener
GNUmed database backend listener.
This module implements threaded listening for asynchronuous notifications from the database backend.
Expand source code
"""GNUmed database backend listener.
This module implements threaded listening for asynchronuous
notifications from the database backend.
"""
#=====================================================================
__author__ = "H. Herb <hherb@gnumed.net>, K.Hilbert <karsten.hilbert@gmx.net>"
__license__ = "GPL v2 or later"
import sys
import time
import threading
import select
import logging
if __name__ == '__main__':
sys.path.insert(0, '../../')
from Gnumed.pycommon import gmDispatcher
from Gnumed.pycommon import gmBorg
_log = logging.getLogger('gm.db')
signals2listen4 = [
'db_maintenance_warning', # warns of impending maintenance and asks for disconnect
'db_maintenance_disconnect', # announces a forced disconnect and disconnects
'gm_table_mod' # sent for any (registered) table modification, payload contains details
]
#=====================================================================
class gmBackendListener(gmBorg.cBorg):
"""The backend listener singleton class."""
def __init__(self, conn=None, poll_interval:int=3):
if hasattr(self, 'already_inited'):
return
assert conn, '<conn> must be given'
_log.info('setting up backend notifications listener')
self.debug = False
self.__notifications_received = 0
self.__messages_sent = 0
# the listener thread will regularly try to acquire
# this lock, when it succeeds the thread will quit
self._quit_lock = threading.Lock()
# take the lock now so it cannot be taken by the worker
# thread until it is released in shutdown()
if not self._quit_lock.acquire(blocking = False):
_log.error('cannot acquire thread-quit lock, aborting')
raise EnvironmentError("cannot acquire thread-quit lock")
self._conn = conn
_log.debug('DB listener connection: %s', self._conn)
self.backend_pid = self._conn.get_backend_pid()
_log.debug('notification listener connection has backend PID [%s]', self.backend_pid)
self._conn.set_isolation_level(0) # autocommit mode = psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT
self._cursor = self._conn.cursor()
try:
self._conn_fd = self._conn.fileno()
except AttributeError:
self._conn_fd = self._cursor.fileno()
self._conn_lock = threading.Lock() # lock for access to connection object
self.__register_interests()
# check for messages every 'poll_interval' seconds
self._poll_interval = poll_interval
self._listener_thread = None
self.__start_thread()
self.already_inited = True
#-------------------------------
# public API
#-------------------------------
def shutdown(self):
"""Cleanly shut down listening.
Unregister notifications. Rejoin listener thread.
"""
_log.debug('received %s notifications', self.__notifications_received)
_log.debug('sent %s messages', self.__messages_sent)
if self._listener_thread is None:
self.__shutdown_connection()
return
_log.info('stopping backend notifications listener thread')
self._quit_lock.release()
try:
# give the worker thread time to terminate
self._listener_thread.join(self._poll_interval+2.0)
try:
if self._listener_thread.is_alive():
_log.error('listener thread still alive after join()')
_log.debug('active threads: %s' % threading.enumerate())
except Exception:
pass
except Exception:
print(sys.exc_info())
self._listener_thread = None
try:
self.__unregister_unspecific_notifications()
except Exception:
_log.exception('unable to unregister unspecific notifications')
self.__shutdown_connection()
return
#-------------------------------
# event handlers
#-------------------------------
# internal helpers
#-------------------------------
def __register_interests(self):
# determine unspecific notifications
self.unspecific_notifications = signals2listen4
_log.info('configured unspecific notifications:')
_log.info('%s' % self.unspecific_notifications)
gmDispatcher.known_signals.extend(self.unspecific_notifications)
# listen to unspecific notifications
self.__register_unspecific_notifications()
#-------------------------------
def __register_unspecific_notifications(self):
for sig in self.unspecific_notifications:
_log.info('starting to listen for [%s]' % sig)
cmd = 'LISTEN "%s"' % sig
self._conn_lock.acquire(blocking = True)
try:
self._cursor.execute(cmd)
finally:
self._conn_lock.release()
#-------------------------------
def __unregister_unspecific_notifications(self):
for sig in self.unspecific_notifications:
_log.info('stopping to listen for [%s]' % sig)
cmd = 'UNLISTEN "%s"' % sig
self._conn_lock.acquire(1)
try:
self._cursor.execute(cmd)
finally:
self._conn_lock.release()
#-------------------------------
def __shutdown_connection(self):
_log.debug('shutting down connection with backend PID [%s]', self.backend_pid)
self._conn_lock.acquire(1)
try:
self._conn.rollback()
except Exception:
pass
finally:
self._conn_lock.release()
#-------------------------------
def __start_thread(self):
if self._conn is None:
raise ValueError("no connection to backend available, useless to start thread")
self._listener_thread = threading.Thread (
target = self._process_notifications,
name = self.__class__.__name__,
daemon = True
)
_log.info('starting listener thread')
self._listener_thread.start()
#-------------------------------
def __parse_notification(self, notification) -> dict:
if self.debug:
print(notification)
_log.debug('#%s: %s (first param: PID of sending backend; this backend: %s)', self.__notifications_received, notification, self.backend_pid)
payload = notification.payload.split('::')
data = {
'channel': notification.channel,
'notification_pid': notification.pid,
'operation': None,
'table': None,
'pk_column_name': None,
'pk_of_row': None,
'pk_identity': None
}
for item in payload:
if item.startswith('operation='):
data['operation'] = item.split('=')[1]
if item.startswith('table='):
data['table'] = item.split('=')[1]
if item.startswith('PK name='):
data['pk_column_name'] = item.split('=')[1]
if item.startswith('row PK='):
data['pk_of_row'] = int(item.split('=')[1])
if item.startswith('person PK='):
data['pk_identity'] = -1
tmp = item.split('=')[1]
if tmp != 'NULL':
try:
data['pk_identity'] = int(tmp)
except ValueError:
_log.error(payload)
_log.exception('error in change notification trigger')
return data
#-------------------------------
def __send_old_style_table_signal(self, data:dict):
if data['table'] is None:
return
self.__messages_sent += 1
signal = '%s_mod_db' % data['table']
_log.debug('emulating old-style table specific signal [%s]', signal)
try:
gmDispatcher.send (
signal = signal,
originated_in_database = True,
listener_pid = self.backend_pid,
sending_backend_pid = data['notification_pid'],
pk_identity = data['pk_identity'],
operation = data['operation'],
table = data['table'],
pk_column_name = data['pk_column_name'],
pk_of_row = data['pk_of_row'],
message_index = self.__messages_sent,
notification_index = self.__notifications_received
)
except Exception:
print("problem routing notification [%s] from backend [%s] to intra-client dispatcher" % (signal, data['notification_pid']))
print(sys.exc_info())
#-------------------------------
def __send_generic_signal(self, data:dict):
self.__messages_sent += 1
try:
gmDispatcher.send (
signal = data['channel'],
originated_in_database = True,
listener_pid = self.backend_pid,
sending_backend_pid = data['notification_pid'],
pk_identity = data['pk_identity'],
operation = data['operation'],
table = data['table'],
pk_column_name = data['pk_column_name'],
pk_of_row = data['pk_of_row'],
message_index = self.__messages_sent,
notification_index = self.__notifications_received
)
except Exception:
print("problem routing notification [%s] from backend [%s] to intra-client dispatcher" % (data['channel'], data['notification_pid']))
print(sys.exc_info())
#-------------------------------
# the actual thread code
#-------------------------------
def _process_notifications(self):
# loop until quitting
_have_quit_lock = None
while not _have_quit_lock:
# quitting ?
if self._quit_lock.acquire(0):
break
# wait at most self._poll_interval for new data
self._conn_lock.acquire(1)
try:
ready_input_sockets = select.select([self._conn_fd], [], [], self._poll_interval)[0]
finally:
self._conn_lock.release()
# any input available ?
if len(ready_input_sockets) == 0:
# no, select.select() timed out
# give others a chance to grab the conn lock (eg listen/unlisten)
time.sleep(0.3)
continue
# data available, wait for it to fully arrive
self._conn_lock.acquire(1)
try:
self._conn.poll()
finally:
self._conn_lock.release()
# any notifications ?
while len(self._conn.notifies) > 0:
# if self._quit_lock can be acquired we may be in
# __del__ in which case gmDispatcher is not
# guaranteed to exist anymore
if self._quit_lock.acquire(0):
_have_quit_lock = 1
break
self._conn_lock.acquire(1)
try:
notification = self._conn.notifies.pop()
finally:
self._conn_lock.release()
self.__notifications_received += 1
data = self.__parse_notification(notification)
# try sending intra-client signals:
self.__send_generic_signal(data)
self.__send_old_style_table_signal(data)
if self._quit_lock.acquire(0):
# there may be more notifications pendings
# but we don't care when quitting
_have_quit_lock = 1
break
# exit thread activity
return
#=====================================================================
# main
#=====================================================================
if __name__ == "__main__":
if len(sys.argv) < 2:
sys.exit()
if sys.argv[1] not in ['test', 'monitor']:
sys.exit()
notifies = 0
from Gnumed.pycommon import gmI18N
gmI18N.activate_locale()
gmI18N.install_domain(domain='gnumed')
from Gnumed.pycommon import gmPG2
from Gnumed.business import gmPerson, gmPersonSearch
#-------------------------------
def run_test():
#-------------------------------
def dummy(n):
return float(n)*n/float(1+n)
#-------------------------------
def OnPatientModified():
global notifies
notifies += 1
sys.stdout.flush()
print("\nBackend says: patient data has been modified (%s. notification)" % notifies)
#-------------------------------
try:
n = int(sys.argv[2])
except Exception:
print("You can set the number of iterations\nwith the second command line argument")
n = 100000
# try loop without backend listener
print("Looping", n, "times through dummy function")
i = 0
t1 = time.time()
while i < n:
dummy(i)
i += 1
t2 = time.time()
t_nothreads = t2-t1
print("Without backend thread, it took", t_nothreads, "seconds")
listener = gmBackendListener(conn = gmPG2.get_raw_connection())
# now try with listener to measure impact
print("Now in a new shell connect psql to the")
print("database <gnumed_vXX> on localhost, return")
print("here and hit <enter> to continue.")
input('hit <enter> when done starting psql')
print("You now have about 30 seconds to go")
print("to the psql shell and type")
print(" notify patient_changed<enter>")
print("several times.")
print("This should trigger our backend listening callback.")
print("You can also try to stop the demo with Ctrl-C !")
#listener.register_callback('patient_changed', OnPatientModified)
try:
counter = 0
while counter < 20:
counter += 1
time.sleep(1)
sys.stdout.flush()
print('.')
print("Looping",n,"times through dummy function")
i = 0
t1 = time.time()
while i < n:
dummy(i)
i += 1
t2 = time.time()
t_threaded = t2-t1
print("With backend thread, it took", t_threaded, "seconds")
print("Difference:", t_threaded-t_nothreads)
except KeyboardInterrupt:
print("cancelled by user")
listener.shutdown()
#-------------------------------
def run_monitor():
print("starting up backend notifications monitor")
def monitoring_callback(*args, **kwargs):
try:
kwargs['originated_in_database']
print('==> got notification from database "%s":' % kwargs['signal'])
except KeyError:
print('==> received signal from client: "%s"' % kwargs['signal'])
del kwargs['signal']
for key in kwargs:
print(' [%s]: %s' % (key, kwargs[key]))
gmDispatcher.connect(receiver = monitoring_callback)
listener = gmBackendListener(conn = gmPG2.get_raw_connection())
print("listening for the following notifications:")
print("1) unspecific:")
for sig in listener.unspecific_notifications:
print(' - %s' % sig)
while True:
pat = gmPersonSearch.ask_for_patient()
if pat is None:
break
print("found patient", pat)
gmPerson.set_active_patient(patient=pat)
print("now waiting for notifications, hit <ENTER> to select another patient")
input()
print("cleanup")
listener.shutdown()
print("shutting down backend notifications monitor")
#-------------------------------
gmPG2.request_login_params(setup_pool = True, force_tui = True)
if sys.argv[1] == 'monitor':
run_monitor()
else:
run_test()
#=====================================================================
Classes
class gmBackendListener (conn=None, poll_interval: int = 3)
-
The backend listener singleton class.
Expand source code
class gmBackendListener(gmBorg.cBorg): """The backend listener singleton class.""" def __init__(self, conn=None, poll_interval:int=3): if hasattr(self, 'already_inited'): return assert conn, '<conn> must be given' _log.info('setting up backend notifications listener') self.debug = False self.__notifications_received = 0 self.__messages_sent = 0 # the listener thread will regularly try to acquire # this lock, when it succeeds the thread will quit self._quit_lock = threading.Lock() # take the lock now so it cannot be taken by the worker # thread until it is released in shutdown() if not self._quit_lock.acquire(blocking = False): _log.error('cannot acquire thread-quit lock, aborting') raise EnvironmentError("cannot acquire thread-quit lock") self._conn = conn _log.debug('DB listener connection: %s', self._conn) self.backend_pid = self._conn.get_backend_pid() _log.debug('notification listener connection has backend PID [%s]', self.backend_pid) self._conn.set_isolation_level(0) # autocommit mode = psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT self._cursor = self._conn.cursor() try: self._conn_fd = self._conn.fileno() except AttributeError: self._conn_fd = self._cursor.fileno() self._conn_lock = threading.Lock() # lock for access to connection object self.__register_interests() # check for messages every 'poll_interval' seconds self._poll_interval = poll_interval self._listener_thread = None self.__start_thread() self.already_inited = True #------------------------------- # public API #------------------------------- def shutdown(self): """Cleanly shut down listening. Unregister notifications. Rejoin listener thread. """ _log.debug('received %s notifications', self.__notifications_received) _log.debug('sent %s messages', self.__messages_sent) if self._listener_thread is None: self.__shutdown_connection() return _log.info('stopping backend notifications listener thread') self._quit_lock.release() try: # give the worker thread time to terminate self._listener_thread.join(self._poll_interval+2.0) try: if self._listener_thread.is_alive(): _log.error('listener thread still alive after join()') _log.debug('active threads: %s' % threading.enumerate()) except Exception: pass except Exception: print(sys.exc_info()) self._listener_thread = None try: self.__unregister_unspecific_notifications() except Exception: _log.exception('unable to unregister unspecific notifications') self.__shutdown_connection() return #------------------------------- # event handlers #------------------------------- # internal helpers #------------------------------- def __register_interests(self): # determine unspecific notifications self.unspecific_notifications = signals2listen4 _log.info('configured unspecific notifications:') _log.info('%s' % self.unspecific_notifications) gmDispatcher.known_signals.extend(self.unspecific_notifications) # listen to unspecific notifications self.__register_unspecific_notifications() #------------------------------- def __register_unspecific_notifications(self): for sig in self.unspecific_notifications: _log.info('starting to listen for [%s]' % sig) cmd = 'LISTEN "%s"' % sig self._conn_lock.acquire(blocking = True) try: self._cursor.execute(cmd) finally: self._conn_lock.release() #------------------------------- def __unregister_unspecific_notifications(self): for sig in self.unspecific_notifications: _log.info('stopping to listen for [%s]' % sig) cmd = 'UNLISTEN "%s"' % sig self._conn_lock.acquire(1) try: self._cursor.execute(cmd) finally: self._conn_lock.release() #------------------------------- def __shutdown_connection(self): _log.debug('shutting down connection with backend PID [%s]', self.backend_pid) self._conn_lock.acquire(1) try: self._conn.rollback() except Exception: pass finally: self._conn_lock.release() #------------------------------- def __start_thread(self): if self._conn is None: raise ValueError("no connection to backend available, useless to start thread") self._listener_thread = threading.Thread ( target = self._process_notifications, name = self.__class__.__name__, daemon = True ) _log.info('starting listener thread') self._listener_thread.start() #------------------------------- def __parse_notification(self, notification) -> dict: if self.debug: print(notification) _log.debug('#%s: %s (first param: PID of sending backend; this backend: %s)', self.__notifications_received, notification, self.backend_pid) payload = notification.payload.split('::') data = { 'channel': notification.channel, 'notification_pid': notification.pid, 'operation': None, 'table': None, 'pk_column_name': None, 'pk_of_row': None, 'pk_identity': None } for item in payload: if item.startswith('operation='): data['operation'] = item.split('=')[1] if item.startswith('table='): data['table'] = item.split('=')[1] if item.startswith('PK name='): data['pk_column_name'] = item.split('=')[1] if item.startswith('row PK='): data['pk_of_row'] = int(item.split('=')[1]) if item.startswith('person PK='): data['pk_identity'] = -1 tmp = item.split('=')[1] if tmp != 'NULL': try: data['pk_identity'] = int(tmp) except ValueError: _log.error(payload) _log.exception('error in change notification trigger') return data #------------------------------- def __send_old_style_table_signal(self, data:dict): if data['table'] is None: return self.__messages_sent += 1 signal = '%s_mod_db' % data['table'] _log.debug('emulating old-style table specific signal [%s]', signal) try: gmDispatcher.send ( signal = signal, originated_in_database = True, listener_pid = self.backend_pid, sending_backend_pid = data['notification_pid'], pk_identity = data['pk_identity'], operation = data['operation'], table = data['table'], pk_column_name = data['pk_column_name'], pk_of_row = data['pk_of_row'], message_index = self.__messages_sent, notification_index = self.__notifications_received ) except Exception: print("problem routing notification [%s] from backend [%s] to intra-client dispatcher" % (signal, data['notification_pid'])) print(sys.exc_info()) #------------------------------- def __send_generic_signal(self, data:dict): self.__messages_sent += 1 try: gmDispatcher.send ( signal = data['channel'], originated_in_database = True, listener_pid = self.backend_pid, sending_backend_pid = data['notification_pid'], pk_identity = data['pk_identity'], operation = data['operation'], table = data['table'], pk_column_name = data['pk_column_name'], pk_of_row = data['pk_of_row'], message_index = self.__messages_sent, notification_index = self.__notifications_received ) except Exception: print("problem routing notification [%s] from backend [%s] to intra-client dispatcher" % (data['channel'], data['notification_pid'])) print(sys.exc_info()) #------------------------------- # the actual thread code #------------------------------- def _process_notifications(self): # loop until quitting _have_quit_lock = None while not _have_quit_lock: # quitting ? if self._quit_lock.acquire(0): break # wait at most self._poll_interval for new data self._conn_lock.acquire(1) try: ready_input_sockets = select.select([self._conn_fd], [], [], self._poll_interval)[0] finally: self._conn_lock.release() # any input available ? if len(ready_input_sockets) == 0: # no, select.select() timed out # give others a chance to grab the conn lock (eg listen/unlisten) time.sleep(0.3) continue # data available, wait for it to fully arrive self._conn_lock.acquire(1) try: self._conn.poll() finally: self._conn_lock.release() # any notifications ? while len(self._conn.notifies) > 0: # if self._quit_lock can be acquired we may be in # __del__ in which case gmDispatcher is not # guaranteed to exist anymore if self._quit_lock.acquire(0): _have_quit_lock = 1 break self._conn_lock.acquire(1) try: notification = self._conn.notifies.pop() finally: self._conn_lock.release() self.__notifications_received += 1 data = self.__parse_notification(notification) # try sending intra-client signals: self.__send_generic_signal(data) self.__send_old_style_table_signal(data) if self._quit_lock.acquire(0): # there may be more notifications pendings # but we don't care when quitting _have_quit_lock = 1 break # exit thread activity return
Ancestors
Methods
def shutdown(self)
-
Cleanly shut down listening.
Unregister notifications. Rejoin listener thread.
Expand source code
def shutdown(self): """Cleanly shut down listening. Unregister notifications. Rejoin listener thread. """ _log.debug('received %s notifications', self.__notifications_received) _log.debug('sent %s messages', self.__messages_sent) if self._listener_thread is None: self.__shutdown_connection() return _log.info('stopping backend notifications listener thread') self._quit_lock.release() try: # give the worker thread time to terminate self._listener_thread.join(self._poll_interval+2.0) try: if self._listener_thread.is_alive(): _log.error('listener thread still alive after join()') _log.debug('active threads: %s' % threading.enumerate()) except Exception: pass except Exception: print(sys.exc_info()) self._listener_thread = None try: self.__unregister_unspecific_notifications() except Exception: _log.exception('unable to unregister unspecific notifications') self.__shutdown_connection() return