Module Gnumed.business.gmOrganization

Organization classes

author: Karsten Hilbert et al

Expand source code
"""Organization classes

author: Karsten Hilbert et al
"""
#============================================================
__license__ = "GPL"


import sys, logging


if __name__ == '__main__':
        sys.path.insert(0, '../../')
        _ = lambda x:x
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmBusinessDBObject

from Gnumed.business import gmDemographicRecord


_log = logging.getLogger('gm.org')

#============================================================
def create_org_category(category=None, link_obj=None):
        args = {'cat': category}
        cmd1 = """INSERT INTO dem.org_category (description) SELECT %(cat)s
WHERE NOT EXISTS (
        SELECT 1 FROM dem.org_category WHERE description = %(cat)s or _(description) = %(cat)s
)"""
        cmd2 = """SELECT pk FROM dem.org_category WHERE description = %(cat)s or _(description) = %(cat)s LIMIT 1"""
        queries = [
                {'cmd': cmd1, 'args': args},
                {'cmd': cmd2, 'args': args}
        ]
        rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True)
        return rows[0][0]

#============================================================
# organization API
#------------------------------------------------------------
_SQL_get_org = 'SELECT * FROM dem.v_orgs WHERE %s'

class cOrg(gmBusinessDBObject.cBusinessDBObject):

        _cmd_fetch_payload = _SQL_get_org % 'pk_org = %s'
        _cmds_store_payload = [
                """UPDATE dem.org SET
                                description = %(organization)s,
                                fk_category = %(pk_category_org)s
                        WHERE
                                pk = %(pk_org)s
                                        AND
                                xmin = %(xmin_org)s
                        RETURNING
                                xmin AS xmin_org"""
        ]
        _updatable_fields = [
                'organization',
                'pk_category_org'
        ]
        #--------------------------------------------------------
        def add_unit(self, unit=None):
                return create_org_unit(pk_organization = self._payload['pk_org'], unit = unit)
        #--------------------------------------------------------
        def format(self):
                lines = []
                lines.append(_('Organization #%s') % self._payload['pk_org'])
                lines.append('')
                lines.append(' %s "%s"' % (
                        self._payload['l10n_category'],
                        self._payload['organization']
                ))
                if self._payload['is_praxis']:
                        lines.append('')
                        lines.append(' ' + _('This is your praxis !'))
                return '\n'.join(lines)
        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_units(self):
                return get_org_units(order_by = 'unit', org = self._payload['pk_org'])

        units = property(_get_units)

#------------------------------------------------------------
def org_exists(organization:str=None, category=None, link_obj=None) -> cOrg:
        args = {'desc': organization, 'cat': category}

        if category is None:
                cat_part = 'True'
        elif isinstance(category, str):
                cat_part = 'fk_category = (SELECT pk FROM dem.org_category WHERE description = %(cat)s)'
        else:
                cat_part = 'fk_category = %(cat)s'

        cmd = 'SELECT pk FROM dem.org WHERE description = %%(desc)s AND %s' % cat_part
        rows = gmPG2.run_ro_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}])
        if len(rows) > 0:
                return cOrg(aPK_obj = rows[0][0])

        return None

#------------------------------------------------------------
def create_org(organization=None, category=None, link_obj=None):

        org = org_exists(link_obj = link_obj, organization = organization, category = category)
        if org is not None:
                return org

        args = {'desc': organization, 'cat': category}

        if isinstance(category, str):
                cat_part = '(SELECT pk FROM dem.org_category WHERE description = %(cat)s)'
        else:
                cat_part = '%(cat)s'

        cmd = 'INSERT INTO dem.org (description, fk_category) VALUES (%%(desc)s, %s) RETURNING pk' % cat_part
        rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}], return_data = True)

        return cOrg(aPK_obj = rows[0][0], link_obj = link_obj)

#------------------------------------------------------------
def delete_org(organization=None):
        args = {'pk': organization}
        cmd = """
                DELETE FROM dem.org
                WHERE
                        pk = %(pk)s
                        AND NOT EXISTS (
                                SELECT 1 FROM dem.org_unit WHERE fk_org = %(pk)s
                        )
        """
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
        return True

#------------------------------------------------------------
def get_orgs(order_by=None, return_pks=False):

        if order_by is None:
                order_by = ''
        else:
                order_by = 'ORDER BY %s' % order_by

        cmd = _SQL_get_org % ('TRUE %s' % order_by)
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
        if return_pks:
                return [ r['pk_org'] for r in rows ]
        return [ cOrg(row = {'data': r, 'pk_field': 'pk_org'}) for r in rows ]

#============================================================
# organizational units API
#------------------------------------------------------------
_SQL_get_org_unit = 'SELECT * FROM dem.v_org_units WHERE %s'

class cOrgUnit(gmBusinessDBObject.cBusinessDBObject):

        _cmd_fetch_payload = _SQL_get_org_unit % 'pk_org_unit = %s'
        _cmds_store_payload = [
                """UPDATE dem.org_unit SET
                                description = %(unit)s,
                                fk_org = %(pk_org)s,
                                fk_category = %(pk_category_unit)s,
                                fk_address = %(pk_address)s
                        WHERE
                                pk = %(pk_org_unit)s
                                        AND
                                xmin = %(xmin_org_unit)s
                        RETURNING
                                xmin AS xmin_org_unit"""
        ]
        _updatable_fields = [
                'unit',
                'pk_org',
                'pk_category_unit',
                'pk_address'
        ]
        #--------------------------------------------------------
        # comms API
        #--------------------------------------------------------
        def get_comm_channels(self, comm_medium=None):

                args = {'pk': self.pk_obj, 'medium': comm_medium}

                if comm_medium is None:
                        cmd = """
                                SELECT *
                                FROM dem.v_org_unit_comms
                                WHERE
                                        pk_org_unit = %(pk)s
                        """
                else:
                        cmd = """
                                SELECT *
                                FROM dem.v_org_unit_comms
                                WHERE
                                        pk_org_unit = %(pk)s
                                                AND
                                        comm_type = %(medium)s
                        """
                rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

                return [ gmDemographicRecord.cOrgCommChannel(row = {
                                        'pk_field': 'pk_lnk_org_unit2comm',
                                        'data': r
                                }) for r in rows
                        ]

        comm_channels = property(get_comm_channels)

        #--------------------------------------------------------
        def link_comm_channel(self, comm_medium=None, url=None, is_confidential=False, pk_channel_type=None):
                """Link a communication medium with this org unit.

                @param comm_medium The name of the communication medium.
                @param url The communication resource locator.
                @type url A str instance.
                @param is_confidential Whether the data must be treated as confidential.
                @type is_confidential A bool instance.
                """
                return gmDemographicRecord.create_comm_channel (
                        comm_medium = comm_medium,
                        url = url,
                        is_confidential = is_confidential,
                        pk_channel_type = pk_channel_type,
                        pk_org_unit = self.pk_obj
                )
        #--------------------------------------------------------
        def unlink_comm_channel(self, comm_channel=None):
                gmDemographicRecord.delete_comm_channel (
                        pk = comm_channel['pk_lnk_org_unit2comm'],
                        pk_org_unit = self.pk_obj
                )
        #--------------------------------------------------------
        # external IDs
        #--------------------------------------------------------
        def get_external_ids(self, id_type=None, issuer=None):
                where_parts = ['pk_org_unit = %(unit)s']
                args = {'unit': self.pk_obj}

                if id_type is not None:
                        where_parts.append('name = %(name)s')
                        args['name'] = id_type.strip()

                if issuer is not None:
                        where_parts.append('issuer = %(issuer)s')
                        args['issuer'] = issuer.strip()

                cmd = "SELECT * FROM dem.v_external_ids4org_unit WHERE %s" % ' AND '.join(where_parts)
                rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

                return rows

        external_ids = property(get_external_ids)
        #--------------------------------------------------------
        def add_external_id(self, type_name=None, value=None, issuer=None, comment=None, pk_type=None):
                """Adds an external ID to an org unit.

                creates ID type if necessary
                """
                args = {
                        'unit': self.pk_obj,
                        'val': value,
                        'type_name': type_name,
                        'pk_type': pk_type,
                        'issuer': issuer,
                        'comment': comment
                }
                # check for existing ID
                if pk_type is not None:
                        cmd = """
                                SELECT * FROM dem.v_external_ids4org_unit WHERE
                                pk_org_unit = %(unit)s
                                        AND
                                pk_type = %(pk_type)s
                                        AND
                                value = %(val)s"""
                else:
                        # by type/value/issuer
                        if issuer is None:
                                cmd = """
                                        SELECT * FROM dem.v_external_ids4org_unit WHERE
                                        pk_org_unit = %(unit)s
                                                AND
                                        name = %(type_name)s
                                                AND
                                        value = %(val)s"""
                        else:
                                cmd = """
                                        SELECT * FROM dem.v_external_ids4org_unit WHERE
                                        pk_org_unit = %(unit)s
                                                AND
                                        name = %(type_name)s
                                                AND
                                        value = %(val)s
                                                AND
                                        issuer = %(issuer)s"""
                rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

                # create new ID if not found
                if len(rows) == 0:
                        if pk_type is None:
                                cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES (
                                        %(val)s,
                                        (SELECT dem.add_external_id_type(%(type_name)s, %(issuer)s)),
                                        %(comment)s,
                                        %(unit)s
                                )"""
                        else:
                                cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES (
                                        %(val)s,
                                        %(pk_type)s,
                                        %(comment)s,
                                        %(unit)s
                                )"""
                        rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

                # or update comment of existing ID
                else:
                        row = rows[0]
                        if comment is not None:
                                # comment not already there ?
                                if gmTools.coalesce(row['comment'], '').find(comment.strip()) == -1:
                                        comment = '%s%s' % (gmTools.coalesce(row['comment'], '', '%s // '), comment.strip)
                                        cmd = "UPDATE dem.lnk_org_unit2ext_id SET comment = %(comment)s WHERE pk = %(pk)s"
                                        args = {'comment': comment, 'pk': row['pk_id']}
                                        rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

        #--------------------------------------------------------
        def update_external_id(self, pk_id=None, type=None, value=None, issuer=None, comment=None):
                """Edits an existing external ID.

                Creates ID type if necessary.
                """
                cmd = """
                        UPDATE dem.lnk_org_unit2ext_id SET
                                fk_type = (SELECT dem.add_external_id_type(%(type)s, %(issuer)s)),
                                external_id = %(value)s,
                                comment = gm.nullify_empty_string(%(comment)s)
                        WHERE
                                pk = %(pk)s
                """
                args = {'pk': pk_id, 'value': value, 'type': type, 'issuer': issuer, 'comment': comment}
                gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

        #--------------------------------------------------------
        def delete_external_id(self, pk_ext_id=None):
                cmd = """
                        DELETE FROM dem.lnk_org_unit2ext_id
                        WHERE fk_org_unit = %(unit)s AND pk = %(pk)s
                """
                args = {'unit': self.pk_obj, 'pk': pk_ext_id}
                gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

        #--------------------------------------------------------
        # address API
        #--------------------------------------------------------
        def link_address(self, id_type=None, address=None):
                self.address = address
                return address
        #--------------------------------------------------------
        def unlink_address(self, address=None, pk_address=None):
                """Remove an address from the org unit.

                The address itself remains in the database.
                The address can be either cAdress or cPatientAdress.
                """
                self.address = None
        #--------------------------------------------------------
        def format(self, with_address=False, with_org=True, with_comms=False):
                lines = []
                lines.append(_('Unit%s: %s%s') % (
                        gmTools.bool2subst (
                                self._payload['is_praxis_branch'],
                                _(' (of your praxis)'),
                                ''
                        ),
                        self._payload['unit'],
                        gmTools.coalesce(self._payload['l10n_unit_category'], '', ' (%s)')
                ))
                if with_org:
                        lines.append(_('Organization: %s (%s)') % (
                                self._payload['organization'],
                                self._payload['l10n_organization_category']
                        ))
                if with_address:
                        adr = self.address
                        if adr is not None:
                                lines.extend(adr.format())
                if with_comms:
                        for comm in self.comm_channels:
                                lines.append('%s: %s%s' % (
                                        comm['l10n_comm_type'],
                                        comm['url'],
                                        gmTools.bool2subst(comm['is_confidential'], _(' (confidential)'), '', '')
                                ))
                return lines

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_address(self):
                if self._payload['pk_address'] is None:
                        return None
                return gmDemographicRecord.cAddress(aPK_obj = self._payload['pk_address'])

        def _set_address(self, address):
                self['pk_address'] = address['pk_address']
                self.save()

        address = property(_get_address, _set_address)

        #--------------------------------------------------------
        def _get_org(self):
                return cOrg(aPK_obj = self._payload['pk_org'])

        organization = property(_get_org)
        org = property(_get_org)

        comm_channels = property(get_comm_channels)

#------------------------------------------------------------
def create_org_unit(pk_organization:str=None, unit:str=None, link_obj=None) -> cOrgUnit:
        _log.debug('creating org unit [%s:%s]', unit, pk_organization)
        args = {'desc': unit, 'pk_org': pk_organization}
        cmd1 = """
                INSERT INTO dem.org_unit (description, fk_org) SELECT
                        %(desc)s,
                        %(pk_org)s
                WHERE NOT EXISTS (
                        SELECT 1 FROM dem.org_unit WHERE description = %(desc)s AND fk_org = %(pk_org)s
                )"""
        cmd2 = _SQL_get_org_unit % 'unit = %(desc)s AND pk_org = %(pk_org)s'
        queries = [
                {'cmd': cmd1, 'args': args},
                {'cmd': cmd2, 'args': args}
        ]
        rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True)
        return cOrgUnit(row = {'data': rows[0], 'pk_field': 'pk_org_unit'})

#------------------------------------------------------------
def delete_org_unit(unit:int=None) -> bool:
        args = {'pk': unit}
        cmd = "DELETE FROM dem.org_unit WHERE pk = %(pk)s"
                        #--                     AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM blobs.doc_med where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM clin.external_care where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM blobs.doc_med where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM clin.hospital_stay where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM clin.procedure where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM clin.test_org where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM dem.lnk_org_unit2comm where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM dem.lnk_org_unit2ext_id where fk_org_unit = %(pk)s
                        #--             )
        try:
                gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
        except gmPG2.dbapi.errors.ForeignKeyViolation:
                _log.exception('error deleting org unit')
                return False

        return True

#------------------------------------------------------------
def get_org_units(order_by:str=None, org:int=None, return_pks:bool=False) -> list:
        if order_by is None:
                order_by = ''
        else:
                order_by = ' ORDER BY %s' % order_by

        if org is None:
                where_part = 'TRUE'
        else:
                where_part = 'pk_org = %(org)s'

        args = {'org': org}
        cmd = (_SQL_get_org_unit % where_part) + order_by
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
        if return_pks:
                return [ r['pk_org_unit'] for r in rows ]

        return [ cOrgUnit(row = {'data': r, 'pk_field': 'pk_org_unit'}) for r in rows ]

#======================================================================
# main
#----------------------------------------------------------------------
if __name__ == "__main__":

        if len(sys.argv) < 2:
                sys.exit()

        if sys.argv[1] != 'test':
                sys.exit()

        gmPG2.request_login_params(setup_pool = True)

        #print(cOrgUnit(aPK_obj = 21825))
        delete_org_unit(2)
        #for unit in get_org_units():
        #       print(unit)

Functions

def create_org(organization=None, category=None, link_obj=None)
Expand source code
def create_org(organization=None, category=None, link_obj=None):

        org = org_exists(link_obj = link_obj, organization = organization, category = category)
        if org is not None:
                return org

        args = {'desc': organization, 'cat': category}

        if isinstance(category, str):
                cat_part = '(SELECT pk FROM dem.org_category WHERE description = %(cat)s)'
        else:
                cat_part = '%(cat)s'

        cmd = 'INSERT INTO dem.org (description, fk_category) VALUES (%%(desc)s, %s) RETURNING pk' % cat_part
        rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}], return_data = True)

        return cOrg(aPK_obj = rows[0][0], link_obj = link_obj)
def create_org_category(category=None, link_obj=None)
Expand source code
def create_org_category(category=None, link_obj=None):
        args = {'cat': category}
        cmd1 = """INSERT INTO dem.org_category (description) SELECT %(cat)s
WHERE NOT EXISTS (
        SELECT 1 FROM dem.org_category WHERE description = %(cat)s or _(description) = %(cat)s
)"""
        cmd2 = """SELECT pk FROM dem.org_category WHERE description = %(cat)s or _(description) = %(cat)s LIMIT 1"""
        queries = [
                {'cmd': cmd1, 'args': args},
                {'cmd': cmd2, 'args': args}
        ]
        rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True)
        return rows[0][0]
def create_org_unit(pk_organization: str = None, unit: str = None, link_obj=None) ‑> cOrgUnit
Expand source code
def create_org_unit(pk_organization:str=None, unit:str=None, link_obj=None) -> cOrgUnit:
        _log.debug('creating org unit [%s:%s]', unit, pk_organization)
        args = {'desc': unit, 'pk_org': pk_organization}
        cmd1 = """
                INSERT INTO dem.org_unit (description, fk_org) SELECT
                        %(desc)s,
                        %(pk_org)s
                WHERE NOT EXISTS (
                        SELECT 1 FROM dem.org_unit WHERE description = %(desc)s AND fk_org = %(pk_org)s
                )"""
        cmd2 = _SQL_get_org_unit % 'unit = %(desc)s AND pk_org = %(pk_org)s'
        queries = [
                {'cmd': cmd1, 'args': args},
                {'cmd': cmd2, 'args': args}
        ]
        rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True)
        return cOrgUnit(row = {'data': rows[0], 'pk_field': 'pk_org_unit'})
def delete_org(organization=None)
Expand source code
def delete_org(organization=None):
        args = {'pk': organization}
        cmd = """
                DELETE FROM dem.org
                WHERE
                        pk = %(pk)s
                        AND NOT EXISTS (
                                SELECT 1 FROM dem.org_unit WHERE fk_org = %(pk)s
                        )
        """
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
        return True
def delete_org_unit(unit: int = None) ‑> bool
Expand source code
def delete_org_unit(unit:int=None) -> bool:
        args = {'pk': unit}
        cmd = "DELETE FROM dem.org_unit WHERE pk = %(pk)s"
                        #--                     AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM blobs.doc_med where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM clin.external_care where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM blobs.doc_med where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM clin.hospital_stay where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM clin.procedure where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM clin.test_org where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM dem.lnk_org_unit2comm where fk_org_unit = %(pk)s
                        #--             )       AND
                        #--             NOT EXISTS (
                        #--                     SELECT 1 FROM dem.lnk_org_unit2ext_id where fk_org_unit = %(pk)s
                        #--             )
        try:
                gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
        except gmPG2.dbapi.errors.ForeignKeyViolation:
                _log.exception('error deleting org unit')
                return False

        return True
def get_org_units(order_by: str = None, org: int = None, return_pks: bool = False) ‑> list
Expand source code
def get_org_units(order_by:str=None, org:int=None, return_pks:bool=False) -> list:
        if order_by is None:
                order_by = ''
        else:
                order_by = ' ORDER BY %s' % order_by

        if org is None:
                where_part = 'TRUE'
        else:
                where_part = 'pk_org = %(org)s'

        args = {'org': org}
        cmd = (_SQL_get_org_unit % where_part) + order_by
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
        if return_pks:
                return [ r['pk_org_unit'] for r in rows ]

        return [ cOrgUnit(row = {'data': r, 'pk_field': 'pk_org_unit'}) for r in rows ]
def get_orgs(order_by=None, return_pks=False)
Expand source code
def get_orgs(order_by=None, return_pks=False):

        if order_by is None:
                order_by = ''
        else:
                order_by = 'ORDER BY %s' % order_by

        cmd = _SQL_get_org % ('TRUE %s' % order_by)
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
        if return_pks:
                return [ r['pk_org'] for r in rows ]
        return [ cOrg(row = {'data': r, 'pk_field': 'pk_org'}) for r in rows ]
def org_exists(organization: str = None, category=None, link_obj=None) ‑> cOrg
Expand source code
def org_exists(organization:str=None, category=None, link_obj=None) -> cOrg:
        args = {'desc': organization, 'cat': category}

        if category is None:
                cat_part = 'True'
        elif isinstance(category, str):
                cat_part = 'fk_category = (SELECT pk FROM dem.org_category WHERE description = %(cat)s)'
        else:
                cat_part = 'fk_category = %(cat)s'

        cmd = 'SELECT pk FROM dem.org WHERE description = %%(desc)s AND %s' % cat_part
        rows = gmPG2.run_ro_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}])
        if len(rows) > 0:
                return cOrg(aPK_obj = rows[0][0])

        return None

Classes

class cOrg (aPK_obj: int | dict = None, row: dict = None, link_obj=None)

Represents business objects in the database.

Rules

  • instances ARE ASSUMED TO EXIST in the database
  • PK construction (aPK_obj): DOES verify its existence on instantiation (fetching data fails)
  • Row construction (row): allowed by using a dict of pairs of field name: field value (PERFORMANCE improvement)
  • does NOT verify FK target existence
  • does NOT create new entries in the database
  • does NOT lazy-fetch fields on access

Class scope SQL commands and variables:

_cmd_fetch_payload:

  • must return exactly one row
  • WHERE clause argument values are expected in self.pk_obj (taken from init(aPK_obj))
  • must return xmin of all rows that _cmds_store_payload will be updating, so views must support the xmin columns of their underlying tables

_cmds_store_payload:

  • one or multiple "update … set … where xmin_ = … and pk = …" statements which actually update the database from the data in self._payload,
  • the last query must refetch at least the XMIN values needed to detect concurrent updates, their field names had better be the same as in _cmd_fetch_payload,
  • the last query CAN return other fields which is particularly useful when those other fields are computed in the backend and may thus change upon save but will not have been set by the client code explicitly - this is only really of concern if the saved subclass is to be reused after saving rather than re-instantiated
  • when subclasses tend to live a while after save_payload() was called and they support computed fields (say, _(some_column) you need to return all columns (see cEncounter)

_updatable_fields:

  • a list of fields available for update via object['field']

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)

Args

aPK_obj
retrieve data from backend
  • an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
  • a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
must hold the fields
  • data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key column OR
  • pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    }
    
    rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
    objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
    
Expand source code
class cOrg(gmBusinessDBObject.cBusinessDBObject):

        _cmd_fetch_payload = _SQL_get_org % 'pk_org = %s'
        _cmds_store_payload = [
                """UPDATE dem.org SET
                                description = %(organization)s,
                                fk_category = %(pk_category_org)s
                        WHERE
                                pk = %(pk_org)s
                                        AND
                                xmin = %(xmin_org)s
                        RETURNING
                                xmin AS xmin_org"""
        ]
        _updatable_fields = [
                'organization',
                'pk_category_org'
        ]
        #--------------------------------------------------------
        def add_unit(self, unit=None):
                return create_org_unit(pk_organization = self._payload['pk_org'], unit = unit)
        #--------------------------------------------------------
        def format(self):
                lines = []
                lines.append(_('Organization #%s') % self._payload['pk_org'])
                lines.append('')
                lines.append(' %s "%s"' % (
                        self._payload['l10n_category'],
                        self._payload['organization']
                ))
                if self._payload['is_praxis']:
                        lines.append('')
                        lines.append(' ' + _('This is your praxis !'))
                return '\n'.join(lines)
        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_units(self):
                return get_org_units(order_by = 'unit', org = self._payload['pk_org'])

        units = property(_get_units)

Ancestors

Instance variables

var units
Expand source code
def _get_units(self):
        return get_org_units(order_by = 'unit', org = self._payload['pk_org'])

Methods

def add_unit(self, unit=None)
Expand source code
def add_unit(self, unit=None):
        return create_org_unit(pk_organization = self._payload['pk_org'], unit = unit)

Inherited members

class cOrgUnit (aPK_obj: int | dict = None, row: dict = None, link_obj=None)

Represents business objects in the database.

Rules

  • instances ARE ASSUMED TO EXIST in the database
  • PK construction (aPK_obj): DOES verify its existence on instantiation (fetching data fails)
  • Row construction (row): allowed by using a dict of pairs of field name: field value (PERFORMANCE improvement)
  • does NOT verify FK target existence
  • does NOT create new entries in the database
  • does NOT lazy-fetch fields on access

Class scope SQL commands and variables:

_cmd_fetch_payload:

  • must return exactly one row
  • WHERE clause argument values are expected in self.pk_obj (taken from init(aPK_obj))
  • must return xmin of all rows that _cmds_store_payload will be updating, so views must support the xmin columns of their underlying tables

_cmds_store_payload:

  • one or multiple "update … set … where xmin_ = … and pk = …" statements which actually update the database from the data in self._payload,
  • the last query must refetch at least the XMIN values needed to detect concurrent updates, their field names had better be the same as in _cmd_fetch_payload,
  • the last query CAN return other fields which is particularly useful when those other fields are computed in the backend and may thus change upon save but will not have been set by the client code explicitly - this is only really of concern if the saved subclass is to be reused after saving rather than re-instantiated
  • when subclasses tend to live a while after save_payload() was called and they support computed fields (say, _(some_column) you need to return all columns (see cEncounter)

_updatable_fields:

  • a list of fields available for update via object['field']

Call init from child classes like so:

    super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)

Args

aPK_obj
retrieve data from backend
  • an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
  • a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
must hold the fields
  • data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
  • pk_field: the name of the primary key column OR
  • pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
  • for example:
    row = {
            'data': rows[0],
            'pk_field': 'pk_XXX (the PK column name)',
            'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val}
    }
    
    rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
    objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
    
Expand source code
class cOrgUnit(gmBusinessDBObject.cBusinessDBObject):

        _cmd_fetch_payload = _SQL_get_org_unit % 'pk_org_unit = %s'
        _cmds_store_payload = [
                """UPDATE dem.org_unit SET
                                description = %(unit)s,
                                fk_org = %(pk_org)s,
                                fk_category = %(pk_category_unit)s,
                                fk_address = %(pk_address)s
                        WHERE
                                pk = %(pk_org_unit)s
                                        AND
                                xmin = %(xmin_org_unit)s
                        RETURNING
                                xmin AS xmin_org_unit"""
        ]
        _updatable_fields = [
                'unit',
                'pk_org',
                'pk_category_unit',
                'pk_address'
        ]
        #--------------------------------------------------------
        # comms API
        #--------------------------------------------------------
        def get_comm_channels(self, comm_medium=None):

                args = {'pk': self.pk_obj, 'medium': comm_medium}

                if comm_medium is None:
                        cmd = """
                                SELECT *
                                FROM dem.v_org_unit_comms
                                WHERE
                                        pk_org_unit = %(pk)s
                        """
                else:
                        cmd = """
                                SELECT *
                                FROM dem.v_org_unit_comms
                                WHERE
                                        pk_org_unit = %(pk)s
                                                AND
                                        comm_type = %(medium)s
                        """
                rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

                return [ gmDemographicRecord.cOrgCommChannel(row = {
                                        'pk_field': 'pk_lnk_org_unit2comm',
                                        'data': r
                                }) for r in rows
                        ]

        comm_channels = property(get_comm_channels)

        #--------------------------------------------------------
        def link_comm_channel(self, comm_medium=None, url=None, is_confidential=False, pk_channel_type=None):
                """Link a communication medium with this org unit.

                @param comm_medium The name of the communication medium.
                @param url The communication resource locator.
                @type url A str instance.
                @param is_confidential Whether the data must be treated as confidential.
                @type is_confidential A bool instance.
                """
                return gmDemographicRecord.create_comm_channel (
                        comm_medium = comm_medium,
                        url = url,
                        is_confidential = is_confidential,
                        pk_channel_type = pk_channel_type,
                        pk_org_unit = self.pk_obj
                )
        #--------------------------------------------------------
        def unlink_comm_channel(self, comm_channel=None):
                gmDemographicRecord.delete_comm_channel (
                        pk = comm_channel['pk_lnk_org_unit2comm'],
                        pk_org_unit = self.pk_obj
                )
        #--------------------------------------------------------
        # external IDs
        #--------------------------------------------------------
        def get_external_ids(self, id_type=None, issuer=None):
                where_parts = ['pk_org_unit = %(unit)s']
                args = {'unit': self.pk_obj}

                if id_type is not None:
                        where_parts.append('name = %(name)s')
                        args['name'] = id_type.strip()

                if issuer is not None:
                        where_parts.append('issuer = %(issuer)s')
                        args['issuer'] = issuer.strip()

                cmd = "SELECT * FROM dem.v_external_ids4org_unit WHERE %s" % ' AND '.join(where_parts)
                rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

                return rows

        external_ids = property(get_external_ids)
        #--------------------------------------------------------
        def add_external_id(self, type_name=None, value=None, issuer=None, comment=None, pk_type=None):
                """Adds an external ID to an org unit.

                creates ID type if necessary
                """
                args = {
                        'unit': self.pk_obj,
                        'val': value,
                        'type_name': type_name,
                        'pk_type': pk_type,
                        'issuer': issuer,
                        'comment': comment
                }
                # check for existing ID
                if pk_type is not None:
                        cmd = """
                                SELECT * FROM dem.v_external_ids4org_unit WHERE
                                pk_org_unit = %(unit)s
                                        AND
                                pk_type = %(pk_type)s
                                        AND
                                value = %(val)s"""
                else:
                        # by type/value/issuer
                        if issuer is None:
                                cmd = """
                                        SELECT * FROM dem.v_external_ids4org_unit WHERE
                                        pk_org_unit = %(unit)s
                                                AND
                                        name = %(type_name)s
                                                AND
                                        value = %(val)s"""
                        else:
                                cmd = """
                                        SELECT * FROM dem.v_external_ids4org_unit WHERE
                                        pk_org_unit = %(unit)s
                                                AND
                                        name = %(type_name)s
                                                AND
                                        value = %(val)s
                                                AND
                                        issuer = %(issuer)s"""
                rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

                # create new ID if not found
                if len(rows) == 0:
                        if pk_type is None:
                                cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES (
                                        %(val)s,
                                        (SELECT dem.add_external_id_type(%(type_name)s, %(issuer)s)),
                                        %(comment)s,
                                        %(unit)s
                                )"""
                        else:
                                cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES (
                                        %(val)s,
                                        %(pk_type)s,
                                        %(comment)s,
                                        %(unit)s
                                )"""
                        rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

                # or update comment of existing ID
                else:
                        row = rows[0]
                        if comment is not None:
                                # comment not already there ?
                                if gmTools.coalesce(row['comment'], '').find(comment.strip()) == -1:
                                        comment = '%s%s' % (gmTools.coalesce(row['comment'], '', '%s // '), comment.strip)
                                        cmd = "UPDATE dem.lnk_org_unit2ext_id SET comment = %(comment)s WHERE pk = %(pk)s"
                                        args = {'comment': comment, 'pk': row['pk_id']}
                                        rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

        #--------------------------------------------------------
        def update_external_id(self, pk_id=None, type=None, value=None, issuer=None, comment=None):
                """Edits an existing external ID.

                Creates ID type if necessary.
                """
                cmd = """
                        UPDATE dem.lnk_org_unit2ext_id SET
                                fk_type = (SELECT dem.add_external_id_type(%(type)s, %(issuer)s)),
                                external_id = %(value)s,
                                comment = gm.nullify_empty_string(%(comment)s)
                        WHERE
                                pk = %(pk)s
                """
                args = {'pk': pk_id, 'value': value, 'type': type, 'issuer': issuer, 'comment': comment}
                gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

        #--------------------------------------------------------
        def delete_external_id(self, pk_ext_id=None):
                cmd = """
                        DELETE FROM dem.lnk_org_unit2ext_id
                        WHERE fk_org_unit = %(unit)s AND pk = %(pk)s
                """
                args = {'unit': self.pk_obj, 'pk': pk_ext_id}
                gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

        #--------------------------------------------------------
        # address API
        #--------------------------------------------------------
        def link_address(self, id_type=None, address=None):
                self.address = address
                return address
        #--------------------------------------------------------
        def unlink_address(self, address=None, pk_address=None):
                """Remove an address from the org unit.

                The address itself remains in the database.
                The address can be either cAdress or cPatientAdress.
                """
                self.address = None
        #--------------------------------------------------------
        def format(self, with_address=False, with_org=True, with_comms=False):
                lines = []
                lines.append(_('Unit%s: %s%s') % (
                        gmTools.bool2subst (
                                self._payload['is_praxis_branch'],
                                _(' (of your praxis)'),
                                ''
                        ),
                        self._payload['unit'],
                        gmTools.coalesce(self._payload['l10n_unit_category'], '', ' (%s)')
                ))
                if with_org:
                        lines.append(_('Organization: %s (%s)') % (
                                self._payload['organization'],
                                self._payload['l10n_organization_category']
                        ))
                if with_address:
                        adr = self.address
                        if adr is not None:
                                lines.extend(adr.format())
                if with_comms:
                        for comm in self.comm_channels:
                                lines.append('%s: %s%s' % (
                                        comm['l10n_comm_type'],
                                        comm['url'],
                                        gmTools.bool2subst(comm['is_confidential'], _(' (confidential)'), '', '')
                                ))
                return lines

        #--------------------------------------------------------
        # properties
        #--------------------------------------------------------
        def _get_address(self):
                if self._payload['pk_address'] is None:
                        return None
                return gmDemographicRecord.cAddress(aPK_obj = self._payload['pk_address'])

        def _set_address(self, address):
                self['pk_address'] = address['pk_address']
                self.save()

        address = property(_get_address, _set_address)

        #--------------------------------------------------------
        def _get_org(self):
                return cOrg(aPK_obj = self._payload['pk_org'])

        organization = property(_get_org)
        org = property(_get_org)

        comm_channels = property(get_comm_channels)

Ancestors

Instance variables

var address
Expand source code
def _get_address(self):
        if self._payload['pk_address'] is None:
                return None
        return gmDemographicRecord.cAddress(aPK_obj = self._payload['pk_address'])
var comm_channels
Expand source code
def get_comm_channels(self, comm_medium=None):

        args = {'pk': self.pk_obj, 'medium': comm_medium}

        if comm_medium is None:
                cmd = """
                        SELECT *
                        FROM dem.v_org_unit_comms
                        WHERE
                                pk_org_unit = %(pk)s
                """
        else:
                cmd = """
                        SELECT *
                        FROM dem.v_org_unit_comms
                        WHERE
                                pk_org_unit = %(pk)s
                                        AND
                                comm_type = %(medium)s
                """
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

        return [ gmDemographicRecord.cOrgCommChannel(row = {
                                'pk_field': 'pk_lnk_org_unit2comm',
                                'data': r
                        }) for r in rows
                ]
var external_ids
Expand source code
def get_external_ids(self, id_type=None, issuer=None):
        where_parts = ['pk_org_unit = %(unit)s']
        args = {'unit': self.pk_obj}

        if id_type is not None:
                where_parts.append('name = %(name)s')
                args['name'] = id_type.strip()

        if issuer is not None:
                where_parts.append('issuer = %(issuer)s')
                args['issuer'] = issuer.strip()

        cmd = "SELECT * FROM dem.v_external_ids4org_unit WHERE %s" % ' AND '.join(where_parts)
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

        return rows
var org
Expand source code
def _get_org(self):
        return cOrg(aPK_obj = self._payload['pk_org'])
var organization
Expand source code
def _get_org(self):
        return cOrg(aPK_obj = self._payload['pk_org'])

Methods

def add_external_id(self, type_name=None, value=None, issuer=None, comment=None, pk_type=None)

Adds an external ID to an org unit.

creates ID type if necessary

Expand source code
def add_external_id(self, type_name=None, value=None, issuer=None, comment=None, pk_type=None):
        """Adds an external ID to an org unit.

        creates ID type if necessary
        """
        args = {
                'unit': self.pk_obj,
                'val': value,
                'type_name': type_name,
                'pk_type': pk_type,
                'issuer': issuer,
                'comment': comment
        }
        # check for existing ID
        if pk_type is not None:
                cmd = """
                        SELECT * FROM dem.v_external_ids4org_unit WHERE
                        pk_org_unit = %(unit)s
                                AND
                        pk_type = %(pk_type)s
                                AND
                        value = %(val)s"""
        else:
                # by type/value/issuer
                if issuer is None:
                        cmd = """
                                SELECT * FROM dem.v_external_ids4org_unit WHERE
                                pk_org_unit = %(unit)s
                                        AND
                                name = %(type_name)s
                                        AND
                                value = %(val)s"""
                else:
                        cmd = """
                                SELECT * FROM dem.v_external_ids4org_unit WHERE
                                pk_org_unit = %(unit)s
                                        AND
                                name = %(type_name)s
                                        AND
                                value = %(val)s
                                        AND
                                issuer = %(issuer)s"""
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

        # create new ID if not found
        if len(rows) == 0:
                if pk_type is None:
                        cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES (
                                %(val)s,
                                (SELECT dem.add_external_id_type(%(type_name)s, %(issuer)s)),
                                %(comment)s,
                                %(unit)s
                        )"""
                else:
                        cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES (
                                %(val)s,
                                %(pk_type)s,
                                %(comment)s,
                                %(unit)s
                        )"""
                rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

        # or update comment of existing ID
        else:
                row = rows[0]
                if comment is not None:
                        # comment not already there ?
                        if gmTools.coalesce(row['comment'], '').find(comment.strip()) == -1:
                                comment = '%s%s' % (gmTools.coalesce(row['comment'], '', '%s // '), comment.strip)
                                cmd = "UPDATE dem.lnk_org_unit2ext_id SET comment = %(comment)s WHERE pk = %(pk)s"
                                args = {'comment': comment, 'pk': row['pk_id']}
                                rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
def delete_external_id(self, pk_ext_id=None)
Expand source code
def delete_external_id(self, pk_ext_id=None):
        cmd = """
                DELETE FROM dem.lnk_org_unit2ext_id
                WHERE fk_org_unit = %(unit)s AND pk = %(pk)s
        """
        args = {'unit': self.pk_obj, 'pk': pk_ext_id}
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
def get_comm_channels(self, comm_medium=None)
Expand source code
def get_comm_channels(self, comm_medium=None):

        args = {'pk': self.pk_obj, 'medium': comm_medium}

        if comm_medium is None:
                cmd = """
                        SELECT *
                        FROM dem.v_org_unit_comms
                        WHERE
                                pk_org_unit = %(pk)s
                """
        else:
                cmd = """
                        SELECT *
                        FROM dem.v_org_unit_comms
                        WHERE
                                pk_org_unit = %(pk)s
                                        AND
                                comm_type = %(medium)s
                """
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

        return [ gmDemographicRecord.cOrgCommChannel(row = {
                                'pk_field': 'pk_lnk_org_unit2comm',
                                'data': r
                        }) for r in rows
                ]
def get_external_ids(self, id_type=None, issuer=None)
Expand source code
def get_external_ids(self, id_type=None, issuer=None):
        where_parts = ['pk_org_unit = %(unit)s']
        args = {'unit': self.pk_obj}

        if id_type is not None:
                where_parts.append('name = %(name)s')
                args['name'] = id_type.strip()

        if issuer is not None:
                where_parts.append('issuer = %(issuer)s')
                args['issuer'] = issuer.strip()

        cmd = "SELECT * FROM dem.v_external_ids4org_unit WHERE %s" % ' AND '.join(where_parts)
        rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])

        return rows
Expand source code
def link_address(self, id_type=None, address=None):
        self.address = address
        return address

Link a communication medium with this org unit.

@param comm_medium The name of the communication medium. @param url The communication resource locator. @type url A str instance. @param is_confidential Whether the data must be treated as confidential. @type is_confidential A bool instance.

Expand source code
def link_comm_channel(self, comm_medium=None, url=None, is_confidential=False, pk_channel_type=None):
        """Link a communication medium with this org unit.

        @param comm_medium The name of the communication medium.
        @param url The communication resource locator.
        @type url A str instance.
        @param is_confidential Whether the data must be treated as confidential.
        @type is_confidential A bool instance.
        """
        return gmDemographicRecord.create_comm_channel (
                comm_medium = comm_medium,
                url = url,
                is_confidential = is_confidential,
                pk_channel_type = pk_channel_type,
                pk_org_unit = self.pk_obj
        )

Remove an address from the org unit.

The address itself remains in the database. The address can be either cAdress or cPatientAdress.

Expand source code
def unlink_address(self, address=None, pk_address=None):
        """Remove an address from the org unit.

        The address itself remains in the database.
        The address can be either cAdress or cPatientAdress.
        """
        self.address = None
Expand source code
def unlink_comm_channel(self, comm_channel=None):
        gmDemographicRecord.delete_comm_channel (
                pk = comm_channel['pk_lnk_org_unit2comm'],
                pk_org_unit = self.pk_obj
        )
def update_external_id(self, pk_id=None, type=None, value=None, issuer=None, comment=None)

Edits an existing external ID.

Creates ID type if necessary.

Expand source code
def update_external_id(self, pk_id=None, type=None, value=None, issuer=None, comment=None):
        """Edits an existing external ID.

        Creates ID type if necessary.
        """
        cmd = """
                UPDATE dem.lnk_org_unit2ext_id SET
                        fk_type = (SELECT dem.add_external_id_type(%(type)s, %(issuer)s)),
                        external_id = %(value)s,
                        comment = gm.nullify_empty_string(%(comment)s)
                WHERE
                        pk = %(pk)s
        """
        args = {'pk': pk_id, 'value': value, 'type': type, 'issuer': issuer, 'comment': comment}
        gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])

Inherited members