Module Gnumed.pycommon.gmConnectionPool

GNUmed connection pooler.

Currently, only readonly connections are pooled.

This pool is (supposedly) thread safe.

Expand source code
# -*- coding: utf-8 -*-

"""GNUmed connection pooler.

Currently, only readonly connections are pooled.

This pool is (supposedly) thread safe.
"""
#============================================================
# SPDX-License-Identifier: GPL-2.0-or-later
__author__ = "karsten.hilbert@gmx.net"
__license__ = "GPL v2 or later (details at https://www.gnu.org)"


_DISABLE_CONNECTION_POOL = False                # set to True to disable the connection pool for debugging (= always return new connection)
_VERBOSE_PG_LOG = False                                 # set to True to force-enable verbose connections

# standard library imports
import os
import sys
import inspect
import logging
import threading
import types
import re as regex
import datetime as pydt


# 3rd party library imports
import psycopg2
import psycopg2.extensions
import psycopg2.extras
import psycopg2.errors
import psycopg2.errorcodes as PG_error_codes


# GNUmed module imports
if __name__ == '__main__':
        sys.path.insert(0, '../../')
from Gnumed.pycommon import gmLog2
from Gnumed.pycommon import gmBorg
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmDateTime

# globals
_log = logging.getLogger('gm.db_pool')

_log.info('psycopg2 module version: %s' % psycopg2.__version__)                         # type: ignore [attr-defined]
_log.info('PostgreSQL via DB-API module "%s": API level %s, thread safety %s, parameter style "%s"' % (psycopg2, psycopg2.apilevel, psycopg2.threadsafety, psycopg2.paramstyle))
_log.info('libpq version (compiled in): %s', psycopg2.__libpq_version__)
_log.info('libpq version (loaded now) : %s', psycopg2.extensions.libpq_version())
#if '2.8' in psycopg2.__version__:
#       _log.info('psycopg2 v2.8 detected, disabling connection pooling for the time being')
#       _DISABLE_CONNECTION_POOL = True

if not (float(psycopg2.apilevel) >= 2.0):
        raise ImportError('gmPG2: supported DB-API level too low')

if psycopg2.threadsafety != 2:
        raise ImportError('gmPG2: lacking minimum thread safety in psycopg2')

if psycopg2.paramstyle != 'pyformat':
        raise ImportError('gmPG2: lacking pyformat (%%(<name>)s style) placeholder support in psycopg2')

if 'dt' not in psycopg2.__version__:                            # type: ignore [attr-defined]
        raise ImportError('gmPG2: lacking datetime support in psycopg2')

if 'ext' not in psycopg2.__version__:                           # type: ignore [attr-defined]
        raise ImportError('gmPG2: lacking extensions support in psycopg2')

if 'pq3' not in psycopg2.__version__:                           # type: ignore [attr-defined]
        raise ImportError('gmPG2: lacking v3 backend protocol support in psycopg2')


# CONSTANTS
_SQL_expand_tz_name = """
SELECT DISTINCT ON (abbrev) name
FROM pg_timezone_names
WHERE
        abbrev = %(tz)s
                AND
        name ~ '^[^/]+/[^/]+$'
                AND
        name !~ '^Etc/'
"""

dbapi = psycopg2                                                # for external use
postgresql_version = None

_timestamp_template = "cast('%s' as timestamp with time zone)"          # MUST NOT be uniocde or else getquoted will not work (true in py3 ?)

_map_psyco_tx_status2str = [
        'TRANSACTION_STATUS_IDLE',
        'TRANSACTION_STATUS_ACTIVE',
        'TRANSACTION_STATUS_INTRANS',
        'TRANSACTION_STATUS_INERROR',
        'TRANSACTION_STATUS_UNKNOWN'
]

_map_psyco_conn_status2str = [
        '0 - ?',
        'STATUS_READY',
        'STATUS_BEGIN_ALIAS_IN_TRANSACTION',
        'STATUS_PREPARED'
]

_map_psyco_iso_level2str = {
        None: 'ISOLATION_LEVEL_DEFAULT (configured on server)',
        0: 'ISOLATION_LEVEL_AUTOCOMMIT',
        1: 'ISOLATION_LEVEL_READ_UNCOMMITTED',
        2: 'ISOLATION_LEVEL_REPEATABLE_READ',
        3: 'ISOLATION_LEVEL_SERIALIZABLE',
        4: 'ISOLATION_LEVEL_READ_UNCOMMITTED'
}

_connection_loss_markers = [
        'terminating connection due to administrator command'
]

#============================================================
class cPGCredentials:
        """Holds PostgreSQL credentials"""

        def __init__(self) -> None:
                self.__host:str = None                  # None: left out -> defaults to $PGHOST or implicit <localhost>
                self.__port:int = None                  # None: left out -> defaults to $PGPORT or libpq compiled-in default (typically 5432)
                self.__database:str = None              # must be set before connecting
                self.__user:str = None                  # must be set before connecting
                self.__password:str = None              # None: left out
                                                                                # -> try password-less connect (TRUST/IDENT/PEER)
                                                                                # -> try connect with password from <passfile> parameter or $PGPASSFILE or ~/.pgpass

        #--------------------------------------------------
        # properties
        #--------------------------------------------------
        def __format_credentials(self) -> str:
                """Database credentials formatted as string."""
                cred_parts = [
                        'dbname=%s' % self.__database,
                        'host=%s' % self.__host,
                        'port=%s' % self.__port,
                        'user=%s' % self.__user
                ]
                return ' '.join(cred_parts)

        formatted_credentials = property(__format_credentials)

        #--------------------------------------------------
        def generate_credentials_kwargs(self, connection_name:str=None) -> dict:
                """Return dictionary with credentials suitable for psycopg2.connection() keyword arguments."""
                assert (self.__database is not None), 'self.__database must be defined'
                assert (self.__user is not None), 'self.__user must be defined'

                kwargs = {
                        'dbname': self.__database,
                        'user': self.__user,
                        'application_name': gmTools.coalesce(connection_name, 'GNUmed'),
                        'fallback_application_name': 'GNUmed',
                        'sslmode': 'prefer',
                        # try to enforce a useful encoding early on so that we
                        # have a good chance of decoding authentication errors
                        # containing foreign language characters
                        'client_encoding': 'UTF8'
                }
                if self.__host is not None:
                        kwargs['host'] = self.__host
                if self.__port is not None:
                        kwargs['port'] = self.__port
                if self.__password is not None:
                        kwargs['password'] = self.__password
                return kwargs

        credentials_kwargs = property(generate_credentials_kwargs)

        #--------------------------------------------------
        def _get_database(self) -> str:
                return self.__database

        def _set_database(self, database:str=None):
                assert database, '<database> must not be None'
                assert database.strip(), '<database> must not be empty'
                assert ('salaam.homeunix' not in database), 'The public database is not hosted by <salaam.homeunix.com> anymore.\n\nPlease point your configuration files to <publicdb.gnumed.de>.'
                self.__database = database.strip()
                _log.info('[%s]', self.__database)

        database = property(_get_database, _set_database)

        #--------------------------------------------------
        def _get_host(self) -> str:
                return self.__host

        def _set_host(self, host:str=None):
                if host is not None:
                        host = host.strip()
                        if host == '':
                                host = None
                self.__host = host
                _log.info('[%s]', self.__host)

        host = property(_get_host, _set_host)

        #--------------------------------------------------
        def _get_port(self) -> int:
                return self.__port

        def _set_port(self, port=None):
                _log.info('[%s]', port)
                if port is None:
                        self.__port = None
                        return

                self.__port = int(port)

        port = property(_get_port, _set_port)

        #--------------------------------------------------
        def _get_user(self) -> str:
                return self.__user

        def _set_user(self, user:str=None):
                assert (user is not None), '<user> must not be None'
                assert (user.strip() != ''), '<user> must not be empty'
                self.__user = user.strip()
                _log.info('[%s]', self.__user)

        user = property(_get_user, _set_user)

        #--------------------------------------------------
        def _get_password(self) -> str:
                return self.__password

        def _set_password(self, password:str=None):
                if password is not None:
                        gmLog2.add_word2hide(password)
                self.__password = password
                _log.info('password was set')

        password = property(_get_password, _set_password)

#============================================================
class gmConnectionPool(gmBorg.cBorg):
        """The Singleton connection pool class.

        Any normal connection from GNUmed to PostgreSQL should go
        through this pool. It needs credentials to be provided
        via .credentials = <cPGCredentials>.
        """
        def __init__(self) -> None:
                try:
                        self.__initialized
                        return

                except AttributeError:
                        self.__initialized:bool = True

                _log.info('[%s]: first instantiation', self.__class__.__name__)
                self.__ro_conn_pool:dict[str, psycopg2.extras.DictConnection] = {}      # keyed by "credentials::thread ID"
                self.__SQL_set_client_timezone:str = None
                self.__client_timezone = None
                self.__creds:cPGCredentials = None
                self.__log_auth_environment()

        #--------------------------------------------------
        # connection API
        #--------------------------------------------------
        def get_connection(self, readonly:bool=True, verbose:bool=False, pooled:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection:
                """Provide a database connection.

                Readonly connections can be pooled. If there is no
                suitable connection in the pool a new one will be
                created and stored. The pool is per-thread and
                per-credentials.

                Args:
                        readonly: make connection read only
                        verbose: make connection log more things
                        pooled: return a pooled connection, if possible
                        connection_name: a human readable name for the connection, avoid spaces
                        autocommit: whether to autocommit
                        credentials: use for getting a connection with other credentials different from what the pool was set to before

                Returns:
                        a working connection to a PostgreSQL database
                """
#               if _DISABLE_CONNECTION_POOL:
#                       pooled = False

                if credentials is not None:
                        pooled = False
                conn = None
                if readonly and pooled:
                        try:
                                conn = self.__ro_conn_pool[self.pool_key]
                        except KeyError:
                                _log.info('pooled RO conn with key [%s] requested, but not in pool, setting up', self.pool_key)
                        if conn is not None:
                                #if verbose:
                                #       _log.debug('using pooled conn [%s]', self.pool_key)
                                return conn

                if conn is None:
                        conn = self.get_raw_connection (
                                verbose = verbose,
                                readonly = readonly,
                                connection_name = connection_name,
                                autocommit = autocommit,
                                credentials = credentials
                        )
                if readonly and pooled:
                        # monkey patch close() for pooled RO connections
                        conn.original_close = conn.close                                                                # type: ignore [attr-defined]
                        conn.close = _raise_exception_on_pooled_ro_conn_close                   # type: ignore [assignment]
                # set connection properties
                # - client encoding
                encoding = 'UTF8'
                _log.debug('setting client (wire) encoding to %s', encoding)
                conn.set_client_encoding(encoding)
                # - transaction isolation level
                if not readonly:
                        conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
                # - client time zone
                _log.debug('client timezone [%s]', self.__client_timezone)
                curs = conn.cursor()
                curs.execute(self.__SQL_set_client_timezone, {'tz': self.__client_timezone})
                curs.close()
                conn.commit()
                if readonly and pooled:
                        _log.debug('putting RO conn with key [%s] into pool', self.pool_key)
                        self.__ro_conn_pool[self.pool_key] = conn
                if verbose:
                        log_conn_state(conn)
                return conn

        #--------------------------------------------------
        def get_rw_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection:
                return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)

        #--------------------------------------------------
        def get_ro_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection:
                return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)

        #--------------------------------------------------
        def get_raw_connection(self, verbose:bool=False, readonly:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection:
                """Get a raw, unadorned connection.

                This will not set any parameters such as encoding,
                timezone, or datestyle, hence it can be used for
                "service" connections for verifying encodings etc
                """
#               # FIXME: support verbose
                if credentials is None:
                        creds2use = self.__creds
                else:
                        creds2use = credentials
                creds_kwargs = creds2use.generate_credentials_kwargs(connection_name = connection_name)
                try:
                        # DictConnection now _is_ a real dictionary
                        conn = psycopg2.connect(connection_factory = psycopg2.extras.DictConnection, **creds_kwargs)
                except psycopg2.OperationalError as e:
                        _log.error('failed to establish connection [%s]', creds2use.formatted_credentials)
                        t, v, tb = sys.exc_info()
                        try:
                                msg = e.args[0]
                        except (AttributeError, IndexError, TypeError):
                                raise

                        if not self.__is_auth_fail_msg(msg):
                                raise

                        raise cAuthenticationError(creds2use.formatted_credentials, msg).with_traceback(tb)

                _log.debug('established connection "%s", backend PID: %s', gmTools.coalesce(connection_name, 'anonymous'), conn.get_backend_pid())
                # safe-guard
                conn._original_rollback = conn.rollback
                conn.rollback = types.MethodType(_safe_transaction_rollback, conn)

                # - inspect server
                self.__log_on_first_contact(conn)
                # - verify PG understands client time zone
                self.__detect_client_timezone(conn)
                # - set access mode
                if readonly:
                        _log.debug('readonly: forcing autocommit=True to avoid <IDLE IN TRANSACTION>')
                        autocommit = True
                else:
                        _log.debug('autocommit is desired to be: %s', autocommit)
                conn.commit()
                conn.autocommit = autocommit
                conn.readonly = readonly
                # - assume verbose=True to mean we want debugging in the database, too
                if verbose or _VERBOSE_PG_LOG:
                        _log.debug('enabling <plpgsql.extra_warnings/_errors>')
                        curs = conn.cursor()
                        try:
                                curs.execute("SET plpgsql.extra_warnings TO 'all'")
                        except Exception:
                                _log.exception('cannot enable <plpgsql.extra_warnings>')
                        finally:
                                curs.close()
                                conn.commit()
                        curs = conn.cursor()
                        try:
                                curs.execute("SET plpgsql.extra_errors TO 'all'")
                        except Exception:
                                _log.exception('cannot enable <plpgsql.extra_errors>')
                        finally:
                                curs.close()
                                conn.commit()
                        _log.debug('enabling auto_explain')
                        curs = conn.cursor()
                        try:
                                curs.execute("SELECT gm.load_auto_explain(3000)")
                        except Exception:
                                _log.exception('cannot enable auto_explain')
                        finally:
                                curs.close()
                                conn.commit()
                return conn

        #--------------------------------------------------
        def get_dbowner_connection(self, readonly:bool=True, verbose:bool=False, connection_name:str=None, autocommit:bool=False, dbo_password:str=None, dbo_account:str='gm-dbo') -> psycopg2.extras.DictConnection:
                """Return a connection for the database owner.

                Will not touch the pool.
                """
                dbo_creds = cPGCredentials()
                dbo_creds.user = dbo_account
                dbo_creds.password = dbo_password
                dbo_creds.database = self.__creds.database
                dbo_creds.host = self.__creds.host
                dbo_creds.port = self.__creds.port
                return self.get_connection (
                        pooled = False,
                        readonly = readonly,
                        verbose = verbose,
                        connection_name = connection_name,
                        autocommit = autocommit,
                        credentials = dbo_creds
                )

        #--------------------------------------------------
        def discard_pooled_connection_of_thread(self):
                """Discard from pool the connection of the current thread."""
                try:
                        conn = self.__ro_conn_pool[self.pool_key]
                except KeyError:
                        _log.debug('no connection pooled for thread [%s]', self.pool_key)
                        return

                del self.__ro_conn_pool[self.pool_key]
                if conn.closed:
                        return

                conn.close = conn.original_close
                conn.close()

        #--------------------------------------------------
        def shutdown(self):
                """Close and discard all pooled connections."""
                for conn_key in self.__ro_conn_pool:
                        conn = self.__ro_conn_pool[conn_key]
                        if conn.closed:
                                continue
                        _log.debug('closing open database connection, pool key: %s', conn_key)
                        log_conn_state(conn)
                        conn.close = conn.original_close
                        conn.close()
                del self.__ro_conn_pool

        #--------------------------------------------------
        # utility functions
        #--------------------------------------------------
        def __log_on_first_contact(self, conn:psycopg2.extras.DictConnection):
                global postgresql_version
                if postgresql_version is not None:
                        return

                _log.debug('_\\\\// heed Prime Directive _\\\\//')
                # FIXME: verify PG version
                curs = conn.cursor()
                curs.execute ("""
                        SELECT
                                substring(setting, E'^\\\\d{1,2}\\\\.\\\\d{1,2}')::numeric AS version
                        FROM
                                pg_settings
                        WHERE
                                name = 'server_version'"""
                )
                postgresql_version = curs.fetchone()['version']
                _log.info('PostgreSQL version (numeric): %s' % postgresql_version)
                try:
                        curs.execute("SELECT pg_size_pretty(pg_database_size(current_database()))")
                        _log.info('database size: %s', curs.fetchone()[0])
                except Exception:
                        _log.exception('cannot get database size')
                finally:
                        curs.close()
                        conn.commit()
                curs = conn.cursor()
                log_pg_settings(curs = curs)
                curs.close()
                conn.commit()
                _log.debug('done')

        #--------------------------------------------------
        def __log_auth_environment(self):
                pgpass_file = os.path.expanduser(os.path.join('~', '.pgpass'))
                if os.path.exists(pgpass_file):
                        _log.debug('standard .pgpass (%s) exists', pgpass_file)
                else:
                        _log.debug('standard .pgpass (%s) not found', pgpass_file)
                pgpass_var = os.getenv('PGPASSFILE')
                if pgpass_var is None:
                        _log.debug('$PGPASSFILE not set')
                else:
                        if os.path.exists(pgpass_var):
                                _log.debug('$PGPASSFILE=%s -> file exists', pgpass_var)
                        else:
                                _log.debug('$PGPASSFILE=%s -> file not found')

        #--------------------------------------------------
        def __detect_client_timezone(self, conn:psycopg2.extras.DictConnection):
                """This is run on the very first connection."""

                if self.__client_timezone is not None:
                        return

                _log.debug('trying to detect timezone from system')
                # we need gmDateTime to be initialized
                if gmDateTime.current_local_iso_numeric_timezone_string is None:
                        gmDateTime.init()
                tz_candidates = [gmDateTime.current_local_timezone_name]
                try:
                        tz_candidates.append(os.environ['TZ'])
                except KeyError:
                        pass
                expanded_tzs = []
                for tz in tz_candidates:
                        expanded = self.__expand_timezone(conn, timezone = tz)
                        if expanded != tz:
                                expanded_tzs.append(expanded)
                tz_candidates.extend(expanded_tzs)
                _log.debug('candidates: %s', tz_candidates)
                # find best among candidates
                found = False
                for tz in tz_candidates:
                        if self.__validate_timezone(conn = conn, timezone = tz):
                                self.__client_timezone = tz
                                self.__SQL_set_client_timezone = 'SET timezone TO %(tz)s'
                                found = True
                                break
                if not found:
                        self.__client_timezone = gmDateTime.current_local_iso_numeric_timezone_string
                        self.__SQL_set_client_timezone = 'set time zone interval %(tz)s hour to minute'
                _log.info('client system timezone detected as equivalent to [%s]', self.__client_timezone)
                # FIXME: check whether server.timezone is the same
                # FIXME: value as what we eventually detect

        #--------------------------------------------------
        def __expand_timezone(self, conn:psycopg2.extras.DictConnection, timezone:str):
                """Some timezone defs are abbreviations so try to expand
                them because "set time zone" doesn't take abbreviations"""

                cmd = _SQL_expand_tz_name
                args = {'tz': timezone}
                conn.commit()
                curs = conn.cursor()
                result = timezone
                try:
                        curs.execute(cmd, args)
                        rows = curs.fetchall()
                except Exception:
                        _log.exception('cannot expand timezone abbreviation [%s]', timezone)
                finally:
                        curs.close()
                        conn.rollback()
                if rows:
                        result = rows[0]['name']
                        _log.debug('[%s] maps to [%s]', timezone, result)
                return result

        #---------------------------------------------------
        def __validate_timezone(self, conn:psycopg2.extras.DictConnection, timezone:str) -> bool:
                _log.debug('validating timezone [%s]', timezone)
                cmd = 'SET timezone TO %(tz)s'
                args = {'tz': timezone}
                curs = conn.cursor()
                try:
                        curs.execute(cmd, args)
                except psycopg2.DataError:
                        _log.warning('timezone [%s] is not settable', timezone)
                        return False

                except Exception:
                        _log.exception('failed to set timezone to [%s]', timezone)
                        return False

                finally:
                        conn.rollback()
                _log.info('time zone [%s] is settable', timezone)
                # can we actually use it, though ?
                SQL = "SELECT '1931-03-26 11:11:11+0'::timestamp with time zone"
                try:
                        curs.execute(SQL)
                        curs.fetchone()
                except Exception:
                        _log.exception('error using timezone [%s]', timezone)
                        return False

                finally:
                        curs.close()
                        conn.rollback()
                _log.info('timezone [%s] is usable', timezone)
                return True

        #--------------------------------------------------
        # properties
        #--------------------------------------------------
        def _get_credentials(self) -> cPGCredentials:
                return self.__creds

        def _set_credentials(self, creds:cPGCredentials=None):
                if self.__creds is None:
                        self.__creds = creds
                        return

                _log.debug('invalidating pooled connections on credentials change')
                pool_key_start_from_curr_creds = self.__creds.formatted_credentials + '::thread='
                for pool_key in self.__ro_conn_pool:
                        if not pool_key.startswith(pool_key_start_from_curr_creds):
                                continue
                        conn = self.__ro_conn_pool[pool_key]
                        del self.__ro_conn_pool[pool_key]
                        if conn.closed:
                                del conn
                                continue
                        _log.debug('closing open database connection, pool key: %s', pool_key)
                        log_conn_state(conn)
                        conn.original_close()                                                                   # type: ignore [attr-defined]
                        del conn
                self.__creds = creds

        credentials = property(_get_credentials, _set_credentials)

        #--------------------------------------------------
        def _get_pool_key(self) -> str:
                return '%s::thread=%s' % (
                        self.__creds.formatted_credentials,
                        threading.current_thread().ident
                )

        pool_key = property(_get_pool_key)

        #--------------------------------------------------
        def __is_auth_fail_msg(self, msg:str) -> bool:
                if 'fe_sendauth' in msg:
                        return True

                if regex.search(r'user ".*" does not exist', msg) is not None:
                        return True

                if 'uthenti' in msg:
                        return True

                if ((
                                (regex.search(r'user ".*"', msg) is not None)
                                        or
                                (regex.search(r'(R|r)ol{1,2}e', msg) is not None)
                        )
                        and ('exist' in msg)
                        and (regex.search(r'n(o|ich)t', msg) is not None)
                ):
                        return True

                # to the best of our knowledge
                return False

#============================================================
# internal helpers
#------------------------------------------------------------
def exception_is_connection_loss(exc: Exception) -> bool:
        """Checks whether exception represents connection loss."""
        if not isinstance(exc, psycopg2.Error):
                # not a PG/psycopg2 exception
                return False

        try:
                if isinstance(exc, psycopg2.errors.AdminShutdown):
                        _log.debug('indicates connection loss due to admin shutdown')
                        return True

        except AttributeError:  # psycopg2 2.7/2.8 transition (no AdminShutdown exception)
                pass
        try:
                msg = '%s' % exc.args[0]
        except (AttributeError, IndexError, TypeError):
                _log.debug('cannot extract message from exception')
                return False

        _log.debug('interpreting: %s', msg)
        for snippet in _connection_loss_markers:
                if snippet in msg:
                        _log.debug('indicates connection loss')
                        return True

        is_conn_loss = (
                # OperationalError
                ('erver' in msg)
                        and
                (
                        ('terminat' in msg)
                                or
                        ('abnorm' in msg)
                                or
                        ('end' in msg)
                                or
                        ('no route' in msg)
                )
        ) or (
                # InterfaceError
                ('onnect' in msg)
                        and
                (
                        ('close' in msg)
                                or
                        ('end' in msg)
                )
        )
        if is_conn_loss:
                _log.debug('indicates connection loss')
        return is_conn_loss

#------------------------------------------------------------
def log_pg_exception_details(exc: Exception) -> bool:
        """Logs details from a database exception."""
        if not isinstance(exc, psycopg2.Error):
                return False

        _log.error(type(exc))
        try:
                for arg in exc.args:
                        _log.debug('exc.arg: %s', arg)
        except AttributeError:
                _log.debug('exception has no <.args>')
        _log.debug('pgerror: [%s]', exc.pgerror)
        if exc.pgcode is None:
                _log.debug('pgcode : %s', exc.pgcode)
        else:
                _log.debug('pgcode : %s (%s)', exc.pgcode, PG_error_codes.lookup(exc.pgcode))
        log_cursor_state(exc.cursor)
        try:
                diags = exc.diag
        except AttributeError:
                _log.debug('<.diag> not available')
                diags = None
        if diags is None:
                return True

        for attr in dir(diags):
                if attr.startswith('__'):
                        continue
                val = getattr(diags, attr)
                if val is None:
                        continue
                _log.debug('%s: %s', attr, val)
        return True

#--------------------------------------------------
def log_pg_settings(curs) -> bool:
        """Log PostgreSQL server settings."""
        # config settings
        try:
                curs.execute('SELECT * FROM pg_settings')
        except psycopg2.Error:
                _log.exception('cannot retrieve PG settings ("SELECT ... FROM pg_settings" failed)')
                return False

        settings = curs.fetchall()
        if settings:
                for setting in settings:
                        if setting['unit'] is None:
                                unit = ''
                        else:
                                unit = ' %s' % setting['unit']
                        if setting['sourcefile'] is None:
                                sfile = ''
                        else:
                                sfile = '// %s @ %s' % (setting['sourcefile'], setting['sourceline'])
                        pending_restart = u''
                        try:
                                if setting['pending_restart']:
                                        pending_restart = u'// needs restart'
                        except KeyError:
                                pass    # 'pending_restart' does not exist in PG 9.4 yet
                        _log.debug('%s: %s%s (set from: [%s] // session RESET will set to: [%s]%s%s)',
                                setting['name'],
                                setting['setting'],
                                unit,
                                setting['source'],
                                setting['reset_val'],
                                pending_restart,
                                sfile
                        )
        # extensions
        try:
                curs.execute('select pg_available_extensions()')
        except Exception:
                _log.exception('cannot log available PG extensions')
                return False

        extensions = curs.fetchall()
        if extensions:
                for ext in extensions:
                        _log.debug('PG extension: %s', ext['pg_available_extensions'])
        else:
                _log.error('no PG extensions available')
        # log pg_config -- can only be read by superusers :-/
        # database collation
        try:
                curs.execute('SELECT *, pg_database_collation_actual_version(oid), pg_encoding_to_char(encoding) FROM pg_database WHERE datname = current_database()')
        except psycopg2.Error:
                _log.exception('cannot log actual collation version (probably PG < 15)')
                curs.execute('SELECT * FROM pg_database WHERE datname = current_database()')
        config = curs.fetchall()
        gmLog2.log_multiline(10, message = 'PG database config', text = gmTools.format_dict_like(dict(config[0]), tabular = True))
        return True

#--------------------------------------------------
def log_cursor_state(cursor) -> None:
        """Log details about a DB-API cursor."""
        if cursor is None:
                _log.debug('cursor: None')
                return

        conn = cursor.connection
        tx_status = conn.get_transaction_status()
        if tx_status in [ psycopg2.extensions.TRANSACTION_STATUS_INERROR, psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN ]:
                isolation_level = '<tx aborted or unknown, cannot retrieve>'
        else:
                isolation_level = conn.isolation_level
        try:
                conn_deferrable = conn.deferrable
        except AttributeError:
                conn_deferrable = '<unavailable>'
        if cursor.query is None:
                query = '<no query>'
        else:
                query = cursor.query.decode(errors = 'replace')
        if conn.closed != 0:
                backend_pid = '<conn closed, cannot retrieve>'
        else:
                backend_pid = conn.get_backend_pid()
        txt = """Cursor
 identity: %s; name: %s
 closed: %s; scrollable: %s; with hold: %s; arraysize: %s; itersize: %s;
 last rowcount: %s; rownumber: %s; lastrowid (OID): %s;
 last description: %s
 statusmessage: %s
Connection
 identity: %s; backend pid: %s; protocol version: %s;
 closed: %s; autocommit: %s; isolation level: %s; encoding: %s; async: %s; deferrable: %s; readonly: %s;
 TX status: %s; CX status: %s; executing async op: %s;
Query
 %s""" % (
                # cursor level:
                id(cursor),
                cursor.name,
                cursor.closed,
                cursor.scrollable,
                cursor.withhold,
                cursor.arraysize,
                cursor.itersize,
                cursor.rowcount,
                cursor.rownumber,
                cursor.lastrowid,
                cursor.description,
                cursor.statusmessage,
                # connection level:
                id(conn),
                backend_pid,
                conn.protocol_version,
                conn.closed,
                conn.autocommit,
                isolation_level,
                conn.encoding,
                conn.async_,
                conn_deferrable,
                conn.readonly,
                _map_psyco_tx_status2str[tx_status],
                _map_psyco_conn_status2str[conn.status],
                conn.isexecuting(),
                # query level:
                query
        )
        gmLog2.log_multiline(logging.DEBUG, message = 'Link state:', line_prefix = '', text = txt)

#--------------------------------------------------
def log_conn_state(conn:psycopg2.extras.DictConnection) -> None:
        """Log details about a DB-API connection."""
        tx_status = conn.get_transaction_status()
        if tx_status in [ psycopg2.extensions.TRANSACTION_STATUS_INERROR, psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN ]:
                isolation_level = '%s (tx aborted or unknown, cannot retrieve)' % conn.isolation_level
        else:
                isolation_level = '%s (%s)' % (conn.isolation_level, _map_psyco_iso_level2str[conn.isolation_level])
        conn_status = '%s (%s)' % (conn.status, _map_psyco_conn_status2str[conn.status])
        if conn.closed != 0:
                conn_status = 'undefined (%s)' % conn_status
                backend_pid = '<conn closed, cannot retrieve>'
        else:
                backend_pid = str(conn.get_backend_pid())
        try:
                conn_deferrable = str(conn.deferrable)
        except AttributeError:
                conn_deferrable = '<unavailable>'
        d = {
                'identity': id(conn),
                'backend PID': backend_pid,
                'protocol version': conn.protocol_version,
                'encoding': conn.encoding,
                'closed': conn.closed,
                'readonly': conn.readonly,
                'autocommit': conn.autocommit,
                'isolation level (psyco)': isolation_level,
                'async': conn.async_,
                'deferrable': conn_deferrable,
                'transaction status': '%s (%s)' % (tx_status, _map_psyco_tx_status2str[tx_status]),
                'connection status': conn_status,
                'executing async op': conn.isexecuting(),
                'type': type(conn)
        }
        _log.debug(conn)
        for key in d:
                _log.debug('%s: %s', key, d[key])

#------------------------------------------------------------
def _safe_transaction_rollback(self) -> bool:
        """Make connection.rollback() somewhat fault tolerant.

        Will *not* fail if the connection is already closed.

        Args:
                conn: a psycopg2 connection object
        """
        if self.closed:
                _log.debug('fishy: connection already closed, cannot roll back')
                return True

        return self._original_rollback()

#------------------------------------------------------------
def _raise_exception_on_pooled_ro_conn_close():
        call_stack = inspect.stack()
        call_stack.reverse()
        for idx in range(1, len(call_stack)):
                caller = call_stack[idx]
                _log.debug('%s[%s] @ [%s] in [%s]', ' '* idx, caller[3], caller[2], caller[1])
        del call_stack
        raise TypeError('close() called on read-only connection')

#========================================================================
class cAuthenticationError(psycopg2.OperationalError):

        def __init__(self, creds=None, prev_val=None):
                self.creds = creds
                self.prev_val = prev_val

        def __str__(self):
                return 'PostgreSQL: %sDSN: %s' % (self.prev_val, self.creds)

#============================================================
# Python -> PostgreSQL
#------------------------------------------------------------
# test when Squeeze (and thus psycopg2 2.2 becomes Stable
class cAdapterPyDateTime(object):

        def __init__(self, dt):
                if dt.tzinfo is None:
                        raise ValueError('datetime.datetime instance is lacking a time zone: [%s]' % _timestamp_template % dt.isoformat())
                self.__dt = dt

        def getquoted(self):
                return _timestamp_template % self.__dt.isoformat()

#============================================================
# main
#------------------------------------------------------------
# make sure psycopg2 knows how to handle unicode ...
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
psycopg2.extensions.register_type(psycopg2._psycopg.UNICODEARRAY)

# tell psycopg2 how to adapt datetime types with timestamps when locales are in use
# check in 0.9:
psycopg2.extensions.register_adapter(pydt.datetime, cAdapterPyDateTime)

# turn dict()s into JSON - only works > 9.2
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)

# do NOT adapt *lists* to "... IN (*) ..." syntax because we want
# them adapted to "... ARRAY[]..." so we can support PG arrays

#============================================================
if __name__ == "__main__":

        if len(sys.argv) < 2:
                sys.exit()

        if sys.argv[1] != 'test':
                sys.exit()

        #--------------------------------------------------------------------
        def test_exceptions():
                print("testing exceptions")

                try:
                        raise cAuthenticationError('no credentials', 'no previous exception')
                except cAuthenticationError:
                        t, v, tb = sys.exc_info()
                        print(t)
                        print(v)
                        print(tb)

        #--------------------------------------------------------------------
        def test_verbose_get_connection():
                creds = cPGCredentials()
                creds.database = 'gnumed_v23'
                creds.user = 'any-doc'
                creds.host = 'localhost'
                pool = gmConnectionPool()
                pool.credentials = creds
                conn = pool.get_raw_connection()
                print(conn.info)
                print(conn.info.dsn_parameters)
                #conn = pool.get_connection(verbose = True)
                #conn = pool.get_connection(verbose = False)
                #curs = conn.cursor()
                #curs.execute('select pg_sleep(4);')

        #--------------------------------------------------------------------
        def test_get_connection():
                print("testing get_connection() from new pool")

                pool = gmConnectionPool()
                creds = cPGCredentials()
                pool.credentials = creds

                print('')
                try:
                        _log.debug('3')
                        conn = pool.get_connection()
                        print("1) ERROR: get_connection() did not fail")
                except AssertionError:
                        _log.error('failed, as expected')
                        print("1) SUCCESS: get_connection(%s) failed as expected" % pool)
                        t, v = sys.exc_info()[:2]
                        print (' ', t)
                        print (' ', v)

                print('')
                creds.database = 'gnumed_v22'
                try:
                        conn = pool.get_connection()
                        print("2) ERROR: get_connection() did not fail")
                except AssertionError:
                        _log.error('failed, as expected')
                        print("2) SUCCESS: get_connection() failed as expected")
                        t, v = sys.exc_info()[:2]
                        print(' ', t)
                        print(' ', v)

                print('')
                creds.database = 'gnumed_v22'
                creds.user = 'abc'
                try:
                        conn = pool.get_connection()
                        print("3) ERROR: get_connection() did not fail")
                except cAuthenticationError:
                        _log.error('failed, as expected')
                        print("3) SUCCESS: get_connection() failed as expected")
                        t, v = sys.exc_info()[:2]
                        print(' ', t)
                        print(' ', v)

                print('')
                creds.database = 'gnumed_v22'
                creds.user = 'any-doc'
                creds.password = 'abcd'
                try:
                        conn = pool.get_connection()
                        print("4) ERROR: get_connection() did not fail")
                except cAuthenticationError:
                        _log.error('failed, as expected')
                        print("4) SUCCESS: get_connection() failed as expected")
                        t, v = sys.exc_info()[:2]
                        print(' ', t)
                        print(' ', v)

                print('')
                creds.password = 'any-doc'
                conn = pool.get_connection(readonly=True)
                print('5) SUCCESS: get_connection(ro)')

                print('')
                conn = pool.get_connection(readonly=False, verbose=True)
                print('6) SUCCESS: get_connection(rw)')

                print('')
                try:
                        conn = pool.get_connection()
                        print("7) SUCCESS:")
                        print('pid:', conn.get_backend_pid())
                except cAuthenticationError:
                        print("7) SUCCESS: get_connection() failed")
                        t, v = sys.exc_info()[:2]
                        print(' ', t)
                        print(' ', v)

                try:
                        conn = pool.get_connection()
                        curs = conn.cursor()
                        input('hit enter to run query')
                        curs.execute('selec 1')
                except Exception as exc:
                        _log.error('failed, as expected')
                        print('ERROR')
                        _log.exception('exception occurred')
                        log_pg_exception_details(exc)
                        if exception_is_connection_loss(exc):
                                _log.error('lost connection')

                try:
                        conn = pool.get_connection()
                        curs = conn.cursor()
                        input('hit enter to run query')
                        curs.execute('select 1 from table_does_not_exist')
                except Exception as exc:
                        _log.error('failed, as expected')
                        print('ERROR')
                        _log.exception('exception occurred')
                        log_pg_exception_details(exc)
                        if exception_is_connection_loss(exc):
                                _log.error('lost connection')

        #--------------------------------------------------------------------
        def test_change_creds():
                print("testing credentials change")
                pool = gmConnectionPool()
                creds = cPGCredentials()
                creds.database = 'gnumed_v22'
                creds.user = 'any-doc'
                pool.credentials = creds
                conn = pool.get_connection()
                _log.debug('changing credentials')
                creds.user = 'gm-dbo'
                pool.credentials = creds
                conn = pool.get_connection()
                print(conn)

        #--------------------------------------------------------------------
        def test_credentials():
                print("testing credentials with spaces")
                pool = gmConnectionPool()
                creds = cPGCredentials()
                creds.database = 'gnumed_v22'
                creds.user = 'any-doc'
                creds.password = 'any-doc'
                pool.credentials = creds
                conn = pool.get_connection()
                print(conn)
                creds.password = 'a - b'
                pool.credentials = creds
                conn = pool.get_connection()

        #--------------------------------------------------------------------
        #test_credentials()
        #test_exceptions()
        #test_get_connection()
        test_verbose_get_connection()
        #test_change_creds()

Functions

def exception_is_connection_loss(exc: Exception) ‑> bool

Checks whether exception represents connection loss.

Expand source code
def exception_is_connection_loss(exc: Exception) -> bool:
        """Checks whether exception represents connection loss."""
        if not isinstance(exc, psycopg2.Error):
                # not a PG/psycopg2 exception
                return False

        try:
                if isinstance(exc, psycopg2.errors.AdminShutdown):
                        _log.debug('indicates connection loss due to admin shutdown')
                        return True

        except AttributeError:  # psycopg2 2.7/2.8 transition (no AdminShutdown exception)
                pass
        try:
                msg = '%s' % exc.args[0]
        except (AttributeError, IndexError, TypeError):
                _log.debug('cannot extract message from exception')
                return False

        _log.debug('interpreting: %s', msg)
        for snippet in _connection_loss_markers:
                if snippet in msg:
                        _log.debug('indicates connection loss')
                        return True

        is_conn_loss = (
                # OperationalError
                ('erver' in msg)
                        and
                (
                        ('terminat' in msg)
                                or
                        ('abnorm' in msg)
                                or
                        ('end' in msg)
                                or
                        ('no route' in msg)
                )
        ) or (
                # InterfaceError
                ('onnect' in msg)
                        and
                (
                        ('close' in msg)
                                or
                        ('end' in msg)
                )
        )
        if is_conn_loss:
                _log.debug('indicates connection loss')
        return is_conn_loss
def log_conn_state(conn: psycopg2.extras.DictConnection) ‑> None

Log details about a DB-API connection.

Expand source code
def log_conn_state(conn:psycopg2.extras.DictConnection) -> None:
        """Log details about a DB-API connection."""
        tx_status = conn.get_transaction_status()
        if tx_status in [ psycopg2.extensions.TRANSACTION_STATUS_INERROR, psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN ]:
                isolation_level = '%s (tx aborted or unknown, cannot retrieve)' % conn.isolation_level
        else:
                isolation_level = '%s (%s)' % (conn.isolation_level, _map_psyco_iso_level2str[conn.isolation_level])
        conn_status = '%s (%s)' % (conn.status, _map_psyco_conn_status2str[conn.status])
        if conn.closed != 0:
                conn_status = 'undefined (%s)' % conn_status
                backend_pid = '<conn closed, cannot retrieve>'
        else:
                backend_pid = str(conn.get_backend_pid())
        try:
                conn_deferrable = str(conn.deferrable)
        except AttributeError:
                conn_deferrable = '<unavailable>'
        d = {
                'identity': id(conn),
                'backend PID': backend_pid,
                'protocol version': conn.protocol_version,
                'encoding': conn.encoding,
                'closed': conn.closed,
                'readonly': conn.readonly,
                'autocommit': conn.autocommit,
                'isolation level (psyco)': isolation_level,
                'async': conn.async_,
                'deferrable': conn_deferrable,
                'transaction status': '%s (%s)' % (tx_status, _map_psyco_tx_status2str[tx_status]),
                'connection status': conn_status,
                'executing async op': conn.isexecuting(),
                'type': type(conn)
        }
        _log.debug(conn)
        for key in d:
                _log.debug('%s: %s', key, d[key])
def log_cursor_state(cursor) ‑> None

Log details about a DB-API cursor.

Expand source code
def log_cursor_state(cursor) -> None:
        """Log details about a DB-API cursor."""
        if cursor is None:
                _log.debug('cursor: None')
                return

        conn = cursor.connection
        tx_status = conn.get_transaction_status()
        if tx_status in [ psycopg2.extensions.TRANSACTION_STATUS_INERROR, psycopg2.extensions.TRANSACTION_STATUS_UNKNOWN ]:
                isolation_level = '<tx aborted or unknown, cannot retrieve>'
        else:
                isolation_level = conn.isolation_level
        try:
                conn_deferrable = conn.deferrable
        except AttributeError:
                conn_deferrable = '<unavailable>'
        if cursor.query is None:
                query = '<no query>'
        else:
                query = cursor.query.decode(errors = 'replace')
        if conn.closed != 0:
                backend_pid = '<conn closed, cannot retrieve>'
        else:
                backend_pid = conn.get_backend_pid()
        txt = """Cursor
 identity: %s; name: %s
 closed: %s; scrollable: %s; with hold: %s; arraysize: %s; itersize: %s;
 last rowcount: %s; rownumber: %s; lastrowid (OID): %s;
 last description: %s
 statusmessage: %s
Connection
 identity: %s; backend pid: %s; protocol version: %s;
 closed: %s; autocommit: %s; isolation level: %s; encoding: %s; async: %s; deferrable: %s; readonly: %s;
 TX status: %s; CX status: %s; executing async op: %s;
Query
 %s""" % (
                # cursor level:
                id(cursor),
                cursor.name,
                cursor.closed,
                cursor.scrollable,
                cursor.withhold,
                cursor.arraysize,
                cursor.itersize,
                cursor.rowcount,
                cursor.rownumber,
                cursor.lastrowid,
                cursor.description,
                cursor.statusmessage,
                # connection level:
                id(conn),
                backend_pid,
                conn.protocol_version,
                conn.closed,
                conn.autocommit,
                isolation_level,
                conn.encoding,
                conn.async_,
                conn_deferrable,
                conn.readonly,
                _map_psyco_tx_status2str[tx_status],
                _map_psyco_conn_status2str[conn.status],
                conn.isexecuting(),
                # query level:
                query
        )
        gmLog2.log_multiline(logging.DEBUG, message = 'Link state:', line_prefix = '', text = txt)
def log_pg_exception_details(exc: Exception) ‑> bool

Logs details from a database exception.

Expand source code
def log_pg_exception_details(exc: Exception) -> bool:
        """Logs details from a database exception."""
        if not isinstance(exc, psycopg2.Error):
                return False

        _log.error(type(exc))
        try:
                for arg in exc.args:
                        _log.debug('exc.arg: %s', arg)
        except AttributeError:
                _log.debug('exception has no <.args>')
        _log.debug('pgerror: [%s]', exc.pgerror)
        if exc.pgcode is None:
                _log.debug('pgcode : %s', exc.pgcode)
        else:
                _log.debug('pgcode : %s (%s)', exc.pgcode, PG_error_codes.lookup(exc.pgcode))
        log_cursor_state(exc.cursor)
        try:
                diags = exc.diag
        except AttributeError:
                _log.debug('<.diag> not available')
                diags = None
        if diags is None:
                return True

        for attr in dir(diags):
                if attr.startswith('__'):
                        continue
                val = getattr(diags, attr)
                if val is None:
                        continue
                _log.debug('%s: %s', attr, val)
        return True
def log_pg_settings(curs) ‑> bool

Log PostgreSQL server settings.

Expand source code
def log_pg_settings(curs) -> bool:
        """Log PostgreSQL server settings."""
        # config settings
        try:
                curs.execute('SELECT * FROM pg_settings')
        except psycopg2.Error:
                _log.exception('cannot retrieve PG settings ("SELECT ... FROM pg_settings" failed)')
                return False

        settings = curs.fetchall()
        if settings:
                for setting in settings:
                        if setting['unit'] is None:
                                unit = ''
                        else:
                                unit = ' %s' % setting['unit']
                        if setting['sourcefile'] is None:
                                sfile = ''
                        else:
                                sfile = '// %s @ %s' % (setting['sourcefile'], setting['sourceline'])
                        pending_restart = u''
                        try:
                                if setting['pending_restart']:
                                        pending_restart = u'// needs restart'
                        except KeyError:
                                pass    # 'pending_restart' does not exist in PG 9.4 yet
                        _log.debug('%s: %s%s (set from: [%s] // session RESET will set to: [%s]%s%s)',
                                setting['name'],
                                setting['setting'],
                                unit,
                                setting['source'],
                                setting['reset_val'],
                                pending_restart,
                                sfile
                        )
        # extensions
        try:
                curs.execute('select pg_available_extensions()')
        except Exception:
                _log.exception('cannot log available PG extensions')
                return False

        extensions = curs.fetchall()
        if extensions:
                for ext in extensions:
                        _log.debug('PG extension: %s', ext['pg_available_extensions'])
        else:
                _log.error('no PG extensions available')
        # log pg_config -- can only be read by superusers :-/
        # database collation
        try:
                curs.execute('SELECT *, pg_database_collation_actual_version(oid), pg_encoding_to_char(encoding) FROM pg_database WHERE datname = current_database()')
        except psycopg2.Error:
                _log.exception('cannot log actual collation version (probably PG < 15)')
                curs.execute('SELECT * FROM pg_database WHERE datname = current_database()')
        config = curs.fetchall()
        gmLog2.log_multiline(10, message = 'PG database config', text = gmTools.format_dict_like(dict(config[0]), tabular = True))
        return True

Classes

class cAdapterPyDateTime (dt)
Expand source code
class cAdapterPyDateTime(object):

        def __init__(self, dt):
                if dt.tzinfo is None:
                        raise ValueError('datetime.datetime instance is lacking a time zone: [%s]' % _timestamp_template % dt.isoformat())
                self.__dt = dt

        def getquoted(self):
                return _timestamp_template % self.__dt.isoformat()

Methods

def getquoted(self)
Expand source code
def getquoted(self):
        return _timestamp_template % self.__dt.isoformat()
class cAuthenticationError (creds=None, prev_val=None)

Error related to database operation (disconnect, memory allocation etc).

Expand source code
class cAuthenticationError(psycopg2.OperationalError):

        def __init__(self, creds=None, prev_val=None):
                self.creds = creds
                self.prev_val = prev_val

        def __str__(self):
                return 'PostgreSQL: %sDSN: %s' % (self.prev_val, self.creds)

Ancestors

  • psycopg2.OperationalError
  • psycopg2.DatabaseError
  • psycopg2.Error
  • builtins.Exception
  • builtins.BaseException
class cPGCredentials

Holds PostgreSQL credentials

Expand source code
class cPGCredentials:
        """Holds PostgreSQL credentials"""

        def __init__(self) -> None:
                self.__host:str = None                  # None: left out -> defaults to $PGHOST or implicit <localhost>
                self.__port:int = None                  # None: left out -> defaults to $PGPORT or libpq compiled-in default (typically 5432)
                self.__database:str = None              # must be set before connecting
                self.__user:str = None                  # must be set before connecting
                self.__password:str = None              # None: left out
                                                                                # -> try password-less connect (TRUST/IDENT/PEER)
                                                                                # -> try connect with password from <passfile> parameter or $PGPASSFILE or ~/.pgpass

        #--------------------------------------------------
        # properties
        #--------------------------------------------------
        def __format_credentials(self) -> str:
                """Database credentials formatted as string."""
                cred_parts = [
                        'dbname=%s' % self.__database,
                        'host=%s' % self.__host,
                        'port=%s' % self.__port,
                        'user=%s' % self.__user
                ]
                return ' '.join(cred_parts)

        formatted_credentials = property(__format_credentials)

        #--------------------------------------------------
        def generate_credentials_kwargs(self, connection_name:str=None) -> dict:
                """Return dictionary with credentials suitable for psycopg2.connection() keyword arguments."""
                assert (self.__database is not None), 'self.__database must be defined'
                assert (self.__user is not None), 'self.__user must be defined'

                kwargs = {
                        'dbname': self.__database,
                        'user': self.__user,
                        'application_name': gmTools.coalesce(connection_name, 'GNUmed'),
                        'fallback_application_name': 'GNUmed',
                        'sslmode': 'prefer',
                        # try to enforce a useful encoding early on so that we
                        # have a good chance of decoding authentication errors
                        # containing foreign language characters
                        'client_encoding': 'UTF8'
                }
                if self.__host is not None:
                        kwargs['host'] = self.__host
                if self.__port is not None:
                        kwargs['port'] = self.__port
                if self.__password is not None:
                        kwargs['password'] = self.__password
                return kwargs

        credentials_kwargs = property(generate_credentials_kwargs)

        #--------------------------------------------------
        def _get_database(self) -> str:
                return self.__database

        def _set_database(self, database:str=None):
                assert database, '<database> must not be None'
                assert database.strip(), '<database> must not be empty'
                assert ('salaam.homeunix' not in database), 'The public database is not hosted by <salaam.homeunix.com> anymore.\n\nPlease point your configuration files to <publicdb.gnumed.de>.'
                self.__database = database.strip()
                _log.info('[%s]', self.__database)

        database = property(_get_database, _set_database)

        #--------------------------------------------------
        def _get_host(self) -> str:
                return self.__host

        def _set_host(self, host:str=None):
                if host is not None:
                        host = host.strip()
                        if host == '':
                                host = None
                self.__host = host
                _log.info('[%s]', self.__host)

        host = property(_get_host, _set_host)

        #--------------------------------------------------
        def _get_port(self) -> int:
                return self.__port

        def _set_port(self, port=None):
                _log.info('[%s]', port)
                if port is None:
                        self.__port = None
                        return

                self.__port = int(port)

        port = property(_get_port, _set_port)

        #--------------------------------------------------
        def _get_user(self) -> str:
                return self.__user

        def _set_user(self, user:str=None):
                assert (user is not None), '<user> must not be None'
                assert (user.strip() != ''), '<user> must not be empty'
                self.__user = user.strip()
                _log.info('[%s]', self.__user)

        user = property(_get_user, _set_user)

        #--------------------------------------------------
        def _get_password(self) -> str:
                return self.__password

        def _set_password(self, password:str=None):
                if password is not None:
                        gmLog2.add_word2hide(password)
                self.__password = password
                _log.info('password was set')

        password = property(_get_password, _set_password)

Instance variables

var credentials_kwargs : dict

Return dictionary with credentials suitable for psycopg2.connection() keyword arguments.

Expand source code
def generate_credentials_kwargs(self, connection_name:str=None) -> dict:
        """Return dictionary with credentials suitable for psycopg2.connection() keyword arguments."""
        assert (self.__database is not None), 'self.__database must be defined'
        assert (self.__user is not None), 'self.__user must be defined'

        kwargs = {
                'dbname': self.__database,
                'user': self.__user,
                'application_name': gmTools.coalesce(connection_name, 'GNUmed'),
                'fallback_application_name': 'GNUmed',
                'sslmode': 'prefer',
                # try to enforce a useful encoding early on so that we
                # have a good chance of decoding authentication errors
                # containing foreign language characters
                'client_encoding': 'UTF8'
        }
        if self.__host is not None:
                kwargs['host'] = self.__host
        if self.__port is not None:
                kwargs['port'] = self.__port
        if self.__password is not None:
                kwargs['password'] = self.__password
        return kwargs
var database : str
Expand source code
def _get_database(self) -> str:
        return self.__database
var formatted_credentials : str

Database credentials formatted as string.

Expand source code
def __format_credentials(self) -> str:
        """Database credentials formatted as string."""
        cred_parts = [
                'dbname=%s' % self.__database,
                'host=%s' % self.__host,
                'port=%s' % self.__port,
                'user=%s' % self.__user
        ]
        return ' '.join(cred_parts)
var host : str
Expand source code
def _get_host(self) -> str:
        return self.__host
var password : str
Expand source code
def _get_password(self) -> str:
        return self.__password
var port : int
Expand source code
def _get_port(self) -> int:
        return self.__port
var user : str
Expand source code
def _get_user(self) -> str:
        return self.__user

Methods

def generate_credentials_kwargs(self, connection_name: str = None) ‑> dict

Return dictionary with credentials suitable for psycopg2.connection() keyword arguments.

Expand source code
def generate_credentials_kwargs(self, connection_name:str=None) -> dict:
        """Return dictionary with credentials suitable for psycopg2.connection() keyword arguments."""
        assert (self.__database is not None), 'self.__database must be defined'
        assert (self.__user is not None), 'self.__user must be defined'

        kwargs = {
                'dbname': self.__database,
                'user': self.__user,
                'application_name': gmTools.coalesce(connection_name, 'GNUmed'),
                'fallback_application_name': 'GNUmed',
                'sslmode': 'prefer',
                # try to enforce a useful encoding early on so that we
                # have a good chance of decoding authentication errors
                # containing foreign language characters
                'client_encoding': 'UTF8'
        }
        if self.__host is not None:
                kwargs['host'] = self.__host
        if self.__port is not None:
                kwargs['port'] = self.__port
        if self.__password is not None:
                kwargs['password'] = self.__password
        return kwargs
class gmConnectionPool

The Singleton connection pool class.

Any normal connection from GNUmed to PostgreSQL should go through this pool. It needs credentials to be provided via .credentials = .

Expand source code
class gmConnectionPool(gmBorg.cBorg):
        """The Singleton connection pool class.

        Any normal connection from GNUmed to PostgreSQL should go
        through this pool. It needs credentials to be provided
        via .credentials = <cPGCredentials>.
        """
        def __init__(self) -> None:
                try:
                        self.__initialized
                        return

                except AttributeError:
                        self.__initialized:bool = True

                _log.info('[%s]: first instantiation', self.__class__.__name__)
                self.__ro_conn_pool:dict[str, psycopg2.extras.DictConnection] = {}      # keyed by "credentials::thread ID"
                self.__SQL_set_client_timezone:str = None
                self.__client_timezone = None
                self.__creds:cPGCredentials = None
                self.__log_auth_environment()

        #--------------------------------------------------
        # connection API
        #--------------------------------------------------
        def get_connection(self, readonly:bool=True, verbose:bool=False, pooled:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection:
                """Provide a database connection.

                Readonly connections can be pooled. If there is no
                suitable connection in the pool a new one will be
                created and stored. The pool is per-thread and
                per-credentials.

                Args:
                        readonly: make connection read only
                        verbose: make connection log more things
                        pooled: return a pooled connection, if possible
                        connection_name: a human readable name for the connection, avoid spaces
                        autocommit: whether to autocommit
                        credentials: use for getting a connection with other credentials different from what the pool was set to before

                Returns:
                        a working connection to a PostgreSQL database
                """
#               if _DISABLE_CONNECTION_POOL:
#                       pooled = False

                if credentials is not None:
                        pooled = False
                conn = None
                if readonly and pooled:
                        try:
                                conn = self.__ro_conn_pool[self.pool_key]
                        except KeyError:
                                _log.info('pooled RO conn with key [%s] requested, but not in pool, setting up', self.pool_key)
                        if conn is not None:
                                #if verbose:
                                #       _log.debug('using pooled conn [%s]', self.pool_key)
                                return conn

                if conn is None:
                        conn = self.get_raw_connection (
                                verbose = verbose,
                                readonly = readonly,
                                connection_name = connection_name,
                                autocommit = autocommit,
                                credentials = credentials
                        )
                if readonly and pooled:
                        # monkey patch close() for pooled RO connections
                        conn.original_close = conn.close                                                                # type: ignore [attr-defined]
                        conn.close = _raise_exception_on_pooled_ro_conn_close                   # type: ignore [assignment]
                # set connection properties
                # - client encoding
                encoding = 'UTF8'
                _log.debug('setting client (wire) encoding to %s', encoding)
                conn.set_client_encoding(encoding)
                # - transaction isolation level
                if not readonly:
                        conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
                # - client time zone
                _log.debug('client timezone [%s]', self.__client_timezone)
                curs = conn.cursor()
                curs.execute(self.__SQL_set_client_timezone, {'tz': self.__client_timezone})
                curs.close()
                conn.commit()
                if readonly and pooled:
                        _log.debug('putting RO conn with key [%s] into pool', self.pool_key)
                        self.__ro_conn_pool[self.pool_key] = conn
                if verbose:
                        log_conn_state(conn)
                return conn

        #--------------------------------------------------
        def get_rw_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection:
                return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)

        #--------------------------------------------------
        def get_ro_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection:
                return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)

        #--------------------------------------------------
        def get_raw_connection(self, verbose:bool=False, readonly:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection:
                """Get a raw, unadorned connection.

                This will not set any parameters such as encoding,
                timezone, or datestyle, hence it can be used for
                "service" connections for verifying encodings etc
                """
#               # FIXME: support verbose
                if credentials is None:
                        creds2use = self.__creds
                else:
                        creds2use = credentials
                creds_kwargs = creds2use.generate_credentials_kwargs(connection_name = connection_name)
                try:
                        # DictConnection now _is_ a real dictionary
                        conn = psycopg2.connect(connection_factory = psycopg2.extras.DictConnection, **creds_kwargs)
                except psycopg2.OperationalError as e:
                        _log.error('failed to establish connection [%s]', creds2use.formatted_credentials)
                        t, v, tb = sys.exc_info()
                        try:
                                msg = e.args[0]
                        except (AttributeError, IndexError, TypeError):
                                raise

                        if not self.__is_auth_fail_msg(msg):
                                raise

                        raise cAuthenticationError(creds2use.formatted_credentials, msg).with_traceback(tb)

                _log.debug('established connection "%s", backend PID: %s', gmTools.coalesce(connection_name, 'anonymous'), conn.get_backend_pid())
                # safe-guard
                conn._original_rollback = conn.rollback
                conn.rollback = types.MethodType(_safe_transaction_rollback, conn)

                # - inspect server
                self.__log_on_first_contact(conn)
                # - verify PG understands client time zone
                self.__detect_client_timezone(conn)
                # - set access mode
                if readonly:
                        _log.debug('readonly: forcing autocommit=True to avoid <IDLE IN TRANSACTION>')
                        autocommit = True
                else:
                        _log.debug('autocommit is desired to be: %s', autocommit)
                conn.commit()
                conn.autocommit = autocommit
                conn.readonly = readonly
                # - assume verbose=True to mean we want debugging in the database, too
                if verbose or _VERBOSE_PG_LOG:
                        _log.debug('enabling <plpgsql.extra_warnings/_errors>')
                        curs = conn.cursor()
                        try:
                                curs.execute("SET plpgsql.extra_warnings TO 'all'")
                        except Exception:
                                _log.exception('cannot enable <plpgsql.extra_warnings>')
                        finally:
                                curs.close()
                                conn.commit()
                        curs = conn.cursor()
                        try:
                                curs.execute("SET plpgsql.extra_errors TO 'all'")
                        except Exception:
                                _log.exception('cannot enable <plpgsql.extra_errors>')
                        finally:
                                curs.close()
                                conn.commit()
                        _log.debug('enabling auto_explain')
                        curs = conn.cursor()
                        try:
                                curs.execute("SELECT gm.load_auto_explain(3000)")
                        except Exception:
                                _log.exception('cannot enable auto_explain')
                        finally:
                                curs.close()
                                conn.commit()
                return conn

        #--------------------------------------------------
        def get_dbowner_connection(self, readonly:bool=True, verbose:bool=False, connection_name:str=None, autocommit:bool=False, dbo_password:str=None, dbo_account:str='gm-dbo') -> psycopg2.extras.DictConnection:
                """Return a connection for the database owner.

                Will not touch the pool.
                """
                dbo_creds = cPGCredentials()
                dbo_creds.user = dbo_account
                dbo_creds.password = dbo_password
                dbo_creds.database = self.__creds.database
                dbo_creds.host = self.__creds.host
                dbo_creds.port = self.__creds.port
                return self.get_connection (
                        pooled = False,
                        readonly = readonly,
                        verbose = verbose,
                        connection_name = connection_name,
                        autocommit = autocommit,
                        credentials = dbo_creds
                )

        #--------------------------------------------------
        def discard_pooled_connection_of_thread(self):
                """Discard from pool the connection of the current thread."""
                try:
                        conn = self.__ro_conn_pool[self.pool_key]
                except KeyError:
                        _log.debug('no connection pooled for thread [%s]', self.pool_key)
                        return

                del self.__ro_conn_pool[self.pool_key]
                if conn.closed:
                        return

                conn.close = conn.original_close
                conn.close()

        #--------------------------------------------------
        def shutdown(self):
                """Close and discard all pooled connections."""
                for conn_key in self.__ro_conn_pool:
                        conn = self.__ro_conn_pool[conn_key]
                        if conn.closed:
                                continue
                        _log.debug('closing open database connection, pool key: %s', conn_key)
                        log_conn_state(conn)
                        conn.close = conn.original_close
                        conn.close()
                del self.__ro_conn_pool

        #--------------------------------------------------
        # utility functions
        #--------------------------------------------------
        def __log_on_first_contact(self, conn:psycopg2.extras.DictConnection):
                global postgresql_version
                if postgresql_version is not None:
                        return

                _log.debug('_\\\\// heed Prime Directive _\\\\//')
                # FIXME: verify PG version
                curs = conn.cursor()
                curs.execute ("""
                        SELECT
                                substring(setting, E'^\\\\d{1,2}\\\\.\\\\d{1,2}')::numeric AS version
                        FROM
                                pg_settings
                        WHERE
                                name = 'server_version'"""
                )
                postgresql_version = curs.fetchone()['version']
                _log.info('PostgreSQL version (numeric): %s' % postgresql_version)
                try:
                        curs.execute("SELECT pg_size_pretty(pg_database_size(current_database()))")
                        _log.info('database size: %s', curs.fetchone()[0])
                except Exception:
                        _log.exception('cannot get database size')
                finally:
                        curs.close()
                        conn.commit()
                curs = conn.cursor()
                log_pg_settings(curs = curs)
                curs.close()
                conn.commit()
                _log.debug('done')

        #--------------------------------------------------
        def __log_auth_environment(self):
                pgpass_file = os.path.expanduser(os.path.join('~', '.pgpass'))
                if os.path.exists(pgpass_file):
                        _log.debug('standard .pgpass (%s) exists', pgpass_file)
                else:
                        _log.debug('standard .pgpass (%s) not found', pgpass_file)
                pgpass_var = os.getenv('PGPASSFILE')
                if pgpass_var is None:
                        _log.debug('$PGPASSFILE not set')
                else:
                        if os.path.exists(pgpass_var):
                                _log.debug('$PGPASSFILE=%s -> file exists', pgpass_var)
                        else:
                                _log.debug('$PGPASSFILE=%s -> file not found')

        #--------------------------------------------------
        def __detect_client_timezone(self, conn:psycopg2.extras.DictConnection):
                """This is run on the very first connection."""

                if self.__client_timezone is not None:
                        return

                _log.debug('trying to detect timezone from system')
                # we need gmDateTime to be initialized
                if gmDateTime.current_local_iso_numeric_timezone_string is None:
                        gmDateTime.init()
                tz_candidates = [gmDateTime.current_local_timezone_name]
                try:
                        tz_candidates.append(os.environ['TZ'])
                except KeyError:
                        pass
                expanded_tzs = []
                for tz in tz_candidates:
                        expanded = self.__expand_timezone(conn, timezone = tz)
                        if expanded != tz:
                                expanded_tzs.append(expanded)
                tz_candidates.extend(expanded_tzs)
                _log.debug('candidates: %s', tz_candidates)
                # find best among candidates
                found = False
                for tz in tz_candidates:
                        if self.__validate_timezone(conn = conn, timezone = tz):
                                self.__client_timezone = tz
                                self.__SQL_set_client_timezone = 'SET timezone TO %(tz)s'
                                found = True
                                break
                if not found:
                        self.__client_timezone = gmDateTime.current_local_iso_numeric_timezone_string
                        self.__SQL_set_client_timezone = 'set time zone interval %(tz)s hour to minute'
                _log.info('client system timezone detected as equivalent to [%s]', self.__client_timezone)
                # FIXME: check whether server.timezone is the same
                # FIXME: value as what we eventually detect

        #--------------------------------------------------
        def __expand_timezone(self, conn:psycopg2.extras.DictConnection, timezone:str):
                """Some timezone defs are abbreviations so try to expand
                them because "set time zone" doesn't take abbreviations"""

                cmd = _SQL_expand_tz_name
                args = {'tz': timezone}
                conn.commit()
                curs = conn.cursor()
                result = timezone
                try:
                        curs.execute(cmd, args)
                        rows = curs.fetchall()
                except Exception:
                        _log.exception('cannot expand timezone abbreviation [%s]', timezone)
                finally:
                        curs.close()
                        conn.rollback()
                if rows:
                        result = rows[0]['name']
                        _log.debug('[%s] maps to [%s]', timezone, result)
                return result

        #---------------------------------------------------
        def __validate_timezone(self, conn:psycopg2.extras.DictConnection, timezone:str) -> bool:
                _log.debug('validating timezone [%s]', timezone)
                cmd = 'SET timezone TO %(tz)s'
                args = {'tz': timezone}
                curs = conn.cursor()
                try:
                        curs.execute(cmd, args)
                except psycopg2.DataError:
                        _log.warning('timezone [%s] is not settable', timezone)
                        return False

                except Exception:
                        _log.exception('failed to set timezone to [%s]', timezone)
                        return False

                finally:
                        conn.rollback()
                _log.info('time zone [%s] is settable', timezone)
                # can we actually use it, though ?
                SQL = "SELECT '1931-03-26 11:11:11+0'::timestamp with time zone"
                try:
                        curs.execute(SQL)
                        curs.fetchone()
                except Exception:
                        _log.exception('error using timezone [%s]', timezone)
                        return False

                finally:
                        curs.close()
                        conn.rollback()
                _log.info('timezone [%s] is usable', timezone)
                return True

        #--------------------------------------------------
        # properties
        #--------------------------------------------------
        def _get_credentials(self) -> cPGCredentials:
                return self.__creds

        def _set_credentials(self, creds:cPGCredentials=None):
                if self.__creds is None:
                        self.__creds = creds
                        return

                _log.debug('invalidating pooled connections on credentials change')
                pool_key_start_from_curr_creds = self.__creds.formatted_credentials + '::thread='
                for pool_key in self.__ro_conn_pool:
                        if not pool_key.startswith(pool_key_start_from_curr_creds):
                                continue
                        conn = self.__ro_conn_pool[pool_key]
                        del self.__ro_conn_pool[pool_key]
                        if conn.closed:
                                del conn
                                continue
                        _log.debug('closing open database connection, pool key: %s', pool_key)
                        log_conn_state(conn)
                        conn.original_close()                                                                   # type: ignore [attr-defined]
                        del conn
                self.__creds = creds

        credentials = property(_get_credentials, _set_credentials)

        #--------------------------------------------------
        def _get_pool_key(self) -> str:
                return '%s::thread=%s' % (
                        self.__creds.formatted_credentials,
                        threading.current_thread().ident
                )

        pool_key = property(_get_pool_key)

        #--------------------------------------------------
        def __is_auth_fail_msg(self, msg:str) -> bool:
                if 'fe_sendauth' in msg:
                        return True

                if regex.search(r'user ".*" does not exist', msg) is not None:
                        return True

                if 'uthenti' in msg:
                        return True

                if ((
                                (regex.search(r'user ".*"', msg) is not None)
                                        or
                                (regex.search(r'(R|r)ol{1,2}e', msg) is not None)
                        )
                        and ('exist' in msg)
                        and (regex.search(r'n(o|ich)t', msg) is not None)
                ):
                        return True

                # to the best of our knowledge
                return False

Ancestors

Instance variables

var credentialscPGCredentials
Expand source code
def _get_credentials(self) -> cPGCredentials:
        return self.__creds
var pool_key : str
Expand source code
def _get_pool_key(self) -> str:
        return '%s::thread=%s' % (
                self.__creds.formatted_credentials,
                threading.current_thread().ident
        )

Methods

def discard_pooled_connection_of_thread(self)

Discard from pool the connection of the current thread.

Expand source code
def discard_pooled_connection_of_thread(self):
        """Discard from pool the connection of the current thread."""
        try:
                conn = self.__ro_conn_pool[self.pool_key]
        except KeyError:
                _log.debug('no connection pooled for thread [%s]', self.pool_key)
                return

        del self.__ro_conn_pool[self.pool_key]
        if conn.closed:
                return

        conn.close = conn.original_close
        conn.close()
def get_connection(self, readonly: bool = True, verbose: bool = False, pooled: bool = True, connection_name: str = None, autocommit: bool = False, credentials: cPGCredentials = None) ‑> psycopg2.extras.DictConnection

Provide a database connection.

Readonly connections can be pooled. If there is no suitable connection in the pool a new one will be created and stored. The pool is per-thread and per-credentials.

Args

readonly
make connection read only
verbose
make connection log more things
pooled
return a pooled connection, if possible
connection_name
a human readable name for the connection, avoid spaces
autocommit
whether to autocommit
credentials
use for getting a connection with other credentials different from what the pool was set to before

Returns

a working connection to a PostgreSQL database

Expand source code
        def get_connection(self, readonly:bool=True, verbose:bool=False, pooled:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection:
                """Provide a database connection.

                Readonly connections can be pooled. If there is no
                suitable connection in the pool a new one will be
                created and stored. The pool is per-thread and
                per-credentials.

                Args:
                        readonly: make connection read only
                        verbose: make connection log more things
                        pooled: return a pooled connection, if possible
                        connection_name: a human readable name for the connection, avoid spaces
                        autocommit: whether to autocommit
                        credentials: use for getting a connection with other credentials different from what the pool was set to before

                Returns:
                        a working connection to a PostgreSQL database
                """
#               if _DISABLE_CONNECTION_POOL:
#                       pooled = False

                if credentials is not None:
                        pooled = False
                conn = None
                if readonly and pooled:
                        try:
                                conn = self.__ro_conn_pool[self.pool_key]
                        except KeyError:
                                _log.info('pooled RO conn with key [%s] requested, but not in pool, setting up', self.pool_key)
                        if conn is not None:
                                #if verbose:
                                #       _log.debug('using pooled conn [%s]', self.pool_key)
                                return conn

                if conn is None:
                        conn = self.get_raw_connection (
                                verbose = verbose,
                                readonly = readonly,
                                connection_name = connection_name,
                                autocommit = autocommit,
                                credentials = credentials
                        )
                if readonly and pooled:
                        # monkey patch close() for pooled RO connections
                        conn.original_close = conn.close                                                                # type: ignore [attr-defined]
                        conn.close = _raise_exception_on_pooled_ro_conn_close                   # type: ignore [assignment]
                # set connection properties
                # - client encoding
                encoding = 'UTF8'
                _log.debug('setting client (wire) encoding to %s', encoding)
                conn.set_client_encoding(encoding)
                # - transaction isolation level
                if not readonly:
                        conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
                # - client time zone
                _log.debug('client timezone [%s]', self.__client_timezone)
                curs = conn.cursor()
                curs.execute(self.__SQL_set_client_timezone, {'tz': self.__client_timezone})
                curs.close()
                conn.commit()
                if readonly and pooled:
                        _log.debug('putting RO conn with key [%s] into pool', self.pool_key)
                        self.__ro_conn_pool[self.pool_key] = conn
                if verbose:
                        log_conn_state(conn)
                return conn
def get_dbowner_connection(self, readonly: bool = True, verbose: bool = False, connection_name: str = None, autocommit: bool = False, dbo_password: str = None, dbo_account: str = 'gm-dbo') ‑> psycopg2.extras.DictConnection

Return a connection for the database owner.

Will not touch the pool.

Expand source code
def get_dbowner_connection(self, readonly:bool=True, verbose:bool=False, connection_name:str=None, autocommit:bool=False, dbo_password:str=None, dbo_account:str='gm-dbo') -> psycopg2.extras.DictConnection:
        """Return a connection for the database owner.

        Will not touch the pool.
        """
        dbo_creds = cPGCredentials()
        dbo_creds.user = dbo_account
        dbo_creds.password = dbo_password
        dbo_creds.database = self.__creds.database
        dbo_creds.host = self.__creds.host
        dbo_creds.port = self.__creds.port
        return self.get_connection (
                pooled = False,
                readonly = readonly,
                verbose = verbose,
                connection_name = connection_name,
                autocommit = autocommit,
                credentials = dbo_creds
        )
def get_raw_connection(self, verbose: bool = False, readonly: bool = True, connection_name: str = None, autocommit: bool = False, credentials: cPGCredentials = None) ‑> psycopg2.extras.DictConnection

Get a raw, unadorned connection.

This will not set any parameters such as encoding, timezone, or datestyle, hence it can be used for "service" connections for verifying encodings etc

Expand source code
        def get_raw_connection(self, verbose:bool=False, readonly:bool=True, connection_name:str=None, autocommit:bool=False, credentials:cPGCredentials=None) -> psycopg2.extras.DictConnection:
                """Get a raw, unadorned connection.

                This will not set any parameters such as encoding,
                timezone, or datestyle, hence it can be used for
                "service" connections for verifying encodings etc
                """
#               # FIXME: support verbose
                if credentials is None:
                        creds2use = self.__creds
                else:
                        creds2use = credentials
                creds_kwargs = creds2use.generate_credentials_kwargs(connection_name = connection_name)
                try:
                        # DictConnection now _is_ a real dictionary
                        conn = psycopg2.connect(connection_factory = psycopg2.extras.DictConnection, **creds_kwargs)
                except psycopg2.OperationalError as e:
                        _log.error('failed to establish connection [%s]', creds2use.formatted_credentials)
                        t, v, tb = sys.exc_info()
                        try:
                                msg = e.args[0]
                        except (AttributeError, IndexError, TypeError):
                                raise

                        if not self.__is_auth_fail_msg(msg):
                                raise

                        raise cAuthenticationError(creds2use.formatted_credentials, msg).with_traceback(tb)

                _log.debug('established connection "%s", backend PID: %s', gmTools.coalesce(connection_name, 'anonymous'), conn.get_backend_pid())
                # safe-guard
                conn._original_rollback = conn.rollback
                conn.rollback = types.MethodType(_safe_transaction_rollback, conn)

                # - inspect server
                self.__log_on_first_contact(conn)
                # - verify PG understands client time zone
                self.__detect_client_timezone(conn)
                # - set access mode
                if readonly:
                        _log.debug('readonly: forcing autocommit=True to avoid <IDLE IN TRANSACTION>')
                        autocommit = True
                else:
                        _log.debug('autocommit is desired to be: %s', autocommit)
                conn.commit()
                conn.autocommit = autocommit
                conn.readonly = readonly
                # - assume verbose=True to mean we want debugging in the database, too
                if verbose or _VERBOSE_PG_LOG:
                        _log.debug('enabling <plpgsql.extra_warnings/_errors>')
                        curs = conn.cursor()
                        try:
                                curs.execute("SET plpgsql.extra_warnings TO 'all'")
                        except Exception:
                                _log.exception('cannot enable <plpgsql.extra_warnings>')
                        finally:
                                curs.close()
                                conn.commit()
                        curs = conn.cursor()
                        try:
                                curs.execute("SET plpgsql.extra_errors TO 'all'")
                        except Exception:
                                _log.exception('cannot enable <plpgsql.extra_errors>')
                        finally:
                                curs.close()
                                conn.commit()
                        _log.debug('enabling auto_explain')
                        curs = conn.cursor()
                        try:
                                curs.execute("SELECT gm.load_auto_explain(3000)")
                        except Exception:
                                _log.exception('cannot enable auto_explain')
                        finally:
                                curs.close()
                                conn.commit()
                return conn
def get_ro_conn(self, verbose: bool = False, connection_name: str = None, autocommit: bool = False) ‑> psycopg2.extras.DictConnection
Expand source code
def get_ro_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection:
        return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)
def get_rw_conn(self, verbose: bool = False, connection_name: str = None, autocommit: bool = False) ‑> psycopg2.extras.DictConnection
Expand source code
def get_rw_conn(self, verbose:bool=False, connection_name:str=None, autocommit:bool=False) -> psycopg2.extras.DictConnection:
        return self.get_connection(verbose = verbose, readonly = False, connection_name = connection_name, autocommit = autocommit)
def shutdown(self)

Close and discard all pooled connections.

Expand source code
def shutdown(self):
        """Close and discard all pooled connections."""
        for conn_key in self.__ro_conn_pool:
                conn = self.__ro_conn_pool[conn_key]
                if conn.closed:
                        continue
                _log.debug('closing open database connection, pool key: %s', conn_key)
                log_conn_state(conn)
                conn.close = conn.original_close
                conn.close()
        del self.__ro_conn_pool