Module Gnumed.business.gmKeywordExpansion

GNUmed keyword snippet expansions

Copyright: authors

Expand source code
# -*- coding: utf-8 -*-
"""GNUmed keyword snippet expansions

Copyright: authors
"""
#============================================================
__author__ = "Karsten Hilbert <Karsten.Hilbert@gmx.net>"
__license__ = 'GPL v2 or later (details at https://www.gnu.org)'

import sys
import os
import logging


if __name__ == '__main__':
        sys.path.insert(0, '../../')
        _ = lambda x:x
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmBusinessDBObject
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmMimeLib


_log = logging.getLogger('gm.kwd_exp')

#============================================================
_SQL_get_keyword_expansions = "SELECT * FROM ref.v_your_keyword_expansions WHERE %s"

class cKeywordExpansion(gmBusinessDBObject.cBusinessDBObject):
        """Keyword indexed text snippets or chunks of data. Used as text macros or to put into documents."""
        _cmd_fetch_payload = _SQL_get_keyword_expansions % "pk_expansion = %s"
        _cmds_store_payload = [
                """
                        UPDATE ref.keyword_expansion SET
                                keyword = gm.nullify_empty_string(%(keyword)s),
                                textual_data = CASE
                                        WHEN gm.nullify_empty_string(%(expansion)s) IS NULL
                                                THEN CASE
                                                        WHEN binary_data IS NULL THEN '---fake_data---'
                                                        ELSE NULL
                                                END
                                        ELSE gm.nullify_empty_string(%(expansion)s)
                                END,
                                binary_data = CASE
                                        WHEN gm.nullify_empty_string(%(expansion)s) IS NULL THEN binary_data
                                        ELSE NULL
                                END,
                                encrypted = %(is_encrypted)s
                        WHERE
                                pk = %(pk_expansion)s
                                        AND
                                xmin = %(xmin_expansion)s
                        RETURNING
                                xmin as xmin_expansion
                """
        ]
        _updatable_fields = [
                'keyword',
                'expansion',
                'is_encrypted'
        ]

        #--------------------------------------------------------
        def save_to_file(self, aChunkSize=0, target_mime=None, target_extension=None, ignore_conversion_problems=False):

                if self._payload['is_textual']:
                        return None

                if self._payload['data_size'] == 0:
                        return None

                exported_fname = gmTools.get_unique_filename(prefix = 'gm-data_snippet-')
                success = gmPG2.bytea2file (
                        data_query = {
                                'cmd': 'SELECT substring(binary_data from %(start)s for %(size)s) FROM ref.keyword_expansion WHERE pk = %(pk)s',
                                'args': {'pk': self.pk_obj}
                        },
                        filename = exported_fname,
                        chunk_size = aChunkSize,
                        data_size = self._payload['data_size']
                )

                if not success:
                        return None

                if target_mime is None:
                        return exported_fname

                converted_fname = gmMimeLib.convert_file(filename = exported_fname, target_mime = target_mime)
                if converted_fname is not None:
                        return converted_fname

                _log.warning('conversion failed')
                if ignore_conversion_problems:
                        _log.warning('programmed to ignore conversion problems, hoping receiver can handle [%s]', exported_fname)
                        return exported_fname

                return None

        #--------------------------------------------------------
        def update_data_from_file(self, filename=None):
                if not (os.access(filename, os.R_OK) and os.path.isfile(filename)):
                        _log.error('[%s] is not a readable file' % filename)
                        return False

                gmPG2.file2bytea (
                        query = "UPDATE ref.keyword_expansion SET binary_data = %(data)s::bytea, textual_data = NULL WHERE pk = %(pk)s",
                        filename = filename,
                        args = {'pk': self.pk_obj}
                )

                # must update XMIN now ...
                self.refetch_payload()

                global __textual_expansion_keywords
                __textual_expansion_keywords = None
                global __keyword_expansions
                __keyword_expansions = None

                return True

        #--------------------------------------------------------
        def format(self):
                txt = '%s            #%s\n' % (
                        gmTools.bool2subst (
                                self._payload['is_textual'],
                                _('Textual keyword expansion'),
                                _('Binary keyword expansion')
                        ),
                        self._payload['pk_expansion']
                )
                txt += ' %s%s\n' % (
                        gmTools.bool2subst (
                                self._payload['private_expansion'],
                                _('private'),
                                _('public')
                        ),
                        gmTools.bool2subst (
                                self._payload['is_encrypted'],
                                ', %s' % _('encrypted'),
                                ''
                        )
                )
                txt += _(' Keyword: %s\n') % self._payload['keyword']
                txt += _(' Owner: %s\n') % self._payload['owner']
                if self._payload['is_textual']:
                        txt += '\n%s' % self._payload['expansion']
                else:
                        txt += ' Data: %s (%s Bytes)' % (gmTools.size2str(self._payload['data_size']), self._payload['data_size'])

                return txt

#------------------------------------------------------------
__keyword_expansions = None

def get_keyword_expansions(order_by=None, force_reload=False, return_pks=False):
        global __keyword_expansions
        if not force_reload:
                if __keyword_expansions is not None:
                        return __keyword_expansions

        if order_by is None:
                order_by = 'true'
        else:
                order_by = 'true ORDER BY %s' % order_by

        cmd = _SQL_get_keyword_expansions % order_by
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
        if return_pks:
                return [ r['pk_expansion'] for r in rows ]

        __keyword_expansions = [ cKeywordExpansion(row = {'data': r, 'pk_field': 'pk_expansion'}) for r in rows ]
        return __keyword_expansions

#------------------------------------------------------------
def get_expansion(keyword=None, textual_only=True, binary_only=False):

        if False not in [textual_only, binary_only]:
                raise ValueError('one of <textual_only> and <binary_only> must be False')

        where_parts = ['keyword = %(kwd)s']
        args = {'kwd': keyword}

        if textual_only:
                where_parts.append('is_textual IS TRUE')

        cmd = _SQL_get_keyword_expansions % ' AND '.join(where_parts)
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
        if len(rows) == 0:
                return None

        return cKeywordExpansion(row = {'data': rows[0], 'pk_field': 'pk_expansion'})

#------------------------------------------------------------
def create_keyword_expansion(keyword=None, text=None, data_file=None, public=True):

        if text is not None:
                if text.strip() == '':
                        text = None

        if None not in [text, data_file]:
                raise ValueError('either <text> or <data_file> must be non-NULL')

        # already exists ?
        cmd = "SELECT 1 FROM ref.v_your_keyword_expansions WHERE public_expansion IS %(public)s AND keyword = %(kwd)s"
        args = {'kwd': keyword, 'public': public}
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
        if len(rows) != 0:
                # can't create duplicate
                return False

        if data_file is not None:
                text = 'fake data'
        args = {'kwd': keyword, 'txt': text}
        if public:
                cmd = """
                        INSERT INTO ref.keyword_expansion (keyword, textual_data, fk_staff)
                        VALUES (
                                gm.nullify_empty_string(%(kwd)s),
                                gm.nullify_empty_string(%(txt)s),
                                null
                        )
                        RETURNING pk
                """
        else:
                cmd = """
                        INSERT INTO ref.keyword_expansion (keyword, textual_data, fk_staff)
                        VALUES (
                                gm.nullify_empty_string(%(kwd)s),
                                gm.nullify_empty_string(%(txt)s),
                                (SELECT pk FROM dem.staff WHERE db_user = current_user)
                        )
                        RETURNING pk
                """
        rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], return_data = True)
        expansion = cKeywordExpansion(aPK_obj = rows[0]['pk'])
        if data_file is not None:
                expansion.update_data_from_file(filename = data_file)

        global __textual_expansion_keywords
        __textual_expansion_keywords = None
        global __keyword_expansions
        __keyword_expansions = None

        return expansion
#------------------------------------------------------------
def delete_keyword_expansion(pk=None):
        args = {'pk': pk}
        cmd = "DELETE FROM ref.keyword_expansion WHERE pk = %(pk)s"
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

        global __textual_expansion_keywords
        __textual_expansion_keywords = None
        global __keyword_expansions
        __keyword_expansions = None

        return True

#------------------------------------------------------------------------
#------------------------------------------------------------------------
#------------------------------------------------------------------------
#------------------------------------------------------------------------
__textual_expansion_keywords = None

def get_textual_expansion_keywords():
        global __textual_expansion_keywords
        if __textual_expansion_keywords is not None:
                return __textual_expansion_keywords

        cmd = """SELECT keyword, public_expansion, private_expansion, owner FROM ref.v_keyword_expansions WHERE is_textual IS TRUE"""
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
        __textual_expansion_keywords = rows

        _log.info('retrieved %s textual expansion keywords', len(__textual_expansion_keywords))

        return __textual_expansion_keywords
#------------------------------------------------------------------------
def get_matching_textual_keywords(fragment=None):

        if fragment is None:
                return []

        return [ kwd['keyword'] for kwd in get_textual_expansion_keywords() if kwd['keyword'].startswith(fragment) ]

#------------------------------------------------------------------------
def expand_keyword(keyword = None):

        # Easter Egg ;-)
        if keyword == '$$steffi':
                return 'Hai, play !  Versucht das ! (Keks dazu ?)  :-)'

        cmd = """SELECT expansion FROM ref.v_your_keyword_expansions WHERE keyword = %(kwd)s"""
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': {'kwd': keyword}}])

        if len(rows) == 0:
                return None

        return rows[0]['expansion']
#============================================================
if __name__ == "__main__":

        if len(sys.argv) < 2:
                sys.exit()

        if sys.argv[1] != 'test':
                sys.exit()

        logging.basicConfig(level=logging.DEBUG)

        from Gnumed.pycommon import gmI18N
        gmI18N.install_domain('gnumed')
        gmI18N.activate_locale()

        #--------------------------------------------------------------------
        def test_textual_expansion():
                print("keywords, from database:")
                print(get_textual_expansion_keywords())
                print("keywords, cached:")
                print(get_textual_expansion_keywords())
                print("'$keyword' expands to:")
                print(expand_keyword(keyword = '$dvt'))

        #--------------------------------------------------------------------
        def test_kwd_expansions():
                for k in get_keyword_expansions():
                        print(k.format())
                        print("")
        #--------------------------------------------------------------------
        #test_textual_expansion()
        test_kwd_expansions()

Functions

def create_keyword_expansion(keyword=None, text=None, data_file=None, public=True)
Expand source code
def create_keyword_expansion(keyword=None, text=None, data_file=None, public=True):

        if text is not None:
                if text.strip() == '':
                        text = None

        if None not in [text, data_file]:
                raise ValueError('either <text> or <data_file> must be non-NULL')

        # already exists ?
        cmd = "SELECT 1 FROM ref.v_your_keyword_expansions WHERE public_expansion IS %(public)s AND keyword = %(kwd)s"
        args = {'kwd': keyword, 'public': public}
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
        if len(rows) != 0:
                # can't create duplicate
                return False

        if data_file is not None:
                text = 'fake data'
        args = {'kwd': keyword, 'txt': text}
        if public:
                cmd = """
                        INSERT INTO ref.keyword_expansion (keyword, textual_data, fk_staff)
                        VALUES (
                                gm.nullify_empty_string(%(kwd)s),
                                gm.nullify_empty_string(%(txt)s),
                                null
                        )
                        RETURNING pk
                """
        else:
                cmd = """
                        INSERT INTO ref.keyword_expansion (keyword, textual_data, fk_staff)
                        VALUES (
                                gm.nullify_empty_string(%(kwd)s),
                                gm.nullify_empty_string(%(txt)s),
                                (SELECT pk FROM dem.staff WHERE db_user = current_user)
                        )
                        RETURNING pk
                """
        rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}], return_data = True)
        expansion = cKeywordExpansion(aPK_obj = rows[0]['pk'])
        if data_file is not None:
                expansion.update_data_from_file(filename = data_file)

        global __textual_expansion_keywords
        __textual_expansion_keywords = None
        global __keyword_expansions
        __keyword_expansions = None

        return expansion
def delete_keyword_expansion(pk=None)
Expand source code
def delete_keyword_expansion(pk=None):
        args = {'pk': pk}
        cmd = "DELETE FROM ref.keyword_expansion WHERE pk = %(pk)s"
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

        global __textual_expansion_keywords
        __textual_expansion_keywords = None
        global __keyword_expansions
        __keyword_expansions = None

        return True
def expand_keyword(keyword=None)
Expand source code
def expand_keyword(keyword = None):

        # Easter Egg ;-)
        if keyword == '$$steffi':
                return 'Hai, play !  Versucht das ! (Keks dazu ?)  :-)'

        cmd = """SELECT expansion FROM ref.v_your_keyword_expansions WHERE keyword = %(kwd)s"""
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': {'kwd': keyword}}])

        if len(rows) == 0:
                return None

        return rows[0]['expansion']
def get_expansion(keyword=None, textual_only=True, binary_only=False)
Expand source code
def get_expansion(keyword=None, textual_only=True, binary_only=False):

        if False not in [textual_only, binary_only]:
                raise ValueError('one of <textual_only> and <binary_only> must be False')

        where_parts = ['keyword = %(kwd)s']
        args = {'kwd': keyword}

        if textual_only:
                where_parts.append('is_textual IS TRUE')

        cmd = _SQL_get_keyword_expansions % ' AND '.join(where_parts)
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
        if len(rows) == 0:
                return None

        return cKeywordExpansion(row = {'data': rows[0], 'pk_field': 'pk_expansion'})
def get_keyword_expansions(order_by=None, force_reload=False, return_pks=False)
Expand source code
def get_keyword_expansions(order_by=None, force_reload=False, return_pks=False):
        global __keyword_expansions
        if not force_reload:
                if __keyword_expansions is not None:
                        return __keyword_expansions

        if order_by is None:
                order_by = 'true'
        else:
                order_by = 'true ORDER BY %s' % order_by

        cmd = _SQL_get_keyword_expansions % order_by
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
        if return_pks:
                return [ r['pk_expansion'] for r in rows ]

        __keyword_expansions = [ cKeywordExpansion(row = {'data': r, 'pk_field': 'pk_expansion'}) for r in rows ]
        return __keyword_expansions
def get_matching_textual_keywords(fragment=None)
Expand source code
def get_matching_textual_keywords(fragment=None):

        if fragment is None:
                return []

        return [ kwd['keyword'] for kwd in get_textual_expansion_keywords() if kwd['keyword'].startswith(fragment) ]
def get_textual_expansion_keywords()
Expand source code
def get_textual_expansion_keywords():
        global __textual_expansion_keywords
        if __textual_expansion_keywords is not None:
                return __textual_expansion_keywords

        cmd = """SELECT keyword, public_expansion, private_expansion, owner FROM ref.v_keyword_expansions WHERE is_textual IS TRUE"""
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
        __textual_expansion_keywords = rows

        _log.info('retrieved %s textual expansion keywords', len(__textual_expansion_keywords))

        return __textual_expansion_keywords

Classes

class cKeywordExpansion (aPK_obj: int | dict = None, row: dict = None, link_obj=None)

Keyword indexed text snippets or chunks of data. Used as text macros or to put into documents.

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)

Args

aPK_obj
retrieve data from backend
  • an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
  • a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
must hold the fields
  • data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key column OR
  • pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    }
    
    rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
    objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
    
Expand source code
class cKeywordExpansion(gmBusinessDBObject.cBusinessDBObject):
        """Keyword indexed text snippets or chunks of data. Used as text macros or to put into documents."""
        _cmd_fetch_payload = _SQL_get_keyword_expansions % "pk_expansion = %s"
        _cmds_store_payload = [
                """
                        UPDATE ref.keyword_expansion SET
                                keyword = gm.nullify_empty_string(%(keyword)s),
                                textual_data = CASE
                                        WHEN gm.nullify_empty_string(%(expansion)s) IS NULL
                                                THEN CASE
                                                        WHEN binary_data IS NULL THEN '---fake_data---'
                                                        ELSE NULL
                                                END
                                        ELSE gm.nullify_empty_string(%(expansion)s)
                                END,
                                binary_data = CASE
                                        WHEN gm.nullify_empty_string(%(expansion)s) IS NULL THEN binary_data
                                        ELSE NULL
                                END,
                                encrypted = %(is_encrypted)s
                        WHERE
                                pk = %(pk_expansion)s
                                        AND
                                xmin = %(xmin_expansion)s
                        RETURNING
                                xmin as xmin_expansion
                """
        ]
        _updatable_fields = [
                'keyword',
                'expansion',
                'is_encrypted'
        ]

        #--------------------------------------------------------
        def save_to_file(self, aChunkSize=0, target_mime=None, target_extension=None, ignore_conversion_problems=False):

                if self._payload['is_textual']:
                        return None

                if self._payload['data_size'] == 0:
                        return None

                exported_fname = gmTools.get_unique_filename(prefix = 'gm-data_snippet-')
                success = gmPG2.bytea2file (
                        data_query = {
                                'cmd': 'SELECT substring(binary_data from %(start)s for %(size)s) FROM ref.keyword_expansion WHERE pk = %(pk)s',
                                'args': {'pk': self.pk_obj}
                        },
                        filename = exported_fname,
                        chunk_size = aChunkSize,
                        data_size = self._payload['data_size']
                )

                if not success:
                        return None

                if target_mime is None:
                        return exported_fname

                converted_fname = gmMimeLib.convert_file(filename = exported_fname, target_mime = target_mime)
                if converted_fname is not None:
                        return converted_fname

                _log.warning('conversion failed')
                if ignore_conversion_problems:
                        _log.warning('programmed to ignore conversion problems, hoping receiver can handle [%s]', exported_fname)
                        return exported_fname

                return None

        #--------------------------------------------------------
        def update_data_from_file(self, filename=None):
                if not (os.access(filename, os.R_OK) and os.path.isfile(filename)):
                        _log.error('[%s] is not a readable file' % filename)
                        return False

                gmPG2.file2bytea (
                        query = "UPDATE ref.keyword_expansion SET binary_data = %(data)s::bytea, textual_data = NULL WHERE pk = %(pk)s",
                        filename = filename,
                        args = {'pk': self.pk_obj}
                )

                # must update XMIN now ...
                self.refetch_payload()

                global __textual_expansion_keywords
                __textual_expansion_keywords = None
                global __keyword_expansions
                __keyword_expansions = None

                return True

        #--------------------------------------------------------
        def format(self):
                txt = '%s            #%s\n' % (
                        gmTools.bool2subst (
                                self._payload['is_textual'],
                                _('Textual keyword expansion'),
                                _('Binary keyword expansion')
                        ),
                        self._payload['pk_expansion']
                )
                txt += ' %s%s\n' % (
                        gmTools.bool2subst (
                                self._payload['private_expansion'],
                                _('private'),
                                _('public')
                        ),
                        gmTools.bool2subst (
                                self._payload['is_encrypted'],
                                ', %s' % _('encrypted'),
                                ''
                        )
                )
                txt += _(' Keyword: %s\n') % self._payload['keyword']
                txt += _(' Owner: %s\n') % self._payload['owner']
                if self._payload['is_textual']:
                        txt += '\n%s' % self._payload['expansion']
                else:
                        txt += ' Data: %s (%s Bytes)' % (gmTools.size2str(self._payload['data_size']), self._payload['data_size'])

                return txt

Ancestors

Methods

def save_to_file(self, aChunkSize=0, target_mime=None, target_extension=None, ignore_conversion_problems=False)
Expand source code
def save_to_file(self, aChunkSize=0, target_mime=None, target_extension=None, ignore_conversion_problems=False):

        if self._payload['is_textual']:
                return None

        if self._payload['data_size'] == 0:
                return None

        exported_fname = gmTools.get_unique_filename(prefix = 'gm-data_snippet-')
        success = gmPG2.bytea2file (
                data_query = {
                        'cmd': 'SELECT substring(binary_data from %(start)s for %(size)s) FROM ref.keyword_expansion WHERE pk = %(pk)s',
                        'args': {'pk': self.pk_obj}
                },
                filename = exported_fname,
                chunk_size = aChunkSize,
                data_size = self._payload['data_size']
        )

        if not success:
                return None

        if target_mime is None:
                return exported_fname

        converted_fname = gmMimeLib.convert_file(filename = exported_fname, target_mime = target_mime)
        if converted_fname is not None:
                return converted_fname

        _log.warning('conversion failed')
        if ignore_conversion_problems:
                _log.warning('programmed to ignore conversion problems, hoping receiver can handle [%s]', exported_fname)
                return exported_fname

        return None
def update_data_from_file(self, filename=None)
Expand source code
def update_data_from_file(self, filename=None):
        if not (os.access(filename, os.R_OK) and os.path.isfile(filename)):
                _log.error('[%s] is not a readable file' % filename)
                return False

        gmPG2.file2bytea (
                query = "UPDATE ref.keyword_expansion SET binary_data = %(data)s::bytea, textual_data = NULL WHERE pk = %(pk)s",
                filename = filename,
                args = {'pk': self.pk_obj}
        )

        # must update XMIN now ...
        self.refetch_payload()

        global __textual_expansion_keywords
        __textual_expansion_keywords = None
        global __keyword_expansions
        __keyword_expansions = None

        return True

Inherited members