Module Gnumed.business.gmOrganization
Organization classes
author: Karsten Hilbert et al
Expand source code
"""Organization classes
author: Karsten Hilbert et al
"""
#============================================================
__license__ = "GPL"
import sys, logging
if __name__ == '__main__':
sys.path.insert(0, '../../')
_ = lambda x:x
from Gnumed.pycommon import gmPG2
from Gnumed.pycommon import gmTools
from Gnumed.pycommon import gmBusinessDBObject
from Gnumed.business import gmDemographicRecord
_log = logging.getLogger('gm.org')
#============================================================
def create_org_category(category=None, link_obj=None):
args = {'cat': category}
cmd1 = """INSERT INTO dem.org_category (description) SELECT %(cat)s
WHERE NOT EXISTS (
SELECT 1 FROM dem.org_category WHERE description = %(cat)s or _(description) = %(cat)s
)"""
cmd2 = """SELECT pk FROM dem.org_category WHERE description = %(cat)s or _(description) = %(cat)s LIMIT 1"""
queries = [
{'cmd': cmd1, 'args': args},
{'cmd': cmd2, 'args': args}
]
rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True)
return rows[0][0]
#============================================================
# organization API
#------------------------------------------------------------
_SQL_get_org = 'SELECT * FROM dem.v_orgs WHERE %s'
class cOrg(gmBusinessDBObject.cBusinessDBObject):
_cmd_fetch_payload = _SQL_get_org % 'pk_org = %s'
_cmds_store_payload = [
"""UPDATE dem.org SET
description = %(organization)s,
fk_category = %(pk_category_org)s
WHERE
pk = %(pk_org)s
AND
xmin = %(xmin_org)s
RETURNING
xmin AS xmin_org"""
]
_updatable_fields = [
'organization',
'pk_category_org'
]
#--------------------------------------------------------
def add_unit(self, unit=None):
return create_org_unit(pk_organization = self._payload['pk_org'], unit = unit)
#--------------------------------------------------------
def format(self):
lines = []
lines.append(_('Organization #%s') % self._payload['pk_org'])
lines.append('')
lines.append(' %s "%s"' % (
self._payload['l10n_category'],
self._payload['organization']
))
if self._payload['is_praxis']:
lines.append('')
lines.append(' ' + _('This is your praxis !'))
return '\n'.join(lines)
#--------------------------------------------------------
# properties
#--------------------------------------------------------
def _get_units(self):
return get_org_units(order_by = 'unit', org = self._payload['pk_org'])
units = property(_get_units)
#------------------------------------------------------------
def org_exists(organization:str=None, category=None, link_obj=None) -> cOrg:
args = {'desc': organization, 'cat': category}
if category is None:
cat_part = 'True'
elif isinstance(category, str):
cat_part = 'fk_category = (SELECT pk FROM dem.org_category WHERE description = %(cat)s)'
else:
cat_part = 'fk_category = %(cat)s'
cmd = 'SELECT pk FROM dem.org WHERE description = %%(desc)s AND %s' % cat_part
rows = gmPG2.run_ro_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}])
if len(rows) > 0:
return cOrg(aPK_obj = rows[0][0])
return None
#------------------------------------------------------------
def create_org(organization=None, category=None, link_obj=None):
org = org_exists(link_obj = link_obj, organization = organization, category = category)
if org is not None:
return org
args = {'desc': organization, 'cat': category}
if isinstance(category, str):
cat_part = '(SELECT pk FROM dem.org_category WHERE description = %(cat)s)'
else:
cat_part = '%(cat)s'
cmd = 'INSERT INTO dem.org (description, fk_category) VALUES (%%(desc)s, %s) RETURNING pk' % cat_part
rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}], return_data = True)
return cOrg(aPK_obj = rows[0][0], link_obj = link_obj)
#------------------------------------------------------------
def delete_org(organization=None):
args = {'pk': organization}
cmd = """
DELETE FROM dem.org
WHERE
pk = %(pk)s
AND NOT EXISTS (
SELECT 1 FROM dem.org_unit WHERE fk_org = %(pk)s
)
"""
gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
return True
#------------------------------------------------------------
def get_orgs(order_by=None, return_pks=False):
if order_by is None:
order_by = ''
else:
order_by = 'ORDER BY %s' % order_by
cmd = _SQL_get_org % ('TRUE %s' % order_by)
rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd}])
if return_pks:
return [ r['pk_org'] for r in rows ]
return [ cOrg(row = {'data': r, 'pk_field': 'pk_org'}) for r in rows ]
#============================================================
# organizational units API
#------------------------------------------------------------
_SQL_get_org_unit = 'SELECT * FROM dem.v_org_units WHERE %s'
class cOrgUnit(gmBusinessDBObject.cBusinessDBObject):
_cmd_fetch_payload = _SQL_get_org_unit % 'pk_org_unit = %s'
_cmds_store_payload = [
"""UPDATE dem.org_unit SET
description = %(unit)s,
fk_org = %(pk_org)s,
fk_category = %(pk_category_unit)s,
fk_address = %(pk_address)s
WHERE
pk = %(pk_org_unit)s
AND
xmin = %(xmin_org_unit)s
RETURNING
xmin AS xmin_org_unit"""
]
_updatable_fields = [
'unit',
'pk_org',
'pk_category_unit',
'pk_address'
]
#--------------------------------------------------------
# comms API
#--------------------------------------------------------
def get_comm_channels(self, comm_medium=None):
args = {'pk': self.pk_obj, 'medium': comm_medium}
if comm_medium is None:
cmd = """
SELECT *
FROM dem.v_org_unit_comms
WHERE
pk_org_unit = %(pk)s
"""
else:
cmd = """
SELECT *
FROM dem.v_org_unit_comms
WHERE
pk_org_unit = %(pk)s
AND
comm_type = %(medium)s
"""
rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
return [ gmDemographicRecord.cOrgCommChannel(row = {
'pk_field': 'pk_lnk_org_unit2comm',
'data': r
}) for r in rows
]
comm_channels = property(get_comm_channels)
#--------------------------------------------------------
def link_comm_channel(self, comm_medium=None, url=None, is_confidential=False, pk_channel_type=None):
"""Link a communication medium with this org unit.
@param comm_medium The name of the communication medium.
@param url The communication resource locator.
@type url A str instance.
@param is_confidential Whether the data must be treated as confidential.
@type is_confidential A bool instance.
"""
return gmDemographicRecord.create_comm_channel (
comm_medium = comm_medium,
url = url,
is_confidential = is_confidential,
pk_channel_type = pk_channel_type,
pk_org_unit = self.pk_obj
)
#--------------------------------------------------------
def unlink_comm_channel(self, comm_channel=None):
gmDemographicRecord.delete_comm_channel (
pk = comm_channel['pk_lnk_org_unit2comm'],
pk_org_unit = self.pk_obj
)
#--------------------------------------------------------
# external IDs
#--------------------------------------------------------
def get_external_ids(self, id_type=None, issuer=None):
where_parts = ['pk_org_unit = %(unit)s']
args = {'unit': self.pk_obj}
if id_type is not None:
where_parts.append('name = %(name)s')
args['name'] = id_type.strip()
if issuer is not None:
where_parts.append('issuer = %(issuer)s')
args['issuer'] = issuer.strip()
cmd = "SELECT * FROM dem.v_external_ids4org_unit WHERE %s" % ' AND '.join(where_parts)
rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
return rows
external_ids = property(get_external_ids)
#--------------------------------------------------------
def add_external_id(self, type_name=None, value=None, issuer=None, comment=None, pk_type=None):
"""Adds an external ID to an org unit.
creates ID type if necessary
"""
args = {
'unit': self.pk_obj,
'val': value,
'type_name': type_name,
'pk_type': pk_type,
'issuer': issuer,
'comment': comment
}
# check for existing ID
if pk_type is not None:
cmd = """
SELECT * FROM dem.v_external_ids4org_unit WHERE
pk_org_unit = %(unit)s
AND
pk_type = %(pk_type)s
AND
value = %(val)s"""
else:
# by type/value/issuer
if issuer is None:
cmd = """
SELECT * FROM dem.v_external_ids4org_unit WHERE
pk_org_unit = %(unit)s
AND
name = %(type_name)s
AND
value = %(val)s"""
else:
cmd = """
SELECT * FROM dem.v_external_ids4org_unit WHERE
pk_org_unit = %(unit)s
AND
name = %(type_name)s
AND
value = %(val)s
AND
issuer = %(issuer)s"""
rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
# create new ID if not found
if len(rows) == 0:
if pk_type is None:
cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES (
%(val)s,
(SELECT dem.add_external_id_type(%(type_name)s, %(issuer)s)),
%(comment)s,
%(unit)s
)"""
else:
cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES (
%(val)s,
%(pk_type)s,
%(comment)s,
%(unit)s
)"""
rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
# or update comment of existing ID
else:
row = rows[0]
if comment is not None:
# comment not already there ?
if gmTools.coalesce(row['comment'], '').find(comment.strip()) == -1:
comment = '%s%s' % (gmTools.coalesce(row['comment'], '', '%s // '), comment.strip)
cmd = "UPDATE dem.lnk_org_unit2ext_id SET comment = %(comment)s WHERE pk = %(pk)s"
args = {'comment': comment, 'pk': row['pk_id']}
rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
#--------------------------------------------------------
def update_external_id(self, pk_id=None, type=None, value=None, issuer=None, comment=None):
"""Edits an existing external ID.
Creates ID type if necessary.
"""
cmd = """
UPDATE dem.lnk_org_unit2ext_id SET
fk_type = (SELECT dem.add_external_id_type(%(type)s, %(issuer)s)),
external_id = %(value)s,
comment = gm.nullify_empty_string(%(comment)s)
WHERE
pk = %(pk)s
"""
args = {'pk': pk_id, 'value': value, 'type': type, 'issuer': issuer, 'comment': comment}
gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
#--------------------------------------------------------
def delete_external_id(self, pk_ext_id=None):
cmd = """
DELETE FROM dem.lnk_org_unit2ext_id
WHERE fk_org_unit = %(unit)s AND pk = %(pk)s
"""
args = {'unit': self.pk_obj, 'pk': pk_ext_id}
gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
#--------------------------------------------------------
# address API
#--------------------------------------------------------
def link_address(self, id_type=None, address=None):
self.address = address
return address
#--------------------------------------------------------
def unlink_address(self, address=None, pk_address=None):
"""Remove an address from the org unit.
The address itself remains in the database.
The address can be either cAdress or cPatientAdress.
"""
self.address = None
#--------------------------------------------------------
def format(self, with_address=False, with_org=True, with_comms=False):
lines = []
lines.append(_('Unit%s: %s%s') % (
gmTools.bool2subst (
self._payload['is_praxis_branch'],
_(' (of your praxis)'),
''
),
self._payload['unit'],
gmTools.coalesce(self._payload['l10n_unit_category'], '', ' (%s)')
))
if with_org:
lines.append(_('Organization: %s (%s)') % (
self._payload['organization'],
self._payload['l10n_organization_category']
))
if with_address:
adr = self.address
if adr is not None:
lines.extend(adr.format())
if with_comms:
for comm in self.comm_channels:
lines.append('%s: %s%s' % (
comm['l10n_comm_type'],
comm['url'],
gmTools.bool2subst(comm['is_confidential'], _(' (confidential)'), '', '')
))
return lines
#--------------------------------------------------------
# properties
#--------------------------------------------------------
def _get_address(self):
if self._payload['pk_address'] is None:
return None
return gmDemographicRecord.cAddress(aPK_obj = self._payload['pk_address'])
def _set_address(self, address):
self['pk_address'] = address['pk_address']
self.save()
address = property(_get_address, _set_address)
#--------------------------------------------------------
def _get_org(self):
return cOrg(aPK_obj = self._payload['pk_org'])
organization = property(_get_org)
org = property(_get_org)
comm_channels = property(get_comm_channels)
#------------------------------------------------------------
def create_org_unit(pk_organization:str=None, unit:str=None, link_obj=None) -> cOrgUnit:
_log.debug('creating org unit [%s:%s]', unit, pk_organization)
args = {'desc': unit, 'pk_org': pk_organization}
cmd1 = """
INSERT INTO dem.org_unit (description, fk_org) SELECT
%(desc)s,
%(pk_org)s
WHERE NOT EXISTS (
SELECT 1 FROM dem.org_unit WHERE description = %(desc)s AND fk_org = %(pk_org)s
)"""
cmd2 = _SQL_get_org_unit % 'unit = %(desc)s AND pk_org = %(pk_org)s'
queries = [
{'cmd': cmd1, 'args': args},
{'cmd': cmd2, 'args': args}
]
rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True)
return cOrgUnit(row = {'data': rows[0], 'pk_field': 'pk_org_unit'})
#------------------------------------------------------------
def delete_org_unit(unit:int=None) -> bool:
args = {'pk': unit}
cmd = "DELETE FROM dem.org_unit WHERE pk = %(pk)s"
#-- AND
#-- NOT EXISTS (
#-- SELECT 1 FROM blobs.doc_med where fk_org_unit = %(pk)s
#-- ) AND
#-- NOT EXISTS (
#-- SELECT 1 FROM clin.external_care where fk_org_unit = %(pk)s
#-- ) AND
#-- NOT EXISTS (
#-- SELECT 1 FROM blobs.doc_med where fk_org_unit = %(pk)s
#-- ) AND
#-- NOT EXISTS (
#-- SELECT 1 FROM clin.hospital_stay where fk_org_unit = %(pk)s
#-- ) AND
#-- NOT EXISTS (
#-- SELECT 1 FROM clin.procedure where fk_org_unit = %(pk)s
#-- ) AND
#-- NOT EXISTS (
#-- SELECT 1 FROM clin.test_org where fk_org_unit = %(pk)s
#-- ) AND
#-- NOT EXISTS (
#-- SELECT 1 FROM dem.lnk_org_unit2comm where fk_org_unit = %(pk)s
#-- ) AND
#-- NOT EXISTS (
#-- SELECT 1 FROM dem.lnk_org_unit2ext_id where fk_org_unit = %(pk)s
#-- )
try:
gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
except gmPG2.dbapi.errors.ForeignKeyViolation:
_log.exception('error deleting org unit')
return False
return True
#------------------------------------------------------------
def get_org_units(order_by:str=None, org:int=None, return_pks:bool=False) -> list:
if order_by is None:
order_by = ''
else:
order_by = ' ORDER BY %s' % order_by
if org is None:
where_part = 'TRUE'
else:
where_part = 'pk_org = %(org)s'
args = {'org': org}
cmd = (_SQL_get_org_unit % where_part) + order_by
rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}])
if return_pks:
return [ r['pk_org_unit'] for r in rows ]
return [ cOrgUnit(row = {'data': r, 'pk_field': 'pk_org_unit'}) for r in rows ]
#======================================================================
# main
#----------------------------------------------------------------------
if __name__ == "__main__":
if len(sys.argv) < 2:
sys.exit()
if sys.argv[1] != 'test':
sys.exit()
gmPG2.request_login_params(setup_pool = True)
#print(cOrgUnit(aPK_obj = 21825))
delete_org_unit(2)
#for unit in get_org_units():
# print(unit)
Functions
def create_org(organization=None, category=None, link_obj=None)
-
Expand source code
def create_org(organization=None, category=None, link_obj=None): org = org_exists(link_obj = link_obj, organization = organization, category = category) if org is not None: return org args = {'desc': organization, 'cat': category} if isinstance(category, str): cat_part = '(SELECT pk FROM dem.org_category WHERE description = %(cat)s)' else: cat_part = '%(cat)s' cmd = 'INSERT INTO dem.org (description, fk_category) VALUES (%%(desc)s, %s) RETURNING pk' % cat_part rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}], return_data = True) return cOrg(aPK_obj = rows[0][0], link_obj = link_obj)
def create_org_category(category=None, link_obj=None)
-
Expand source code
def create_org_category(category=None, link_obj=None): args = {'cat': category} cmd1 = """INSERT INTO dem.org_category (description) SELECT %(cat)s WHERE NOT EXISTS ( SELECT 1 FROM dem.org_category WHERE description = %(cat)s or _(description) = %(cat)s )""" cmd2 = """SELECT pk FROM dem.org_category WHERE description = %(cat)s or _(description) = %(cat)s LIMIT 1""" queries = [ {'cmd': cmd1, 'args': args}, {'cmd': cmd2, 'args': args} ] rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True) return rows[0][0]
def create_org_unit(pk_organization: str = None, unit: str = None, link_obj=None) ‑> cOrgUnit
-
Expand source code
def create_org_unit(pk_organization:str=None, unit:str=None, link_obj=None) -> cOrgUnit: _log.debug('creating org unit [%s:%s]', unit, pk_organization) args = {'desc': unit, 'pk_org': pk_organization} cmd1 = """ INSERT INTO dem.org_unit (description, fk_org) SELECT %(desc)s, %(pk_org)s WHERE NOT EXISTS ( SELECT 1 FROM dem.org_unit WHERE description = %(desc)s AND fk_org = %(pk_org)s )""" cmd2 = _SQL_get_org_unit % 'unit = %(desc)s AND pk_org = %(pk_org)s' queries = [ {'cmd': cmd1, 'args': args}, {'cmd': cmd2, 'args': args} ] rows = gmPG2.run_rw_queries(link_obj = link_obj, queries = queries, return_data = True) return cOrgUnit(row = {'data': rows[0], 'pk_field': 'pk_org_unit'})
def delete_org(organization=None)
-
Expand source code
def delete_org(organization=None): args = {'pk': organization} cmd = """ DELETE FROM dem.org WHERE pk = %(pk)s AND NOT EXISTS ( SELECT 1 FROM dem.org_unit WHERE fk_org = %(pk)s ) """ gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) return True
def delete_org_unit(unit: int = None) ‑> bool
-
Expand source code
def delete_org_unit(unit:int=None) -> bool: args = {'pk': unit} cmd = "DELETE FROM dem.org_unit WHERE pk = %(pk)s" #-- AND #-- NOT EXISTS ( #-- SELECT 1 FROM blobs.doc_med where fk_org_unit = %(pk)s #-- ) AND #-- NOT EXISTS ( #-- SELECT 1 FROM clin.external_care where fk_org_unit = %(pk)s #-- ) AND #-- NOT EXISTS ( #-- SELECT 1 FROM blobs.doc_med where fk_org_unit = %(pk)s #-- ) AND #-- NOT EXISTS ( #-- SELECT 1 FROM clin.hospital_stay where fk_org_unit = %(pk)s #-- ) AND #-- NOT EXISTS ( #-- SELECT 1 FROM clin.procedure where fk_org_unit = %(pk)s #-- ) AND #-- NOT EXISTS ( #-- SELECT 1 FROM clin.test_org where fk_org_unit = %(pk)s #-- ) AND #-- NOT EXISTS ( #-- SELECT 1 FROM dem.lnk_org_unit2comm where fk_org_unit = %(pk)s #-- ) AND #-- NOT EXISTS ( #-- SELECT 1 FROM dem.lnk_org_unit2ext_id where fk_org_unit = %(pk)s #-- ) try: gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) except gmPG2.dbapi.errors.ForeignKeyViolation: _log.exception('error deleting org unit') return False return True
def get_org_units(order_by: str = None, org: int = None, return_pks: bool = False) ‑> list
-
Expand source code
def get_org_units(order_by:str=None, org:int=None, return_pks:bool=False) -> list: if order_by is None: order_by = '' else: order_by = ' ORDER BY %s' % order_by if org is None: where_part = 'TRUE' else: where_part = 'pk_org = %(org)s' args = {'org': org} cmd = (_SQL_get_org_unit % where_part) + order_by rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) if return_pks: return [ r['pk_org_unit'] for r in rows ] return [ cOrgUnit(row = {'data': r, 'pk_field': 'pk_org_unit'}) for r in rows ]
def get_orgs(order_by=None, return_pks=False)
-
Expand source code
def get_orgs(order_by=None, return_pks=False): if order_by is None: order_by = '' else: order_by = 'ORDER BY %s' % order_by cmd = _SQL_get_org % ('TRUE %s' % order_by) rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd}]) if return_pks: return [ r['pk_org'] for r in rows ] return [ cOrg(row = {'data': r, 'pk_field': 'pk_org'}) for r in rows ]
def org_exists(organization: str = None, category=None, link_obj=None) ‑> cOrg
-
Expand source code
def org_exists(organization:str=None, category=None, link_obj=None) -> cOrg: args = {'desc': organization, 'cat': category} if category is None: cat_part = 'True' elif isinstance(category, str): cat_part = 'fk_category = (SELECT pk FROM dem.org_category WHERE description = %(cat)s)' else: cat_part = 'fk_category = %(cat)s' cmd = 'SELECT pk FROM dem.org WHERE description = %%(desc)s AND %s' % cat_part rows = gmPG2.run_ro_queries(link_obj = link_obj, queries = [{'cmd': cmd, 'args': args}]) if len(rows) > 0: return cOrg(aPK_obj = rows[0][0]) return None
Classes
class cOrg (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Represents business objects in the database.
Rules
- instances ARE ASSUMED TO EXIST in the database
- PK construction (aPK_obj): DOES verify its existence on instantiation (fetching data fails)
- Row construction (row): allowed by using a dict of pairs of field name: field value (PERFORMANCE improvement)
- does NOT verify FK target existence
- does NOT create new entries in the database
- does NOT lazy-fetch fields on access
Class scope SQL commands and variables:
_cmd_fetch_payload:
- must return exactly one row
- WHERE clause argument values are expected in self.pk_obj (taken from init(aPK_obj))
- must return xmin of all rows that _cmds_store_payload will be updating, so views must support the xmin columns of their underlying tables
_cmds_store_payload:
- one or multiple "update … set … where xmin_ = … and pk = …" statements which actually update the database from the data in self._payload,
- the last query must refetch at least the XMIN values needed to detect concurrent updates, their field names had better be the same as in _cmd_fetch_payload,
- the last query CAN return other fields which is particularly useful when those other fields are computed in the backend and may thus change upon save but will not have been set by the client code explicitly - this is only really of concern if the saved subclass is to be reused after saving rather than re-instantiated
- when subclasses tend to live a while after save_payload() was called and they support computed fields (say, _(some_column) you need to return all columns (see cEncounter)
_updatable_fields:
- a list of fields available for update via object['field']
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Expand source code
class cOrg(gmBusinessDBObject.cBusinessDBObject): _cmd_fetch_payload = _SQL_get_org % 'pk_org = %s' _cmds_store_payload = [ """UPDATE dem.org SET description = %(organization)s, fk_category = %(pk_category_org)s WHERE pk = %(pk_org)s AND xmin = %(xmin_org)s RETURNING xmin AS xmin_org""" ] _updatable_fields = [ 'organization', 'pk_category_org' ] #-------------------------------------------------------- def add_unit(self, unit=None): return create_org_unit(pk_organization = self._payload['pk_org'], unit = unit) #-------------------------------------------------------- def format(self): lines = [] lines.append(_('Organization #%s') % self._payload['pk_org']) lines.append('') lines.append(' %s "%s"' % ( self._payload['l10n_category'], self._payload['organization'] )) if self._payload['is_praxis']: lines.append('') lines.append(' ' + _('This is your praxis !')) return '\n'.join(lines) #-------------------------------------------------------- # properties #-------------------------------------------------------- def _get_units(self): return get_org_units(order_by = 'unit', org = self._payload['pk_org']) units = property(_get_units)
Ancestors
Instance variables
var units
-
Expand source code
def _get_units(self): return get_org_units(order_by = 'unit', org = self._payload['pk_org'])
Methods
def add_unit(self, unit=None)
-
Expand source code
def add_unit(self, unit=None): return create_org_unit(pk_organization = self._payload['pk_org'], unit = unit)
Inherited members
class cOrgUnit (aPK_obj: int | dict = None, row: dict = None, link_obj=None)
-
Represents business objects in the database.
Rules
- instances ARE ASSUMED TO EXIST in the database
- PK construction (aPK_obj): DOES verify its existence on instantiation (fetching data fails)
- Row construction (row): allowed by using a dict of pairs of field name: field value (PERFORMANCE improvement)
- does NOT verify FK target existence
- does NOT create new entries in the database
- does NOT lazy-fetch fields on access
Class scope SQL commands and variables:
_cmd_fetch_payload:
- must return exactly one row
- WHERE clause argument values are expected in self.pk_obj (taken from init(aPK_obj))
- must return xmin of all rows that _cmds_store_payload will be updating, so views must support the xmin columns of their underlying tables
_cmds_store_payload:
- one or multiple "update … set … where xmin_ = … and pk = …" statements which actually update the database from the data in self._payload,
- the last query must refetch at least the XMIN values needed to detect concurrent updates, their field names had better be the same as in _cmd_fetch_payload,
- the last query CAN return other fields which is particularly useful when those other fields are computed in the backend and may thus change upon save but will not have been set by the client code explicitly - this is only really of concern if the saved subclass is to be reused after saving rather than re-instantiated
- when subclasses tend to live a while after save_payload() was called and they support computed fields (say, _(some_column) you need to return all columns (see cEncounter)
_updatable_fields:
- a list of fields available for update via object['field']
Call init from child classes like so:
super().__init__(aPK_obj = aPK_obj, row = row, link_obj = link_obj)
Args
aPK_obj
- retrieve data from backend
- an scalar value the ._cmd_fetch_payload WHERE condition must be a simple column: "… WHERE pk_col = %s"
- a dictionary of values the ._cmd_fetch_payload WHERE condition must consume the dictionary and produce a unique row
row
- must hold the fields
- data: list of column values for the row selected by ._cmd_fetch_payload (as returned by cursor.fetchone() in the DB-API)
- pk_field: the name of the primary key column OR
- pk_obj: a dictionary suitable for being passed to cursor.execute and holding the primary key values, used for composite PKs
- for example:
row = { 'data': rows[0], 'pk_field': 'pk_XXX (the PK column name)', 'pk_obj': {'pk_col1': pk_col1_val, 'pk_col2': pk_col2_val} } rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) objects = [ cChildClass(row = {'data': r, 'pk_field': 'the PK column name'}) for r in rows ]
Expand source code
class cOrgUnit(gmBusinessDBObject.cBusinessDBObject): _cmd_fetch_payload = _SQL_get_org_unit % 'pk_org_unit = %s' _cmds_store_payload = [ """UPDATE dem.org_unit SET description = %(unit)s, fk_org = %(pk_org)s, fk_category = %(pk_category_unit)s, fk_address = %(pk_address)s WHERE pk = %(pk_org_unit)s AND xmin = %(xmin_org_unit)s RETURNING xmin AS xmin_org_unit""" ] _updatable_fields = [ 'unit', 'pk_org', 'pk_category_unit', 'pk_address' ] #-------------------------------------------------------- # comms API #-------------------------------------------------------- def get_comm_channels(self, comm_medium=None): args = {'pk': self.pk_obj, 'medium': comm_medium} if comm_medium is None: cmd = """ SELECT * FROM dem.v_org_unit_comms WHERE pk_org_unit = %(pk)s """ else: cmd = """ SELECT * FROM dem.v_org_unit_comms WHERE pk_org_unit = %(pk)s AND comm_type = %(medium)s """ rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) return [ gmDemographicRecord.cOrgCommChannel(row = { 'pk_field': 'pk_lnk_org_unit2comm', 'data': r }) for r in rows ] comm_channels = property(get_comm_channels) #-------------------------------------------------------- def link_comm_channel(self, comm_medium=None, url=None, is_confidential=False, pk_channel_type=None): """Link a communication medium with this org unit. @param comm_medium The name of the communication medium. @param url The communication resource locator. @type url A str instance. @param is_confidential Whether the data must be treated as confidential. @type is_confidential A bool instance. """ return gmDemographicRecord.create_comm_channel ( comm_medium = comm_medium, url = url, is_confidential = is_confidential, pk_channel_type = pk_channel_type, pk_org_unit = self.pk_obj ) #-------------------------------------------------------- def unlink_comm_channel(self, comm_channel=None): gmDemographicRecord.delete_comm_channel ( pk = comm_channel['pk_lnk_org_unit2comm'], pk_org_unit = self.pk_obj ) #-------------------------------------------------------- # external IDs #-------------------------------------------------------- def get_external_ids(self, id_type=None, issuer=None): where_parts = ['pk_org_unit = %(unit)s'] args = {'unit': self.pk_obj} if id_type is not None: where_parts.append('name = %(name)s') args['name'] = id_type.strip() if issuer is not None: where_parts.append('issuer = %(issuer)s') args['issuer'] = issuer.strip() cmd = "SELECT * FROM dem.v_external_ids4org_unit WHERE %s" % ' AND '.join(where_parts) rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) return rows external_ids = property(get_external_ids) #-------------------------------------------------------- def add_external_id(self, type_name=None, value=None, issuer=None, comment=None, pk_type=None): """Adds an external ID to an org unit. creates ID type if necessary """ args = { 'unit': self.pk_obj, 'val': value, 'type_name': type_name, 'pk_type': pk_type, 'issuer': issuer, 'comment': comment } # check for existing ID if pk_type is not None: cmd = """ SELECT * FROM dem.v_external_ids4org_unit WHERE pk_org_unit = %(unit)s AND pk_type = %(pk_type)s AND value = %(val)s""" else: # by type/value/issuer if issuer is None: cmd = """ SELECT * FROM dem.v_external_ids4org_unit WHERE pk_org_unit = %(unit)s AND name = %(type_name)s AND value = %(val)s""" else: cmd = """ SELECT * FROM dem.v_external_ids4org_unit WHERE pk_org_unit = %(unit)s AND name = %(type_name)s AND value = %(val)s AND issuer = %(issuer)s""" rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) # create new ID if not found if len(rows) == 0: if pk_type is None: cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES ( %(val)s, (SELECT dem.add_external_id_type(%(type_name)s, %(issuer)s)), %(comment)s, %(unit)s )""" else: cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES ( %(val)s, %(pk_type)s, %(comment)s, %(unit)s )""" rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) # or update comment of existing ID else: row = rows[0] if comment is not None: # comment not already there ? if gmTools.coalesce(row['comment'], '').find(comment.strip()) == -1: comment = '%s%s' % (gmTools.coalesce(row['comment'], '', '%s // '), comment.strip) cmd = "UPDATE dem.lnk_org_unit2ext_id SET comment = %(comment)s WHERE pk = %(pk)s" args = {'comment': comment, 'pk': row['pk_id']} rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) #-------------------------------------------------------- def update_external_id(self, pk_id=None, type=None, value=None, issuer=None, comment=None): """Edits an existing external ID. Creates ID type if necessary. """ cmd = """ UPDATE dem.lnk_org_unit2ext_id SET fk_type = (SELECT dem.add_external_id_type(%(type)s, %(issuer)s)), external_id = %(value)s, comment = gm.nullify_empty_string(%(comment)s) WHERE pk = %(pk)s """ args = {'pk': pk_id, 'value': value, 'type': type, 'issuer': issuer, 'comment': comment} gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) #-------------------------------------------------------- def delete_external_id(self, pk_ext_id=None): cmd = """ DELETE FROM dem.lnk_org_unit2ext_id WHERE fk_org_unit = %(unit)s AND pk = %(pk)s """ args = {'unit': self.pk_obj, 'pk': pk_ext_id} gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) #-------------------------------------------------------- # address API #-------------------------------------------------------- def link_address(self, id_type=None, address=None): self.address = address return address #-------------------------------------------------------- def unlink_address(self, address=None, pk_address=None): """Remove an address from the org unit. The address itself remains in the database. The address can be either cAdress or cPatientAdress. """ self.address = None #-------------------------------------------------------- def format(self, with_address=False, with_org=True, with_comms=False): lines = [] lines.append(_('Unit%s: %s%s') % ( gmTools.bool2subst ( self._payload['is_praxis_branch'], _(' (of your praxis)'), '' ), self._payload['unit'], gmTools.coalesce(self._payload['l10n_unit_category'], '', ' (%s)') )) if with_org: lines.append(_('Organization: %s (%s)') % ( self._payload['organization'], self._payload['l10n_organization_category'] )) if with_address: adr = self.address if adr is not None: lines.extend(adr.format()) if with_comms: for comm in self.comm_channels: lines.append('%s: %s%s' % ( comm['l10n_comm_type'], comm['url'], gmTools.bool2subst(comm['is_confidential'], _(' (confidential)'), '', '') )) return lines #-------------------------------------------------------- # properties #-------------------------------------------------------- def _get_address(self): if self._payload['pk_address'] is None: return None return gmDemographicRecord.cAddress(aPK_obj = self._payload['pk_address']) def _set_address(self, address): self['pk_address'] = address['pk_address'] self.save() address = property(_get_address, _set_address) #-------------------------------------------------------- def _get_org(self): return cOrg(aPK_obj = self._payload['pk_org']) organization = property(_get_org) org = property(_get_org) comm_channels = property(get_comm_channels)
Ancestors
Instance variables
var address
-
Expand source code
def _get_address(self): if self._payload['pk_address'] is None: return None return gmDemographicRecord.cAddress(aPK_obj = self._payload['pk_address'])
var comm_channels
-
Expand source code
def get_comm_channels(self, comm_medium=None): args = {'pk': self.pk_obj, 'medium': comm_medium} if comm_medium is None: cmd = """ SELECT * FROM dem.v_org_unit_comms WHERE pk_org_unit = %(pk)s """ else: cmd = """ SELECT * FROM dem.v_org_unit_comms WHERE pk_org_unit = %(pk)s AND comm_type = %(medium)s """ rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) return [ gmDemographicRecord.cOrgCommChannel(row = { 'pk_field': 'pk_lnk_org_unit2comm', 'data': r }) for r in rows ]
var external_ids
-
Expand source code
def get_external_ids(self, id_type=None, issuer=None): where_parts = ['pk_org_unit = %(unit)s'] args = {'unit': self.pk_obj} if id_type is not None: where_parts.append('name = %(name)s') args['name'] = id_type.strip() if issuer is not None: where_parts.append('issuer = %(issuer)s') args['issuer'] = issuer.strip() cmd = "SELECT * FROM dem.v_external_ids4org_unit WHERE %s" % ' AND '.join(where_parts) rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) return rows
var org
-
Expand source code
def _get_org(self): return cOrg(aPK_obj = self._payload['pk_org'])
var organization
-
Expand source code
def _get_org(self): return cOrg(aPK_obj = self._payload['pk_org'])
Methods
def add_external_id(self, type_name=None, value=None, issuer=None, comment=None, pk_type=None)
-
Adds an external ID to an org unit.
creates ID type if necessary
Expand source code
def add_external_id(self, type_name=None, value=None, issuer=None, comment=None, pk_type=None): """Adds an external ID to an org unit. creates ID type if necessary """ args = { 'unit': self.pk_obj, 'val': value, 'type_name': type_name, 'pk_type': pk_type, 'issuer': issuer, 'comment': comment } # check for existing ID if pk_type is not None: cmd = """ SELECT * FROM dem.v_external_ids4org_unit WHERE pk_org_unit = %(unit)s AND pk_type = %(pk_type)s AND value = %(val)s""" else: # by type/value/issuer if issuer is None: cmd = """ SELECT * FROM dem.v_external_ids4org_unit WHERE pk_org_unit = %(unit)s AND name = %(type_name)s AND value = %(val)s""" else: cmd = """ SELECT * FROM dem.v_external_ids4org_unit WHERE pk_org_unit = %(unit)s AND name = %(type_name)s AND value = %(val)s AND issuer = %(issuer)s""" rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) # create new ID if not found if len(rows) == 0: if pk_type is None: cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES ( %(val)s, (SELECT dem.add_external_id_type(%(type_name)s, %(issuer)s)), %(comment)s, %(unit)s )""" else: cmd = """INSERT INTO dem.lnk_org_unit2ext_id (external_id, fk_type, comment, fk_org_unit) VALUES ( %(val)s, %(pk_type)s, %(comment)s, %(unit)s )""" rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}]) # or update comment of existing ID else: row = rows[0] if comment is not None: # comment not already there ? if gmTools.coalesce(row['comment'], '').find(comment.strip()) == -1: comment = '%s%s' % (gmTools.coalesce(row['comment'], '', '%s // '), comment.strip) cmd = "UPDATE dem.lnk_org_unit2ext_id SET comment = %(comment)s WHERE pk = %(pk)s" args = {'comment': comment, 'pk': row['pk_id']} rows = gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
def delete_external_id(self, pk_ext_id=None)
-
Expand source code
def delete_external_id(self, pk_ext_id=None): cmd = """ DELETE FROM dem.lnk_org_unit2ext_id WHERE fk_org_unit = %(unit)s AND pk = %(pk)s """ args = {'unit': self.pk_obj, 'pk': pk_ext_id} gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
def get_comm_channels(self, comm_medium=None)
-
Expand source code
def get_comm_channels(self, comm_medium=None): args = {'pk': self.pk_obj, 'medium': comm_medium} if comm_medium is None: cmd = """ SELECT * FROM dem.v_org_unit_comms WHERE pk_org_unit = %(pk)s """ else: cmd = """ SELECT * FROM dem.v_org_unit_comms WHERE pk_org_unit = %(pk)s AND comm_type = %(medium)s """ rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) return [ gmDemographicRecord.cOrgCommChannel(row = { 'pk_field': 'pk_lnk_org_unit2comm', 'data': r }) for r in rows ]
def get_external_ids(self, id_type=None, issuer=None)
-
Expand source code
def get_external_ids(self, id_type=None, issuer=None): where_parts = ['pk_org_unit = %(unit)s'] args = {'unit': self.pk_obj} if id_type is not None: where_parts.append('name = %(name)s') args['name'] = id_type.strip() if issuer is not None: where_parts.append('issuer = %(issuer)s') args['issuer'] = issuer.strip() cmd = "SELECT * FROM dem.v_external_ids4org_unit WHERE %s" % ' AND '.join(where_parts) rows = gmPG2.run_ro_queries(queries = [{'cmd': cmd, 'args': args}]) return rows
def link_address(self, id_type=None, address=None)
-
Expand source code
def link_address(self, id_type=None, address=None): self.address = address return address
def link_comm_channel(self, comm_medium=None, url=None, is_confidential=False, pk_channel_type=None)
-
Link a communication medium with this org unit.
@param comm_medium The name of the communication medium. @param url The communication resource locator. @type url A str instance. @param is_confidential Whether the data must be treated as confidential. @type is_confidential A bool instance.
Expand source code
def link_comm_channel(self, comm_medium=None, url=None, is_confidential=False, pk_channel_type=None): """Link a communication medium with this org unit. @param comm_medium The name of the communication medium. @param url The communication resource locator. @type url A str instance. @param is_confidential Whether the data must be treated as confidential. @type is_confidential A bool instance. """ return gmDemographicRecord.create_comm_channel ( comm_medium = comm_medium, url = url, is_confidential = is_confidential, pk_channel_type = pk_channel_type, pk_org_unit = self.pk_obj )
def unlink_address(self, address=None, pk_address=None)
-
Remove an address from the org unit.
The address itself remains in the database. The address can be either cAdress or cPatientAdress.
Expand source code
def unlink_address(self, address=None, pk_address=None): """Remove an address from the org unit. The address itself remains in the database. The address can be either cAdress or cPatientAdress. """ self.address = None
def unlink_comm_channel(self, comm_channel=None)
-
Expand source code
def unlink_comm_channel(self, comm_channel=None): gmDemographicRecord.delete_comm_channel ( pk = comm_channel['pk_lnk_org_unit2comm'], pk_org_unit = self.pk_obj )
def update_external_id(self, pk_id=None, type=None, value=None, issuer=None, comment=None)
-
Edits an existing external ID.
Creates ID type if necessary.
Expand source code
def update_external_id(self, pk_id=None, type=None, value=None, issuer=None, comment=None): """Edits an existing external ID. Creates ID type if necessary. """ cmd = """ UPDATE dem.lnk_org_unit2ext_id SET fk_type = (SELECT dem.add_external_id_type(%(type)s, %(issuer)s)), external_id = %(value)s, comment = gm.nullify_empty_string(%(comment)s) WHERE pk = %(pk)s """ args = {'pk': pk_id, 'value': value, 'type': type, 'issuer': issuer, 'comment': comment} gmPG2.run_rw_queries(queries = [{'cmd': cmd, 'args': args}])
Inherited members